Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

RotoClientModule.cxx

Go to the documentation of this file.
00001 
00002 // $Id: RotoClientModule.cxx,v 1.17 2007/07/09 00:50:06 rhatcher Exp $
00003 //
00004 // A JobControl Module for sending raw blocks to the Roto-Rooter
00005 //
00006 // rhatcher@fnal.gov
00008 
00009 #include "Rotorooter/RotoClientModule.h"
00010 #include "Rotorooter/RotoClientBinaryFile.h"
00011 #include "Rotorooter/RotoClient.h"
00012 #include "Rotorooter/RotoObjectifier.h"
00013 
00014 #include "MinosObjectMap/MomNavigator.h"
00015 #include "MessageService/MsgService.h"
00016 #include "JobControl/JobCModuleRegistry.h"
00017 #include "JobControl/JobCommand.h"
00018 
00019 #include "RawData/RawRecord.h"
00020 #include "RawData/RawDaqHeader.h"
00021 #include "RawData/RawDataBlock.h"
00022 
00023 #include "RawData/RawDaqHeaderBlock.h"
00024 #include "RawData/RawRunStartBlock.h"
00025 #include "RawData/RawRunEndBlock.h"
00026 
00027 #include "TSystem.h"
00028 
00029 const Char_t *dfltHost = "localhost";
00030 const Int_t   dfltPort = 9011;
00031 
00032 const Bool_t  dfltPrimary     = true;
00033 const Int_t   dfltBufferWords = 6 * 1024 * 1024;  // 6*sizeof(int) Mbytes
00034 
00035 const UInt_t dbg_DumpHead         = 0x0001;
00036 const UInt_t dbg_DumpAddBlock     = 0x0002;
00037 const UInt_t dbg_PrintAddBlock    = 0x0004;
00038 const UInt_t dbg_PrintHexAddBlock = 0x0008;
00039 
00040 ClassImp(RotoClientModule)
00041 
00042 //......................................................................
00043 
00044 CVSID("$Id: RotoClientModule.cxx,v 1.17 2007/07/09 00:50:06 rhatcher Exp $");
00045 JOBMODULE(RotoClientModule, "RotoClientModule",
00046          "Sends RawRecord info to Roto-Rooter");
00047 
00048 //......................................................................
00049 
00050 RotoClientModule::RotoClientModule() 
00051    : fRotoClient(0), fBufferWords(dfltBufferWords), fBuffer(0), 
00052      fUseRototalk(true), fGenFakeRec(true), fDebugFlags(0)
00053 {
00054    // construct a new "RotoClientModule" JobControl module
00055 
00056    // MSG("Roto", Msg::kInfo) << "RotoClientModule::ctor" << endl;
00057 
00058    // everyone has a shot at being the primary
00059    fPrimary = dfltPrimary;
00060    
00061    // pick up server host from env variable (if possible)
00062    const char *ROTOSERVER = gSystem->Getenv("ROTOSERVER");
00063    if (!ROTOSERVER || strlen(ROTOSERVER) == 0) {
00064       ROTOSERVER = dfltHost;
00065    }
00066    SetHostPort(ROTOSERVER,0);
00067   
00068    MSG("Roto", Msg::kInfo) << "  Initially configured for host \""
00069                            << fHost.c_str() << "\"  Port " << fPort << endl;
00070 }
00071 
00072 //......................................................................
00073 
00074 RotoClientModule::~RotoClientModule() 
00075 {
00076    // destruct this "RotoClientModule"
00077 
00078    // MSG("Roto", Msg::kInfo) << "RotoClientModule::dtor" << endl;
00079 
00080    DestroyClient();
00081    DestroyBuffer();
00082 }
00083 
00084 //......................................................................
00085 
00086 JobCResult RotoClientModule::Put(const MomNavigator *mom)
00087 {
00088    // Turn a RawRecord into a flat Int_t buffer
00089    // pass it on to RotoClient to send to the Roto-rooter (or binary file)
00090    // MSG("Roto", Msg::kInfo) << "RotoClientModule::Put" << endl;
00091 
00092    JobCResult jobcStatus(JobCResult::kAOK); // All Ok
00093 
00094    int* iptr_data   = GetBuffer();  // get a buffer, create one if needed
00095    if (!iptr_data) return JobCResult::kError;
00096 
00097    RotoClient* client = GetClient();
00098    if (!client) return JobCResult::kError;
00099 
00100    Bool_t ok = true;
00101 
00102    // Squish the record(s) flat
00103    // Note there may be more than one RawRecord in MOM
00104    TObject *tobj = 0;
00105    TIter reciter = const_cast<MomNavigator*>(mom)->FragmentIter();
00106    while ( (tobj = reciter() ) ) {
00107       RawRecord *rawrec = dynamic_cast<RawRecord*>(tobj);
00108       if (!rawrec) continue;
00109 
00110       Char_t* iptr_bytes = (Char_t*) iptr_data;
00111       Int_t     maxbytes = fBufferWords * sizeof(Int_t)/sizeof(Char_t);
00112       Int_t  nbytes_used = 
00113          RotoObjectifier::BufferSquish(rawrec,iptr_bytes,maxbytes);
00114 
00115       VldContext vldc = rawrec->GetRawHeader()->GetVldContext(); // always okay
00116 
00117       const RawDaqHeader *rawdaqhead = 
00118          dynamic_cast<const RawDaqHeader *>(rawrec->GetRawHeader());
00119       Int_t run    = rawdaqhead->GetRun();
00120       Int_t subrun = rawdaqhead->GetSubRun();
00121 
00122       ReOpenOutputFile(vldc.GetDetector(),run,subrun,&vldc,fGenFakeRec);
00123    
00124       // send the buffer to the RotoRooter
00125       ok &= client->SendRecordBuffer(iptr_bytes,nbytes_used);
00126    }
00127 
00128    if (ok) return JobCResult::kPassed;  // All Ok
00129    return jobcStatus;
00130 }
00131 
00132 //......................................................................
00133 void RotoClientModule::BeginJob()
00134 {
00135    // 
00136    // MSG("Roto", Msg::kInfo) << "RotoClientModule::BeginJob" << endl;
00137 }
00138 //......................................................................
00139 void RotoClientModule::EndJob()
00140 {
00141    // 
00142    // MSG("Roto", Msg::kInfo) << "RotoClientModule::EndJob" << endl;
00143 
00144    // ensure that last file is closed
00145    ReOpenOutputFile(-1,-1,-1,0,fGenFakeRec);
00146 }
00147 //......................................................................
00148 void RotoClientModule::BeginFile()
00149 {
00150    // 
00151    // MSG("Roto", Msg::kInfo) << "RotoClientModule::BeginFile" << endl;
00152 }
00153 //......................................................................
00154 void RotoClientModule::EndFile()
00155 {
00156    // 
00157    // MSG("Roto", Msg::kInfo) << "RotoClientModule::EndFile" << endl;
00158 }
00159 //......................................................................
00160 void RotoClientModule::BeginRun()
00161 {
00162    // 
00163    // MSG("Roto", Msg::kInfo) << "RotoClientModule::BeginRun" << endl;
00164 }
00165 //......................................................................
00166 void RotoClientModule::EndRun()
00167 {
00168    // 
00169    // MSG("Roto", Msg::kInfo) << "RotoClientModule::EndRun" << endl;
00170 }
00171 //......................................................................
00172 void RotoClientModule::Report()
00173 {
00174    // Report (to message service) current configuration/status
00175    MSG("Roto", Msg::kInfo) << "RotoClientModule::Report" << endl;
00176    
00177    MSG("Roto", Msg::kInfo) << "RotoClientModule has:" << endl;
00178 
00179    // information about the client
00180    if (fRotoClient) {
00181       RotoClient* client = GetClient();
00182       if (client) {
00183          int maxbytes = 1024;
00184          char* buffer = new char[maxbytes];
00185          client->GetRotoStatus(buffer,maxbytes);
00186          if (fPort >= 0) {         
00187             MSG("Roto", Msg::kInfo) 
00188                << "   connected client to \"" 
00189                << fHost.c_str() 
00190                << "\" Port "
00191                << fPort 
00192                << endl
00193                << "   state_report: \"" << buffer << "\""
00194                << endl;            
00195          } else {
00196             MSG("Roto", Msg::kInfo) 
00197                << "   open binary file \"" 
00198                << fHost.c_str() 
00199                << "\" " 
00200                << endl
00201                << buffer
00202                << endl;
00203          }
00204          delete [] buffer;
00205       } else {
00206             MSG("Roto", Msg::kInfo) 
00207                << "   connected client to \"" 
00208                << fHost.c_str() 
00209                << "\" Port "
00210                << fPort 
00211                << endl;
00212       }            
00213    } else {
00214       MSG("Roto", Msg::kInfo) 
00215          << "   no currently connected client" 
00216          << endl
00217          << "      configured to use to \"" 
00218          << fHost.c_str() 
00219          << "\" Port "
00220          << fPort 
00221          << endl;
00222    }
00223 
00224    // information about the buffer
00225    if (fBuffer) {
00226       MSG("Roto", Msg::kInfo) 
00227          << "   buffer of " 
00228          << fBufferWords 
00229          << " words"
00230          << endl;
00231    } else {
00232       MSG("Roto", Msg::kInfo) 
00233          << "   no buffer"
00234          << endl
00235          << "      configured to use "
00236          << fBufferWords 
00237          << " words"
00238          << endl;
00239    }
00240   
00241    // other info
00242    MSG("Roto", Msg::kInfo) 
00243       << "   primary flag set to " << fPrimary
00244       << endl
00245       << "   DebugFlags set to 0x" << hex << setw(8) << fDebugFlags << dec
00246       << endl
00247       << endl;
00248 }
00249 
00250 //......................................................................
00251 void RotoClientModule::HandleCommand(JobCommand *command)
00252 {
00253    //
00254    // Process configuration commands
00255    //
00256 
00257    // MSG("Roto", Msg::kDebug) << "RotoClientModule::HandleCommand" << endl;
00258 
00259    TString cmd = command->PopCmd();
00260    if (cmd == "Set") {
00261       TString opt = command->PopOpt();
00262       if      (opt == "HostPort")  {
00263          string newHost  = command->PopOpt();
00264          Int_t  newPort  = command->PopIntOpt();
00265          if (newHost != fHost || newPort != fPort) {
00266             DestroyClient();  // close any existing client
00267             SetHostPort(newHost.c_str(),newPort);
00268          }
00269       }
00270       else if (opt == "BufferWords") {
00271          Int_t newSize = command->PopIntOpt();
00272          if (newSize != fBufferWords) {
00273             DestroyBuffer();
00274             fBufferWords = newSize;
00275          }
00276       }
00277       else if (opt == "Primary") {
00278          fPrimary = (command->PopIntOpt()) ? 1 : 0;
00279       }
00280       else if (opt == "GenFakeRec") {
00281          fGenFakeRec = (command->PopIntOpt()) ? 1 : 0;
00282       }
00283       else if (opt == "DebugFlags") {
00284          fDebugFlags = command->PopIntOpt();
00285       }
00286       else if (opt == "ClientDebugFlags") {
00287          Int_t flags = command->PopIntOpt();
00288          RotoClient::SetDebugFlags(flags);
00289       }
00290       else if (opt == "ObjectifierDebugFlags") {
00291          Int_t flags = command->PopIntOpt();
00292          RotoObjectifier::SetDebugFlags(flags);
00293       }
00294       else {
00295          MSG("Roto", Msg::kWarning)
00296             << "RotoClientModule: Unrecognized Set option " << opt << endl;
00297       }
00298    } else if (cmd == "Report") {
00299       Report();
00300    } else if (cmd == "CrashAndBurn") {
00301       MSG("Roto", Msg::kFatal)
00302             << "RotoClientModule: CrashAndBurn command " << endl;
00303       exit(0); // kill without core dumping
00304       // assert(0);
00305    } else {
00306       MSG("Roto", Msg::kWarning)
00307          << "RotoClientModule: Unrecognized command " << cmd << endl;
00308    }
00309    
00310 }
00311 
00312 //......................................................................
00313 void RotoClientModule::Config(const Registry& r)
00314 {
00315   // Configure the module using the registry r
00316 
00317   string newHost = fHost;
00318   int    newPort = fPort;
00319   int    newSize = fBufferWords;
00320   const char* tmpcs;
00321   int         tmpi;
00322 
00323   if (r.Get("Host:Port",tmpcs))  {
00324     char host[255];
00325     char* hostptr = host;
00326     // copy up to colon or end-of-string
00327     while (*tmpcs != ':' && *tmpcs != '\0') *hostptr++ = *tmpcs++;
00328     if (host != hostptr) {
00329       *hostptr = '\0';  // trailing end-of-string marker
00330       newHost = host;   // saw new host name (not just port), convert to string
00331     }
00332     if (*tmpcs == ':') {
00333       tmpcs++; // move beyond colon
00334       newPort = atoi(tmpcs);
00335     }
00336   }
00337 
00338   if (r.Get("BufferWords",tmpi)) newSize      = tmpi;
00339   if (r.Get("UseRototalk",tmpi)) fUseRototalk = tmpi;
00340   if (r.Get("Primary",tmpi))     fPrimary     = tmpi;
00341   if (r.Get("GenFakeRec",tmpi))  fGenFakeRec  = tmpi;
00342   if (r.Get("DebugFlags",tmpi))  fDebugFlags  = tmpi;
00343   if (r.Get("ClientDebugFlags",tmpi)) RotoClient::SetDebugFlags(tmpi);
00344 
00345   if ( newHost != fHost || newPort != fPort ) {
00346     DestroyClient();  // close any existing client
00347     SetHostPort(newHost.c_str(),newPort);
00348   }
00349 
00350   if (newSize != fBufferWords) {
00351     DestroyBuffer();
00352     fBufferWords = newSize;
00353   }
00354 
00355 }
00356 //......................................................................
00357 const Registry& RotoClientModule::DefaultConfig() const
00358 {
00359   // The default configuration for this module
00360   static Registry r;
00361 
00362   std::string name = this->GetName();
00363   name += ".config.default";
00364   r.SetName(name.c_str());
00365 
00366   r.UnLockValues();
00367 
00368   // server IP host name and port (or file name)
00369   // if port=0 use default, if <0 "host" is binary file name
00370   char dfltHostPort[255];
00371   sprintf(dfltHostPort,"%s:%d",dfltHost,dfltPort);
00372   r.Set("Host:Port",dfltHostPort);    
00373 
00374   r.Set("UseRototalk",true);   // use DAQ Rototalk vs. duplicate code
00375   r.Set("BufferWords",dfltBufferWords);  // size of client's buffer
00376   r.Set("Primary",true);      // client is data source (false not supported)
00377   r.Set("GenFakeRec",false);  // generate fake RunStart/RunEnd if missing
00378   //  r.Set("DebugFlags",0);     // various bits
00379   //  r.Set("ClientDebugFlags",0);     // various bits
00380   r.LockValues();
00381 
00382   return r;
00383 }
00384 
00385 //......................................................................
00386 void RotoClientModule::Help()
00387 {
00388    MSG("Roto", Msg::kInfo)
00389     << "Help for 'RotoClientModule':" << endl
00390     << "  RotoClientModule is a JobCModule for flattening a RawRecord" << endl
00391     << "  to look like one that would come from the DAQ and then to" << endl
00392     << "  write it out, either by sending the buffer to a Roto-rooter" << endl
00393     << "  or writing it to a binary file" << endl
00394     << endl
00395     << "Commands implemented:" << endl
00396     << endl
00397     << "  /RotoClientModule/Set HostPort <host> <port>" << endl
00398     << "     set the output host (string) and port (int)" << endl
00399     << "     if (port == -1) host is the name of the binary file" << endl
00400     << "     if (port ==  0) use default port #" << endl
00401     << endl
00402     << "  /RotoClientModule/Set BufferWords <nwords>" << endl
00403     << "     sets the buffer size (32 bit words) to use" << endl
00404     << endl
00405     << "  /RotoClientModule/Set Primary {0|1}" << endl
00406     << "     if not primary then only no writing is done" << endl
00407     << "     but Report can be requested from server" << endl
00408     << endl
00409     << "  /RotoClientModule/Set DebugFlags <bits>" << endl
00410     << "     sets the debug flags bits" << endl
00411     << endl;
00412 }
00413 
00414 //......................................................................
00415 void RotoClientModule::SetHostPort(const Char_t* host, Int_t port)
00416 {
00417    // Set Host and Port
00418    // if port == 0 use :<port> from *host, if none use dfltPort
00419 
00420    char* hostBuffer = new Char_t[strlen(host)+1];
00421    strcpy(hostBuffer,host);
00422 
00423    // pick off port if given
00424    char *colon = strchr(hostBuffer,':');
00425    int bport = 0;
00426    if (colon) {
00427       sscanf(colon,":%d",&bport);
00428       *colon = 0;  // truncate hostBuffer so there is no ':'
00429    }
00430    fHost = hostBuffer;
00431    if (fHost == "") fHost = dfltHost;
00432    fPort = port;
00433    if (!fPort) fPort = bport;
00434    if (!fPort) fPort = dfltPort;
00435 
00436    delete [] hostBuffer;
00437 
00438 }
00439 
00440 //......................................................................
00441 RotoClient* RotoClientModule::GetClient()
00442 {
00443    // return ptr to up-and-running client
00444    // create one if necessary
00445    // return 0 if unable to connect
00446 
00447    if (fRotoClient) return fRotoClient; // if a client is in place just move on
00448 
00449    if (fPort >= 0) {
00450       fRotoClient = new RotoClient(fHost.c_str(),fPort,
00451                                    MINOS_ROOTER_DCP,fUseRototalk);
00452    } else {
00453       fRotoClient = new RotoClientBinaryFile(fHost.c_str(),fPort);
00454    }
00455    if (fRotoClient->Connected()) return fRotoClient;
00456    // couldn't connect during construction
00457    delete fRotoClient;
00458    fRotoClient = 0;
00459 
00460    MSG("Roto", Msg::kWarning) 
00461       << "RotoClientModule::GetClient for host " << fHost.c_str()
00462       << " port " << fPort << " failed " << endl;
00463    return 0;
00464 
00465 }
00466 
00467 //......................................................................
00468 void RotoClientModule::DestroyClient()
00469 {
00470    // destroy any current client
00471    // client's dtor should do a clean disconnect
00472    // closing socket or file
00473    if (fRotoClient) { delete fRotoClient; fRotoClient=0; }
00474 }
00475 
00476 //......................................................................
00477 Int_t* RotoClientModule::GetBuffer()
00478 {
00479    // return ptr to buffer
00480    // create one if necessary
00481 
00482    if (fBuffer) return fBuffer; // if a buffer is in place just move on
00483 
00484    fBuffer = new Int_t[fBufferWords];
00485    if (fBuffer) return fBuffer;
00486 
00487    MSG("Roto", Msg::kWarning) 
00488       << "RotoClientModule::GetBuffer for size " << fBufferWords
00489       << " failed " << endl;
00490    return 0;
00491 
00492 }
00493 
00494 //......................................................................
00495 void RotoClientModule::DestroyBuffer()
00496 {
00497    // destroy any current buffer
00498 
00499    if (fBuffer) { delete [] fBuffer; fBuffer=0; }
00500 }
00501 
00502 //......................................................................
00503 void RotoClientModule::ReOpenOutputFile(Int_t detector, Int_t run, Int_t subrun,
00504                                         VldContext* pvldc, Bool_t fakeRunRec)
00505                                                          
00506 {
00507    // open an output file on the rotorooter if necessary
00508    // close the last one if there was one
00509    // this assumes a strictly sequential ordering
00510 
00511    static Int_t last_det = Detector::kUnknown;
00512    static Int_t last_run = -1;
00513    static Int_t last_subrun = -1;
00514    static VldContext last_vldc;
00515    static VldContext start_vldc;
00516 
00517    RotoClient* client = GetClient();
00518 
00519    if (!client) return;
00520 
00521    bool newrun =  (run != last_run || subrun != last_subrun);
00522 
00523    if (newrun) {
00524       // old file exists?
00525       if (last_run != -1 || last_subrun != -1) {
00526 
00527          if (fakeRunRec) {
00528             // pump over a fake RunEnd record
00529 
00530             // advance timestamp by 1 ns so it comes after last event
00531             VldTimeStamp end_timestamp(last_vldc.GetTimeStamp());
00532             end_timestamp.Add(VldTimeStamp((time_t)0,+1));
00533             VldContext end_vldc(last_vldc.GetDetector(),
00534                                 last_vldc.GetSimFlag(),
00535                                 end_timestamp);
00536 
00537             int runtype = 0;
00538             int timeframe = -1;
00539             RawDaqHeaderBlock* daqHdr =
00540                new RawDaqHeaderBlock(end_vldc,last_run,last_subrun,
00541                                      runtype,timeframe);
00542             RawDataBlock* runEnd = 
00543                new RawRunEndBlock(start_vldc,end_timestamp,
00544                                   last_run,last_subrun,runtype);
00545             SendFakeStartEndRecord(daqHdr,runEnd);
00546             delete daqHdr;
00547             delete runEnd;
00548          }
00549          // close the file
00550          client->CloseDAQFile(last_det,last_run,last_subrun);
00551       }
00552    }
00553 
00554    // update values 
00555    // do this no matter what so that vldc can timestamp
00556    // fake runend with the time of the last event
00557    last_det    = detector;
00558    last_run    = run;
00559    last_subrun = subrun;
00560    if (pvldc) last_vldc = *pvldc;
00561 
00562    if (newrun) {
00563       // new file sensible?
00564       if (last_run != -1 || last_subrun != -1) {
00565          // open file
00566          client->OpenDAQFile(last_det,last_run,last_subrun);
00567 
00568          if (fakeRunRec) {
00569             // pump over a fake RunStart record
00570             // backup timestamp by 1 ns so it comes before first event
00571             VldTimeStamp start_timestamp(last_vldc.GetTimeStamp());
00572             start_timestamp.Add(VldTimeStamp((time_t)0,-1));
00573             start_vldc = VldContext(last_vldc.GetDetector(),
00574                                     last_vldc.GetSimFlag(),
00575                                     start_timestamp);
00576 
00577             int runtype = 0;
00578             int timeframe = -1;
00579             RawDaqHeaderBlock* daqHdr =
00580                new RawDaqHeaderBlock(start_vldc,last_run,last_subrun,
00581                                      runtype,timeframe);
00582             RawDataBlock* runStart = 
00583                new RawRunStartBlock(start_vldc,last_run,last_subrun,runtype);
00584             SendFakeStartEndRecord(daqHdr,runStart);
00585             delete daqHdr;
00586             delete runStart;
00587          }
00588       }
00589    }
00590 
00591 }
00592 //......................................................................
00593 void RotoClientModule::SendFakeStartEndRecord(RawDaqHeaderBlock* headerblk,
00594                                               RawDataBlock* startend)
00595                                                          
00596 {
00597    // send these two blocks to the client
00598    // WITHOUT overwriting what's in the main buffer
00599 
00600    RotoClient* client = GetClient();
00601 
00602    if (!client) return;
00603 
00604    int nwords = headerblk->GetSize() + startend->GetSize();
00605    int nbytes = sizeof(Int_t) * nwords;
00606 
00607    // allocate a temporary buffer
00608    Int_t* buffer = new Int_t [nwords];
00609    Int_t* iptr   = buffer;
00610 
00611    // fill the buffer
00612    iptr = headerblk->AppendToBuffer(iptr);
00613    iptr = startend->AppendToBuffer(iptr);
00614 
00615    // send this "record"
00616    client->SendRecordBuffer(buffer,nbytes);
00617             
00618 
00619    // delete temporary buffer
00620    delete [] buffer;
00621 }
00622 //......................................................................

Generated on Mon Nov 23 05:28:20 2009 for loon by  doxygen 1.3.9.1