PerInputStreamManager Class Reference

#include <PerInputStreamManager.h>

Inheritance diagram for PerInputStreamManager:
PerStreamManager

List of all members.

Public Member Functions

 PerInputStreamManager ()
virtual ~PerInputStreamManager ()
VldContext GetCurrentVld () const
VldContext GetLastEntryVld () const
PerInputStreamGetOpenedStream (std::string streamname) const
UInt_t GetMaxSyncDelay () const
bool IsBegin () const
bool IsEnd () const
bool IsFileEnd () const
bool IsValidSelectionString () const
std::ostream & Print (std::ostream &s, const char *option="") const
void CloseFile (string streamName="*")
void CloseStream (string streamName="*")
Int_t Get (MomNavigator *mom)
Int_t Next (MomNavigator *mom=0, UInt_t advanceby=1)
PerInputStreamOpenStream (std::string streamName, std::string treeName)
Int_t Previous (MomNavigator *mom=0, UInt_t rewindby=1)
VldContext RecordsAt (MomNavigator *mom, const VldContext &vld)
bool SetFile (string streamName, string fullFilePathName, Per::EAccessMode accessmode)
void SetFileEnd (bool isFileEnd=true)
void SetMaxSyncDelay (UInt_t maxSyncDelay)
void SetSelection (std::string streamName="*", std::string selection="", bool isRequired=false)
void SetSequenceMode (std::string streamName="*", Per::ESequenceMode seqMode=Per::kKey)
void SetPerOwnedDisabled (std::string streamName="*", bool perOwnedDisabled=true)
void SetWindow (std::string streamName="*", double lower=0, double upper=0)
void SetUpdateMode (bool updatemode)
void SetMaxFileRepeat (std::string streamName, int numRepeat=0)
void SetMeanMom (std::string streamName="*", double mm=0.)
void SetPushRandom (std::string streamName="*", bool tf=true)
void SetRandomSeed (int rSeed=0)
int AddFile (std::string fullfilepathname, int at=-1, std::string streamname="*")
int GoToFile (int n, std::string streamname="*")
int GoToFile (std::string fullfilepathname, std::string streamname="*")
int NextFile (int n=1, std::string streamname="*")
int PrevFile (int n=1, std::string streamname="*")
int RemoveFile (std::string fullfilepathname="*", std::string streamname="*")
int IsBeginOfFiles (std::string streamname="*") const
int IsEndOfFiles (std::string streamname="*") const
std::ostream & ListFile (std::ostream &os, std::string streamname="*") const

Private Member Functions

int AdvanceLowerBoundTags (const VldContext &vld)
int AdvanceWindowTags (const VldContext &vld)
int AdvanceSequentialTags ()
int AdvanceRecordTags ()
int LoadRecord (MomNavigator *mom)
int LoadRecordWithTag (const PerInputStream &instream, const PerRecordTags &tag, MomNavigator *mom)
void RemoveAllFragments (MomNavigator *mom, std::string streamName)
void RemoveFragmentsNotInWindow (MomNavigator *mom, std::string streamName, const VldContext &loVld, const VldContext &hiVld)
int RewindRecordTags ()
VldContext GetCurrentKeyVld (bool isMin=true) const
void UpdateTreeFormula ()
void SetCurrentVld (const VldContext &vld)
bool IsSelectedSet ()

Private Attributes

VldContext fCurrentVld
std::string fGlobalSelection
TTreeFormula * fTTreeFormula
std::map< std::string,
std::string > 
fFormulaMap
bool fUpdateMode
UInt_t fMaxSyncDelay
bool fIsNewCurrentVldToSelect
bool fIsSelectedSet
std::map< std::string, VldContextfLastServed
TRandom3 * fRanGen
int fRandomSeed
 OWNED HERE BUT USED BY PERINPUTSTREAMs.
bool fRandomOverride

Detailed Description

Definition at line 27 of file PerInputStreamManager.h.


Constructor & Destructor Documentation

PerInputStreamManager::PerInputStreamManager (  ) 

Definition at line 1213 of file PerInputStreamManager.cxx.

References fRandomSeed, fRanGen, Msg::kDebug, and MSG.

01213                                              : fCurrentVld(Per::GetVldEnd()),
01214       fGlobalSelection(""),fTTreeFormula(0),fFormulaMap(),
01215       fUpdateMode(false),fMaxSyncDelay(15), fIsNewCurrentVldToSelect(true), 
01216       fIsSelectedSet(false), fRandomSeed(0), fRandomOverride(false) {
01217   //
01218   //  Purpose:  Default constructor.
01219   //
01220   //  Arguments: none.
01221   //
01222   //  Return:  n/a.
01223   //
01224   //  Contact:   S. Kasahara
01225   //
01226   fRanGen = new TRandom3(0);
01227   MSG("Per",Msg::kDebug)
01228     << "PerInputStreamManager ctor : Random Seed set to "
01229     << fRandomSeed << endl;
01230 }

PerInputStreamManager::~PerInputStreamManager (  )  [virtual]

Definition at line 1234 of file PerInputStreamManager.cxx.

References fFormulaMap, fRanGen, and fTTreeFormula.

01234                                               {
01235   //
01236   //  Purpose:  Default constructor.
01237   //
01238   //  Arguments: none.
01239   //
01240   //  Return:  n/a.
01241   //
01242   //  oCntact:   S. Kasahara
01243   //
01244 
01245  if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01246  fFormulaMap.clear();
01247 
01248  if ( fRanGen ) delete fRanGen; fRanGen = 0;
01249 
01250 }


Member Function Documentation

int PerInputStreamManager::AddFile ( std::string  fullfilepathname,
int  at = -1,
std::string  streamname = "*" 
)

Definition at line 45 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, PerStreamManager::GetNumStreamOpen(), GetOpenedStream(), Per::GetVldBegin(), Msg::kWarning, MSG, and SetCurrentVld().

Referenced by IoInputStreamItr::AddFile(), PerValidate::StreamFileAdd(), and PerValidate::StreamMgrParallelFileSeq().

00046                                                          {
00047   //
00048   //  Purpose:  Add file to file list at position at of specified stream(s).
00049   //
00050   //  Arguments: fullfilepathname
00051   //             at = position in file list (-1 => end of list)
00052   //             streamname (if "*" (default), apply to all streams)
00053   //             
00054   //  Return: number of streams on which action is successfully applied.
00055   //
00056   //  Contact:   S. Kasahara
00057 
00058   int oldnstream = this -> GetNumStreamOpen();
00059 
00060   int nstream = 0;
00061   if ( streamname == "*" ) {
00062     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00063       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00064       nstream += instream -> AddFile(fullfilepathname,at); 
00065     }
00066   }
00067   else {
00068     PerInputStream* instream = this -> GetOpenedStream(streamname);
00069     if ( instream ) nstream += instream -> AddFile(fullfilepathname,at);
00070     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
00071                                   << endl;
00072   }
00073   // reset if first, otherwise leave it where it is
00074   if ( oldnstream == 0 && nstream > 0 ) SetCurrentVld(Per::GetVldBegin());
00075 
00076   return nstream;
00077 
00078 }

int PerInputStreamManager::AdvanceLowerBoundTags ( const VldContext vld  )  [private]

Definition at line 82 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, PerInputStream::GetSequenceMode(), and Per::kLowerBound.

Referenced by AdvanceRecordTags(), and RewindRecordTags().

00082                                                                          {
00083   //  Purpose:  Advance record tags in Per::kLowerBound streams to
00084   //            be equal to or less than the keyVld given in the argument.
00085   //
00086   //  Argument: none.
00087   //
00088   //  Return:  1 if successful, else 0 (=> waiting for new records on streams 
00089   //           when reading in update mode).   
00090   //
00091   //  Contact:   S. Kasahara
00092   //
00093   //  Notes: To return successful when in update mode requires that the
00094   //         method is able to see at least one record set with validity
00095   //         equal to or greater than the specified input validity, or that
00096   //         the method can see that the open file has been closed by
00097   //         the writer.
00098   //
00099 
00100   int nFail = 0;
00101   for ( StreamMapItr sitr=fStreamMap.begin(); sitr!=fStreamMap.end(); ++sitr ){
00102     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00103     if ( instream->GetSequenceMode() != Per::kLowerBound ) continue;
00104     if ( !instream -> AdvanceLowerBoundTags(keyVld) ) nFail++;
00105   }
00106 
00107   if ( nFail ) return 0;
00108   return 1;
00109 
00110 }

int PerInputStreamManager::AdvanceRecordTags (  )  [private]

Definition at line 163 of file PerInputStreamManager.cxx.

References AdvanceLowerBoundTags(), AdvanceWindowTags(), fCurrentVld, PerInputStream::fLastEntryVld, fLastServed, fMaxSyncDelay, PerStreamManager::fStreamMap, fUpdateMode, VldTimeStamp::GetSec(), PerInputStream::GetSequenceMode(), VldContext::GetTimeStamp(), PerRecordTags::GetVldContext(), Per::GetVldEnd(), PerRecordTags::IsBegin(), PerRecordTags::IsEnd(), PerInputStream::IsEndOfFiles(), PerInputStream::IsFileEnd(), Per::kKey, Per::kSequential, Msg::kVerbose, Msg::kWarning, MSG, PerInputStream::NextFile(), PerInputStream::NextTags(), RewindRecordTags(), and SetCurrentVld().

Referenced by Next(), and RecordsAt().

