ReadDispatcherModule Class Reference

#include <ReadDispatcherModule.h>

Inheritance diagram for ReadDispatcherModule:
JobCModule

List of all members.

Public Member Functions

 ReadDispatcherModule ()
 ~ReadDispatcherModule ()
JobCResult Get ()
JobCResult Get (MomNavigator *mom)
const RegistryDefaultConfig () const
void Config (const Registry &r)
Bool_t IsNewEventReady ()

Private Member Functions

Int_t ConnectToServer (void)

Private Attributes

Bool_t fConnected
DDSClientfClient
MomNavigatorfSurrogateMom
std::string fDDSServer
Int_t fDDSPort
std::string fStreams
std::string fDDSKeepUpMode
std::string fDDSDataSource
Int_t fDDSTimeOut
std::string fSelectRule

Detailed Description

Definition at line 17 of file ReadDispatcherModule.h.


Constructor & Destructor Documentation

ReadDispatcherModule::ReadDispatcherModule (  ) 

Definition at line 42 of file ReadDispatcherModule.cxx.

References fClient, fConnected, and fSurrogateMom.

00043 {
00044   fClient = 0;
00045   fConnected = false;
00046   fSurrogateMom = 0;
00047 }

ReadDispatcherModule::~ReadDispatcherModule (  ) 

Definition at line 52 of file ReadDispatcherModule.cxx.

References fClient.

00053 {
00054   if(fClient) delete fClient;
00055 }


Member Function Documentation

void ReadDispatcherModule::Config ( const Registry r  )  [virtual]

Return the actual configuration. If your module directly pulls its configuration from the fConfig Registry, you don't need to override this. Override if you have local config variables.

Reimplemented from JobCModule.

Definition at line 244 of file ReadDispatcherModule.cxx.

References fDDSDataSource, fDDSKeepUpMode, fDDSPort, fDDSServer, fDDSTimeOut, fSelectRule, fStreams, and Registry::Get().

00245 {
00246 //======================================================================
00247 // Configure the module given the Registry r
00248 //======================================================================
00249   int    tmpi;
00250   const char* tmps;
00251 
00252   if (r.Get("DDSServer",tmps))       { fDDSServer = tmps; }
00253   if (r.Get("DDSPort",tmpi))         { fDDSPort   = tmpi; }
00254   if (r.Get("Streams",tmps))         { fStreams = tmps; }
00255   if (r.Get("DDSKeepUpMode",tmps))   { fDDSKeepUpMode = tmps; }
00256   if (r.Get("DDSDataSource",tmps))   { fDDSDataSource = tmps; }
00257   if (r.Get("DDSTimeOut",tmpi))      { fDDSTimeOut = tmpi; }
00258   if (r.Get("SelectRule",tmps))      { fSelectRule = tmps; }
00259 }

Int_t ReadDispatcherModule::ConnectToServer ( void   )  [private]

Definition at line 58 of file ReadDispatcherModule.cxx.

References bfld::AsString(), fClient, fConnected, fDDSDataSource, fDDSKeepUpMode, fDDSPort, fDDSServer, fSelectRule, fStreams, DDS::GetDataSourceCode(), DDS::GetKeepUpCode(), DDSClient::GetSubscription(), Msg::kInfo, DDS::kOk, MSG, DDSSubscription::SetDataSource(), DDSSubscription::SetStreams(), and DDSClient::Subscribe().

Referenced by IsNewEventReady().

