//# ColumnsIndex.cc: Index to a table //# Copyright (C) 1998,1999,2000 //# Associated Universities, Inc. Washington DC, USA. //# //# This library is free software; you can redistribute it and/or modify it //# under the terms of the GNU Library General Public License as published by //# the Free Software Foundation; either version 2 of the License, or (at your //# option) any later version. //# //# This library is distributed in the hope that it will be useful, but WITHOUT //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public //# License for more details. //# //# You should have received a copy of the GNU Library General Public License //# along with this library; if not, write to the Free Software Foundation, //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA. //# //# Correspondence concerning AIPS++ should be addressed as follows: //# Internet email: aips2-request@nrao.edu. //# Postal address: AIPS++ Project Office //# National Radio Astronomy Observatory //# 520 Edgemont Road //# Charlottesville, VA 22903-2475 USA //# //# $Id$ //# Includes #include #include #include #include #include #include #include #include #include #include #include namespace casacore { //# NAMESPACE CASACORE - BEGIN ColumnsIndex::ColumnsIndex (const Table& table, const String& columnName, Compare* compareFunction, Bool noSort) : itsLowerKeyPtr (0), itsUpperKeyPtr (0) { Vector columnNames(1); columnNames(0) = columnName; create (table, columnNames, compareFunction, noSort); } ColumnsIndex::ColumnsIndex (const Table& table, const Vector& columnNames, Compare* compareFunction, Bool noSort) { create (table, columnNames, compareFunction, noSort); } ColumnsIndex::ColumnsIndex (const ColumnsIndex& that) : itsLowerKeyPtr (0), itsUpperKeyPtr (0) { copy (that); } ColumnsIndex::~ColumnsIndex() { deleteObjects(); } ColumnsIndex& ColumnsIndex::operator= (const ColumnsIndex& that) { copy (that); return *this; } void ColumnsIndex::copy (const ColumnsIndex& that) { if (this != &that) { deleteObjects(); itsTable = that.itsTable; itsNrrow = itsTable.nrow(); itsNoSort = that.itsNoSort; itsCompare = that.itsCompare; makeObjects (that.itsLowerKeyPtr->description()); } } Vector ColumnsIndex::columnNames() const { const RecordDesc& desc = itsLowerKeyPtr->description(); const uInt nrfield = desc.nfields(); Vector names(nrfield); for (uInt i=0; i*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; case TpUChar: delete (RecordFieldPtr*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; case TpShort: delete (RecordFieldPtr*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; case TpInt: delete (RecordFieldPtr*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; case TpUInt: delete (RecordFieldPtr*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; case TpInt64: delete (RecordFieldPtr*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; case TpFloat: delete (RecordFieldPtr*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; case TpDouble: delete (RecordFieldPtr*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; case TpComplex: delete (RecordFieldPtr*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; case TpDComplex: delete (RecordFieldPtr*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; case TpString: delete (RecordFieldPtr*)(itsLowerFields[i]); delete (RecordFieldPtr*)(itsUpperFields[i]); delete (Vector*)(itsDataVectors[i]); break; default: throw (TableError ("ColumnsIndex: unknown data type")); } itsLowerFields[i] = 0; itsUpperFields[i] = 0; itsDataVectors[i] = 0; itsData[i] = 0; } delete itsLowerKeyPtr; delete itsUpperKeyPtr; itsLowerKeyPtr = 0; itsUpperKeyPtr = 0; } void ColumnsIndex::addColumnToDesc (RecordDesc& description, const TableColumn& column) { const ColumnDesc& columnDesc = column.columnDesc(); DataType dataType = columnDesc.dataType(); if (! columnDesc.isScalar()) { throw (TableError ("ColumnsIndex: column " + columnDesc.name() + " should be a scalar column")); } description.addField (columnDesc.name(), dataType); } void ColumnsIndex::create (const Table& table, const Vector& columnNames, Compare* compareFunction, Bool noSort) { itsTable = table; itsNrrow = itsTable.nrow(); itsCompare = (compareFunction == 0 ? compare : compareFunction); itsNoSort = noSort; // Loop through all column names. // Always add it to the RecordDesc. RecordDesc description; uInt nrfields = columnNames.nelements(); for (uInt i=0; i(0)); itsData.resize (nrfield, False, False); itsData.set (static_cast(0)); itsLowerFields.resize (nrfield, False, False); itsLowerFields.set (static_cast(0)); itsUpperFields.resize (nrfield, False, False); itsUpperFields.set (static_cast(0)); itsColumnChanged.resize (nrfield, False, False); itsColumnChanged.set (True); itsChanged = True; // Create the correct column object for each field. // Also create a RecordFieldPtr object for each Key. // This makes a fast data copy possible. for (uInt i=0; i(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } case TpUChar: { itsLowerFields[i] = new RecordFieldPtr(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } case TpShort: { itsLowerFields[i] = new RecordFieldPtr(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } case TpInt: { itsLowerFields[i] = new RecordFieldPtr(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } case TpUInt: { itsLowerFields[i] = new RecordFieldPtr(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } case TpInt64: { itsLowerFields[i] = new RecordFieldPtr(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } case TpFloat: { itsLowerFields[i] = new RecordFieldPtr(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } case TpDouble: { itsLowerFields[i] = new RecordFieldPtr(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } case TpComplex: { itsLowerFields[i] = new RecordFieldPtr(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } case TpDComplex: { itsLowerFields[i] = new RecordFieldPtr(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } case TpString: { itsLowerFields[i] = new RecordFieldPtr(*itsLowerKeyPtr, i); itsUpperFields[i] = new RecordFieldPtr(*itsUpperKeyPtr, i); itsDataVectors[i] = new Vector; break; } default: throw (TableError ("ColumnsIndex: unknown data type")); } } } void ColumnsIndex::readData() { // Acquire a lock if needed. TableLocker locker(itsTable, FileLocker::Read); rownr_t nrrow = itsTable.nrow(); if (nrrow != itsNrrow) { itsColumnChanged.set (True); itsChanged = True; itsNrrow = nrrow; } if (!itsChanged) { return; } Sort sort; Bool deleteIt; const RecordDesc& desc = itsLowerKeyPtr->description(); uInt nrfield = itsDataTypes.nelements(); for (uInt i=0; i* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } case TpUChar: { Vector* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } case TpShort: { Vector* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } case TpInt: { Vector* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } case TpUInt: { Vector* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } case TpInt64: { Vector* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } case TpFloat: { Vector* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } case TpDouble: { Vector* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } case TpComplex: { Vector* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } case TpDComplex: { Vector* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } case TpString: { Vector* vecptr = (Vector*)itsDataVectors[i]; if (itsColumnChanged[i]) { ScalarColumn(itsTable, name).getColumn (*vecptr, True); } itsData[i] = vecptr->getStorage (deleteIt); sort.sortKey (itsData[i], desc.type(i)); break; } default: throw (TableError ("ColumnsIndex: unknown data type")); } itsColumnChanged[i] = False; } // Sort the data if needed. // Otherwise fill the index vector with 0..n. itsDataIndex.resize (itsNrrow); if (!itsNoSort) { sort.sort (itsDataIndex, itsNrrow); } else { indgen (itsDataIndex); } // Determine all unique keys (itsUniqueIndex will contain the index of // each first unique entry in itsDataIndex). sort.unique (itsUniqueIndex, itsDataIndex); itsDataInx = itsDataIndex.getStorage (deleteIt); itsUniqueInx = itsUniqueIndex.getStorage (deleteIt); itsChanged = False; } rownr_t ColumnsIndex::bsearch (Bool& found, const Block& fieldPtrs) const { found = False; Int64 lower = 0; Int64 upper = itsUniqueIndex.nelements(); upper--; Int64 middle = 0; while (lower <= upper) { middle = (upper + lower) / 2; Int cmp = itsCompare (fieldPtrs, itsData, itsDataTypes, itsDataInx[itsUniqueInx[middle]]); if (cmp < 0) { upper = middle - 1; // go to left } else if (cmp > 0) { middle++; lower = middle; // go to right } else { found = True; break; } } return middle; } Int ColumnsIndex::compare (const Block& fieldPtrs, const Block& dataPtrs, const Block& dataTypes, rownr_t index) { uInt nfield = fieldPtrs.nelements(); for (uInt i=0; i*)(fieldPtrs[i])); const Bool right = ((const Bool*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } case TpUChar: { const uChar left = *(*(RecordFieldPtr*)(fieldPtrs[i])); const uChar right = ((const uChar*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } case TpShort: { const Short left = *(*(RecordFieldPtr*)(fieldPtrs[i])); const Short right = ((const Short*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } case TpInt: { const Int left = *(*(RecordFieldPtr*)(fieldPtrs[i])); const Int right = ((const Int*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } case TpUInt: { const uInt left = *(*(RecordFieldPtr*)(fieldPtrs[i])); const uInt right = ((const uInt*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } case TpInt64: { const Int64 left = *(*(RecordFieldPtr*)(fieldPtrs[i])); const Int64 right = ((const Int64*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } case TpFloat: { const Float left = *(*(RecordFieldPtr*)(fieldPtrs[i])); const Float right = ((const Float*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } case TpDouble: { const Double left = *(*(RecordFieldPtr*)(fieldPtrs[i])); const Double right = ((const Double*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } case TpComplex: { const Complex& left = *(*(RecordFieldPtr*)(fieldPtrs[i])); const Complex& right = ((const Complex*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } case TpDComplex: { const DComplex& left = *(*(RecordFieldPtr*)(fieldPtrs[i])); const DComplex& right = ((const DComplex*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } case TpString: { const String& left = *(*(RecordFieldPtr*)(fieldPtrs[i])); const String& right = ((const String*)(dataPtrs[i]))[index]; if (left < right) { return -1; } else if (left > right) { return 1; } break; } default: throw (TableError ("ColumnsIndex: unknown data type")); } } return 0; } rownr_t ColumnsIndex::getRowNumber (Bool& found, const Record& key) { copyKey (itsLowerFields, key); return getRowNumber (found); } rownr_t ColumnsIndex::getRowNumber (Bool& found) { if (!isUnique()) { throw (TableError ("ColumnsIndex::getRowNumber only possible " "when the index keys are unique")); } // Read the data (if needed). readData(); rownr_t inx = bsearch (found, itsLowerFields); if (found) { inx = itsDataInx[inx]; } return inx; } RowNumbers ColumnsIndex::getRowNumbers (const Record& key) { copyKey (itsLowerFields, key); return getRowNumbers(); } RowNumbers ColumnsIndex::getRowNumbers() { // Read the data (if needed). readData(); Bool found; rownr_t inx = bsearch (found, itsLowerFields); RowNumbers rows; if (found) { fillRowNumbers (rows, inx, inx+1); } return rows; } RowNumbers ColumnsIndex::getRowNumbers (const Record& lowerKey, const Record& upperKey, Bool lowerInclusive, Bool upperInclusive) { copyKey (itsLowerFields, lowerKey); copyKey (itsUpperFields, upperKey); return getRowNumbers (lowerInclusive, upperInclusive); } RowNumbers ColumnsIndex::getRowNumbers (Bool lowerInclusive, Bool upperInclusive) { // Read the data (if needed). readData(); Bool found; // Try to find the lower key. If not found, bsearch is giving the // index of the next higher key. // So increment the start index if found and is not to be included. rownr_t start = bsearch (found, itsLowerFields); if (found && !lowerInclusive) { start++; } // Try to find the upper key. // Increment the end index such that it is not inclusive // (thus increment if the found end index is to be included). rownr_t end = bsearch (found, itsUpperFields); if (found && upperInclusive) { end++; } RowNumbers rows; if (start < end) { fillRowNumbers (rows, start, end); } return rows; } void ColumnsIndex::fillRowNumbers (Vector& rows, rownr_t start, rownr_t end) const { start = itsUniqueInx[start]; if (end < itsUniqueIndex.nelements()) { end = itsUniqueInx[end]; } else { end = itsDataIndex.nelements(); } rownr_t nr = end-start; rows.resize (nr); Bool deleteIt; rownr_t* rowStorage = rows.getStorage (deleteIt); objcopy (rowStorage, itsDataInx+start, nr); rows.putStorage (rowStorage, deleteIt); } void ColumnsIndex::setChanged() { itsColumnChanged.set (True); itsChanged = True; } void ColumnsIndex::setChanged (const String& columnName) { const RecordDesc& desc = itsLowerKeyPtr->description(); uInt nrfield = itsColumnChanged.nelements(); for (uInt i=0; i*)(fieldPtr), key); break; case TpUChar: copyKeyField(*(RecordFieldPtr*)(fieldPtr), key); break; case TpShort: copyKeyField(*(RecordFieldPtr*)(fieldPtr), key); break; case TpInt: copyKeyField(*(RecordFieldPtr*)(fieldPtr), key); break; case TpUInt: copyKeyField(*(RecordFieldPtr*)(fieldPtr), key); break; case TpInt64: copyKeyField(*(RecordFieldPtr*)(fieldPtr), key); break; case TpFloat: copyKeyField(*(RecordFieldPtr*)(fieldPtr), key); break; case TpDouble: copyKeyField(*(RecordFieldPtr*)(fieldPtr), key); break; case TpComplex: copyKeyField(*(RecordFieldPtr*)(fieldPtr), key); break; case TpDComplex: copyKeyField(*(RecordFieldPtr*)(fieldPtr), key); break; case TpString: copyKeyField(*(RecordFieldPtr*)(fieldPtr), key); break; default: throw (TableError ("ColumnsIndex: unknown data type")); } } void ColumnsIndex::copyKey (Block fields, const Record& key) { for (uInt i=0; i