00163                                              {
00164   //  Purpose:  Advance record tags in managed streams to one past
00165   //            fCurrentVld.
00166   //
00167   //  Argument: none.
00168   //
00169   //  Return:  1 if successful, else 0 (=> end of all 
00170   //           streams or waiting for new records on streams when 
00171   //           reading in update mode).
00172   //
00173   //  Contact:   S. Kasahara
00174   //
00175   //  Notes: This method checks each stream's current record set to see if the 
00176   //         set has a vld that matches or is less than fCurrentVld.  If so, 
00177   //         it advances one record set on that stream.  
00178   //         At the end of the method, if advance has been successful,
00179   //         fCurrentVld is set to point to the lowest validity
00180   //         of all the current stream record tags, or Per::GetVldEnd()
00181   //         if at the end of all streams. 
00182 
00183   int nFail = 0;
00184   VldContext newVld = Per::GetVldEnd();
00185   int  nKey = 0;
00186   int  nSeq = 0;
00187   bool allSeqDone = true;
00188 
00189   for ( StreamMapItr sitr=fStreamMap.begin();sitr != fStreamMap.end();++sitr ){
00190     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00191     if ( instream->GetSequenceMode() != Per::kKey ) continue;
00192     nKey++;
00193     PerRecordTags tags = instream -> GetTags();
00194     VldContext lastvld = fLastServed[sitr->first];
00195     bool isdone = false;
00196     while ( !isdone ) {
00197       isdone = true;
00198       if ( tags.IsBegin() || (tags.GetVldContext() <= fCurrentVld 
00199                           || lastvld == tags.GetVldContext())   
00200                           || (tags.IsEnd() && !instream->IsEndOfFiles()) ) {
00201         tags = instream->NextTags();
00202         if ( tags.IsEnd() ) {
00203           if ( fUpdateMode && !instream->IsFileEnd() ) nFail++;
00204           else if (instream->NextFile()) isdone = false;
00205         }
00206       }
00207     }
00208     if ( tags.GetVldContext() < newVld ) newVld = tags.GetVldContext();
00209   }
00210   // if there is no kKey stream, look for a kSequential stream
00211   if (!nKey) {
00212     for ( StreamMapItr sitr=fStreamMap.begin();
00213           sitr != fStreamMap.end();++sitr ){
00214       PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00215       if ( instream->GetSequenceMode() != Per::kSequential ) continue;
00216       nSeq++;
00217       PerRecordTags tags = instream -> GetTags();
00218       VldContext lastvld = fLastServed[sitr->first];
00219       bool isdone = false;
00220       while ( !isdone ) {
00221         isdone = true;
00222         if ( tags.IsBegin()                               ||
00223              ( tags.IsEnd() && !instream->IsEndOfFiles() ) ) {
00224           tags = instream->NextTags();
00225           if ( tags.IsEnd() ) {
00226             if ( fUpdateMode && !instream->IsFileEnd() ) nFail++;
00227             else if (instream->NextFile()) isdone = false;
00228           }
00229         }
00230       }
00231       if ( tags.GetVldContext() < newVld ) newVld = tags.GetVldContext();
00232       if ( !( tags.IsEnd() && instream->IsEndOfFiles() ) ) allSeqDone = false;
00233     } // end for loop over the stream map
00234   }
00235 
00236   if ( !nKey && !nSeq) {
00237     // Must be at least one key stream in list of managed streams
00238     MSG("Per",Msg::kWarning)
00239       << "No stream of sequence mode Per::kKey or Per::kSequential "
00240       << "found.\n Must be at least one key or sequential stream to "
00241       << "facilitate sequencing." 
00242       << std::endl;
00243     return 0;
00244   }
00245 
00246   // Now sequence any Per::kLowerBound or Per:kWindow streams relative
00247   // to newVld (kSequential and kRandom streams advance themselves)
00248   if ( !this -> AdvanceLowerBoundTags(newVld) ) nFail++;
00249   if ( !this -> AdvanceWindowTags(newVld) ) nFail++;
00250 
00251   if ( nFail ) {
00252     // One or more streams failed to advance but file is not closed and 
00253     // update mode has been requested (e.g. client is dispatcher)
00254     // Check to see if we should declare a successful advance anyway because
00255     // one or more streams indicate that the condition of fMaxSyncDelay has
00256     // been met.
00257     MSG("Per",Msg::kVerbose) << nFail << " streams failed to advance\n" 
00258                              << " checking time to tree end against maxsyncdelay " 
00259                              << fMaxSyncDelay << "." << endl;
00260     
00261     Int_t maxTimeToTreeEnd = fMaxSyncDelay - 1;
00262     for( StreamMapItr sitr=fStreamMap.begin(); sitr!=fStreamMap.end();++sitr ){
00263       PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00264       if ( instream -> IsOpen() ) {
00265         Int_t timeToTreeEnd = 0;
00266         if ( instream -> fLastEntryVld.IsValid() ) {
00267           timeToTreeEnd = (Int_t)(instream->fLastEntryVld.GetTimeStamp().GetSec())
00268             - (Int_t)(newVld.GetTimeStamp().GetSec());
00269         }
00270         MSG("Per",Msg::kVerbose) << sitr->first << " time to end " 
00271                                  << timeToTreeEnd << "(sec)" << endl;
00272         maxTimeToTreeEnd = TMath::Max(maxTimeToTreeEnd,timeToTreeEnd);
00273         if ( timeToTreeEnd < (Int_t)fMaxSyncDelay ) instream -> UpdateTree();
00274       }
00275     }
00276     if ( maxTimeToTreeEnd < (Int_t)fMaxSyncDelay ) {
00277       if ( fCurrentVld != newVld ) {
00278         SetCurrentVld(newVld);
00279         this -> RewindRecordTags(); // to push back to where we were
00280       }
00281       return 0; // failure
00282     }  
00283   }
00284 
00285   for ( StreamMapItr sitr = fStreamMap.begin();
00286         sitr != fStreamMap.end(); ++sitr)  {
00287     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00288     if ( instream->GetSequenceMode() != Per::kKey       &&
00289          instream->GetSequenceMode() != Per::kSequential ) continue;
00290     PerRecordTags tags = instream -> GetTags();
00291     if ( tags.GetVldContext() == newVld ) {
00292       fLastServed[sitr->first] = newVld;
00293     }
00294   }
00295 
00296   if ( (newVld == fCurrentVld && !nSeq ) ) {
00297     return 0; // failure
00298   }
00299 
00300   if ( nSeq && allSeqDone ) {
00301     return 0; // failure
00302   }
00303 
00304   SetCurrentVld(newVld); // equals Per::GetVldEnd() when end of all streams reached
00305   return 1; // success
00306 
00307 }

int PerInputStreamManager::AdvanceSequentialTags (  )  [private]

Definition at line 138 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, PerInputStream::GetSequenceMode(), Per::kRandom, and Per::kSequential.

00138                                                  {
00139   //  Purpose: Attempt to advance record tags in Per::kSequential &
00140   //  Per::kRandom streams to make the stream active and at an acceptable
00141   //  record regardless of VldContext.
00142   //
00143   //  Argument: None.
00144   //
00145   //  Return:  1 if successful, else 0 (=> waiting for new records on streams 
00146   //           when reading in update mode).   
00147   //
00148 
00149   int nFail = 0;
00150   for ( StreamMapItr sitr=fStreamMap.begin();sitr!=fStreamMap.end(); sitr++ ) {
00151     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00152     if (instream->GetSequenceMode() != Per::kSequential &&
00153         instream->GetSequenceMode() != Per::kRandom      ) continue;
00154     if ( !instream -> AdvanceTagsList() ) nFail++;
00155   }
00156   
00157   if (nFail) return 0;
00158   return 1;
00159 }

int PerInputStreamManager::AdvanceWindowTags ( const VldContext vld  )  [private]

Definition at line 114 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, PerInputStream::GetSequenceMode(), and Per::kWindow.

Referenced by AdvanceRecordTags(), and RewindRecordTags().

00114                                                                      {
00115   //  Purpose: Attempt to advance record tags in Per::kWindow streams
00116   //  such that their VldContext is in the window around the key
00117   //  VldContext.  The window is defined in the individual streams.
00118   //
00119   //  Argument: VldContext of the key record.
00120   //
00121   //  Return:  1 if successful, else 0 (=> waiting for new records on streams 
00122   //           when reading in update mode).   
00123   //
00124 
00125   int nFail = 0;
00126   for ( StreamMapItr sitr=fStreamMap.begin();sitr!=fStreamMap.end(); sitr++ ) {
00127     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00128     if (instream->GetSequenceMode() != Per::kWindow) continue;
00129     if ( !instream -> AdvanceWindowTags(keyVld) ) nFail++;
00130   }
00131   
00132   if (nFail) return 0;
00133   return 1;
00134 }

void PerInputStreamManager::CloseFile ( string  streamName = "*"  )  [virtual]

Reimplemented from PerStreamManager.

Definition at line 311 of file PerInputStreamManager.cxx.

References PerStreamManager::GetNumStreamOpen(), Per::GetVldEnd(), and SetCurrentVld().

Referenced by DemoInputModule::EndFile(), DDSChildServer::GoToFile(), DDSChildServer::Next(), PerValidate::StreamMgrFileChangeSeq(), and PerValidate::StreamMgrParallelFileSeq().

00311                                                        {
00312   //  Purpose:  Close current file serving specified stream(s).
00313   //
00314   //  Argument: streamName string  name of stream on which to close file.
00315   //                               if streamname="*" (default), all
00316   //                               streams will have their files closed.
00317   //
00318   //  Return:  none.
00319   //
00320   //  Contact:   S. Kasahara
00321   //
00322 
00323   PerStreamManager::CloseFile(streamName);
00324   // When all files have been closed, reset fCurrentVld to reflect this
00325   if ( this->GetNumStreamOpen() <= 0 ) SetCurrentVld(Per::GetVldEnd()); 
00326 
00327 }

void PerInputStreamManager::CloseStream ( string  streamName = "*"  )  [virtual]

Reimplemented from PerStreamManager.

Definition at line 331 of file PerInputStreamManager.cxx.

References fLastServed, PerStreamManager::GetNumStreamOpen(), Per::GetVldEnd(), and SetCurrentVld().

Referenced by IoInputStreamItr::DefineStream(), DemoInputModule::EndJob(), DDSChildServer::Shutdown(), IoInputStreamItr::Streams(), DDSChildServer::Subscribe(), DDSChildServer::~DDSChildServer(), and IoInputStreamItr::~IoInputStreamItr().

00331                                                          {
00332   //  Purpose:  Close specified stream(s).
00333   //
00334   //  Argument: streamName string  name of stream to close.
00335   //                               if streamName="*" (default), all
00336   //                               streams will be closed.
00337   //
00338   //  Return:  none.
00339   //
00340   //  Contact:   S. Kasahara
00341   //
00342 
00343   PerStreamManager::CloseStream(streamName);
00344   // When all files have been closed, reset fCurrentVld to reflect this
00345   if ( this->GetNumStreamOpen() <= 0 ) SetCurrentVld(Per::GetVldEnd()); 
00346   if ( streamName == "*" ) fLastServed.clear();
00347   else {
00348     std::map<std::string,VldContext>::iterator itr 
00349                                             = fLastServed.find(streamName);
00350     if ( itr != fLastServed.end() ) fLastServed.erase(itr);
00351   }
00352   
00353 }

Int_t PerInputStreamManager::Get ( MomNavigator mom  ) 

Definition at line 357 of file PerInputStreamManager.cxx.

References IsSelectedSet(), and LoadRecord().

Referenced by DDSChildServer::Get(), IoInputStreamItr::LoadRecords(), RemoveAllFragments(), and RemoveFragmentsNotInWindow().

00357                                                   {
00358   //  Purpose:  Retrieve current record set from managed record streams
00359   //            with no advancing.  
00360   //
00361   //  Argument: mom        pointer to MomNavigator
00362   //
00363   //  Return:  number of records retrieved.  
00364   //
00365   //  Contact:   S. Kasahara
00366   //
00367   //  Notes:  If one record in a record set (sharing common vldcontext) is 
00368   //          rejected due to a selection cut, then entire set is rejected.
00369   //
00370 
00371   if ( !mom ) return 0;
00372 
00373   Int_t nrecord = 0;
00374   if ( IsSelectedSet() ) {
00375     nrecord = this -> LoadRecord(mom);
00376   }
00377 
00378   return nrecord;
00379 
00380 }

VldContext PerInputStreamManager::GetCurrentKeyVld ( bool  isMin = true  )  const [private]

Definition at line 1138 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, PerInputStream::GetTags(), Per::GetVldBegin(), PerRecordTags::GetVldContext(), Per::GetVldEnd(), and PerInputStream::IsOpen().

Referenced by GoToFile(), NextFile(), and PrevFile().