00059 {
00060   // Connect.
00061 
00062   if(fClient) delete fClient;
00063   fConnected = false;
00064 
00065   // Create a new DDSClient object connected to ther server running on
00066   // node "serverhostname" and port 9090.
00067   fClient = new DDSClient(fDDSServer.c_str(),fDDSPort);
00068   
00069   // Check validity of connected socket before using it
00070   if (! fClient -> IsValid() ) {
00071     MSG("ReadDisp",Msg::kInfo) << "Error in creation of socket connected to server." << endl;
00072     delete fClient; fClient = 0;
00073     return 1;  // end of session
00074   }
00075   else {
00076     MSG("ReadDisp",Msg::kInfo) << "Successfully connected to dispatcher server.\n" 
00077        << fClient << endl;
00078   }
00079     
00080   // Subscribe to data source of interest (kDaq or kDcs, default is kDaq).
00081   fClient->GetSubscription()
00082     ->SetDataSource((DDS::EDataSource)DDS::GetDataSourceCode(fDDSDataSource.c_str()));
00083   
00084   // Subscribe to streams of interest
00085   fClient->GetSubscription()
00086     ->SetStreams(fStreams.c_str());
00087   // Keep up mode.
00088   fClient->GetSubscription()->
00089     SetKeepUpMode((DDS::EKeepUpMode)DDS::GetKeepUpCode(fDDSKeepUpMode.c_str()));
00090   
00091   // Selection, assumes DaqSnarl
00092   fClient->GetSubscription()->
00093     SetSelection(fStreams.c_str(),fSelectRule.c_str());
00094   
00095   // Submit the subscription to the ddschildserver and check to make sure that
00096   // it was received okay
00097   DDS::EMessageType msgrc = fClient->Subscribe();
00098   if (msgrc != DDS::kOk) {
00099     MSG("ReadDisp",Msg::kInfo) << "An error message " << DDS::AsString(msgrc) 
00100        << " was received from DDS::Subscribe." << endl;    
00101     return 1;
00102   }
00103 
00104   fConnected = true;
00105   return 0;
00106 }

const Registry & ReadDispatcherModule::DefaultConfig ( void   )  const [virtual]

Get the default configuration registry. This should normally be overridden. One useful idiom is to implement it like:

