IoDDSStreamItr Class Reference

#include <IoDDSStreamItr.h>

Inheritance diagram for IoDDSStreamItr:
IoDataStreamItr

List of all members.

Public Member Functions

 IoDDSStreamItr (const char *server, unsigned int port=DDS::kPort, unsigned int nretry=0, unsigned int retrydelay=1, DDS::EClientType=DDS::kUnknownClientType, string clientname="")
 ~IoDDSStreamItr ()
bool IsValid () const
const char * GetFormat () const
int LoadRecords (MomNavigator *m)
int Increment (int n=1, MomNavigator *m=0)
int Decrement (int n=1, MomNavigator *m=0)
JobCResult GoTo (const VldContext &vld, MomNavigator *m=0)
int GoToEOF ()
int Streams (const char *streamlist)
int Select (const char *stream, const char *selection, bool isRequired=false)
void SetTimeOut (unsigned int seconds)
void SetClientType (DDS::EClientType clienttype)
void SetClientName (string clientname)
void AddFile (const char *filename, int at=-1, const char *streamlist="*")
const char * GetCurrentFile (const char *streamlist="*") const
JobCResult GoToFile (const char *filename, const char *streamlist="*")
JobCResult GoToFile (int i, const char *streamlist="*")
JobCResult NextFile (int n=1, const char *streamlist="*")
JobCResult PrevFile (int n=1, const char *streamlist="*")
void RemoveFile (const char *filename="*", const char *streamlist="*")
std::ostream & ListFile (std::ostream &ostream, const char *streamlist="*") const
int SetMaxSyncDelay (unsigned int seconds)
int SetDataSource (unsigned int datasource)
int SetKeepUpMode (unsigned int keepupmode)
int SetOffLine (bool offLine=true)
int Subscribe ()
int GetPort () const
DDS::EClientType GetClientType () const
string GetClientName () const
bool IsModified () const

Private Member Functions

void InitDDSClient ()
void ShutdownDDSClient ()

Private Attributes

unsigned int fPort
unsigned int fTimeOut
unsigned int fMaxRetry
unsigned int fRetryDelay
DDSClientfDDSClient
bool fModified
DDS::EClientType fClientType
string fClientName

Detailed Description

Definition at line 22 of file IoDDSStreamItr.h.


Constructor & Destructor Documentation

IoDDSStreamItr::IoDDSStreamItr ( const char *  server,
unsigned int  port = DDS::kPort,
unsigned int  nretry = 0,
unsigned int  retrydelay = 1,
DDS::EClientType  clienttype = DDS::kUnknownClientType,
string  clientname = "" 
)

Definition at line 141 of file IoDDSStreamItr.cxx.

References InitDDSClient(), SetClientName(), SetClientType(), and IoDataStreamItr::SetSourceName().

00144                                                   :
00145   fPort(port),fTimeOut(120),fMaxRetry(nretry),fRetryDelay(delay),
00146   fDDSClient(0),fModified(true),fClientType(clienttype),
00147   fClientName(clientname) 
00148 {
00149   this->SetSourceName(server);
00150   this->SetClientType(clienttype);
00151   this->SetClientName(clientname);
00152   this->InitDDSClient();
00153 }

IoDDSStreamItr::~IoDDSStreamItr (  ) 

Definition at line 157 of file IoDDSStreamItr.cxx.

References ShutdownDDSClient().

00158 {
00159   this->ShutdownDDSClient();
00160 }


Member Function Documentation