01138                                                                    {
01139   // 
01140   // Purpose:  Determine current minimum or maximum validity of managed 
01141   //           key stream record block tags
01142   //
01143   // Arguments: isMin = true (default) => return minimum validity, else
01144   //            returns max validity
01145   //
01146   // Contact: S. Kasahara
01147   //
01148 
01149   VldContext currentVld = Per::GetVldEnd();
01150   if ( !isMin ) currentVld = Per::GetVldBegin();
01151 
01152   for( StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++ ) {
01153     PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01154     if ( instream && instream->IsOpen() ) {
01155       VldContext tagVld = instream->GetTags().GetVldContext();
01156       if ( isMin && tagVld < currentVld ) currentVld = tagVld;
01157       else if ( !isMin && tagVld > currentVld ) currentVld = tagVld;
01158     } 
01159   }
01160 
01161   return currentVld;
01162 
01163 }

VldContext PerInputStreamManager::GetCurrentVld (  )  const [inline]

Definition at line 36 of file PerInputStreamManager.h.

References fCurrentVld.

Referenced by DDSChildServer::Next().

00036 { return fCurrentVld; }

VldContext PerInputStreamManager::GetLastEntryVld (  )  const

Definition at line 384 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, PerInputStream::GetLastEntryVld(), and Per::GetVldBegin().

Referenced by DDSChildServer::Next().

00384                                                         {
00385   //
00386   //  Purpose:  Return last entry vld of any open stream.  This is useful
00387   //            when reading record sets from an open file, e.g. when
00388   //            used in context of dispatcher, to determine how close we
00389   //            are to the file end.
00390   //
00391 
00392   VldContext lastentryvld = Per::GetVldBegin();
00393   for (StreamMapConstItr itr=fStreamMap.begin(); itr!=fStreamMap.end(); itr++){
00394     PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00395     instream -> UpdateTree();
00396     if ( instream -> GetLastEntryVld() > lastentryvld ) 
00397       lastentryvld = instream->GetLastEntryVld();
00398   }
00399 
00400   return lastentryvld;
00401 
00402 }

UInt_t PerInputStreamManager::GetMaxSyncDelay (  )  const [inline]

Definition at line 39 of file PerInputStreamManager.h.

References fMaxSyncDelay.

00039 { return fMaxSyncDelay; }

PerInputStream * PerInputStreamManager::GetOpenedStream ( std::string  streamname  )  const [inline, virtual]
int PerInputStreamManager::GoToFile ( std::string  fullfilepathname,
std::string  streamname = "*" 
)

Definition at line 463 of file PerInputStreamManager.cxx.

References fCurrentVld, PerStreamManager::fStreamMap, GetCurrentKeyVld(), GetOpenedStream(), PerInputStream::GetTags(), PerRecordTags::GetVldContext(), Per::GetVldEnd(), GoToFile(), Msg::kWarning, MSG, and SetCurrentVld().

00464                                                           {
00465   //
00466   //  Purpose:  Move to specified file in file list for requested stream(s).
00467   //
00468   //  Arguments: fullfilepathname = file name(if "" move to first file in list)
00469   //             streamname = name of stream to apply movement. if "*"(default)
00470   //                          will apply to all managed streams.
00471   //             
00472   //  Return: number of streams on which action is successfully applied.
00473   //
00474   //  Contact:   S. Kasahara
00475   // 
00476   //  Notes: After moving to file, the current record ptr is positioned at
00477   //         the first record of the file.
00478 
00479   VldContext newVld = Per::GetVldEnd();
00480 
00481   int nstream = 0;
00482   if ( streamname == "*" ) {
00483     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00484       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00485       if ( instream -> GoToFile(fullfilepathname) ) {
00486         nstream++;
00487         instream -> NextTags(); // must advance to first record of file
00488         newVld = instream->GetTags().GetVldContext();
00489       } 
00490     }
00491   }
00492   else {
00493     PerInputStream* instream = this -> GetOpenedStream(streamname);
00494     if ( instream ) {
00495       if ( instream -> GoToFile(fullfilepathname) ) {
00496         nstream++;
00497         instream -> NextTags(); // must advance to first record of file
00498         newVld = instream->GetTags().GetVldContext();
00499       }
00500     }
00501     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
00502                                   << endl;
00503   }
00504 
00505   if ( nstream ) {
00506     // Setting vld to lower or upper bound is determined by movement direction
00507     if ( newVld > fCurrentVld ) {
00508       VldContext minVld = this -> GetCurrentKeyVld(true);
00509       if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
00510     }
00511     else {
00512       VldContext maxVld = this -> GetCurrentKeyVld(false);
00513       if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
00514     }
00515   }
00516 
00517   return nstream;
00518 
00519 }

int PerInputStreamManager::GoToFile ( int  n,
std::string  streamname = "*" 
)

Definition at line 406 of file PerInputStreamManager.cxx.

References fCurrentVld, PerStreamManager::fStreamMap, GetCurrentKeyVld(), GetOpenedStream(), PerInputStream::GetTags(), PerRecordTags::GetVldContext(), Per::GetVldEnd(), Msg::kWarning, MSG, and SetCurrentVld().

Referenced by IoInputStreamItr::GoToFile(), GoToFile(), PerValidate::StreamFileGoToByIndex(), and PerValidate::StreamFileGoToByName().

00406                                                                {
00407   //
00408   //  Purpose:  Move to nth files in file list for requested stream(s).
00409   //
00410   //  Arguments: n = file number (0 is first)
00411   //             streamname = name of stream to apply movement. if "*"(default)
00412   //                          will apply to all managed streams.
00413   //             
00414   //  Return: number of streams on which action is successfully applied.
00415   //
00416   //  Contact:   S. Kasahara
00417   // 
00418 
00419   VldContext newVld = Per::GetVldEnd();
00420 
00421   int nstream = 0;
00422   if ( streamname == "*" ) {
00423     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00424       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00425       if ( instream -> GoToFile(n) ) {
00426         nstream++;
00427         instream -> NextTags(); // must advance to first record of file
00428         newVld = instream->GetTags().GetVldContext();
00429       } 
00430     }
00431   }
00432   else {
00433     PerInputStream* instream = this -> GetOpenedStream(streamname);
00434     if ( instream ) {
00435       if ( instream -> GoToFile(n) ) {
00436         nstream++;
00437         instream -> NextTags(); // must adavnce to first record of file
00438         newVld = instream->GetTags().GetVldContext();
00439       }
00440     }
00441     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
00442                                   << endl;
00443   }
00444 
00445   if ( nstream ) {
00446     // Setting vld to lower or upper bound is determined by movement direction
00447     if ( newVld > fCurrentVld ) {
00448       VldContext minVld = this -> GetCurrentKeyVld(true);
00449       if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
00450     }
00451     else {
00452       VldContext maxVld = this -> GetCurrentKeyVld(false);
00453       if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
00454     }
00455   }
00456 
00457   return nstream;
00458 
00459 }

bool PerInputStreamManager::IsBegin (  )  const [inline]

Definition at line 40 of file PerInputStreamManager.h.

References fCurrentVld.

Referenced by DemoInputModule::IsBegin(), IsSelectedSet(), LoadRecord(), and Previous().

00040 {return Per::IsBegin(fCurrentVld); }

int PerInputStreamManager::IsBeginOfFiles ( std::string  streamname = "*"  )  const

Definition at line 523 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, GetOpenedStream(), Msg::kWarning, and MSG.

Referenced by IoInputStreamItr::PrevFile().

00523                                                                     {
00524   //  Purpose:  Return number of streams that have reached begin of filelist.
00525   //
00526   //  Argument: name of stream to check status. If "*" (default), check all 
00527   //            managed streams
00528   //
00529   //  Contact:   S. Kasahara
00530   //
00531 
00532   int nbegin = 0;  
00533 
00534   if ( streamname != "*" ) {
00535     PerInputStream* instream = this -> GetOpenedStream(streamname);
00536     if ( instream ) {
00537       nbegin = instream -> IsBeginOfFiles();
00538     }
00539     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
00540                                   << endl;
00541   }
00542   else { 
00543     for ( StreamMapConstItr citr = fStreamMap.begin();
00544                             citr!= fStreamMap.end(); ++citr ) { 
00545       PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00546       nbegin += instream -> IsBeginOfFiles();
00547     }
00548   }
00549 
00550   return nbegin;
00551 
00552 }

bool PerInputStreamManager::IsEnd (  )  const [inline]

Definition at line 41 of file PerInputStreamManager.h.

References fCurrentVld.

Referenced by DemoInputModule::IsEnd(), IsSelectedSet(), LoadRecord(), and Next().

00041 { return Per::IsEnd(fCurrentVld); }

int PerInputStreamManager::IsEndOfFiles ( std::string  streamname = "*"  )  const

Definition at line 556 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, GetOpenedStream(), Msg::kWarning, and MSG.

Referenced by IoInputStreamItr::NextFile().

00556                                                                   {
00557   //  Purpose:  Return number of streams that have reached end of filelist.
00558   //
00559   //  Argument: name of stream to check status. If "*" (default), check all 
00560   //            managed streams
00561   //
00562   //  Contact:   S. Kasahara
00563   //
00564 
00565  
00566   int nend = 0;  
00567 
00568   if ( streamname != "*" ) {
00569     PerInputStream* instream = this -> GetOpenedStream(streamname);
00570     if ( instream ) {
00571       nend = instream -> IsEndOfFiles();
00572     }
00573     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
00574                                   << endl;
00575   }
00576   else { 
00577     for ( StreamMapConstItr citr = fStreamMap.begin();
00578                             citr!= fStreamMap.end(); ++citr ) { 
00579       PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00580       nend += instream -> IsEndOfFiles();
00581     }
00582   }
00583 
00584   return nend;
00585 
00586 }

bool PerInputStreamManager::IsFileEnd (  )  const

Definition at line 590 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, and PerInputStream::IsFileEnd().

Referenced by Next(), and DDSChildServer::Next().

00590                                             {
00591   //  Purpose:  Check if input file has been closed by writer on all
00592   //            managed streams.
00593   //
00594   //  Return:  true if file end detected on all streams.
00595   //
00596   //  Contact:   S. Kasahara
00597   //
00598   //  Notes:  Invokes PerInputStream::IsFileEnd for all streams.
00599   //          This method is useful for a user (e.g. dispatcher)
00600   //          reading open files.  Note that detecting file closure
00601   //          by the writer does not mean that the user has reached
00602   //          the end of the entries in the current record map.
00603   //          Use IsEnd()&&IsFileEnd() to determine if the user has
00604   //          reached the end of the current record map *and* all
00605   //          all files have been closed by the writer.
00606   //
00607 
00608   bool isFileEnd = true;  
00609 
00610   for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
00611     PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00612     if (!instream->IsFileEnd()) isFileEnd = false;
00613   }
00614 
00615   return isFileEnd;
00616 
00617 }

bool PerInputStreamManager::IsSelectedSet (  )  [private]

Definition at line 621 of file PerInputStreamManager.cxx.