const Registry& MyModule::DefaultConfig() const { static Registry cfg; // never is destroyed if (cfg.Size()) return cfg; // already filled it // set defaults: cfg.Set("TheAnswer",42); cfg.Set("Units","unknown"); return cfg; }

Reimplemented from JobCModule.

Definition at line 216 of file ReadDispatcherModule.cxx.

References JobCModule::GetName(), Registry::LockValues(), Registry::Set(), and Registry::UnLockValues().

00217 {
00218 //======================================================================
00219 // Supply the default configuration for the module
00220 //======================================================================
00221   static Registry r; // Default configuration for module
00222 
00223   // Set name of config
00224   std::string name = this->GetName();
00225   name += ".config.default";
00226   r.SetName(name.c_str());
00227 
00228   // Set values in configuration
00229   r.UnLockValues();
00230   r.Set("DDSServer",      "localhost");
00231   r.Set("DDSPort",         9090);
00232   r.Set("Streams",        "DaqSnarl");
00233   r.Set("DDSKeepUpMode",  "RecordKeepUp");
00234   r.Set("DDSDataSource",  "Daq");
00235   r.Set("DDSTimeOut",      1);
00236   r.Set("SelectRule",     "true");
00237   r.LockValues();
00238 
00239   return r;
00240 }

JobCResult ReadDispatcherModule::Get ( MomNavigator mom  )  [virtual]

Implement if your module needs to read data from some external source and fill mom

Reimplemented from JobCModule.

Definition at line 181 of file ReadDispatcherModule.cxx.

References MomNavigator::Clear(), fSurrogateMom, IsNewEventReady(), JobCResult::kFailed, and JobCResult::kPassed.

00182 {
00183   //
00184   // Get next event.
00185   //
00186   // Queries dispatcher once before failing.
00187   // Uses a previously cached-but-unused event if available.
00188   //
00189   if(! fSurrogateMom) {
00190     // Dont have one ready.. go look again.
00191     IsNewEventReady();
00192   }
00193 
00194   if(! fSurrogateMom) return JobCResult::kFailed; // Give up.
00195 
00196 
00197   // Copy the surrogate.
00198   // This bit of code ripped out of DDSClient mercilessly.
00199   mom->Clear();
00200   TObjArray* objArray =const_cast<TObjArray*>
00201     (fSurrogateMom -> GetFragmentArray());
00202   Int_t nrecord = objArray -> GetEntries();
00203   for ( Int_t irec=0; irec < nrecord; irec++ ) {
00204     //record is removed fr  om fSurrogateMom to pass to usr's mom for ownership
00205     RecMinos* record = dynamic_cast<RecMinos*>(objArray->RemoveAt(irec));
00206     mom -> AdoptFragment(record); // new owner of record
00207   }
00208   delete fSurrogateMom; fSurrogateMom=0; // clears array and deletes temptags
00209 
00210 
00211   return JobCResult::kPassed;
00212 }

JobCResult ReadDispatcherModule::Get ( void   ) 

Definition at line 169 of file ReadDispatcherModule.cxx.

References JobCResult::kAOK.

00170 {
00171   //
00172   // Get next event, default fMom
00173   //
00174   
00175   // If this were to be a JobCInputModule...
00176   //return Get(GetMom());
00177   return JobCResult::kAOK;
00178 }

Bool_t ReadDispatcherModule::IsNewEventReady ( void   ) 

Definition at line 108 of file ReadDispatcherModule.cxx.

References bfld::AsString(), ConnectToServer(), DataUtil::dump_mom(), fClient, fConnected, fDDSTimeOut, fSurrogateMom, MomNavigator::GetFragmentArray(), MsgService::GetStream(), MsgService::Instance(), MsgStream::IsActive(), Msg::kDebug, Msg::kInfo, DDS::kOk, DDS::kTimeoutNewRecord, MSG, DDSClient::Next(), and DDSClient::Shutdown().

Referenced by DoNext(), and Get().

00109 {
00110   // Query the server and get a new mom if available.
00111   // Hold this mom until the next read cycle.
00112   // If we are already holding a mom, throw it out
00113   // in favour of a new one.
00114 
00115   if(!fConnected) ConnectToServer();
00116 
00117   if(!fConnected) return false;
00118 
00119   DDS::EMessageType msgrc = DDS::kOk;
00120 
00121   MomNavigator* newMom = 0;
00122   msgrc = fClient->Next(newMom,fDDSTimeOut);
00123   if (msgrc == DDS::kOk ) {
00124     //cout << "Got Next reply." << endl;
00125 
00126     if( newMom ) {
00127       // Got something!
00128       //cout << "Got a mom." << endl;
00129 
00130       if(newMom->GetFragmentArray()->IsEmpty()){
00131         cout << "Empty mom." << endl;
00132         delete newMom;
00133       } else {
00134         //cout << "Good mom." << endl;
00135         // Got a real, filled mom.
00136         // Do we already have something in the queue?
00137         // If so, get rid of it.
00138         if( fSurrogateMom ) delete fSurrogateMom;
00139         fSurrogateMom = newMom;
00140         
00141         if(MsgService::Instance()->GetStream("TriD")->IsActive(Msg::kDebug))
00142           DataUtil::dump_mom(newMom,std::cout);
00143       
00144         return true;
00145       }
00146     }
00147     
00148   } else {
00149     if(msgrc != DDS::kTimeoutNewRecord) {
00150       // Failed to get something from server.
00151       MSG("ReadDisp",Msg::kInfo) << "Dispatcher Error " 
00152          << DDS::AsString(msgrc)
00153          << "  Will try reconnect." 
00154          << endl;
00155       
00156       fClient->Shutdown();
00157       delete fClient;
00158       fClient = 0;
00159       fConnected = false;
00160     }
00161   }
00162  
00163   if(fSurrogateMom) return true;
00164   return false;
00165 }


Member Data Documentation

Definition at line 38 of file ReadDispatcherModule.h.

Referenced by ConnectToServer(), IsNewEventReady(), and ReadDispatcherModule().

std::string ReadDispatcherModule::fDDSDataSource [private]

Definition at line 49 of file ReadDispatcherModule.h.

Referenced by Config(), and ConnectToServer().

std::string ReadDispatcherModule::fDDSKeepUpMode [private]

Definition at line 48 of file ReadDispatcherModule.h.

Referenced by Config(), and ConnectToServer().

Definition at line 46 of file ReadDispatcherModule.h.

Referenced by Config(), and ConnectToServer().

std::string ReadDispatcherModule::fDDSServer [private]

Definition at line 45 of file ReadDispatcherModule.h.

Referenced by Config(), and ConnectToServer().

Definition at line 50 of file ReadDispatcherModule.h.

Referenced by Config(), and IsNewEventReady().

std::string ReadDispatcherModule::fSelectRule [private]

Definition at line 51 of file ReadDispatcherModule.h.

Referenced by Config(), and ConnectToServer().

std::string ReadDispatcherModule::fStreams [private]

Definition at line 47 of file ReadDispatcherModule.h.

Referenced by Config(), and ConnectToServer().

Definition at line 41 of file ReadDispatcherModule.h.

Referenced by Get(), IsNewEventReady(), and ReadDispatcherModule().


The documentation for this class was generated from the following files:

Generated on 22 Nov 2017 for loon by  doxygen 1.6.1