void IoDDSStreamItr::AddFile ( const char *  filename,
int  at = -1,
const char *  streamlist = "*" 
) [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 96 of file IoDDSStreamItr.cxx.

00097                                                           {
00098   return;
00099 }

int IoDDSStreamItr::Decrement ( int  n = 1,
MomNavigator m = 0 
) [virtual]

Implements IoDataStreamItr.

Definition at line 287 of file IoDDSStreamItr.cxx.

References Msg::kWarning, and MSG.

00288 {
00289   MSG("Io",Msg::kWarning) << 
00290     "DDS Stream does not support Decrement(" << n << ").\n";
00291   return 0;
00292 }

string IoDDSStreamItr::GetClientName (  )  const [inline]

Definition at line 67 of file IoDDSStreamItr.h.

References fClientName.

Referenced by IoInputModule::UpdateDDSConfig().

00067 { return fClientName; }

DDS::EClientType IoDDSStreamItr::GetClientType (  )  const [inline]

Definition at line 66 of file IoDDSStreamItr.h.

References fClientType.

Referenced by IoInputModule::UpdateDDSConfig().

00066 { return fClientType; }

const char * IoDDSStreamItr::GetCurrentFile ( const char *  streamlist = "*"  )  const [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 117 of file IoDDSStreamItr.cxx.

References Msg::kWarning, and MSG.

00117                                                                             {
00118   MSG("Io",Msg::kWarning) << " DDS Stream does not support GetCurrentFile" 
00119                           << endl;
00120   return "";
00121 }

const char* IoDDSStreamItr::GetFormat (  )  const [inline, virtual]

Implements IoDataStreamItr.

Definition at line 33 of file IoDDSStreamItr.h.

00033 { return "dds"; }

int IoDDSStreamItr::GetPort (  )  const [inline]

Definition at line 65 of file IoDDSStreamItr.h.

References fPort.

Referenced by IoInputModule::UpdateDDSConfig().

00065 { return fPort; }

JobCResult IoDDSStreamItr::GoTo ( const VldContext vld,
MomNavigator m = 0 
) [virtual]

Implements IoDataStreamItr.

Definition at line 296 of file IoDDSStreamItr.cxx.

References JobCResult::kWarning, Msg::kWarning, and MSG.

00297 {
00298   MSG("Io",Msg::kWarning) << 
00299     "DDS Stream does not support GoTo(" << vld << ").\n";
00300   return JobCResult::kWarning;
00301 }

int IoDDSStreamItr::GoToEOF (  )  [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 305 of file IoDDSStreamItr.cxx.

References Msg::kWarning, and MSG.

00306 {
00307   MSG("Io",Msg::kWarning) << 
00308     "DDS Stream does not support GoToEOF().\n";
00309   return 0;
00310 }

JobCResult IoDDSStreamItr::GoToFile ( int  i,
const char *  streamlist = "*" 
) [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 125 of file IoDDSStreamItr.cxx.

References JobCResult::kWarning, Msg::kWarning, and MSG.

00126                                                           {
00127   MSG("Io",Msg::kWarning) << " DDS Stream does not support GoToFile n" << endl;
00128   return JobCResult::kWarning;
00129 }

JobCResult IoDDSStreamItr::GoToFile ( const char *  filename,
const char *  streamlist = "*" 
) [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 24 of file IoDDSStreamItr.cxx.

References bfld::AsString(), fDDSClient, DDSClient::GoToFile(), IsModified(), IsValid(), JobCResult::kAOK, JobCResult::kEndOfInputStream, DDS::kOk, JobCResult::kWarning, Msg::kWarning, MSG, and Subscribe().

00025                                                                   {
00026 //======================================================================
00027 // Go to file filename. filename may be a symbolic link, e.g. "currentfile"
00028 // will advance to the target of the symbolic link "currentfile" in the
00029 // data source directory.  If filename is not absolute fullfilepathname,
00030 // its path is assumed relative to the data source directory defined on
00031 // the server.  If filename is missing in this directory, will position
00032 // pointer to the next file in the alphanumerically sorted list in the
00033 // data source directory.  Returns JobCResult::kAOK or JobCResult::kWarning
00034 // if error return from DDS, or JobCResult::kEndOfInputStream if not
00035 // connected.
00036 
00037   if ( !IsValid() ) return JobCResult::kEndOfInputStream; 
00038 
00039   if ( this -> IsModified() ) { // modified since last subscription
00040     bool isOk = this -> Subscribe(); 
00041     if ( !isOk ) {
00042       MSG("Io",Msg::kWarning)<<" Subscription submit to server failed."<< endl;
00043       return JobCResult::kWarning;
00044     }
00045   }
00046 
00047   std::string returnedfilename;
00048   DDS::EMessageType msgrc = fDDSClient->GoToFile(filename,returnedfilename);
00049   if (msgrc != DDS::kOk) { 
00050     MSG("Io",Msg::kWarning) << 
00051       "An error message " << DDS::AsString(msgrc) << 
00052       " was received from DDSClient::GoToFile " << filename << "." << endl;
00053     return JobCResult::kWarning;
00054   }
00055 
00056   return JobCResult::kAOK;
00057 
00058 }

int IoDDSStreamItr::Increment ( int  n = 1,
MomNavigator m = 0 
) [virtual]

Implements IoDataStreamItr.

Definition at line 251 of file IoDDSStreamItr.cxx.

References bfld::AsString(), fDDSClient, fTimeOut, IsModified(), DDS::kOk, Msg::kWarning, MSG, DDSClient::Next(), and Subscribe().

00252 {
00253 //======================================================================
00254 // Advance the position in the stream n spaces.
00255 //=====================================================================
00256   if (n==0) return 0;
00257 
00258   if ( this -> IsModified() ) { // modified since last subscription
00259     bool isOk = this -> Subscribe(); 
00260     if ( !isOk ) return 0;
00261   }
00262   
00263   // Advance by n-1 since Get() is implemented by Next()...
00264   if (n==1) return 1;
00265 
00266   // For advances beyond just one...
00267   MomNavigator* mtmp = 0;
00268   bool dodel = false;
00269   if (m==0) { mtmp = new MomNavigator; dodel = true; }
00270   else      { mtmp = m; }
00271 
00272   DDS::EMessageType rtn = fDDSClient->Next(mtmp, fTimeOut, n-1);
00273 
00274   if (dodel) { delete mtmp; mtmp = 0; }
00275 
00276   if (rtn != DDS::kOk) {
00277     MSG("Io",Msg::kWarning) << "DDSClient::Next returned error message: "
00278                             << DDS::AsString(rtn) << "." << endl;
00279     return 0;
00280   }
00281 
00282   return n;
00283 }

void IoDDSStreamItr::InitDDSClient (  )  [private]

Definition at line 164 of file IoDDSStreamItr.cxx.

References fClientName, fClientType, fDDSClient, fMaxRetry, fModified, fPort, fRetryDelay, IoDataStreamItr::GetSourceName(), gSystem(), DDSClient::IsValid(), DDS::kData, Msg::kError, MSG, and ShutdownDDSClient().

Referenced by IoDDSStreamItr().

00165 {
00166 //======================================================================
00167 // Set up the data dispatcher client
00168 //======================================================================
00169 
00170   unsigned int attempt = fMaxRetry+1;
00171   fDDSClient = 0;
00172   while (!fDDSClient && attempt) {
00173     fDDSClient = new DDSClient(this->GetSourceName(),fPort,DDS::kData,
00174                                fClientType,fClientName);
00175     --attempt;
00176 
00177     if (fDDSClient==0 || !fDDSClient->IsValid()) {
00178       // Warning here...
00179       MSG("Io",Msg::kError)
00180         << "Unable to create DDSClient to host " << this->GetSourceName()
00181         << " on port " << fPort << "," << endl
00182         << "   will attempt " << attempt << " more times." << endl; 
00183       this->ShutdownDDSClient();
00184       gSystem->Sleep(1000*fRetryDelay);
00185     }
00186   }
00187   fModified = true;
00188   
00189 }

bool IoDDSStreamItr::IsModified (  )  const [inline]

Definition at line 68 of file IoDDSStreamItr.h.

References fModified.

Referenced by GoToFile(), Increment(), LoadRecords(), and NextFile().

00068 { return fModified; }

bool IoDDSStreamItr::IsValid (  )  const [inline, virtual]

Implements IoDataStreamItr.

Definition at line 32 of file IoDDSStreamItr.h.

References fDDSClient.

Referenced by GoToFile(), NextFile(), Select(), SetDataSource(), SetKeepUpMode(), SetMaxSyncDelay(), SetOffLine(), Streams(), and Subscribe().

00032 { return ( fDDSClient ) ? true : false; }

std::ostream & IoDDSStreamItr::ListFile ( std::ostream &  ostream,
const char *  streamlist = "*" 
) const [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 133 of file IoDDSStreamItr.cxx.

00135 {
00136   return os;
00137 }

int IoDDSStreamItr::LoadRecords ( MomNavigator m  )  [virtual]

Implements IoDataStreamItr.

Definition at line 227 of file IoDDSStreamItr.cxx.

References bfld::AsString(), fDDSClient, fTimeOut, IsModified(), DDS::kOk, Msg::kWarning, MSG, DDSClient::Next(), and Subscribe().

00228 {
00229 //======================================================================
00230 // Load the record at the current position in the stream
00231 //======================================================================
00232 
00233   if ( this -> IsModified() ) { // modified since last subscription
00234     bool isOk = this -> Subscribe(); 
00235     if ( !isOk ) return 0;
00236   }
00237 
00238   // DDSClient only has a Next
00239   DDS::EMessageType rtn = fDDSClient->Next(mom, fTimeOut, 1);
00240   if (rtn != DDS::kOk) {
00241     MSG("Io",Msg::kWarning) << "DDSClient::Next returned error message: "
00242                             << DDS::AsString(rtn) << "." << endl;
00243     return 0;
00244   }
00245   
00246   return 1;
00247 }

JobCResult IoDDSStreamItr::NextFile ( int  n = 1,
const char *  streamlist = "*" 
) [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 62 of file IoDDSStreamItr.cxx.

References bfld::AsString(), fDDSClient, DDSClient::GoToNextFile(), IsModified(), IsValid(), JobCResult::kAOK, JobCResult::kEndOfInputStream, DDS::kOk, JobCResult::kWarning, Msg::kWarning, MSG, and Subscribe().

00062                                                                        {
00063 //======================================================================
00064 // Advance by n files in data source directory. Returns JobCResult::kAOK
00065 // or JobCResult::kWarning if error returned from DDS, or
00066 // JobCResult::kEndOfInputStream if not connected.
00067 //======================================================================
00068 
00069   if ( !IsValid() ) return JobCResult::kEndOfInputStream;
00070 
00071   if ( this -> IsModified() ) { // modified since last subscription
00072     bool isOk = this -> Subscribe(); 
00073     if ( !isOk ) {
00074       MSG("Io",Msg::kWarning)<<" Subscription submit to server failed."<< endl;
00075       return JobCResult::kWarning;
00076     }
00077   }
00078 
00079   for ( int i = 0; i < n; i++ ) {
00080     std::string returnedfilename;
00081     DDS::EMessageType msgrc = fDDSClient->GoToNextFile(returnedfilename);
00082     if (msgrc != DDS::kOk) {
00083       MSG("Io",Msg::kWarning) << 
00084         "An error message " << DDS::AsString(msgrc) << 
00085         " was received from DDSClient::GoToNextFile." << endl;
00086       return JobCResult::kWarning;
00087     }
00088   }
00089 
00090   return JobCResult::kAOK;
00091 
00092 }

JobCResult IoDDSStreamItr::PrevFile ( int  n = 1,
const char *  streamlist = "*" 
) [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 110 of file IoDDSStreamItr.cxx.

References JobCResult::kWarning, Msg::kWarning, and MSG.

00110                                                                             {
00111   MSG("Io",Msg::kWarning) << " DDS Stream does not support PrevFile" << endl;
00112   return JobCResult::kWarning;
00113 }

void IoDDSStreamItr::RemoveFile ( const char *  filename = "*",
const char *  streamlist = "*" 
) [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 103 of file IoDDSStreamItr.cxx.

00104                                                              {
00105   return;
00106 }

int IoDDSStreamItr::Select ( const char *  stream,
const char *  selection,
bool  isRequired = false 
) [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 415 of file IoDDSStreamItr.cxx.

References fDDSClient, fModified, and IsValid().

00417 { 
00418   // Subscription is modified with the new selection cut but is not sent
00419   // until the IoDDSStreamItr::Subscribe method is invoked.
00420   int isOpen = 0;
00421   if ( this->IsValid() ) {
00422     fDDSClient -> GetSubscription() -> SetSelection(stream,selection);
00423     isOpen = 1;
00424   }
00425   fModified = true;
00426 
00427   return isOpen;
00428 
00429 }

void IoDDSStreamItr::SetClientName ( string  clientname  ) 

Definition at line 394 of file IoDDSStreamItr.cxx.

References fClientName.

Referenced by IoDDSStreamItr().

00395                                                 { fClientName = clientname; }

void IoDDSStreamItr::SetClientType ( DDS::EClientType  clienttype  ) 

Definition at line 389 of file IoDDSStreamItr.cxx.

References fClientType.

Referenced by IoDDSStreamItr().

00390                                                 { fClientType = clienttype; }

int IoDDSStreamItr::SetDataSource ( unsigned int  datasource  ) 

Definition at line 349 of file IoDDSStreamItr.cxx.

References fDDSClient, fModified, and IsValid().

00350 { 
00351   // Subscription is modified with the new data source but is not sent
00352   // until the IoDDSStreamItr::Subscribe method is invoked.
00353   int isOpen = 0;
00354   if ( this -> IsValid() ) {
00355     DDS::EDataSource sourceEnum = static_cast<DDS::EDataSource>(dataSource);
00356     fDDSClient -> GetSubscription() -> SetDataSource(sourceEnum);
00357     isOpen = 1;
00358   }
00359 
00360   fModified = true;
00361   return isOpen;
00362 
00363 }

int IoDDSStreamItr::SetKeepUpMode ( unsigned int  keepupmode  ) 

Definition at line 367 of file IoDDSStreamItr.cxx.

References fDDSClient, fModified, and IsValid().

00368 { 
00369   // Subscription is modified with the new keep up mode but is not sent
00370   // until the IoDDSStreamItr::Subscribe method is invoked.
00371   int isOpen = 0;
00372   if ( this -> IsValid() ) {
00373     DDS::EKeepUpMode keepUpEnum = static_cast<DDS::EKeepUpMode>(keepUpMode);
00374     fDDSClient -> GetSubscription() -> SetKeepUpMode(keepUpEnum);
00375     isOpen = 1;
00376   }
00377 
00378   fModified = true;
00379   return isOpen;
00380 
00381 }

int IoDDSStreamItr::SetMaxSyncDelay ( unsigned int  seconds  ) 

Definition at line 314 of file IoDDSStreamItr.cxx.

References fDDSClient, fModified, and IsValid().

00315 { 
00316   // Subscription is modified with the new max sync delay but is not sent
00317   // until the IoDDSStreamItr::Subscribe method is invoked.
00318   int isOpen = 0;
00319   if ( this -> IsValid() ) {
00320     fDDSClient -> GetSubscription() -> SetMaxSyncDelay(maxSyncDelay);
00321     isOpen = 1;
00322   }
00323 
00324   fModified = true;
00325   return isOpen;
00326 
00327 }

int IoDDSStreamItr::SetOffLine ( bool  offLine = true  ) 

Definition at line 332 of file IoDDSStreamItr.cxx.

References fDDSClient, fModified, and IsValid().

00333 { 
00334   // Subscription is modified with the new offline status but is not sent
00335   // until the IoDDSStreamItr::Subscribe method is invoked.
00336   int isOpen = 0;
00337   if ( this -> IsValid() ) {
00338     fDDSClient -> GetSubscription() -> SetOffLine(offLine);
00339     isOpen = 1;
00340   }
00341 
00342   fModified = true;
00343   return isOpen;
00344 
00345 }

void IoDDSStreamItr::SetTimeOut ( unsigned int  seconds  )  [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 385 of file IoDDSStreamItr.cxx.

References fTimeOut.

Referenced by IoInputModule::UpdateDDSConfig().

00385 { fTimeOut = timeout; }

void IoDDSStreamItr::ShutdownDDSClient (  )  [private]

Definition at line 220 of file IoDDSStreamItr.cxx.

References fDDSClient.

Referenced by InitDDSClient(), Subscribe(), and ~IoDDSStreamItr().

00221 {
00222   if (fDDSClient) { delete fDDSClient; fDDSClient = 0; }
00223 }

int IoDDSStreamItr::Streams ( const char *  streamlist  )  [virtual]

Reimplemented from IoDataStreamItr.

Definition at line 399 of file IoDDSStreamItr.cxx.

References fDDSClient, fModified, and IsValid().

00400 { 
00401   // Subscription is modified with the new streamList but is not sent
00402   // until the IoDDSStreamItr::Subscribe method is invoked.
00403   Int_t numStream = 0;
00404   if ( this->IsValid() ) {
00405     numStream = fDDSClient -> GetSubscription() -> SetStreams(streamList);
00406   }
00407 
00408   fModified = true;
00409   return numStream;
00410 
00411 }

int IoDDSStreamItr::Subscribe (  ) 

Definition at line 192 of file IoDDSStreamItr.cxx.

References bfld::AsString(), fDDSClient, fModified, fPort, IoDataStreamItr::GetSourceName(), IsValid(), Msg::kError, DDS::kOk, MSG, ShutdownDDSClient(), and DDSClient::Subscribe().

Referenced by GoToFile(), Increment(), LoadRecords(), and NextFile().

00193 {
00194 //======================================================================
00195 // Submit subscription to server.  Returns 1 if successful, else 0.
00196 //======================================================================
00197   int isOk = 0;
00198   if ( !IsValid() ) return isOk;
00199 
00200   DDS::EMessageType msgrc = fDDSClient->Subscribe();
00201   if (msgrc != DDS::kOk) {
00202     MSG("Io",Msg::kError) << 
00203       "An error message " << DDS::AsString(msgrc) << 
00204       " was received from DDS::Subscribe." <<
00205       "( host " << this->GetSourceName() << 
00206       ", port " << fPort << ")." << endl;
00207     this->ShutdownDDSClient();
00208   }
00209   else {
00210     isOk = 1;
00211     fModified = false;
00212   }
00213 
00214   return isOk;
00215 
00216 }


Member Data Documentation

string IoDDSStreamItr::fClientName [private]

Definition at line 81 of file IoDDSStreamItr.h.

Referenced by GetClientName(), InitDDSClient(), and SetClientName().

Definition at line 80 of file IoDDSStreamItr.h.

Referenced by GetClientType(), InitDDSClient(), and SetClientType().

unsigned int IoDDSStreamItr::fMaxRetry [private]

Definition at line 76 of file IoDDSStreamItr.h.

Referenced by InitDDSClient().

bool IoDDSStreamItr::fModified [private]
unsigned int IoDDSStreamItr::fPort [private]

Definition at line 74 of file IoDDSStreamItr.h.

Referenced by GetPort(), InitDDSClient(), and Subscribe().

unsigned int IoDDSStreamItr::fRetryDelay [private]

Definition at line 77 of file IoDDSStreamItr.h.

Referenced by InitDDSClient().

unsigned int IoDDSStreamItr::fTimeOut [private]

Definition at line 75 of file IoDDSStreamItr.h.

Referenced by Increment(), LoadRecords(), and SetTimeOut().


The documentation for this class was generated from the following files:

Generated on 22 Nov 2017 for loon by  doxygen 1.6.1