References fCurrentVld, fGlobalSelection, fIsNewCurrentVldToSelect, fIsSelectedSet, PerInputStream::fSelection, PerStreamManager::fStreamMap, PerStream::fTObject, PerStream::fTTree, PerInputStream::fTTreeFormula, fTTreeFormula, PerRecordTags::GetIndexHi(), PerRecordTags::GetIndexLo(), PerInputStream::GetTags(), PerRecordTags::GetVldContext(), IsBegin(), IsEnd(), PerInputStream::IsRequired(), Msg::kVerbose, MSG, PerStream::Reset(), and UpdateTreeFormula().

Referenced by Get(), Next(), Previous(), and RecordsAt().

00621                                           {
00622   //  Purpose:  Check to see if record set corresponding to fCurrentVld
00623   //            has passed user specified selection cuts.
00624   //
00625   //  Arguments: none.
00626   //
00627   //  Return:  true if all records passed selection cut.
00628   //
00629   //  Contact:   S. Kasahara
00630   //
00631   //  Notes: If multiple streams have selection cuts applied, the record set
00632   //         is rejected based on an AND of the different
00633   //         selection cuts, i.e. rejecting one record rejects the entire
00634   //         set.
00635 
00636   if ( !fIsNewCurrentVldToSelect ) return fIsSelectedSet;
00637   
00638   fIsNewCurrentVldToSelect = false;
00639 
00640   if ( this -> IsEnd() || this -> IsBegin() ) {
00641     fIsSelectedSet = false;
00642     return fIsSelectedSet;
00643   }
00644   
00645   fIsSelectedSet = true;  
00646 
00647   if ( !fGlobalSelection.empty() ) {
00648     MSG("Per",Msg::kVerbose) << "Testing globally applied selection cut " 
00649                              << fGlobalSelection << endl;
00650     // Selection cut was applied globally
00651     this -> UpdateTreeFormula();
00652     if ( fTTreeFormula ) {
00653       for(StreamMapConstItr itr =fStreamMap.begin();
00654                             itr!=fStreamMap.end();++itr) {
00655         PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00656         const PerRecordTags& tags = instream->GetTags();
00657         Int_t idx = -1;
00658         if ( tags.GetVldContext() == fCurrentVld ) idx = tags.GetIndexLo();
00659         TTree* ttree = instream->fTTree;
00660         if ( ttree ) {
00661           ttree -> LoadTree(idx);
00662           if ( !instream->fTObject ) {
00663             instream -> fTBranch -> SetAddress(&(instream->fTObject));
00664           }
00665         }
00666       }
00667       Int_t arraySize = fTTreeFormula->GetNdata();
00668       bool ispass = false;
00669       for ( int jent = 0; jent < arraySize; jent++ ) {
00670         if ( fTTreeFormula -> EvalInstance(jent) != 0 ) ispass = true;  
00671       }
00672       if ( !ispass) fIsSelectedSet = false;
00673       for (StreamMapConstItr itr = fStreamMap.begin();
00674                              itr!= fStreamMap.end(); ++itr) {
00675         PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00676         instream -> Reset(true);
00677       }
00678     }
00679   }
00680   else {
00681     for(StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();++itr) {
00682       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00683       const PerRecordTags& tags = instream->GetTags();
00684       TTreeFormula* treeformula = instream->fTTreeFormula;
00685       if ( tags.GetVldContext() == fCurrentVld ) {
00686         if ( treeformula ) {
00687           MSG("Per",Msg::kVerbose) << "Testing locally applied selection cut " 
00688                 << instream->fSelection << " on stream " << itr->first << endl;
00689           TTree* ttree = instream -> fTTree;
00690           if ( ttree ) {
00691             for (Int_t idx=tags.GetIndexLo();idx <= tags.GetIndexHi(); idx++) {
00692               ttree -> LoadTree(idx);
00693               if ( !instream->fTObject ) {
00694                 instream -> fTBranch -> SetAddress(&(instream->fTObject));
00695               }
00696               Int_t arraySize = treeformula->GetNdata();
00697               bool ispass = false;
00698               for ( int jent = 0; jent < arraySize; jent++ ) {
00699                 if ( treeformula -> EvalInstance(jent) != 0 ) ispass = true;
00700               }
00701               if ( !ispass) fIsSelectedSet = false;
00702               instream->Reset(true);
00703             }
00704           }
00705         }
00706       }
00707       // Check to see if record set is missing a record from a required stream 
00708       else if ( instream->IsRequired() ) fIsSelectedSet = false;  
00709     }
00710 
00711   }
00712   
00713   if (!fIsSelectedSet) 
00714      MSG("Per",Msg::kVerbose) << "Record set failed selection cut" << endl;
00715   else 
00716      MSG("Per",Msg::kVerbose) << "Record set passed selection cut" << endl;
00717 
00718   return fIsSelectedSet;
00719 
00720 }

bool PerInputStreamManager::IsValidSelectionString (  )  const

Definition at line 724 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap.

Referenced by DDSChildServer::Next().

00724                                                          {
00725   // OR of PerInputStream::IsValidSelectionString() results, e.g.
00726   // if any one stream's application of selection string failed, return
00727   // false
00728 
00729   for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00730     PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00731     bool isValid = instream -> IsValidSelectionString();
00732     if ( !isValid ) return false;
00733   }
00734 
00735   return true;
00736   
00737 }

std::ostream & PerInputStreamManager::ListFile ( std::ostream &  os,
std::string  streamname = "*" 
) const

Definition at line 741 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, and GetOpenedStream().

Referenced by IoInputStreamItr::ListFile(), PerValidate::StreamFileAdd(), PerValidate::StreamFileGoToByIndex(), PerValidate::StreamFileGoToByName(), PerValidate::StreamFileNext(), PerValidate::StreamFilePrev(), PerValidate::StreamFileRemove(), and PerValidate::StreamMgrFileList().

00742                                                                    {
00743  //
00744  //  Purpose:  Print the contents of the fFileList for requested stream(s).
00745  //
00746  //  Arguments: ostream reference to print on and name of stream(s). If
00747  //             streamname == "*" (default), lists files on all streams.
00748  //
00749  //  Contact:   S. Kasahara
00750  // 
00751 
00752   if ( streamname == "*" ) {
00753     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00754       os << "Stream " << itr->first << ":" << endl;
00755       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00756       instream -> ListFile(os);
00757     }
00758   }
00759   else {
00760     PerInputStream* instream = this -> GetOpenedStream(streamname);
00761     if ( instream ) {
00762       os << "Stream " << streamname << ":" << endl;
00763       instream -> ListFile(os);
00764     }
00765     else os << "Stream " << streamname << " not open." << endl;
00766   }
00767 
00768   return os;
00769 
00770 }

Int_t PerInputStreamManager::LoadRecord ( MomNavigator mom  )  [private]

Definition at line 774 of file PerInputStreamManager.cxx.

References PerInputStream::AdvanceTags(), fCurrentVld, fRandomOverride, fRandomSeed, fRanGen, PerStreamManager::fStreamMap, PerInputStream::fSumTags, PerInputStream::GetMeanMom(), PerInputStream::GetPushRandom(), Per::GetSequenceMode(), Per::GetVldBegin(), GetVldContext(), PerRecordTags::GetVldContext(), Per::GetVldEnd(), IsBegin(), PerRecordTags::IsBegin(), IsEnd(), PerRecordTags::IsEnd(), Per::kKey, Per::kLowerBound, Per::kRandom, Per::kSequential, Msg::kVerbose, Per::kWindow, LoadRecordWithTag(), MSG, RemoveAllFragments(), and RemoveFragmentsNotInWindow().

Referenced by Get(), Next(), Previous(), and RecordsAt().

00774                                                          {
00775   //  Purpose:  Loads records with fCurrentVld in input MomNavigator
00776   //            object.
00777   //
00778   //  Argument: pointer to MomNavigator.  
00779   //
00780   //  Return:  Number of records loaded. 
00781   //
00782   //  Contact:   S. Kasahara
00783   //
00784   //  Notes: LoadRecord does not apply user selection cuts to the record set.
00785   //
00786 
00787   MSG("Per",Msg::kVerbose) << "Load Record Set w/currentVld " << fCurrentVld 
00788                            << endl;
00789   
00790   Int_t nrecord = 0;
00791   Int_t lastNRecord = 0;
00792   if ( !mom || this -> IsBegin() || this -> IsEnd() ) return nrecord;
00793 
00794   for ( StreamMapItr itr=fStreamMap.begin(); itr!=fStreamMap.end(); ++itr ) {
00795     PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00796     std::string streamName = itr->first;
00797 
00798     if ( instream -> GetSequenceMode() == Per::kKey ) {
00799       const PerRecordTags& tags = instream -> GetTags();
00800       if ( tags.GetVldContext() == fCurrentVld ) {
00801         nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00802       }
00803     }
00804     else if ( instream -> GetSequenceMode() == Per::kLowerBound ) {
00805       const PerRecordTags& tags = instream -> GetTags();
00806       // First clean Mom of previously loaded object from this stream if not this tag
00807       VldContext loVld = tags.GetVldContext();
00808       VldContext hiVld = tags.GetVldContext();
00809       this -> RemoveFragmentsNotInWindow(mom,streamName,loVld,hiVld);
00810       if ( !tags.IsBegin() && !tags.IsEnd() ) {
00811         nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00812       }
00813     }
00814     else if ( instream -> GetSequenceMode() == Per::kWindow ) {
00815       std::vector<PerRecordTags> tagslist = instream -> GetWindowTags();
00816       // First clean Mom of any objects not in range of tags on this list
00817       VldContext loVld = Per::GetVldEnd(); // set in this way all records will be removed
00818       VldContext hiVld = Per::GetVldBegin();
00819       if ( !tagslist.empty() ) {
00820         loVld = (tagslist[0]).GetVldContext();
00821         hiVld = (tagslist[tagslist.size()-1]).GetVldContext();
00822       }
00823       this ->  RemoveFragmentsNotInWindow(mom,streamName,loVld,hiVld);
00824       if ( !tagslist.empty() ) {
00825         std::vector<PerRecordTags>::iterator itr;
00826         for ( itr = tagslist.begin(); itr != tagslist.end(); itr++ ) {
00827           const PerRecordTags& tags = *itr;
00828           nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00829         }
00830       }
00831     }
00832     else if ( instream -> GetSequenceMode() == Per::kSequential ||
00833               instream -> GetSequenceMode() == Per::kRandom      ){
00834       // clear Mom of all entries of this sort
00835       this->RemoveAllFragments(mom,streamName);
00836 
00837       double meanMom = instream->GetMeanMom();
00838 
00839       if (!fRandomOverride) {
00840         fRandomSeed =
00841           int(10000*fStreamMap.size()+100*meanMom+(instream->fSumTags)+42);
00842         fRanGen->SetSeed(fRandomSeed);
00843       }
00844 
00845       int    numEvts =
00846         ( instream->GetPushRandom() )?
00847         fRanGen->Poisson(meanMom) : int(ceil(meanMom)) ;
00848 
00849       int iEvt = 0;
00850       while (iEvt < numEvts) {
00851         const PerRecordTags& tags = instream -> GetTags();
00852         int iSuccess = this -> LoadRecordWithTag(*instream,tags,mom);
00853         iEvt    += iSuccess;
00854         nrecord += iSuccess;
00855         if ( !instream->AdvanceTags() ) break;
00856       } // end loop over # events to push
00857     } // end if Per::kSequential || Per::kRandom
00858     lastNRecord = nrecord;
00859   } // end loop over stream map
00860 
00861   return nrecord;
00862 
00863 }

