00001
00002
00003
00004 #include <algorithm>
00005 #include <map>
00006 #include <vector>
00007
00008 #include "DatabaseInterface/DbiCache.h"
00009 #include "DatabaseInterface/DbiBinaryFile.h"
00010 #include "DatabaseInterface/DbiDBProxy.h"
00011 #include "DatabaseInterface/DbiResultAgg.h"
00012 #include "DatabaseInterface/DbiResultNonAgg.h"
00013 #include "DatabaseInterface/DbiResultKey.h"
00014 #include "DatabaseInterface/DbiResultSet.h"
00015 #include "DatabaseInterface/DbiTableRow.h"
00016 #include "DatabaseInterface/DbiTimerManager.h"
00017 #include "DatabaseInterface/DbiValidityRecBuilder.h"
00018 #include "MessageService/MsgService.h"
00019 #include "Validity/VldRange.h"
00020 #include "Util/UtilString.h"
00021
00022 ClassImp(DbiResultAgg)
00023
00024 typedef vector<const DbiResult*>::const_iterator ConstResultItr_t;
00025
00026
00027
00028
00029
00030 CVSID("$Id: DbiResultAgg.cxx,v 1.29 2008/05/12 09:59:11 nwest Exp $");
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 DbiResultAgg::DbiResultAgg(const string& tableName,
00041 const DbiTableRow* tableRow,
00042 DbiCache* cache,
00043 const DbiValidityRecBuilder* vrecBuilder,
00044 const DbiDBProxy* proxy,
00045 const string& sqlQualifiers) :
00046 DbiResult(0,0,sqlQualifiers),
00047 fSize(0)
00048 {
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078 LEA_CTOR
00079
00080 typedef map<UInt_t,UInt_t> seqToRow_t;
00081
00082 MSG("Dbi", Msg::kVerbose) << "Creating DbiResultAgg" << endl;
00083 SetTableName(tableName);
00084 if ( ! tableRow || ! cache || ! vrecBuilder || ! proxy ) return;
00085
00086
00087
00088
00089
00090 string::size_type loc = sqlQualifiers.find(';');
00091 string::size_type loc2 = sqlQualifiers.find(';',loc+1);
00092 string sqlData = string(sqlQualifiers,loc+1,loc2-loc-1);
00093 string fillOpts = string(sqlQualifiers,loc2+1);
00094
00095
00096
00097
00098 vector<UInt_t> reqSeqNos;
00099 seqToRow_t seqToRow;
00100
00101
00102 UInt_t dbNo = 0;
00103 Int_t maxRowNo = vrecBuilder->GetNumValidityRec() - 1;
00104
00105
00106
00107 for ( Int_t rowNo = 1; rowNo <= maxRowNo; ++rowNo ) {
00108 const DbiValidityRec& vrecRow = vrecBuilder->GetValidityRec(rowNo);
00109
00110
00111 const DbiResult* res = cache->Search(vrecRow,sqlQualifiers);
00112 MSG("Dbi", Msg::kVerbose) << "Checking validity rec " << rowNo
00113 << " " << vrecRow
00114 << "SQL qual: " << sqlQualifiers
00115 << " cache search: " << (void*) res << endl;
00116 if ( res ) {
00117 fResults.push_back(res);
00118 res->Connect();
00119 fSize += res->GetNumRows();
00120 }
00121
00122
00123
00124 else if ( vrecRow.IsGap() ) {
00125 DbiResult* newRes = new DbiResultNonAgg(0, tableRow, &vrecRow);
00126 cache->Adopt(newRes,false);
00127 fResults.push_back(newRes);
00128 newRes->Connect();
00129 }
00130
00131
00132 else {
00133 UInt_t seqNo = vrecRow.GetSeqNo();
00134 reqSeqNos.push_back(seqNo);
00135 seqToRow[seqNo] = rowNo;
00136 fResults.push_back(0);
00137
00138
00139 dbNo = vrecRow.GetDbNo();
00140 }
00141 }
00142
00143
00144
00145
00146 if ( reqSeqNos.size() ) {
00147
00148
00149 sort(reqSeqNos.begin(),reqSeqNos.end());
00150 DbiResultSet* rs = proxy->QuerySeqNos(reqSeqNos,dbNo,sqlData,fillOpts);
00151
00152 this->SetResultsFromDb();
00153 DbiTimerManager::gTimerManager.StartSubWatch(1);
00154 while ( ! rs->IsExhausted() ) {
00155 Int_t seqNo;
00156 *rs >> seqNo;
00157 rs->DecrementCurCol();
00158 Int_t rowNo = -2;
00159 if ( seqToRow.find(seqNo) == seqToRow.end() ) {
00160 MSG("Dbi", Msg::kError)
00161 << "Unexpected SeqNo: " << seqNo << endl;
00162 }
00163 else {
00164 rowNo = seqToRow[seqNo];
00165 MSG("Dbi", Msg::kVerbose)
00166 << "Procesing SeqNo: " << seqNo
00167 << " for row " << rowNo << endl;
00168 }
00169
00170 const DbiValidityRec& vrecRow = vrecBuilder->GetValidityRec(rowNo);
00171 DbiResultNonAgg* newRes = new DbiResultNonAgg(rs,tableRow,&vrecRow);
00172
00173 if ( this->IsExtendedContext() ) newRes->SetCanReuse(false);
00174 if ( rowNo == -2 ) {
00175 delete newRes;
00176 }
00177 else {
00178 MSG("Dbi", Msg::kVerbose)
00179 << "SeqNo: " << seqNo
00180 << " produced " << newRes->GetNumRows() << " rows" << endl;
00181
00182
00183 cache->Adopt(newRes,false);
00184 fResults[rowNo-1] = newRes;
00185 newRes->Connect();
00186 fSize += newRes->GetNumRows();
00187 }
00188 }
00189
00190
00191 delete rs;
00192 }
00193
00194
00195
00196
00197
00198 fRowKeys.reserve(fSize);
00199
00200 DbiValidityRec vRec = vrecBuilder->GetValidityRec(1);
00201 for ( Int_t rowNo = 1; rowNo <= maxRowNo; ++rowNo ) {
00202
00203 const DbiValidityRec& vrecRow = vrecBuilder->GetValidityRec(rowNo);
00204 VldRange r = vrecRow.GetVldRange();
00205 vRec.AndTimeWindow(r.GetTimeStart(),r.GetTimeEnd());
00206
00207 const DbiResult* res = fResults[rowNo-1];
00208 if ( res ) {
00209 UInt_t numEnt = res->GetNumRows();
00210 for (UInt_t entNo = 0; entNo < numEnt; ++entNo )
00211 fRowKeys.push_back(res->GetTableRow(entNo));
00212 }
00213 }
00214
00215
00216
00217 this->BuildLookUpTable();
00218
00219
00220 vRec.SetAggregateNo(-1);
00221 SetValidityRec(vRec);
00222
00223 MSG("Dbi",Msg::kDebug)
00224 << "Aggregate contains " << fSize << " entries. vRec:-" << endl
00225 << vRec << endl;
00226
00227 MSG("Dbi",Msg::kSynopsis) << "Created aggregated result set no. of rows: "
00228 << this->GetNumRows() << endl;
00229
00230 }
00231
00232
00233
00234 DbiResultAgg::~DbiResultAgg() {
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257 LEA_DTOR
00258
00259 MSG("Dbi", Msg::kVerbose) << "Destroying DbiResultAgg." << endl;
00260
00261 for ( ConstResultItr_t itr = fResults.begin();
00262 itr != fResults.end();
00263 ++itr) if ( *itr ) (*itr)->Disconnect();
00264
00265 }
00266
00267
00268 DbiResultKey* DbiResultAgg::CreateKey() const {
00269
00270
00271
00272
00273
00274 DbiResultKey* key = 0;
00275 for ( ConstResultItr_t itr = fResults.begin();
00276 itr != fResults.end();
00277 ++itr ) {
00278 const DbiResult* result = *itr;
00279 if ( result ) {
00280
00281 if ( ! key ) key = result->CreateKey();
00282
00283 else {
00284 const DbiValidityRec& vrec = result->GetValidityRec();
00285 key->AddVRecKey(vrec.GetSeqNo(),vrec.GetCreationDate());
00286 }
00287 }
00288 }
00289
00290
00291 if ( ! key ) key = new DbiResultKey();
00292
00293 return key;
00294
00295 }
00296
00297
00298 const DbiTableRow* DbiResultAgg::GetTableRow(UInt_t row) const {
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315 return ( row >= fRowKeys.size() ) ? 0 : fRowKeys[row];
00316
00317 }
00318
00319
00320
00321 const DbiValidityRec& DbiResultAgg::GetValidityRec(
00322 const DbiTableRow* row) const {
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340 if ( ! row ) return this->GetValidityRecGlobal();
00341 DbiResult* owner = row->GetOwner();
00342 return owner ? owner->GetValidityRecGlobal() : this->GetValidityRecGlobal();
00343
00344 }
00345
00346
00347 Bool_t DbiResultAgg::Satisfies(const string& sqlQualifiers) {
00348
00349
00350
00351
00352
00353 MSG("Dbi",Msg::kDebug)
00354 << "Trying to satisfy: SQL: " << sqlQualifiers
00355 << "\n with CanReuse: " << this->CanReuse()
00356 << " sqlQualifiers: " << this->GetSqlQualifiers()
00357 << endl;
00358 return this->CanReuse()
00359 && this->GetSqlQualifiers() == sqlQualifiers;
00360 }
00361
00362
00363
00364 void DbiResultAgg::Streamer(DbiBinaryFile& bf) {
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374 vector<const DbiResult*>::const_iterator itr = fResults.begin();
00375 vector<const DbiResult*>::const_iterator end = fResults.end();
00376
00377 UInt_t numNonAgg = 0;
00378 for (; itr != end; ++itr) {
00379 const DbiResultNonAgg* rna = dynamic_cast<const DbiResultNonAgg*>(*itr);
00380 if ( rna && ! rna->GetValidityRecGlobal().IsGap() ) ++numNonAgg;
00381 }
00382 bf << numNonAgg;
00383
00384
00385 for (itr = fResults.begin(); itr != end; ++itr) {
00386 const DbiResultNonAgg* rna = dynamic_cast<const DbiResultNonAgg*>(*itr);
00387 if ( rna && ! rna->GetValidityRecGlobal().IsGap() ) bf << *rna;
00388 }
00389 }
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421