Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

IoInputModule.cxx

Go to the documentation of this file.
00001 
00002 // $Id: IoInputModule.cxx,v 1.86 2009/01/05 05:55:13 schubert Exp $
00003 //
00004 // Job control interface to input data streams
00005 //
00006 // messier@huhepl.harvard.edu
00008 #include "TSystem.h"
00009 #include "TRegexp.h"
00010 
00011 #include "Dispatcher/DDS.h"
00012 #include "IoModules/IoInputModule.h"
00013 #include <cassert>
00014 #include "MessageService/MsgService.h"
00015 #include "MinosObjectMap/MomNavigator.h"
00016 #include "IoModules/IoDataStreamItr.h"
00017 #include "IoModules/IoDDSStreamItr.h"
00018 #include "IoModules/IoDataStreamFactory.h"
00019 #include "JobControl/JobCInputModule.h"
00020 #include "JobControl/JobCModuleRegistry.h"
00021 #include "JobControl/JobCEnv.h"
00022 #include "RawData/RawRecord.h"
00023 #include "RawData/RawDaqSnarlHeader.h"
00024 #include "CandData/CandHeader.h"
00025 #include "Record/RecRecord.h"
00026 #include "Record/RecPhysicsHeader.h"
00027 #include "Registry/Registry.h"
00028 #include "Validity/VldContext.h"
00029 #include "Validity/VldTimeStamp.h"
00030 #include "Util/UtilString.h"
00031 #include "TSystem.h"
00032 #include "TRandom3.h"
00033 
00034 #include <algorithm>
00035 #include <cstring>
00036 #include <string>
00037 #include <map>
00038 
00039 #ifdef SITE_HAS_SAM
00040 #include "sam_cpp_api/SamConsumer.hpp"
00041 #include "sam_cpp_api/SamLocate.hpp"
00042 #endif
00043 
00044 CVSID("$Id: IoInputModule.cxx,v 1.86 2009/01/05 05:55:13 schubert Exp $");
00045 JOBMODULE(IoInputModule,"INPUT","Read and configure input streams");
00046 
00047 typedef std::map<std::string,std::string>::const_iterator mapStrStrItr_t;
00048 
00049 // A result that says "AOK"
00050 static const JobCResult gsAllClear = JobCResult::kAOK;
00051 
00052 // Many of the input methods are recursive. Ie. try to load an event,
00053 // if that fails, try the next file and try again, etc. To prevent
00054 // initializations from being done twice we need a way to keep track
00055 // of the recursion level. This little class helps with that.
00056 class CallDepth {
00057 public:
00058   CallDepth()  { ++fsDepth; }
00059   ~CallDepth() { --fsDepth; }
00060   static int fsDepth;
00061 };
00062 int CallDepth::fsDepth = 0;
00063 
00064 //......................................................................
00065 
00066 IoInputModule::IoInputModule() : 
00067   fDataStreamItr(0),
00068   fFormat(""),
00069   fStreamList(""),
00070   fServer(""),
00071   fPort(0),
00072   fTimeOut(0),
00073   fDataSource(0),
00074   fKeepUpMode(0),
00075   fMaxSyncDelay(0),
00076   fOffLine(false),
00077   fMaxRetry(0),
00078   fRetryDelay(1),
00079   fClientType(DDS::kUnknownClientType),
00080   fClientName(""),
00081   fStatus(JobCResult::kAOK),
00082   fLastRun(-1),
00083   fLastSnarl(-1),
00084   fCurrentRun(-1),
00085   fCurrentSnarl(-1),
00086   fLoadedCommandLineFiles(false)
00087 #ifdef SITE_HAS_SAM
00088   ,fsamProject(0)
00089 #endif
00090 { fStopwatch.Reset(); fStopwatch.Stop(); }
00091 
00092 //......................................................................
00093 
00094 IoInputModule::~IoInputModule() 
00095 { 
00096   if ( fDataStreamItr ) { delete fDataStreamItr; fDataStreamItr = 0; }
00097 }
00098 
00099 //......................................................................
00100 
00101 void IoInputModule::BeginJob() 
00102 {
00103   this->LoadFilesFromCommandLine();
00104   // Delay opening files until first action (Next,Prev,etc.) call
00105 }
00106 
00107 
00108 //......................................................................
00109 
00110 void IoInputModule::EndJob()
00111 {
00112   fStopwatch.Stop();
00113   MSG("Io",Msg::kDebug) << "IoInputModule::EndJob, Time(sec), Real " 
00114         << fStopwatch.RealTime() << ", CPU "
00115         << fStopwatch.CpuTime() << endl;
00116   
00117 }
00118 
00119 //......................................................................
00120 
00121 const Registry& IoInputModule::DefaultConfig() const 
00122 {
00123 //======================================================================
00124 // Get the default configuration for this module
00125 //======================================================================
00126   static Registry r;
00127   r.SetName("INPUT.config");
00128 
00129   r.UnLockValues();
00130 
00131   MSG("Io",Msg::kDebug) << "Loading default config\n";
00132 
00133   // Stream config
00134   r.Set("Format" ,"input");
00135   r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection");
00136   // r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection,DcsMonitor,DcsAlarm");
00137   
00138   // DDS config
00139   r.Set("DDSServer", "daqdds.minos-soudan.org");
00140   r.Set("DDSPort",    DDS::kPort);
00141   r.Set("DDSTimeOut",  120);
00142   r.Set("DDSDataSource","Daq");
00143   r.Set("DDSKeepUpMode", "FileKeepUp");
00144   r.Set("DDSMaxSyncDelay",15);
00145   r.Set("DDSOffLine",false);
00146   r.Set("DDSMaxRetry",0);
00147   r.Set("DDSRetryDelay",1);
00148   r.Set("DDSClientType","Unknown");
00149   r.Set("DDSClientName","");
00150 
00151 #ifdef SITE_HAS_SAM
00152 
00153   // SAM config
00154   r.Set("Station","minos");
00155   r.Set("SnapShotVers",0);
00156   r.Set("WorkGroupName","minos");
00157   r.Set("ApplicationName","loon");
00158   r.Set("ApplicationVers","dev");
00159   r.Set("MaxNumberOfFiles",0);
00160   r.Set("StartNewProject",1);
00161 
00162   // Create default project name
00163   // Get $USER
00164   const char* username = gSystem->Getenv("USER");
00165   if (!username) username = "unknown";
00166   r.Set("ProjectName",username);
00167 
00168 #endif
00169 
00170   r.LockValues();
00171   return r;
00172 }
00173 
00174 //......................................................................
00175 
00176 void IoInputModule::Config(const Registry& r) 
00177 {
00178 //======================================================================
00179 // Configure the module based on the contents of the registry r
00180 //======================================================================
00181   const char* tmps;
00182   int         tmpi;
00183   int         tmpb;  // bools
00184 
00185   MSG("Io",Msg::kDebug) << "Config IoInputModule with r=" << r << "\n";
00186 
00187   // Input data stream configuration
00188   bool doFormatConfig = false;
00189   if (r.Get("Format", tmps)) { fFormat     = tmps; doFormatConfig = true; }
00190   if (doFormatConfig) this->UpdateFormatConfig();
00191   bool doStreamConfig = false;
00192   if (r.Get("Streams",tmps)) { fStreamList = tmps; doStreamConfig = true; }
00193   if (doStreamConfig) this->UpdateStreamConfig();
00194     
00195   // DDS options
00196   bool doDDSConfig = false;
00197   if (r.Get("DDSServer", tmps)) { fServer  = tmps; doDDSConfig = true; }
00198   if (r.Get("DDSPort",   tmpi)) { fPort    = tmpi; doDDSConfig = true; }
00199   if (r.Get("DDSTimeOut",tmpi)) { fTimeOut = tmpi; doDDSConfig = true; }
00200   if (r.Get("DDSClientType",tmps)) { fClientType = DDS::GetClientType(tmps); 
00201                                      doDDSConfig = true; }
00202   if (r.Get("DDSClientName",tmps)) { fClientName = tmps; 
00203                                      doDDSConfig = true; }
00204   if (r.Get("DDSDataSource",tmps)){fDataSource = DDS::GetDataSourceCode(tmps);
00205                                    doDDSConfig = true;}
00206   if (r.Get("DDSKeepUpMode",tmps)) { fKeepUpMode = DDS::GetKeepUpCode(tmps); 
00207                                      doDDSConfig = true; }
00208   if (r.Get("DDSMaxSyncDelay",tmpi)){fMaxSyncDelay = tmpi; doDDSConfig = true;}
00209   if (r.Get("DDSOffLine",tmpb)) {fOffLine = tmpb; doDDSConfig = true;}
00210   if (r.Get("DDSMaxRetry",tmpi)) {fMaxRetry = tmpi; doDDSConfig = true; }
00211   if (r.Get("DDSRetryDelay",tmpi)) {fRetryDelay = tmpi; doDDSConfig = true; }
00212   if (doDDSConfig) this->UpdateDDSConfig();
00213 
00214 
00215 }
00216 
00217 //......................................................................  
00218 
00219 JobCResult IoInputModule::Get()
00220 {
00221 //======================================================================
00222 // Load the data records at the current position in the input stream
00223 //======================================================================
00224 
00225 
00226   if ( fDataStreamItr==0 ) {
00227     if ( this->OpenStreamItr()==0 ) {
00228       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00229       return fStatus;
00230     }
00231     return this->Get();
00232   }
00233 
00234   MSG("Io",Msg::kVerbose) << "IoInputModule::Get " << endl;
00235 
00236   fStopwatch.Start(false);
00237   MomNavigator* mom = this->GetMom();
00238   assert(mom);
00239   mom -> Clear(); // Moving on so clear contents of Mom
00240 
00241   int nrecord = fDataStreamItr->LoadRecords(mom);
00242   bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
00243   // special treatment required because dds doesn't separate advance from load
00244   if ( isDDS && !nrecord ) fStatus.SetEndOfInputStream();
00245 
00246   this->ReadHeader(); // sets beginrun/endrun beginfile/endfile fStatus bits
00247   if ( fStatus.EndOfInputStream() ) 
00248      { fStatus.SetEndFile(); fStatus.SetEndRun(); }
00249   MSG("Io",Msg::kVerbose) 
00250    << "IoInputModule::Get returning status " << fStatus << endl;
00251   fStopwatch.Stop();
00252   return fStatus;
00253 }
00254 
00255 //......................................................................
00256 
00257 JobCResult IoInputModule::Next(int n)
00258 {
00259 //======================================================================
00260 // Advance the position in the stream n record sets. Load the records
00261 // at the last position
00262 //======================================================================
00263   CallDepth d; // Keep track of the call depth
00264   
00265   // Set the input status to "all clear" since advancing
00266   if (d.fsDepth==1) fStatus = gsAllClear;
00267 
00268   MSG("Io",Msg::kVerbose) << "IoInputModule::Next " << n << endl;
00269 
00270   if ( fDataStreamItr==0 ) {
00271     if (this->OpenStreamItr()==0) {
00272       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00273       return fStatus;
00274     }
00275     return this->Next(n);
00276   }
00277 
00278   fStopwatch.Start(false);
00279   MomNavigator* mom = this->GetMom();
00280   assert(mom);
00281   mom -> Clear(); // Moving on so clear contents of Mom
00282 
00283   // Advance the position in the input stream until we run out of
00284   // records and files
00285   int nstep = 0;
00286   int ndone = 0;
00287   int ntry  = 0;
00288   while ( ndone < n ) {
00289     ntry  = n - ndone;
00290     nstep = fDataStreamItr->Increment(ntry);
00291 
00292     if ( nstep < ntry ) {
00293       // Reached end of file, load next one
00294       fStatus |= this->NextFile(); 
00295       
00296       // If this is the end of the input stream, we're done.
00297       if ( fStatus.EndOfInputStream() ) {
00298         fStopwatch.Stop();
00299         return this->Get();
00300       }
00301     }
00302     ndone += nstep;
00303   }
00304 
00305   // Load the current event
00306   fStopwatch.Stop();
00307   return this->Get();
00308 }
00309 
00310 //......................................................................
00311 
00312 JobCResult IoInputModule::Prev(int n) 
00313 {
00314 //======================================================================
00315 // Back up n positions in the input data stream. Load the records at
00316 // the current position
00317 //======================================================================
00318   CallDepth d;
00319 
00320   // Set the input status to "all clear"
00321   if (d.fsDepth == 1) fStatus = gsAllClear;
00322 
00323   MSG("Io",Msg::kVerbose) << "IoInputModule::Prev " << n << endl;
00324   
00325   if (fDataStreamItr==0) {
00326     if (this->OpenStreamItr()==0) {
00327       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00328       return fStatus;
00329     }
00330     return this->Prev(n);
00331   }
00332 
00333   // Back up the the position in the input stream until we run out of
00334   // records and files
00335   fStopwatch.Start(false);
00336   MomNavigator* mom = this->GetMom();
00337   assert(mom);
00338   mom -> Clear(); // Moving on so clear contents of Mom
00339 
00340   int nstep = 0;
00341   int ndone = 0;
00342   int ntry  = 0;
00343   while (ndone < n) {
00344     ntry = n - ndone;
00345     nstep = fDataStreamItr->Decrement(ntry);
00346 
00347     if (nstep < ntry) {
00348       // Reached start of file, load previous file.
00349       fStatus |= this->PrevFile();
00350 
00351       // If there is no previous file, then we're done.
00352       if ( fStatus.BeginOfInputStream() || fStatus.EndOfInputStream() ) {
00353         return this->Get(); // may be end if failed to open stream itr
00354         fStopwatch.Stop();
00355       }
00356 
00357       // Move the position to the end of the current file so we can
00358       // walk backwards over it
00359       fDataStreamItr->GoToEOF();
00360     }
00361     ndone += nstep;
00362   }
00363 
00364   // Load the current event
00365   fStopwatch.Stop();
00366   return this->Get();
00367 
00368 }
00369 
00370 //......................................................................
00371 
00372 JobCResult IoInputModule::GoTo(int run, int snarl, int searchDir) 
00373 {
00374   CallDepth d;
00375   
00376   if (d.fsDepth==1) fStatus = gsAllClear;
00377 
00378   if (fDataStreamItr==0) {
00379     if (this->OpenStreamItr()==0) {
00380       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00381       return fStatus;
00382     }
00383     return this->GoTo(run,snarl,searchDir);
00384   }
00385 
00386   if ( run == fCurrentRun && snarl == fCurrentSnarl ) return this -> Get();
00387 
00388   int dir = searchDir;
00389   if (dir==0) {
00390     if      (run>fLastRun) { dir =  1; }
00391     else if (run<fLastRun || (run==fLastRun && fCurrentRun < 0)) { dir = -1; }
00392     else {
00393       if    (snarl>fLastSnarl) { dir =  1; }
00394       else                     { dir = -1; }
00395     }
00396   }
00397 
00398   // Move position in the stream looking for run/event number
00399   while ( 1 ) {
00400     if ( dir > 0 ) {
00401       this->Next();
00402       if (fCurrentRun>run) {
00403         MSG("Io",Msg::kWarning) << 
00404           "Went to run "<<fCurrentRun<<
00405           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00406         return fStatus;
00407       }
00408       if (fCurrentRun==run && fCurrentSnarl>snarl) {
00409         MSG("Io",Msg::kWarning) << 
00410           "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00411           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00412         return fStatus;
00413       }
00414     }
00415     if ( dir <0 ) {
00416       this->Prev();
00417       if (fCurrentRun<run) {
00418         MSG("Io",Msg::kWarning) << 
00419           "Went to run "<<fCurrentRun<<
00420           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00421         return fStatus;
00422       }
00423       if (fCurrentRun==run && fCurrentSnarl<snarl) {
00424         MSG("Io",Msg::kWarning) << 
00425           "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00426           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00427         return fStatus;
00428       }
00429     }
00430     // Check if we're done
00431     if (fCurrentRun == run && fCurrentSnarl == snarl) return fStatus;
00432     if (dir>0 && fStatus.EndOfInputStream())          return fStatus;
00433     if (dir<0 && fStatus.BeginOfInputStream())        return fStatus;
00434   }
00435   return fStatus;
00436 }
00437 
00438 //......................................................................
00439 
00440 JobCResult IoInputModule::GoTo(const VldContext& vld)
00441 {
00442 //======================================================================
00443 // Go to records that match validity context. If vld is not found, will
00444 // GoTo record set one beyond requested validity.
00445 //======================================================================
00446   CallDepth d;
00447 
00448   // Set the input status to "all clear"
00449   if (d.fsDepth==1) fStatus = gsAllClear;
00450 
00451   if (fDataStreamItr==0) {
00452     if (this->OpenStreamItr()==0) {
00453       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00454       return fStatus;
00455     }
00456     return this->GoTo(vld);
00457   }
00458 
00459   fStatus |= fDataStreamItr -> GoTo(vld);
00460 
00461   // Load the current event
00462   return this->Get();
00463 
00464 }
00465 
00466 //......................................................................
00467 
00468 void IoInputModule::List(const char* streamlist) const
00469 {
00470 //======================================================================
00471 // Print list of files loaded
00472 //======================================================================
00473 
00474   MsgStream& m = MSGSTREAM("Io",Msg::kInfo);
00475 
00476   m << "IoInputModule using data format " << fFormat << endl;
00477 
00478   if ( fDataStreamItr ) {
00479     fDataStreamItr -> ListFile(std::cout,streamlist);
00480     return;
00481   }
00482 
00483   // data stream not yet open, list files from fFileList
00484   m << "File Name\tStream List " << endl;
00485   m << "=========\t=========== " << endl;
00486   std::list<IoFileListItem>::const_iterator itr = fFileList.begin();
00487   std::list<IoFileListItem>::const_iterator itrEnd = fFileList.end();
00488 
00489   for ( ; itr != itrEnd; itr++ ) {
00490     m << *itr;
00491   }
00492  
00493 }
00494 
00495 //......................................................................
00496 
00497 void IoInputModule::AddFile(const char *filepath, const char* streamlist, 
00498                                                                  int at) {
00499 //======================================================================
00500 // Add to the list of attached streams at the position "at". -1 = end
00501 // of list
00502 //======================================================================
00503 
00504 
00505   // Find out by checking the format of the filepath whether this
00506   // is a SAM job. Format will be SAM:samdataset of SAM_FILE::file
00507 
00508   const char *s1 = "SAM";
00509   if ( strstr(filepath,s1) != NULL ) {
00510 
00511 #ifdef SITE_HAS_SAM
00512 
00513   // SAM options
00514 
00515     const char* tmps;
00516     int tmpi;
00517 
00518     Registry& r = GetConfig();
00519     if (r.Get("Station",tmps)) {fStation = tmps;}
00520     if (r.Get("SnapShotVers",tmpi)){fSnapShotVers = tmpi;}
00521     if (r.Get("WorkGroupName",tmps)) {fWorkGroupName = tmps;}
00522     if (r.Get("ApplicationName",tmps)) {fApplicationName = tmps;}
00523     if (r.Get("ApplicationVers",tmps)) {fApplicationVers = tmps;}
00524     if (r.Get("ProjectName",tmps)) {fProjectName = tmps;}
00525     if (r.Get("MaxNumberOfFiles",tmpi)) {fMaxNumberOfFiles = tmpi;}
00526     if (r.Get("StartNewProject",tmpi)) {fStartNewProject = tmpi;}
00527 
00528     // Decide whether we have a dataset or a single file
00529     // dataset will be SAM, file will be SAM_FILE
00530     // Get the samdataset or file from filepath
00531 
00532     std::string temp = filepath;
00533     size_t pos = temp.find(":")+1;
00534     std::string sam_access_type = temp.substr(0,pos-1);
00535     if (sam_access_type == "SAM") {
00536       std::string samdataset = temp.substr(pos,temp.length()-pos);
00537 
00538 
00539       std::string projectname;
00540 
00541       // If this is a new project then append time-stamp
00542       if (fStartNewProject == 1) {
00543         // Construct full project name including timestamp
00544         // Get timestamp as a string
00545         VldTimeStamp ts;
00546         std::string timestamp = ts.AsString("lc");
00547         // Replace blank with "-"
00548         size_t pos = timestamp.find(" ");
00549         timestamp.replace(pos,1,"-");
00550         // Replace : with - as DbServer does not like : in project names
00551         pos = timestamp.find(":");
00552         while ( pos != string::npos ) {
00553           timestamp.replace(pos,1,"-");
00554           pos = timestamp.find(":",pos+1);
00555         }
00556         // Append timestamp to fProjectName taken from registry
00557         fProjectName.append("-");
00558         projectname = fProjectName+timestamp;
00559       }
00560       // Otherwise use project name that is supplied
00561       else if(fStartNewProject == 0) {
00562         projectname = fProjectName;
00563       }
00564       
00565       
00566       MSG("Io",Msg::kDebug) << "Sam Station " << fStation << " Snap Shot " <<
00567         fSnapShotVers << " Work Group Name " << fWorkGroupName 
00568                             << " Application Name "
00569                             << fApplicationName << " Application Version " << fApplicationVers <<
00570         " Project Name " << projectname << endl;
00571 
00572 
00573       // Define snapshot version
00574 
00575       long snapshot;
00576       // Snapshot version = 0 means create New Snapshot
00577       if ( fSnapShotVers == 0 ) {
00578         snapshot = sam::SamProject::NewSnapshotVersion;
00579       }
00580       // Snapshot version < 0 means use last one created
00581       else if (fSnapShotVers < 0) {
00582         snapshot = sam::SamProject::LatestSnapshotVersion;
00583       }
00584       else if (fSnapShotVers > 0 ) {
00585         // Use specified SnapShot version 
00586         snapshot = fSnapShotVers;
00587       }
00588     
00589       MSG("Io",Msg::kDebug) << "SnapShot Version " << snapshot << endl; 
00590 
00591       if (fStartNewProject == 1) {
00592 
00593         // Create SAM project
00594 
00595         fsamProject = new sam::SamProject(projectname,fStation);
00596 
00597         // Start SAM project
00598 
00599         std::list<std::string> projectMasterArgList;
00600 
00601         try {
00602           MSG("Io",Msg::kInfo) << "Starting SAM Project " << projectname << 
00603             " on station " << fStation << endl;
00604           fsamProject->startProject(fWorkGroupName,samdataset,snapshot,
00605                                     projectMasterArgList);
00606         }
00607         catch(const sam::SamProject::StartProjectRequestRejected& ex) {
00608           MSG("Io",Msg::kInfo) << "Rejected start SAM project request " 
00609                                << ex << endl;
00610         }
00611       }
00612 
00613       // Start SAM consumer to deliver files
00614 
00615       const int projectMasterTimeout(60);
00616       const std::string processDescription("Loon Analysis Process");
00617 
00618       try{
00619 
00620 
00621         sam::SamConsumer fsamConsumer(projectname,fStation,fWorkGroupName,
00622                                       fApplicationName,fApplicationVers,
00623                                       processDescription,
00624                                       fMaxNumberOfFiles,
00625                                       projectMasterTimeout);
00626         
00627         MSG("Io",Msg::kInfo) << "Started SAM Consumer" << endl;
00628         
00629         // Now get files. Format of returned files depends on whether
00630         // SAM cache is local disk, dcache disk or AFS file space
00631         
00632         std::map<std::string,std::string> filelist;
00633         map<std::string,std::string>::iterator fitr;
00634         
00635         int location;
00636         int length;
00637         int comp;
00638         std::string fileonly;
00639         std::string restOfPath;
00640         std::string afsroot("afsroot:");
00641         try {
00642           while(true) {
00643             std::string filename = fsamConsumer.getFile().getFullFileName();
00644             // The files need to be sorted as they come back in an undefined
00645             // order. Split them into a filename and the rest of the path. 
00646             // Put them in a map and then iterate over key which is filename 
00647             // - guarantees correct order.
00648             
00649             MSG("Io",Msg::kDebug) << "Filename " << filename << endl;
00650             location = filename.find_last_of("/");
00651             length = filename.length();
00652             fileonly = filename.substr(location+1,length-1);
00653             // Need to look for afsroot: at start of path. If it is there the
00654             // remove it and rest of path is AFS path.
00655             comp = filename.compare(0,8,afsroot);
00656             if (comp == 0 ) {
00657               restOfPath = filename.substr(8,location-8);
00658             }
00659             else {
00660               restOfPath = filename.substr(0,location);
00661             }
00662             
00663             restOfPath.append("/");
00664             filelist.insert(make_pair(fileonly,restOfPath));
00665             
00666             MSG("Io",Msg::kDebug) << "File Only " << fileonly << " Rest Of Path " 
00667                                   << restOfPath << endl;
00668             
00669             // Release file
00670             
00671             fsamConsumer.releaseFile();
00672           }
00673         }
00674         catch(const sam::SamConsumer::EndOfFileStreamReached& ex) {
00675           MSG("Io",Msg::kDebug) << "End of File Stream reached" << endl;
00676         }
00677 
00678         // Got all files. Now need to add them to file list. Iterate over map
00679       
00680         std::string sfile;
00681         const char *samfile = 0;
00682         for (fitr = filelist.begin(); fitr != filelist.end(); fitr++) {
00683           sfile = (fitr->second+fitr->first);
00684           samfile = sfile.data();
00685           MSG("Io",Msg::kInfo) << "Adding File " << samfile << endl;
00686           // Add file to file list
00687           IoFileListItem iofile(samfile,at,streamlist);
00688           fFileList.push_back(iofile);
00689         }
00690       }
00691       catch(const sam::SamConsumer::InitializationError& ex) {
00692         MSG("Io",Msg::kInfo) << "Rejected start SAM Consumer request " 
00693                              << ex << endl;
00694       }
00695 
00696 
00697       if (fsamProject) {
00698         try {
00699           MSG("Io",Msg::kInfo) << "Requesting end of SAM project " << endl;
00700           fsamProject->endProject();
00701         }
00702         catch(const sam::SamProject::EndProjectRequestRejected& ex) {
00703           MSG("Io",Msg::kInfo) << "SAM Project end request rejected "<< ex << endl;
00704           
00705         }
00706         catch(const sam::SamProject::EndProjectRequestFailed& ex) {
00707           MSG("Io",Msg::kInfo) << "SAM Project end request failed "<< ex << endl;
00708         }
00709       }
00710     }
00711     else if (sam_access_type == "SAM_FILE") {
00712       
00713       // User just wants a single file
00714       std::string samfile = temp.substr(pos,temp.length()-pos);
00715       
00716       // Use sam_locate to get pnfs path
00717       
00718       sam::LocationList samFiles;
00719       try {
00720         MSG("Io",Msg::kInfo) << "Locating file  " << samfile << endl;
00721         samFiles = sam::locate(samfile);
00722         // Now need to translate pnfs path into dcache path
00723         // insert fnal.gov/usr/ after /pnfs/
00724         samFiles[0].insert(6,"fnal.gov/usr/");
00725         // There are two dcache ports so chose one at random
00726         TRandom3 rand(0);
00727         Double_t r = rand.Rndm();
00728         if (r <= 0.5) {
00729           samFiles[0].insert(0,"dcap://fndca1.fnal.gov:24125");
00730         }
00731         else if (r > 0.5) {
00732           samFiles[0].insert(0,"dcap://fndca1.fnal.gov:24136");
00733         }
00734         // Finally append filename
00735         samFiles[0].append("/");
00736         samFiles[0].append(samfile);
00737         MSG("Io",Msg::kInfo) <<  "Adding file " << samFiles[0].c_str() << " to input list" << endl;
00738         // Add file to file list
00739         IoFileListItem iofile(samFiles[0].c_str(),at,streamlist);
00740         fFileList.push_back(iofile);
00741       }
00742       catch(const sam::exception::DataFileNotFound& ex) {
00743         MSG("Io",Msg::kInfo) << ex << endl;
00744       }
00745     }
00746 
00747 #endif     // End of ifdef SITE_HAS_SAM
00748 
00749   }
00750   else {
00751 
00752     // Add file to file list
00753     IoFileListItem iofile(filepath,at,streamlist);
00754     fFileList.push_back(iofile);
00755     
00756     if ( !fDataStreamItr ) return;
00757     
00758     // Add files to stream managed lists
00759     const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
00760     
00761     if ( at < 0 ) {
00762       IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
00763       for ( ; itr != filemap.end(); itr++ ) {
00764         std::string filename = itr -> first;
00765         std::string streamlist = itr -> second;
00766         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00767       }
00768     }
00769     else {
00770       // Apply files in reverse to have first file in list inserted at pos At
00771       IoFileListItem::FileStreamMap::const_reverse_iterator itr=filemap.rbegin();
00772       for ( ; itr != filemap.rend(); itr++ ) {
00773         std::string filename = itr -> first;
00774         std::string streamlist = itr -> second;
00775         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00776       }      
00777     }
00778   }
00779 }
00780 
00781 //......................................................................
00782 
00783 void IoInputModule::RemoveFile(const char* filename, const char* streamlist) {
00784 //======================================================================
00785 // Remove the file "filename" from the list of input data files. 
00786 //======================================================================
00787 
00788   if ( fDataStreamItr ) fDataStreamItr -> RemoveFile(filename,streamlist);
00789 
00790   std::string f(filename);
00791   std::list<IoFileListItem>::iterator  itr = fFileList.end();
00792   while ( !fFileList.empty() && itr != fFileList.begin() ) {
00793     itr--;
00794     IoFileListItem& iofile = *itr;
00795     iofile.RemoveFile(filename,streamlist);
00796     if ( iofile.GetNumFile() == 0 ) fFileList.erase(itr);
00797   }
00798 
00799   return;
00800 
00801 }
00802 
00803 //......................................................................
00804 
00805 JobCResult IoInputModule::NextFile(int n, const char* streamlist)
00806 {
00807 //======================================================================
00808 // Move to the next file in the list (move by n positions)
00809 //======================================================================
00810   CallDepth d;
00811 
00812   if (d.fsDepth==1) fStatus = gsAllClear;
00813 
00814   if (fDataStreamItr==0) {
00815     if (this->OpenStreamItr()==0) {
00816       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00817       return fStatus;
00818     }
00819     return this->NextFile(n,streamlist);
00820   }    
00821 
00822   fStatus |= fDataStreamItr -> NextFile(n,streamlist);
00823 
00824   MSG("Io",Msg::kDebug)
00825       << "status is " << fStatus
00826       << " current file is " << fDataStreamItr->GetCurrentFile() << endl;
00827 
00828   return fStatus;
00829 
00830 }
00831 
00832 //......................................................................
00833 
00834 JobCResult IoInputModule::PrevFile(int n, const char* streamlist) 
00835 {
00836 //======================================================================
00837 // Move to the previous list in the file (move back by n files)
00838 //======================================================================
00839   CallDepth d;
00840   
00841   if (d.fsDepth==1) fStatus = gsAllClear;
00842 
00843   if (fDataStreamItr==0) {
00844     if (this->OpenStreamItr()==0) {
00845       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00846       return fStatus;
00847     }
00848     return this->PrevFile(n,streamlist);
00849   }    
00850 
00851   fStatus |= fDataStreamItr -> PrevFile(n,streamlist);
00852 
00853   return fStatus;
00854 
00855 }
00856 
00857 //......................................................................
00858 
00859 JobCResult IoInputModule::GoToFile(int n, const char* streamlist) 
00860 {
00861 //======================================================================
00862 // Move the stream to the nth file in the list (n=0 is first)
00863 //======================================================================
00864   CallDepth d;
00865   
00866   if (d.fsDepth==1) fStatus = gsAllClear;
00867 
00868   if (fDataStreamItr==0) {
00869     if (this->OpenStreamItr()==0) {
00870       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00871       return fStatus;
00872     }
00873     return this->GoToFile(n,streamlist);
00874   }    
00875 
00876   fStatus |= fDataStreamItr -> GoToFile(n,streamlist);
00877 
00878   return fStatus;
00879 
00880 }
00881 
00882 //......................................................................
00883 
00884 JobCResult IoInputModule::GoToFile(const char* filename, const char*streamlist){
00885 //======================================================================
00886 // Move the stream to a named file
00887 //======================================================================
00888   CallDepth d;
00889   
00890   if (d.fsDepth==1) fStatus = gsAllClear;
00891 
00892   if (fDataStreamItr==0) {
00893     if (this->OpenStreamItr()==0) {
00894       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00895       return fStatus;
00896     }
00897     return this->GoToFile(filename,streamlist);
00898   }    
00899 
00900   fStatus |= fDataStreamItr -> GoToFile(filename,streamlist);
00901 
00902   return fStatus;
00903 }
00904 
00905 //......................................................................  
00906 
00907 void IoInputModule::Select(const char* stream, const char* select, 
00908                            bool isRequired)
00909 {
00910 //======================================================================
00911 // Add/Change the selection cuts for a stream
00912 //======================================================================
00913   // Insert the selection into the map
00914   fStreamSelectionMap[stream] = select;
00915   fStreamRequiredMap[stream] = isRequired;
00916   
00917   // Pass the info on to the data stream
00918   if (fDataStreamItr) {
00919     fDataStreamItr->Select(stream, select,isRequired);
00920   }
00921 
00922 }
00923 
00924 //......................................................................  
00925 
00926 void IoInputModule::SetPerOwnedDisabled(const char* stream, 
00927                                         bool perowneddisabled)
00928 {
00929 //======================================================================
00930 // Used to disable Persistency Ownership of records for this stream
00931 //======================================================================
00932   // Insert the perowneddisabled bool into the map
00933   fStreamPerOwnedDisabledMap[stream] = perowneddisabled;
00934   
00935   // Pass the info on to the data stream
00936   if (fDataStreamItr) {
00937     fDataStreamItr->SetPerOwnedDisabled(stream, perowneddisabled);
00938   }
00939 
00940 }
00941 
00942 
00943 //......................................................................  
00944 
00945 void IoInputModule::DefineStream(const char* stream, const char* tree) {
00946 //======================================================================
00947 // Define stream to serve specified tree
00948 //======================================================================
00949   // Insert the definition into the map
00950   fStreamDefMap[stream] = tree;
00951 
00952   // Pass the info on to the data stream
00953   if ( fDataStreamItr ) {
00954     fDataStreamItr->DefineStream(stream, tree);
00955   }
00956 
00957 }
00958 
00959 //......................................................................  
00960 
00961 void IoInputModule::SetSequenceMode(const char* stream,
00962                                     Per::ESequenceMode sequenceMode) {
00963 //======================================================================
00964 // Define stream sequence mode
00965 //======================================================================
00966   // Insert the sequence mode into the map
00967   fStreamSeqModeMap[stream] = sequenceMode;
00968 
00969   // Pass the info on to the data stream
00970   if ( fDataStreamItr ) {
00971     fDataStreamItr->SetSequenceMode(stream, sequenceMode);
00972   }
00973 
00974 }
00975 
00976 //......................................................................  
00977 
00978 void IoInputModule::SetTestMode(const char* stream,
00979                                 bool testMode) {
00980 //======================================================================
00981 // Define stream test mode
00982 //======================================================================
00983   // Insert the test mode into the map
00984   fStreamTestModeMap[stream] = testMode;
00985 
00986   // Pass the info on to the data stream
00987   if ( fDataStreamItr ) {
00988     fDataStreamItr->SetTestMode(stream, testMode);
00989   }
00990 
00991 }
00992 
00993 //......................................................................
00994 
00995 void IoInputModule::SetWindow(const char* stream, double lower, double upper)
00996 {
00997 //======================================================================
00998 // Define stream window if kWindow sequence mode is used
00999 //======================================================================
01000   fStreamWindowMap[stream] = std::pair<double,double>(lower,upper);
01001 
01002   // Pass the info on to the data stream
01003   if ( fDataStreamItr ) {
01004       fDataStreamItr->SetWindow(stream, lower, upper);
01005   }
01006 
01007 }
01008 
01009 //......................................................................
01010 
01011 void IoInputModule::SetMaxFileRepeat(const char* stream, int numRepeat)
01012 {
01013 //======================================================================
01014 // Define maximum number of times to reuse a file in the stream before
01015 // loading the next one; for kSequential and kRandom sequence modes
01016 //======================================================================
01017   fStreamMaxRepeatMap[stream] = numRepeat;
01018 
01019   // Pass it on to the data stream
01020   if ( fDataStreamItr ) fDataStreamItr->SetMaxFileRepeat(stream,numRepeat);
01021 }
01022 
01023 //......................................................................
01024 
01025 void IoInputModule::SetMeanMom(const char* stream, double mean)
01026 {
01027 //======================================================================
01028 // Define mean number of events to push to mom for this stream
01029 // for kSequential and kRandom sequence modes
01030 //======================================================================
01031   fStreamMeanMap[stream] = mean;
01032 
01033   // Pass it on to the data stream
01034   if ( fDataStreamItr ) fDataStreamItr->SetMeanMom(stream,mean);
01035 }
01036 
01037 //......................................................................
01038 
01039 void IoInputModule::SetPushRandom(const char* stream, bool setRandom)
01040 {
01041 //======================================================================
01042 // Define whether to push a random or constant number of events to mom
01043 // for this stream for kSequential and kRandom sequence modes
01044 //======================================================================
01045   fStreamPushRandomMap[stream] = setRandom;
01046 
01047   // Pass it on to the data stream
01048   if ( fDataStreamItr ) fDataStreamItr->SetPushRandom(stream,setRandom);
01049 }
01050 
01051 //......................................................................
01052 
01053 void IoInputModule::SetRandomSeed(int rSeed)
01054 {
01055 //======================================================================
01056 // Set the random seed for SetPushRandom(stream,true) case
01057 // for kSequential and kRandom sequence modes
01058 //======================================================================
01059   fRandomSeed = rSeed;
01060 
01061   // Pass it on to the data stream
01062   if ( fDataStreamItr ) fDataStreamItr->SetRandomSeed(rSeed);
01063 
01064 }
01065 
01066 //......................................................................
01067 
01068 const char* IoInputModule::GetCurrentFile(const char* streamname) const
01069 {
01070     MSG("Io",Msg::kDebug) << "IoInputModule::GetCurrentFile()" << endl;
01071     mapStrStrItr_t it, done=fCurrentFileMap.end();
01072     std::string strmstring = streamname;
01073 
01074     for (it = fCurrentFileMap.begin(); it !=done; ++it) {
01075         MSG("Io",Msg::kVerbose)
01076           << "stream: " << setw(16) << it->first
01077           << " file: " << it->second
01078           <<endl;
01079     }
01080 
01081     for (it = fCurrentFileMap.begin(); it!=done; ++it) {
01082       if ( strmstring == it->first ) return it->second.c_str();
01083     }
01084     // Sue's original approach
01085     if (!fDataStreamItr) return 0;
01086     return fDataStreamItr->GetCurrentFile(streamname);
01087 }
01088 
01089 const char* IoInputModule::GetLastFile(const char* streamname) const
01090 {
01091     MSG("Io",Msg::kInfo) << "IoInputModule::GetLastFile()" << endl;
01092     mapStrStrItr_t it, done = fLastFileMap.end();
01093     std::string strmstring = streamname;
01094 
01095     for (it = fLastFileMap.begin(); it!=done; ++it) {
01096         MSG("Io",Msg::kVerbose)
01097           << "stream: " << setw(16) << it->first
01098           << " file: " << it->second
01099           <<endl;
01100     }
01101 
01102     for (it = fLastFileMap.begin(); it!=done; ++it) {
01103       if ( strmstring == it->first ) return it->second.c_str();
01104     }
01105     return 0;
01106 
01107 }
01108 
01109 //......................................................................
01110 
01111 void IoInputModule::LoadFilesFromCommandLine()
01112 {
01113 //======================================================================
01114 // Load the files listed on the program command line
01115 //======================================================================
01116   JobCEnv& jce = JobCEnv::Instance();
01117   if (!fLoadedCommandLineFiles) {
01118     for (int i=0; i<jce.GetNfile(); ++i) {
01119       this->AddFile(jce.GetFileName(i));
01120     }
01121     fLoadedCommandLineFiles = true;
01122   }
01123 }
01124 
01125 //......................................................................
01126 
01127 int IoInputModule::ReadHeader()
01128 {
01129 //======================================================================
01130 // Read temptags to get file name
01131 // Read header information to get run/snarl info. 
01132 //   if found Run and Snarl info return 2, 
01133 //   if only Run info return 1,
01134 //   else return 0
01135 //======================================================================
01136   const MomNavigator* mom = this->GetMom();
01137   assert(mom);
01138 
01139   // BeginFile/EndFile boundaries may not be in synch across the different
01140   // data streams.  The definition used here is to set file boundary true if 
01141   // the file has changed for any of the managed streams.
01142 
01143   const TObjArray* momarray = mom->GetFragmentArray();
01144   for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01145     TObject* obj = momarray->At(i);
01146     if (!obj) continue;
01147     Registry* temptags = 0;
01148     if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01149       temptags = &(record->GetTempTags());
01150     }
01151     else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01152       temptags = &(record->GetTempTags());
01153     }
01154     if ( ! temptags ) continue;
01155 
01156     // on a named stream?
01157     const char* tagstream = 0;
01158     if ( ! temptags->Get("stream",tagstream) ) continue;
01159 
01160     // stream managed by i/o?
01161     std::string streamname(tagstream);
01162     const char* tagnewfile = 0;
01163     if ( ! temptags->Get("file",tagnewfile) ) continue; 
01164 
01165     std::string lstfilename = fLastFileMap[streamname];
01166     std::string curfilename = fCurrentFileMap[streamname];
01167     std::string newfilename(tagnewfile);
01168 
01169     if ( newfilename != curfilename ) {
01170             
01171       std::string starcur  = fCurrentFileMap.begin()->first;
01172       std::string starlast = fLastFileMap.begin()->first;
01173       
01174       MSG("Io",Msg::kDebug) 
01175         << "SetBeginFile on streamname '" << streamname << "'" << endl
01176         << "   current      '" << fCurrentFileMap[streamname] << "'" << endl
01177         << "   last         '" << fLastFileMap[streamname] << "'" << endl
01178         << "   current['" << starcur << "'] '" 
01179         << fCurrentFileMap[starcur] << "'" << endl
01180         << "      last['" << starlast << "'] '" 
01181         << fLastFileMap[starlast] << "'" << endl
01182         << "   new '" << newfilename << "' != "
01183         << " cur '" << curfilename << "'" << endl
01184         << "   update \"*\" ? " 
01185         << (( newfilename != fCurrentFileMap["*"] ) ? "yes":"no")
01186         << endl;
01187       
01188       // update "*" stream first
01189       if ( newfilename != fCurrentFileMap["*"] ) {
01190         fStatus.SetBeginFile();
01191         //if ( lstfilename != "" ) fStatus.SetEndFile();
01192         if ( fCurrentFileMap["*"] != "" ) fStatus.SetEndFile();
01193         
01194         fLastFileMap["*"]           = fCurrentFileMap["*"];
01195         fCurrentFileMap["*"]        = newfilename;
01196 
01197         MSG("Io",Msg::kDebug) 
01198           << "SetBeginFile on '*'" << endl
01199           << "   current['" << starcur << "'] '" 
01200           << fCurrentFileMap[starcur] << "'" << endl
01201           << "      last['" << starlast << "'] '" 
01202           << fLastFileMap[starlast] << "'" << endl;
01203       }
01204       // update this named stream
01205       fLastFileMap[streamname]    = curfilename;
01206       fCurrentFileMap[streamname] = newfilename;
01207       
01208       // if a stream on a file moved on then presumably all the
01209       // other streams on the same file have also been exhausted
01210       // and are going to move on -- help them along so we don't
01211       // have to wait for that stream to be the next record on
01212       // that stream is the next VldContext
01213       mapStrStrItr_t it, done = fCurrentFileMap.end();
01214       for (it = fCurrentFileMap.begin(); it != done; ++it) {
01215         if ( it->second == curfilename ) {
01216           std::string altstream = it->first;
01217           fLastFileMap[altstream]    = curfilename;
01218           fCurrentFileMap[altstream] = newfilename;
01219         }
01220       }
01221       
01222     } // new != cur filename
01223 
01224     MSG("Io",Msg::kVerbose) 
01225       << " stream '" << streamname << "' set fLastFileMap to '"
01226       << curfilename << "', fCurrentFileMap to '"
01227       << newfilename << "'" 
01228       << " * '" << fLastFileMap["*"] << "'  '" << fCurrentFileMap["*"] << "'"
01229       << endl;
01230         
01231   } // loop over records
01232 
01233   // update for EOF/EOJ condition (which doesn't come throught this
01234   // function) if it isn't a file change
01235   // this won't work is the file has just one record
01236   // ...the whole procedure is fundamentally flawed -- it shouldn't be
01237   // based on what we find in "mom" but rather the stream/file 
01238   // management classes...
01239   if ( ! fStatus.BeginFile() ) fLastFileMap["*"] = fCurrentFileMap["*"];
01240 
01241   // BeginRun/EndRun
01242   int run   = -1;  // default and flag value
01243   int snarl = -1;  // default and flag value
01244   for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01245     TObject* obj = momarray->At(i);
01246     if (!obj) continue;
01247     if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01248       // old style
01249       
01250         // all DAQ generated records can supply run #
01251       const RawDaqHeader* rdh 
01252              = dynamic_cast<const RawDaqHeader*>(record->GetHeader());
01253       if (rdh) run = rdh->GetRun();
01254 
01255       // but only DaqSnarl records can supply snarl #
01256       const RawDaqSnarlHeader* rdsh 
01257              = dynamic_cast<const RawDaqSnarlHeader*>(record->GetHeader());
01258       if (rdsh) snarl = rdsh->GetSnarl();
01259 
01260       if (!rdh) {
01261         // not a DAQ record, perhaps it's a CandRecord
01262         const CandHeader* candhdr
01263           = dynamic_cast<const CandHeader*>(record->GetHeader());
01264         if (candhdr) {
01265           run   = candhdr->GetRun();
01266           snarl = candhdr->GetSnarl();
01267         }
01268       }
01269     }
01270     else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01271       // New style
01272       const RecPhysicsHeader* rph 
01273           = dynamic_cast<const RecPhysicsHeader*>(&(record->GetHeader()));
01274       if ( rph ) {
01275         run   = rph->GetRun();
01276         snarl = rph->GetSnarl();
01277       }
01278     }
01279     // break early only if determined snarl in case it one of those
01280     // crazy record sets with a DaqMonitor and a DaqSnarl record.
01281     if ( snarl >= 0 ) break;
01282   }
01283 
01284   // set the status flags based on what was extracted
01285   fCurrentSnarl = snarl;
01286   if ( run < 0 ) {
01287     fCurrentRun = -1;
01288     return 0;
01289   }
01290   if ( run != fCurrentRun ) {
01291     fStatus.SetBeginRun();
01292     if ( fLastRun >= 0 ) fStatus.SetEndRun();
01293   }
01294   fLastRun      = fCurrentRun;
01295   fCurrentRun   = run;
01296   if ( snarl >= 0 ) {
01297     fLastSnarl    = fCurrentSnarl;
01298     fCurrentSnarl = snarl;
01299     return 2;
01300   }
01301   return 1;
01302 }
01303 
01304 //......................................................................
01305 
01306 void IoInputModule::UpdateDDSConfig() {
01307 //======================================================================
01308 // Update the dispatcher configuration
01309 //======================================================================
01310   if ( fDataStreamItr == 0 ) return;
01311 
01312   IoDDSStreamItr* ddsItr = dynamic_cast<IoDDSStreamItr*>(fDataStreamItr);
01313   if ( ! ddsItr ) return;
01314 
01315   ddsItr->SetTimeOut(fTimeOut);  
01316 
01317   // Need to reinitialize dispatcher if server hostname, port, clienttype
01318   // or clientname have changed
01319   bool reinit = (fServer != ddsItr->GetSourceName() 
01320                 || fPort != ddsItr->GetPort()
01321                 || fClientType != ddsItr->GetClientType() 
01322                 || fClientName != ddsItr->GetClientName() );
01323   
01324   if ( reinit ) {
01325     this -> CloseStreamItr();  // wait for next action to reopen
01326   }
01327   else {
01328     ddsItr -> SetKeepUpMode(fKeepUpMode);
01329     ddsItr -> SetMaxSyncDelay(fMaxSyncDelay);
01330     ddsItr -> SetDataSource(fDataSource);
01331     ddsItr -> SetOffLine(fOffLine);
01332   }
01333 
01334 }
01335 
01336 
01337 //......................................................................
01338 
01339 void IoInputModule::UpdateFormatConfig() {
01340 //======================================================================
01341 // Update the stream itr to match requested format
01342 //======================================================================
01343 
01344   if ( fDataStreamItr == 0 ) return;
01345 
01346   bool reopen = ( fFormat != fDataStreamItr->GetFormat() );
01347   if ( reopen ) {
01348     this -> CloseStreamItr();  // wait for next action to reopen
01349   }
01350 
01351   return;
01352 
01353 }
01354 
01355 //......................................................................
01356 
01357 void IoInputModule::UpdateFileList() 
01358 {
01359 //======================================================================
01360 // Update the file list 
01361 //======================================================================
01362 
01363   if ( fDataStreamItr == 0 ) return;
01364 
01365   // Fresh start. This should typically only be called when data stream 
01366   // iterator is newly opened (i.e. when the format changes)
01367   fDataStreamItr -> RemoveFile("*");
01368 
01369   std::list<IoFileListItem>::iterator itr = fFileList.begin();
01370   for ( ; itr != fFileList.end(); itr++ ) {
01371     IoFileListItem& iofile = *itr;
01372     const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
01373     int at = iofile.GetAt();
01374   
01375     if ( at < 0 ) {
01376       IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
01377       for ( ; itr != filemap.end(); itr++ ) {
01378         std::string filename = itr -> first;
01379         std::string streamlist = itr -> second;
01380         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01381       }
01382     }
01383     else {
01384       // Apply files in reverse to have first file in list inserted at pos At
01385       IoFileListItem::FileStreamMap::const_reverse_iterator itr 
01386                                                          = filemap.rbegin();
01387       for ( ; itr != filemap.rend(); itr++ ) {
01388         std::string filename = itr -> first;
01389         std::string streamlist = itr -> second;
01390         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01391       }      
01392     }
01393   }
01394 
01395   return;
01396   
01397 }
01398 
01399 //......................................................................
01400 
01401 void IoInputModule::UpdateStreamConfig()
01402 {
01403 //======================================================================
01404 // Set the stream and selection cuts for the open streams
01405 //======================================================================
01406   if (fDataStreamItr==0) return;
01407 
01408   // Define streams
01409   mapStrStrItr_t itr;
01410   for ( itr = fStreamDefMap.begin(); itr != fStreamDefMap.end(); itr++ ) {
01411     fDataStreamItr->DefineStream((itr->first).c_str(),  // Stream
01412                                  (itr->second).c_str());  // Definition
01413   }
01414 
01415   // Set the streams to be activated
01416   fDataStreamItr->Streams(fStreamList.c_str());
01417   // Set the selection cuts for each stream
01418   for (itr=fStreamSelectionMap.begin();itr!=fStreamSelectionMap.end();++itr) {
01419     std::map<std::string,bool>::const_iterator reqitr 
01420                            = fStreamRequiredMap.find(itr->first);
01421     bool isrequired = false;
01422     if ( reqitr != fStreamRequiredMap.end() ) isrequired = reqitr -> second;
01423     fDataStreamItr->Select((itr->first).c_str(),   // Stream
01424                            (itr->second).c_str(), // Selection
01425                            isrequired);  // IsRequired
01426   }
01427 
01428   // Set the test mode for each stream
01429   std::map<std::string,bool>::const_iterator testitr;
01430   for ( testitr=fStreamTestModeMap.begin(); testitr !=fStreamTestModeMap.end();
01431                                           ++testitr ) {
01432     fDataStreamItr->SetTestMode((testitr->first).c_str(),   // Stream
01433                                 testitr->second);  // TestMode
01434   }
01435 
01436   // Disable per owned for each stream requested
01437   std::map<std::string,bool>::const_iterator disableitr;
01438   for ( disableitr = fStreamPerOwnedDisabledMap.begin();
01439         disableitr != fStreamPerOwnedDisabledMap.end(); ++disableitr ) {
01440     fDataStreamItr -> SetPerOwnedDisabled((disableitr->first).c_str(),
01441                                           disableitr->second);
01442   }
01443   
01444 
01445   // Set the sequence mode for each stream
01446   bool setRandom = false;
01447   std::map<std::string,Per::ESequenceMode>::const_iterator seqitr;
01448   for ( seqitr=fStreamSeqModeMap.begin(); seqitr!=fStreamSeqModeMap.end();
01449                                         ++seqitr ) {
01450     fDataStreamItr->SetSequenceMode((seqitr->first).c_str(),   // Stream
01451                                      seqitr->second); // Sequence Mode
01452     pair<double,double> window = fStreamWindowMap[seqitr->first];
01453     fDataStreamItr->SetWindow((seqitr->first).c_str(),   // Stream
01454                               window.first,window.second);
01455     if (seqitr->second == Per::kSequential ||
01456         seqitr->second == Per::kRandom      ) {
01457       if (!setRandom) {
01458         setRandom = true;
01459         fDataStreamItr->SetRandomSeed(fRandomSeed);
01460       }
01461       int    repeat = fStreamMaxRepeatMap[seqitr->first];
01462       fDataStreamItr->SetMaxFileRepeat( (seqitr->first).c_str(), repeat );
01463       double mean = fStreamMeanMap[seqitr->first];
01464       fDataStreamItr->SetMeanMom( (seqitr->first).c_str(), mean );
01465       bool   pushRand = fStreamPushRandomMap[seqitr->first];
01466       fDataStreamItr->SetPushRandom( (seqitr->first).c_str(), pushRand );
01467     } // end if kSeq or kRand
01468   }
01469 }
01470 
01471 //......................................................................
01472 
01473 int IoInputModule::OpenStreamItr()
01474 {
01475 //======================================================================
01476 // Open a new stream iterator
01477 //======================================================================
01478   if (fDataStreamItr) this->CloseStreamItr();
01479 
01480   bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
01481   std::string src;
01482   if ( isDDS ) src = fServer;
01483   
01484   fDataStreamItr = IoDataStreamFactory::CreateDataStreamItr(src.c_str(),
01485                      fFormat.c_str(),fPort,fMaxRetry,fRetryDelay,
01486                      fClientType,fClientName);
01487 
01488 
01489   if (fDataStreamItr == 0) {
01490     MSG("Io",Msg::kWarning) << "Failed to open stream '" << src << "'" <<
01491       " using format '" << fFormat << "'" << endl; 
01492     fStatus.SetEndRun();
01493     fStatus.SetEndFile();
01494     fStatus.SetEndOfInputStream();
01495     return 0;
01496   }
01497 
01498   fStatus.SetBeginOfInputStream();
01499   fStatus.SetBeginFile();
01500   fStatus.SetBeginRun();
01501 
01502   // Configure the file and module
01503   this->UpdateStreamConfig(); // this should be called before filelist
01504   this->UpdateFileList();
01505   this->UpdateDDSConfig();
01506   fFormat = fDataStreamItr->GetFormat();
01507   
01508   MSG("Io",Msg::kDebug) 
01509     << "Opened stream itr of format " << fDataStreamItr->GetFormat() << endl;
01510 
01511   return 1;
01512 }
01513 
01514 //......................................................................
01515 
01516 void IoInputModule::CloseStreamItr() 
01517 {
01518 //======================================================================
01519 // Close the currently openned stream
01520 //======================================================================
01521   if (fDataStreamItr) {
01522     MSG("Io",Msg::kDebug) 
01523       << "Close stream itr of format " << fDataStreamItr->GetFormat() << endl;
01524     delete fDataStreamItr;
01525     fDataStreamItr = 0;
01526     fStatus.SetEndRun();
01527     fStatus.SetEndFile();
01528     fStatus.SetEndOfInputStream();
01529   }
01530 }
01531 
01533 
01534 
01535 
01536 
01537 
01538 
01539 
01540 
01541 
01542 
01543 
01544 
01545 
01546 

Generated on Sat Nov 21 22:46:22 2009 for loon by  doxygen 1.3.9.1