Int_t PerInputStreamManager::LoadRecordWithTag ( const PerInputStream instream,
const PerRecordTags tag,
MomNavigator mom 
) [private]

Definition at line 943 of file PerInputStreamManager.cxx.

References fCurrentVld, PerStream::GetClassName(), PerRecordTags::GetFileName(), PerRecordTags::GetIndexHi(), PerRecordTags::GetIndexLo(), PerInputStream::GetSequenceMode(), PerStream::GetStreamName(), PerRecordTags::GetTreeName(), Per::kKey, Per::kRandom, Per::kSequential, Msg::kVerbose, Registry::LockValues(), MSG, Registry::Set(), and Registry::UnLockValues().

Referenced by LoadRecord().

00945                                                                   {
00946   //  Purpose:  Loads records from this tag in input MomNavigator
00947   //            object.
00948   //
00949   //  Argument: pointer to MomNavigator.  
00950   //
00951   //  Return:  Number of records loaded. 
00952   //
00953   //  Contact:   S. Kasahara
00954   //
00955   //  Notes: LoadRecord does not apply user selection cuts to the record set.
00956   //
00957 
00958   int nrecord = 0;
00959 
00960   // Load the records in the tag set
00961   for (Int_t idx = tags.GetIndexLo(); idx <= tags.GetIndexHi(); idx++) {
00962     std::string streamName = instream.GetStreamName();
00963     TObject* object = mom -> GetFragmentByInputTag(streamName.c_str(),
00964             (tags.GetTreeName()).c_str(),idx,(tags.GetFileName()).c_str());
00965     if ( object ) continue;  // object already in Mom don't bother
00966     object = (const_cast<PerInputStream&>(instream)).GetObject(idx,false);
00967     if ( object ) {
00968       Registry* temptags = 0;
00969       if ( RecMinos* record = dynamic_cast<RecMinos*>(object) ) {
00970         temptags = &(record->GetTempTags());
00971         if ( instream.GetSequenceMode() != Per::kKey        &&
00972              instream.GetSequenceMode() != Per::kSequential &&
00973              instream.GetSequenceMode() != Per::kRandom      )
00974           record -> SetTransient(false);
00975       }
00976       else if ( RecRecord* record = dynamic_cast<RecRecord*>(object) ) {
00977         temptags = &(record->GetTempTags());
00978         if ( instream.GetSequenceMode() != Per::kKey        &&
00979              instream.GetSequenceMode() != Per::kSequential &&
00980              instream.GetSequenceMode() != Per::kRandom      )
00981           record -> SetTransient(false);
00982       }
00983         
00984       if ( temptags ) {
00985         // Stamp record with i/o tags
00986         temptags->UnLockValues();
00987         std::string tagname = instream.GetClassName()+".fTempTags";
00988         temptags->SetName(tagname.c_str());
00989         temptags->Set("stream",streamName.c_str());
00990         temptags->Set("tree",(tags.GetTreeName()).c_str());
00991         temptags->Set("index",idx);
00992         temptags->Set("file",(tags.GetFileName()).c_str());
00993         temptags->LockValues();
00994       }
00995       // And add record to Mom, mom now owns record
00996       if ( nrecord == 0) {
00997         MSG("Per",Msg::kVerbose) << "..Loading records with Vld" 
00998                                  <<  fCurrentVld << " from stream " 
00999                                  << streamName.c_str() << ":" << endl;
01000       }
01001       MSG("Per",Msg::kVerbose) << "....Record " << nrecord << " from"
01002                                << " stream " << streamName
01003                                << " tree " << tags.GetTreeName()
01004                                << " index " << idx
01005                                << " file "<< tags.GetFileName()
01006                                << endl;
01007       mom -> AdoptFragment(object);
01008         
01009       nrecord++;
01010     } // object retrieved
01011   } // loop over tag tree indices
01012 
01013   return nrecord; 
01014 
01015 }

Int_t PerInputStreamManager::Next ( MomNavigator mom = 0,
UInt_t  advanceby = 1 
)

Definition at line 1019 of file PerInputStreamManager.cxx.

References AdvanceRecordTags(), fCurrentVld, fUpdateMode, IsEnd(), IsFileEnd(), IsSelectedSet(), Msg::kVerbose, Msg::kWarning, LoadRecord(), MSG, and RewindRecordTags().

Referenced by IoInputStreamItr::Increment(), DemoInputModule::Next(), DDSChildServer::Next(), PerValidate::StreamMgrFileChangeSeq(), PerValidate::StreamMgrParallelFileSeq(), PerValidate::StreamMgrSelectionSeq(), PerValidate::StreamMgrSkipByThreeSeq(), PerValidate::StreamMgrSkipByTwoSeq(), PerValidate::StreamMgrTagsSeq(), and PerValidate::StreamMgrTagsSeqZigZag().

01019                                                                      {
01020   //  Purpose:  Retrieve next set of records from managed streams of
01021   //            common VldContext which satisfy user's  
01022   //            selection cuts (if specified).  The records are 
01023   //            extracted from the input streams and loaded into the
01024   //            input MomNavigator object.    
01025   //
01026   //  Argument: mom        pointer to MomNavigator. (If set 0 (default), the 
01027   //                       method will advance to the record set of interest
01028   //                       but not load the records.  The user can then
01029   //                       retrieve the record set with the Get method.)
01030   //            advanceby  number of selected record sets to advance by
01031   //                       (default = 1).  Can be used to skip over record
01032   //                       sets.
01033   //
01034   //  Return:  number of selected record sets advanced.
01035   //
01036   //  Contact:   S. Kasahara
01037   //
01038   //  Notes:  Entries are subject to any user-specified selection strings
01039   //          as described under PerInputStream::GetObject and 
01040   //          PerInputStreamManager::IsSelectedSet.
01041   //
01042 
01043   MSG("Per",Msg::kVerbose) << "Next advance by " << advanceby << " sets." 
01044                            << " Start at vld " << fCurrentVld << endl;
01045   
01046   if ( this -> IsEnd() ) return 0;
01047   VldContext saveCurrentVld = fCurrentVld; 
01048 
01049   // Search for entry satisfying selection cuts
01050   UInt_t nselect = 0;
01051   while ( nselect < advanceby && this -> AdvanceRecordTags() ) {
01052     MSG("Per",Msg::kVerbose) << "Advanced to vld " << fCurrentVld 
01053                              << endl;
01054     if ( IsSelectedSet() ) {
01055       nselect++;
01056       if ( nselect >= advanceby ) {
01057         // Load record in Mom
01058         if ( mom ) this -> LoadRecord(mom);
01059       }
01060     }
01061   }
01062 
01063   if ( advanceby > 1 ) {
01064     // current vld is left at last record served if in updatemode
01065     if ( nselect < advanceby && 
01066          fUpdateMode         && 
01067          !IsFileEnd()         ) {
01068       while ( fCurrentVld > saveCurrentVld ) {
01069         this -> RewindRecordTags();
01070       }
01071       // At this point fCurrentVld should be back to original
01072       if ( saveCurrentVld != fCurrentVld ) {
01073         MSG("Per",Msg::kWarning) 
01074           << "Failure to rewind to original state in Next method." << endl;
01075       }
01076       nselect = 0; 
01077     }
01078   }
01079 
01080   return (Int_t)nselect;
01081 
01082 }

int PerInputStreamManager::NextFile ( int  n = 1,
std::string  streamname = "*" 
)

Definition at line 1086 of file PerInputStreamManager.cxx.

References fCurrentVld, PerStreamManager::fStreamMap, GetCurrentKeyVld(), GetOpenedStream(), Msg::kVerbose, Msg::kWarning, MSG, and SetCurrentVld().

Referenced by IoInputStreamItr::NextFile(), and PerValidate::StreamFileNext().

01086                                                                {
01087   //
01088   //  Purpose:  Move stream(s) forward n files in file list.  If n exceeds 
01089   //            number of files to end of list, ptr is left at eof marker.  
01090   //
01091   //  Arguments: n = number of files to advance (default = 1)
01092   //             streamname = name of stream to apply movement. if "*"(default)
01093   //                          will apply to all managed streams.
01094   //             
01095   //  Return: number of streams on which action is successfully applied.
01096   //
01097   //  Contact:   S. Kasahara
01098   // 
01099   //  Notes: After moving to file, the current record ptr is positioned at
01100   //         the first record of the file.
01101 
01102   MSG("Per",Msg::kVerbose) << "PerInputStreamManager::NextFile("
01103                            << n << "," << streamname << ") called"
01104                            << endl;
01105 
01106   int nstream = 0;
01107   if ( streamname == "*" ) {
01108     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01109       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01110       if ( instream -> NextFile(n) ) {
01111         nstream++;
01112         instream -> NextTags(); // must advance to first record of file
01113       } 
01114     }
01115   }
01116   else {
01117     PerInputStream* instream = this -> GetOpenedStream(streamname);
01118     if ( instream ) { 
01119       if ( instream -> NextFile(n) ) {
01120         nstream++;
01121         instream -> NextTags(); // must advance to first record of file
01122       }
01123     }
01124     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
01125                                   << endl;
01126   }
01127 
01128   // Recalculate fCurrentVld to point to lower bound of new set of tags
01129   VldContext minVld = this -> GetCurrentKeyVld(true);
01130   if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
01131 
01132   return nstream;
01133 
01134 }

PerInputStream * PerInputStreamManager::OpenStream ( std::string  streamName,
std::string  treeName 
)

Definition at line 1167 of file PerInputStreamManager.cxx.

References fLastServed, PerStreamManager::fStreamMap, fUpdateMode, Per::GetDefSequenceMode(), Per::GetVldEnd(), Msg::kWarning, MSG, SetSequenceMode(), and SetUpdateMode().

Referenced by DemoInputModule::BeginJob(), IoInputStreamItr::DefineStream(), PerValidate::RunAllTests(), IoInputStreamItr::Streams(), and DDSChildServer::Subscribe().

01168                                                                       {
01169   //
01170   //  Purpose:  Open input stream with name streamName serving tree treeName.
01171   //
01172   //  Arguments: streamName  string   name of stream to be opened. Names of 
01173   //                                  managed streams must be unique.
01174   //             treeName    string   name of tree served by this stream.
01175   //
01176   //  Return:  pointer to PerInputStream. If unable to open stream (because
01177   //           stream with this streamName has already been opened), returns
01178   //           (PerInputStream*)0.
01179   //
01180   //  Contact:   S. Kasahara
01181   //
01182   //  Notes: PerInputStream objects are owned by the PerInputStreamManager 
01183   //         and should only be deleted through the 
01184   //         PerStreamManager::CloseStream method.
01185 
01186   bool openok = false;
01187 
01188   PerInputStream* stream=dynamic_cast<PerInputStream*>(fStreamMap[streamName]);
01189   if ( !stream ) {
01190     // Stream not found in map, need to create new one
01191     stream = new PerInputStream(treeName);
01192     stream -> SetStreamName(streamName);
01193     stream -> SetUpdateMode(fUpdateMode);
01194     Per::ESequenceMode seqMode = Per::GetDefSequenceMode(streamName.c_str());
01195     stream -> SetSequenceMode(seqMode);
01196     fStreamMap[streamName] = stream;
01197     fLastServed[streamName] = Per::GetVldEnd();
01198     openok = true;
01199   }
01200   else {
01201     MSG("Per",Msg::kWarning)
01202       <<"Stream manager failed to open\nrequested stream "
01203       << streamName << " because name conflicts with previously opened stream."
01204       << endl;
01205   }
01206 
01207   return (openok) ? stream : (PerInputStream*)0;
01208 
01209 }

int PerInputStreamManager::PrevFile ( int  n = 1,
std::string  streamname = "*" 
)

Definition at line 1298 of file PerInputStreamManager.cxx.

References fCurrentVld, PerStreamManager::fStreamMap, GetCurrentKeyVld(), GetOpenedStream(), Msg::kWarning, MSG, and SetCurrentVld().

Referenced by IoInputStreamItr::PrevFile(), RewindRecordTags(), PerValidate::StreamFileNext(), and PerValidate::StreamFilePrev().

01298                                                                {
01299   //
01300   //  Purpose:  Move stream(s) back n files in file list.  If n exceeds 
01301   //            number of files to beginning of list, ptr is left at first
01302   //            file.
01303   //
01304   //  Arguments: n = number of files to rewind (default = 1)
01305   //             streamname = name of stream to apply movement. if "*"(default)
01306   //                          will apply to all managed streams.
01307   //             
01308   //  Return: number of streams on which action is successfully applied.
01309   //
01310   //  Contact:   S. Kasahara
01311   // 
01312   //  Notes: After moving to file, the current record ptr is positioned at
01313   //         the first record of the file.
01314 
01315   int nstream = 0;
01316   if ( streamname == "*" ) {
01317     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01318       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01319       if ( instream -> PrevFile(n) ) {
01320         nstream++;
01321         instream -> NextTags(); // position at first record of file
01322       } 
01323     }
01324   }
01325   else {
01326     PerInputStream* instream = this -> GetOpenedStream(streamname);
01327     if ( instream ) {
01328       if ( instream -> PrevFile(n) ) {
01329         nstream++;
01330         instream -> NextTags(); // position at first record of file
01331       }
01332     }
01333     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
01334                                   << endl;
01335   }
01336 
01337   // Recalculate fCurrentVld to point to upper bound of new set of tags
01338   VldContext maxVld = this -> GetCurrentKeyVld(false);
01339   if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
01340 
01341   return nstream;
01342 
01343 }

Int_t PerInputStreamManager::Previous ( MomNavigator mom = 0,
UInt_t  rewindby = 1 
)

Definition at line 1254 of file PerInputStreamManager.cxx.

References IsBegin(), IsSelectedSet(), LoadRecord(), and RewindRecordTags().

Referenced by IoInputStreamItr::Decrement(), DDSChildServer::Next(), DemoInputModule::Previous(), PerValidate::StreamMgrFileChangeSeq(), PerValidate::StreamMgrParallelFileSeq(), PerValidate::StreamMgrSelectionSeq(), PerValidate::StreamMgrSkipByThreeSeq(), PerValidate::StreamMgrSkipByTwoSeq(), PerValidate::StreamMgrTagsSeq(), and PerValidate::StreamMgrTagsSeqZigZag().

01254                                                                         {
01255   //  Purpose:  Retrieve previous set of records from managed streams of
01256   //            common VldContext which satisfy user's  
01257   //            selection cuts (if specified).  The records are 
01258   //            extracted from the input streams and loaded into the
01259   //            input MomNavigator object.    
01260   //
01261   //  Argument: mom        pointer to MomNavigator (default=0 means 
01262   //                       streams are rewound but no record set is loaded.
01263   //                       The user can then use Get to retrieve the record 
01264   //                       set.)
01265   //            rewindby   number of selected record sets to rewind by
01266   //                       (default = 1).  Can be used to skip over record
01267   //                       sets.
01268   //
01269   //  Return:  number of record sets rewound.  
01270   //
01271   //  Contact:   S. Kasahara
01272   //
01273   //  Notes:  Entries are subject to any user-specified selection strings
01274   //          as described under PerInputStream::GetObject and 
01275   //          PerInputStreamManager::IsSelectedSet.
01276   //
01277 
01278   if ( this -> IsBegin() ) return 0;
01279 
01280   UInt_t nselect = 0;
01281   // Search for entry satisfying selection cuts
01282   while ( nselect < rewindby && this -> RewindRecordTags() ) {
01283     if ( IsSelectedSet() ) {
01284       nselect++;
01285       if ( nselect >= rewindby ) {
01286         // Load record in Mom
01287         if ( mom ) this -> LoadRecord(mom);
01288       }
01289     }
01290   }
01291 
01292   return (Int_t)nselect;
01293 
01294 }

std::ostream & PerInputStreamManager::Print ( std::ostream &  s,
const char *  option = "" 
) const

Definition at line 1347 of file PerInputStreamManager.cxx.

References bfld::AsString(), PerStreamManager::fPrintOpt, PerStreamManager::fStreamMap, PerStream::GetEntry(), PerStream::GetFullFilePathName(), PerStream::GetNumEntries(), PerInputStream::GetSequenceMode(), and PerInputStream::IsFileEnd().

Referenced by operator<<(), and DemoInputModule::Print().

01348                                                                      {
01349   //
01350   //  Purpose:  Print status of input stream manager on std::ostream.
01351   //
01352   //  Arguments: os           std::ostream to display on.
01353   //             option       verbosity level ("" (default), or "brief")
01354   //
01355   //  Return:  std::ostream reference.
01356   //
01357   //  Contact:   S. Kasahara
01358   //
01359 
01360   TString opt = option;
01361   opt.ToLower();
01362   if ( opt == "brief" || opt.IsNull() && fPrintOpt == "brief" ) {
01363     Int_t nenabled = 0;
01364     for ( StreamMapConstItr citr  = fStreamMap.begin();
01365                             citr != fStreamMap.end(); ++ citr ) {
01366       PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
01367       if ( instream -> IsEnabled() && instream -> IsOpen() ) {
01368         os << "    " << nenabled << ")" << citr->first << " serving tree " 
01369            << instream -> GetTreeName()
01370            << " w/" << instream->GetNumEntries() 
01371            << " entries (current entry = " << instream->GetEntry()
01372            << ") using sequence mode " 
01373            << Per::AsString(instream->GetSequenceMode())
01374            << "\n      from the " 
01375            << ((instream->IsFileEnd()) ? "closed file " : "open file ")  
01376            << instream->GetFullFilePathName() << "." << endl;
01377         nenabled++;
01378       }
01379     }
01380   }
01381   else {
01382     os << "PerInput";
01383     PerStreamManager::Print(os);
01384   }
01385   return os;
01386 
01387 }

VldContext PerInputStreamManager::RecordsAt ( MomNavigator mom,
const VldContext vld 
)

Definition at line 1391 of file PerInputStreamManager.cxx.

References AdvanceRecordTags(), fCurrentVld, IsSelectedSet(), LoadRecord(), and RewindRecordTags().

Referenced by IoInputStreamItr::GoTo(), IoInputStreamItr::GoToEOF(), and PerValidate::StreamMgrParallelFileSeq().

01392                                                                    {
01393   //  Purpose:  Retrieve record set with requested VldContext from managed
01394   //            record streams.  Record set is subject to selection
01395   //            cuts as specified by user.  The records are extracted from
01396   //            the input streams and loaded into the input MomNavigator
01397   //            object.  If exact match not found, the record set corresponding
01398   //            to the validity just beyond that requested is loaded.
01399   //
01400   //  Argument: mom        pointer to MomNavigator
01401   //                       (if == 0, streams are moved to vld but records are
01402   //                        not loaded)
01403   //            vld        VldContext of requested record set
01404   //
01405   //  Return:  VldContext of new record set (= Per::GetVldEnd() or 
01406   //           Per::GetVldBegin() if end-of-input-streams or 
01407   //           begin-of-input-streams respectively).
01408   //
01409   //  Contact:   S. Kasahara
01410   //
01411 
01412   if ( vld == fCurrentVld && IsSelectedSet() ) {
01413     if ( mom ) this -> LoadRecord(mom);
01414     return fCurrentVld;
01415   }
01416 
01417   if ( vld < fCurrentVld ) {
01418     while ( RewindRecordTags() ) {
01419       if ( vld >= fCurrentVld && IsSelectedSet() ) {
01420         if ( mom ) this -> LoadRecord(mom);
01421         return fCurrentVld;
01422       }
01423     }
01424   }
01425   else {
01426     while ( AdvanceRecordTags() ) {
01427       if ( vld <= fCurrentVld && IsSelectedSet() ) {
01428         if ( mom ) this -> LoadRecord(mom);
01429         return fCurrentVld;
01430       }
01431     }
01432   }
01433 
01434   return fCurrentVld; // begin-or-end-of-streams
01435 
01436 }

void PerInputStreamManager::RemoveAllFragments ( MomNavigator mom,
std::string  streamName 
) [private]

Definition at line 908 of file PerInputStreamManager.cxx.

References Get(), MomNavigator::GetFragmentArray(), and DataUtil::GetTempTags().

Referenced by LoadRecord().

00909                                                           {
00910   //  Purpose:  Remove fragments of stream streamName from mom
00911   //
00912   //  Argument: pointer to MomNavigator & streamName  
00913   //
00914   //  Return:  none
00915   //
00916   //  Contact:   S. Kasahara & K. Arms
00917   //
00918 
00919   TObjArray* fragarray=const_cast<TObjArray*>(mom->GetFragmentArray());
00920   for (int idx = fragarray->GetEntriesFast()-1; idx >= 0; idx--) {
00921     TObject* oldobject = fragarray -> At(idx);
00922     Registry* oldiotags = 0;
00923     if ( RecMinos* oldrecord = dynamic_cast<RecMinos*>(oldobject) ) {
00924       oldiotags = &(oldrecord -> GetTempTags());
00925     }
00926     else if(RecRecord* oldrecord = dynamic_cast<RecRecord*>(oldobject)){
00927       oldiotags = &(oldrecord -> GetTempTags());
00928     }
00929     if ( oldiotags ) {
00930       const char* oldtagstream = 0;
00931       if ( oldiotags -> Get("stream",oldtagstream)     &&
00932            strcmp(streamName.c_str(),oldtagstream) == 0 ) {
00933           fragarray->RemoveAt(idx);
00934           delete oldobject; oldobject = 0;
00935       } // end if tags are from stream streamName
00936     } // end if tags exist
00937   } // end for loop over mom fragment array
00938   
00939 }

int PerInputStreamManager::RemoveFile ( std::string  fullfilepathname = "*",
std::string  streamname = "*" 
)

Definition at line 1440 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, GetOpenedStream(), Msg::kWarning, and MSG.

Referenced by IoInputStreamItr::RemoveFile(), PerValidate::StreamFileRemove(), and PerValidate::StreamMgrParallelFileSeq().

01441                                                             {
01442   //
01443   //  Purpose:  Remove file from file list of specified stream(s).
01444   //
01445   //  Arguments: fullfilepathname (if "*" (default), remove all files.)
01446   //             streamname (if "*" (default), apply to all streams)
01447   //             
01448   //  Return: number of streams on which action is successfully applied.
01449   //         
01450   //  Contact:   S. Kasahara
01451   // 
01452 
01453   int nstream = 0;
01454   if ( streamname == "*" ) {
01455     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01456       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01457       nstream += instream -> RemoveFile(fullfilepathname); 
01458     }
01459   }
01460   else {
01461     PerInputStream* instream = this -> GetOpenedStream(streamname);
01462     if ( instream ) nstream += instream -> RemoveFile(fullfilepathname);
01463     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
01464                                   << endl;
01465   }
01466 
01467   return nstream;
01468 
01469 }

void PerInputStreamManager::RemoveFragmentsNotInWindow ( MomNavigator mom,
std::string  streamName,
const VldContext loVld,
const VldContext hiVld 
) [private]

Definition at line 867 of file PerInputStreamManager.cxx.

References Get(), MomNavigator::GetFragmentArray(), DataUtil::GetTempTags(), and GetVldContext().

Referenced by LoadRecord().

00869                                                                   {
00870   //  Purpose:  Remove fragments < loVld || > hiVld
00871   //
00872   //  Argument: pointer to MomNavigator.  
00873   //
00874   //  Return:  none
00875   //
00876   //  Contact:   S. Kasahara
00877   //
00878 
00879   TObjArray* fragarray=const_cast<TObjArray*>(mom->GetFragmentArray());
00880   for (int idx = fragarray->GetEntriesFast()-1; idx >= 0; idx--) {
00881     TObject* oldobject = fragarray -> At(idx);
00882     Registry* oldiotags = 0;
00883     const VldContext* oldvldc = 0;
00884     if ( RecMinos* oldrecord = dynamic_cast<RecMinos*>(oldobject) ) {
00885       oldiotags = &(oldrecord -> GetTempTags());
00886       oldvldc = oldrecord -> GetVldContext();
00887     }
00888     else if(RecRecord* oldrecord = dynamic_cast<RecRecord*>(oldobject)){
00889       oldiotags = &(oldrecord -> GetTempTags());
00890       oldvldc = &(oldrecord -> GetHeader().GetVldContext());
00891     }
00892     if ( oldiotags ) {
00893       const char* oldtagstream = 0;
00894       if ( oldiotags -> Get("stream",oldtagstream) 
00895         && strcmp(streamName.c_str(),oldtagstream) == 0 ) {
00896         if ( oldvldc && (*oldvldc < loVld || *oldvldc > hiVld) ) {
00897           fragarray->RemoveAt(idx);
00898           delete oldobject; oldobject = 0;
00899         }
00900       }
00901     }
00902   }
00903   
00904 }

int PerInputStreamManager::RewindRecordTags (  )  [private]

Definition at line 1473 of file PerInputStreamManager.cxx.

References AdvanceLowerBoundTags(), AdvanceWindowTags(), fCurrentVld, fLastServed, PerStreamManager::fStreamMap, PerInputStream::GetSequenceMode(), PerInputStream::GetTags(), Per::GetVldBegin(), PerRecordTags::GetVldContext(), PerRecordTags::IsBegin(), PerInputStream::IsBeginOfFiles(), PerRecordTags::IsEnd(), Per::kKey, Per::kSequential, Msg::kWarning, MSG, PrevFile(), PerInputStream::PrevTags(), and SetCurrentVld().

Referenced by AdvanceRecordTags(), Next(), Previous(), and RecordsAt().

01473                                             {
01474   //  Purpose:  Rewind record tags in managed streams to one previous to
01475   //            fCurrentVld.
01476   //
01477   //  Argument: none.
01478   //
01479   //  Return:  1 if success, else 0 (end of all 
01480   //           streams reached)
01481   //
01482   //  Contact:   S. Kasahara
01483   //
01484   //  Notes: This method checks each stream's current record set to see if the 
01485   //         set has a vld that matches or exceeds fCurrentVld.  If so, it 
01486   //         rewinds one record set on that stream.  
01487   //         At the end of the method, if rewind has been successful,
01488   //         fCurrentVld is set to point to the highest validity
01489   //         of all the current stream record tags, or Per::GetVldBegin()
01490   //         if at the beginning of all streams. 
01491 
01492   VldContext newVld = Per::GetVldBegin();
01493   int nKey = 0;
01494   int nSeq = 0;
01495   bool allSeqDone = true;
01496 
01497   for ( StreamMapItr sitr=fStreamMap.begin();sitr != fStreamMap.end();++sitr ){
01498     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01499     if ( instream->GetSequenceMode() != Per::kKey ) continue;
01500     nKey++;
01501     PerRecordTags tags = instream->GetTags();
01502     bool isdone = false;
01503     while ( !isdone ) {
01504       isdone = true;
01505       if ( tags.IsEnd() || tags.GetVldContext() >= fCurrentVld 
01506                         || (tags.IsBegin() && !instream->IsBeginOfFiles()) ) {
01507         tags = instream->PrevTags();
01508         if ( tags.IsBegin() ) {
01509           if ( instream -> PrevFile() ) isdone = false;
01510         }
01511       }
01512     }
01513     if ( tags.GetVldContext() > newVld ) newVld = tags.GetVldContext();
01514   }
01515 
01516   // if there is no kKey stream, look for a kSequential stream
01517   if ( !nKey ) {
01518     for ( StreamMapItr sitr=fStreamMap.begin();
01519           sitr != fStreamMap.end();++sitr ){
01520       PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01521       if ( instream->GetSequenceMode() != Per::kSequential ) continue;
01522       nSeq++;
01523       PerRecordTags tags = instream->GetTags();
01524       bool isdone = false;
01525       while ( !isdone ) {
01526         isdone = true;
01527         if ( tags.IsEnd()                                     ||
01528              ( tags.IsBegin() && !instream->IsBeginOfFiles() ) ) {
01529           tags = instream->PrevTags();
01530           if ( tags.IsBegin() ) {
01531             if ( instream -> PrevFile() ) isdone = false;
01532           }
01533         }
01534       }
01535       if ( tags.GetVldContext() > newVld ) newVld = tags.GetVldContext();
01536       if ( !( tags.IsBegin() && instream->IsBeginOfFiles() ) ) allSeqDone = false;
01537     } // end loop over the stream map
01538   } // end if no kKey stream
01539 
01540   if ( !nKey && !nSeq) {
01541     // Must be at least one key stream in list of managed streams
01542     MSG("Per",Msg::kWarning)
01543       << "No stream of sequence mode Per::kKey or Per::kSequential "
01544       << "found.\n Must be at least one key or sequential stream to "
01545       << "facilitate sequencing."
01546       << std::endl;
01547     return 0;
01548   }
01549 
01550   if ( fCurrentVld == newVld && !nSeq ) {
01551     return 0; // failure
01552   }
01553 
01554   if ( nSeq && allSeqDone ) {
01555     return 0; // failure
01556   }
01557 
01558   for ( StreamMapItr sitr = fStreamMap.begin();
01559         sitr != fStreamMap.end(); ++sitr)  {
01560     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01561     if ( instream->GetSequenceMode() != Per::kKey       &&
01562          instream->GetSequenceMode() != Per::kSequential ) continue;
01563     fLastServed[sitr->first] = Per::GetVldBegin();
01564   }
01565 
01566   SetCurrentVld(newVld); //equals Per::GetVldBegin() when begin of all strms reached
01567   this -> AdvanceLowerBoundTags(fCurrentVld);
01568   this -> AdvanceWindowTags(fCurrentVld);
01569   // kSequential and kRandom streams advance themselves as appropriate
01570 
01571   return 1;
01572 
01573 }

void PerInputStreamManager::SetCurrentVld ( const VldContext vld  )  [private]

Definition at line 1610 of file PerInputStreamManager.cxx.

References fCurrentVld, and fIsNewCurrentVldToSelect.

Referenced by AddFile(), AdvanceRecordTags(), CloseFile(), CloseStream(), GoToFile(), NextFile(), PrevFile(), RewindRecordTags(), and SetFile().

01610                                                                {
01611   // Set fCurrentVld to vld, and mark that IsSelected needs to be
01612   // applied to this validity
01613  
01614   if ( vld != fCurrentVld ) fIsNewCurrentVldToSelect = true;
01615   fCurrentVld = vld;
01616 
01617 }

bool PerInputStreamManager::SetFile ( string  streamName,
string  fullFilePathName,
Per::EAccessMode  accessmode 
) [virtual]

Reimplemented from PerStreamManager.

Definition at line 1577 of file PerInputStreamManager.cxx.

References PerStreamManager::GetNumStreamOpen(), Per::GetVldBegin(), and SetCurrentVld().

Referenced by DemoInputModule::BeginFile(), PerValidate::RunAllTests(), PerValidate::StreamMgrFileChangeSeq(), and PerValidate::StreamMgrParallelFileSeq().

01578                                                                {
01579   //  Purpose:  Sets new file for specified stream(s).
01580   //
01581   //  Argument: streamName  string  name of stream to set file.
01582   //                                if streamName="*" (default), all
01583   //                                streams will have their file set.
01584   //            fullFilePathName string  new filename.
01585   //            accessMode          Per::EAccessMode accessMode in which to
01586   //                                open file.
01587   //
01588   //  Return:  bool  set true if PerInputStream::SetFile returned true for at
01589   //                 least one of the requested streams.
01590   //
01591   //  Contact:   S. Kasahara
01592   //
01593   //  Notes:  Invokes PerInputStream::SetFile for each requested stream.
01594   //
01595 
01596    Int_t oldNumStream = this -> GetNumStreamOpen();
01597    bool openok 
01598    = PerStreamManager::SetFile(streamName,fullFilePathName,accessMode);
01599    // reset if first, otherwise leave it where it is
01600    if ( openok && oldNumStream == 0) {
01601      SetCurrentVld(Per::GetVldBegin());
01602    } 
01603 
01604    return openok;
01605 
01606 }

void PerInputStreamManager::SetFileEnd ( bool  isFileEnd = true  ) 

Definition at line 1621 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, and Munits::second.

Referenced by DDSChildServer::Next().

01621                                                      {
01622   //  Purpose:  Force file end on input file.  Can be used for recovering
01623   //            aborted files.
01624   //
01625   //  Arguments: isFileEnd  true (default) to force file end
01626   //
01627   //  Return:  none.
01628   //
01629   //  Contact:   S. Kasahara
01630   //
01631 
01632   for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01633     PerInputStream* instream = dynamic_cast<PerInputStream*>(citr -> second);
01634     instream -> SetFileEnd(isFileEnd);
01635   }
01636 
01637   return;
01638 
01639 }

void PerInputStreamManager::SetMaxFileRepeat ( std::string  streamName,
int  numRepeat = 0 
)

Definition at line 1726 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, GetOpenedStream(), and Munits::second.

Referenced by IoInputStreamItr::SetMaxFileRepeat().

01728 {
01729   // Purpose: Set the maximum number of times to reuse files
01730   //          from this stream before moving to the next file
01731   //          (only used for kSequential & kRandom stream sequence
01732   //          modes)
01733   //
01734   // Argument: stream name of interest, max number (int >= 0)
01735   //
01736   // Return: none
01737   //
01738   // Note: Invokes PerInPutStream::SetMaxFileRepeat() for each stream
01739   //
01740 
01741   if (numRepeat < 0) numRepeat = 0;
01742 
01743   if (streamName == "*") {
01744     // Set info for all streams
01745     for ( StreamMapConstItr citr = fStreamMap.begin(); 
01746           citr!= fStreamMap.end(); ++citr ) {
01747       ((PerInputStream*)(citr -> second)) -> SetMaxFileRepeat(numRepeat);
01748     } // end loop over streams
01749   }
01750   else {
01751     // Set info for specified streamName
01752     PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01753     stream -> SetMaxFileRepeat(numRepeat);
01754   }
01755 }

void PerInputStreamManager::SetMaxSyncDelay ( UInt_t  maxSyncDelay  )  [inline]

Definition at line 57 of file PerInputStreamManager.h.

References fMaxSyncDelay.

Referenced by DDSChildServer::Subscribe().

00057 { fMaxSyncDelay = maxSyncDelay; }

void PerInputStreamManager::SetMeanMom ( std::string  streamName = "*",
double  mm = 0. 
)

Definition at line 1759 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, GetOpenedStream(), and Munits::second.

Referenced by IoInputStreamItr::SetMeanMom().

01760 {
01761   // Purpose: Set the mean number of events to push to Mom from this stream
01762   //
01763   // Argument: stream name of interest, mean number (double >= 0.)
01764   //
01765   // Return: none
01766   //
01767   // Note: Invokes PerInPutStream::SetMeanMom() for each stream
01768   //
01769 
01770   if (mm < 0.) return;
01771 
01772   if (streamName == "*") {
01773     // Set info for all streams
01774     for ( StreamMapConstItr citr = fStreamMap.begin(); 
01775           citr!= fStreamMap.end(); ++citr ) {
01776       ((PerInputStream*)(citr -> second)) -> SetMeanMom(mm);
01777     } // end loop over streams
01778   }
01779   else {
01780     // Set info for specified streamName
01781     PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01782     stream -> SetMeanMom(mm);
01783   }
01784 }

void PerInputStreamManager::SetPerOwnedDisabled ( std::string  streamName = "*",
bool  perOwnedDisabled = true 
)
void PerInputStreamManager::SetPushRandom ( std::string  streamName = "*",
bool  tf = true 
)

Definition at line 1788 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, GetOpenedStream(), and Munits::second.

Referenced by IoInputStreamItr::SetPushRandom().

01789 {
01790   // Purpose: Set whether to push a constant (false) or random (true)
01791   //          number of events to Mom from this stream
01792   //
01793   // Argument: stream name of interest, bool
01794   //
01795   // Return: none
01796   //
01797   // Note: Invokes PerInPutStream::SetPushRandom() for each stream
01798   //
01799 
01800   if (streamName == "*") {
01801     // Set info for all streams
01802     for ( StreamMapConstItr citr = fStreamMap.begin(); 
01803           citr!= fStreamMap.end(); ++citr ) {
01804       ((PerInputStream*)(citr -> second)) -> SetPushRandom(tf);
01805     } // end loop over streams
01806   }
01807   else {
01808     // Set info for specified streamName
01809     PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01810     stream -> SetPushRandom(tf);
01811   }
01812 }

void PerInputStreamManager::SetRandomSeed ( int  rSeed = 0  ) 

Definition at line 1816 of file PerInputStreamManager.cxx.

References fRandomOverride, fRandomSeed, fRanGen, Msg::kInfo, and MSG.

Referenced by IoInputStreamItr::SetRandomSeed().

01817 { 
01818   // User input method. If code modifies the random seed, use fRanGen->SetSeed()
01819   fRanGen->SetSeed(rSeed);
01820   fRandomSeed = rSeed;
01821   fRandomOverride = true;
01822   MSG("Per",Msg::kInfo)
01823     << "PerInputStreamManager::SetRandomSeed : Random Seed set to "
01824     << ((rSeed==0)? fRanGen->GetSeed() : fRandomSeed) << endl;
01825 
01826 }

void PerInputStreamManager::SetSelection ( std::string  streamName = "*",
std::string  selection = "",
bool  isRequired = false 
)
void PerInputStreamManager::SetSequenceMode ( std::string  streamName = "*",
Per::ESequenceMode  seqMode = Per::kKey 
)
void PerInputStreamManager::SetUpdateMode ( bool  updatemode  ) 

Definition at line 1899 of file PerInputStreamManager.cxx.

References PerStreamManager::fStreamMap, fUpdateMode, and Munits::second.

Referenced by OpenStream(), and PerValidate::RunAllTests().

01899                                                          {
01900   //  Purpose:  Set update mode true or false. true should be set
01901   //            when reading open files (e.g. user is dispatcher).
01902   //
01903   //  Argument: updateMode  bool  true => stream tree will be updated when
01904   //                              end of entries is reached. Appropriate
01905   //                              for read of open files.  Default when
01906   //                              stream is opened is false.
01907   //
01908   //  Return:  none.
01909   //
01910   //  Contact:   S. Kasahara
01911   //
01912 
01913   fUpdateMode = updateMode;
01914   for ( StreamMapConstItr citr = fStreamMap.begin(); 
01915                           citr!= fStreamMap.end(); ++citr ) 
01916     ((PerInputStream*)(citr -> second)) -> SetUpdateMode(updateMode);
01917 
01918 }

void PerInputStreamManager::SetWindow ( std::string  streamName = "*",
double  lower = 0,
double  upper = 0 
)
void PerInputStreamManager::UpdateTreeFormula (  )  [private]

Definition at line 1922 of file PerInputStreamManager.cxx.

References fFormulaMap, fGlobalSelection, PerStreamManager::fStreamMap, PerStream::fTTree, fTTreeFormula, PerStream::GetFullFilePathName(), PerStream::GetNumEntries(), PerStreamManager::GetNumStream(), PerStreamManager::GetNumStreamOpen(), and Munits::second.

Referenced by IsSelectedSet().

01922                                               {
01923   //  Purpose:  Private method, called by IsSelectedSet when selection
01924   //            cut has been applied globally.
01925   //
01926   //  Return:  none.
01927   //
01928   //  Contact:   S. Kasahara
01929   //
01930 
01931   if ( fGlobalSelection.empty() ) return;
01932   if ( this -> GetNumStreamOpen() <= 0 ) {
01933     if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01934     fFormulaMap.clear();
01935     return;
01936   }
01937 
01938   bool rebuild = false;
01939   if ( !fTTreeFormula ) rebuild = true;
01940   else {
01941     // Loop through formula map checking for differences
01942     if ( this -> GetNumStream() != fFormulaMap.size() ) rebuild = true;
01943     else {
01944       for ( StreamMapConstItr citr = fStreamMap.begin();
01945                               citr!= fStreamMap.end(); ++citr) {
01946         PerInputStream* stream = (PerInputStream*)(citr->second);
01947         std::string streamname = citr->first;
01948         if ( fFormulaMap[streamname] != stream->GetFullFilePathName() ) {
01949           rebuild = true;
01950           break;
01951         }
01952       }
01953     }
01954   }
01955 
01956   if ( !rebuild ) return;  
01957   //  if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01958   // If file boundary has been crossed on one of the streams, must
01959   // update treeformula
01960   fFormulaMap.clear();
01961   std::string minstreamnm = "";
01962   Int_t minentries = -1; 
01963   for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01964     PerInputStream* stream = (PerInputStream*)(citr -> second);
01965     if ( stream->fTTree != 0  && 
01966         (stream->GetNumEntries() < minentries || minstreamnm.empty()) ) {
01967       minentries  = stream->GetNumEntries();
01968       minstreamnm = citr->first;
01969     }
01970     fFormulaMap.insert(make_pair(citr->first,stream->GetFullFilePathName()));
01971   }
01972 
01973 
01974   // Smallest tree becomes master, else root will complain
01975   TTree* mastertree = ((PerInputStream*)fStreamMap[minstreamnm])->fTTree;
01976   for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01977     if ( citr->first == minstreamnm ) continue;
01978     PerInputStream* stream = (PerInputStream*)(citr->second);
01979     if ( stream->fTTree != 0 ) mastertree -> AddFriend(stream->fTTree);
01980   }
01981   if ( !fTTreeFormula ) {
01982     fTTreeFormula 
01983     = new TTreeFormula("PerMgrSelection",fGlobalSelection.c_str(),mastertree);
01984   }
01985   else {
01986     fTTreeFormula -> SetTree(mastertree);
01987     fTTreeFormula -> UpdateFormulaLeaves();
01988   }
01989   if ( fTTreeFormula -> GetNdim() <= 0 ) {
01990     // TTreeformula build failed
01991     delete fTTreeFormula; fTTreeFormula = 0;
01992     fFormulaMap.clear();
01993   }
01994 
01995   for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01996     if ( citr->first == minstreamnm ) continue;
01997     PerInputStream* stream = (PerInputStream*)(citr->second);
01998     if ( stream->fTTree != 0 ) mastertree -> RemoveFriend(stream->fTTree);
01999   }
02000 
02001   return;
02002 
02003 }


Member Data Documentation

std::map<std::string,std::string> PerInputStreamManager::fFormulaMap [private]

Definition at line 111 of file PerInputStreamManager.h.

Referenced by UpdateTreeFormula(), and ~PerInputStreamManager().

Definition at line 109 of file PerInputStreamManager.h.

Referenced by IsSelectedSet(), and UpdateTreeFormula().

Definition at line 116 of file PerInputStreamManager.h.

Referenced by IsSelectedSet(), and SetCurrentVld().

Definition at line 117 of file PerInputStreamManager.h.

Referenced by IsSelectedSet().

std::map<std::string,VldContext> PerInputStreamManager::fLastServed [private]

Definition at line 115 of file PerInputStreamManager.h.

Referenced by AdvanceRecordTags(), GetMaxSyncDelay(), and SetMaxSyncDelay().

Definition at line 124 of file PerInputStreamManager.h.

Referenced by LoadRecord(), and SetRandomSeed().

OWNED HERE BUT USED BY PERINPUTSTREAMs.

Definition at line 123 of file PerInputStreamManager.h.

Referenced by LoadRecord(), PerInputStreamManager(), and SetRandomSeed().

TRandom3* PerInputStreamManager::fRanGen [private]
TTreeFormula* PerInputStreamManager::fTTreeFormula [private]

Definition at line 114 of file PerInputStreamManager.h.

Referenced by AdvanceRecordTags(), Next(), OpenStream(), and SetUpdateMode().


The documentation for this class was generated from the following files:

Generated on 2 Nov 2017 for loon by  doxygen 1.6.1