DDSChildServer Class Reference

#include <DDSChildServer.h>

List of all members.

Public Member Functions

 DDSChildServer (Int_t sockfd, Int_t maxinactive)
virtual ~DDSChildServer ()
bool IsValid () const
std::ostream & Print (std::ostream &ms) const
Int_t Run ()

Private Member Functions

void Get ()
void GoToFile ()
bool IsClientConnected () const
void Next ()
void Shutdown ()
void Subscribe ()

Private Attributes

TSocket * fTSocket
DDSSubscriptionfSubscription
DDSFileHandlerfFileHandler
bool fShutdown
int fPid
MomNavigatorfMom
TMessage * fMessageIn
TMessage fMessageOut
PerInputStreamManager fInputStreamManager
TInetAddress fServerAddress
TInetAddress fClientAddress
Int_t fMaxInactive


Detailed Description

Definition at line 32 of file DDSChildServer.h.


Constructor & Destructor Documentation

DDSChildServer::DDSChildServer ( Int_t  sockfd,
Int_t  maxinactive 
)

Definition at line 52 of file DDSChildServer.cxx.

References gSystem(), Msg::kWarning, and MSG.

00052                                                               : fTSocket(0), 
00053 fSubscription(0), fFileHandler(0), fShutdown(false), fPid(0), fMom(0), 
00054 fMessageIn(0), fMaxInactive(maxinactive) {
00055   // Purpose: Normal constructor for DDSChildServer object.  This constructor 
00056   //          creates a ROOT TSocket attached to the connected socket 
00057   //          descriptor sockfd (previously established and authorized
00058   //          by the parent server).
00059   //  
00060   // Argument: sockfd   connected socket descriptor.
00061   //
00062   // Return: n/a.
00063   //
00064   // Contact: S. Kasahara
00065   //
00066   // Notes: Use IsValid() to check if the DDSChildServer was created 
00067   //        successfully.
00068   //  
00069   
00070   // Get process id of child server for identification purposes
00071   fPid = gSystem -> GetPid();  
00072 
00073   // Create ROOT TSocket from connected socket descriptor
00074   fTSocket = new TSocket(sockfd);   
00075 
00076   if (!fTSocket || !fTSocket -> IsValid()) {
00077     // An error occured during the creation of the TSocket
00078     MSG("DDS",Msg::kWarning) << "CS_" << fPid  
00079       << ": Unable to create child server socket with socket descriptor "
00080       << sockfd << "." << endl;
00081     if (fTSocket) delete fTSocket; fTSocket = 0;
00082     fShutdown = true;
00083   }
00084   else {
00085     fServerAddress = fTSocket -> GetLocalInetAddress();
00086     fClientAddress = fTSocket -> GetInetAddress();
00087   }
00088 
00089   fInputStreamManager.SetUpdateMode(true);
00090   fInputStreamManager.SetPrintOpt("brief");
00091 
00092   // Remove default plugin (TXNetFile) for "root:" prefaced file so that
00093   // TNetFile will be used to connect to rootd server used with dispatcher
00094   // This was done beginning with root v5.10/00, but apparently should
00095   // no longer be done beginning with root v5.16/00.
00096   if ( gROOT -> GetVersionInt() < 51600 )   
00097     gROOT -> GetPluginManager() -> RemoveHandler("TFile","^root:");
00098   // Default subscription
00099   fSubscription = new DDSSubscription();
00100   fFileHandler = new DDSFileHandler(fSubscription->GetDataSource(),
00101                                     fSubscription->IsOffLine());  
00102 
00103 }

DDSChildServer::~DDSChildServer (  )  [virtual]

Definition at line 105 of file DDSChildServer.cxx.

References PerInputStreamManager::CloseStream(), fFileHandler, fInputStreamManager, fMessageIn, fMom, fSubscription, fTSocket, and IsValid().

00105                                 {
00106   // Purpose: Destructor.
00107   //
00108   // Argument: n/a.
00109   //
00110   // Return: n/a.
00111   //
00112   // Contact: S. Kasahara
00113   //
00114 
00115   fInputStreamManager.CloseStream();
00116   if ( IsValid() ) delete fTSocket;  fTSocket = 0;
00117   if ( fSubscription ) delete fSubscription;  fSubscription = 0;
00118   if ( fFileHandler ) delete fFileHandler; fFileHandler = 0;
00119   if ( fMom ) delete fMom; fMom = 0;
00120   if ( fMessageIn ) delete fMessageIn; fMessageIn = 0;
00121 
00122 }


Member Function Documentation

void DDSChildServer::Get (  )  [private]

Definition at line 124 of file DDSChildServer.cxx.

References MomNavigator::AdoptFragment(), fInputStreamManager, fMessageOut, fMom, MomNavigator::FragmentIter(), PerInputStreamManager::Get(), RecMinos::GetTempTags(), DDS::kOk, and Registry::Registry().

Referenced by Next().

00124                          {
00125   // Purpose: For now a private method to respond to a request to load
00126   //          a the current record set from the stream manager.
00127   //
00128   // Argument: none.
00129   //
00130   // Return: none.
00131   //
00132   // Contact: S. Kasahara
00133   //
00134 
00135   // If very first call to Next(), need to create Mom container
00136   if (!fMom) fMom = new MomNavigator();
00137 
00138   // Load current record set in Mom
00139   fInputStreamManager.Get(fMom);
00140   // Because fTempTags stored in records won't survive the i/o trip
00141   // need to unpack them here and repack them on the other side
00142   TIter riter = fMom->FragmentIter();
00143   RecMinos* record = 0;
00144   while ( (record = dynamic_cast<RecMinos*>(riter.Next())) ) {
00145     Registry* temptags = new Registry(record->GetTempTags()); // get copy
00146     fMom->AdoptFragment(temptags); // adopted by mom
00147   }
00148   // Load retrieved mom object into fMessageOut for shipping
00149   fMessageOut.Reset(DDS::kOk);
00150   fMessageOut.WriteObject(fMom);
00151   return;
00152 
00153 }

void DDSChildServer::GoToFile (  )  [private]

Definition at line 155 of file DDSChildServer.cxx.

References PerInputStreamManager::CloseFile(), fFileHandler, fInputStreamManager, fMessageOut, fPid, DDSFileHandler::GoToFile(), DDSFileHandler::GoToNextFile(), DDSFileHandler::GoToSymLinkFile(), IsValid(), Msg::kDebug, DDS::kFileError, DDS::kOk, Per::kRead, Msg::kVerbose, Msg::kWarning, and MSG.

Referenced by Run().

00155                               {
00156   // Purpose: This Service responds to a request from the client to
00157   //          advance to a user-specified filename.
00158   //
00159   // Argument: none.
00160   //
00161   // Return: none.
00162   //
00163   // Contact: S. Kasahara
00164   //
00165 
00166   if ( !fFileHandler || !fFileHandler -> IsValid()) {
00167     fMessageOut.Reset(DDS::kFileError);
00168     MSG("DDS",Msg::kWarning) << "CS_" << fPid 
00169          << " GoToFile called but DDSFileHandler is InValid." << endl; 
00170     return;
00171   }
00172 
00173   // Retrieve filename from fMessageIn 
00174   char filename[512];
00175   (*fMessageIn) >> filename;
00176   std::string newfilename;
00177   if ( !strncmp(filename,"next",4) ) {
00178     MSG("DDS",Msg::kDebug) << "CS_" << fPid << " GoToNextFile requested." 
00179                            << endl;
00180     newfilename = fFileHandler->GoToNextFile();
00181   }
00182   else if ( !strncmp(filename,"symlink",7) ) {
00183     MSG("DDS",Msg::kDebug) << "CS_" << fPid << " GoToSymLinkFile requested." 
00184                            << endl;
00185     newfilename = fFileHandler->GoToSymLinkFile();
00186   }
00187   else {
00188     MSG("DDS",Msg::kDebug) << "CS_" << fPid << " GoToFile " << filename 
00189                            << " requested." << endl;
00190     newfilename = fFileHandler->GoToFile(filename);
00191   }
00192 
00193   if ( !newfilename.empty() ) {
00194     MSG("DDS",Msg::kVerbose) << "CS_" << fPid 
00195                << " PerInputStreamManager status before closing old file:\n" 
00196                << fInputStreamManager;
00197     fInputStreamManager.CloseFile();
00198     MSG("DDS",Msg::kDebug) << "CS_" << fPid << " Advancing to " << newfilename 
00199                            << "." << endl;
00200     if(!fInputStreamManager.SetFile("*",newfilename,Per::kRead)) {
00201       fMessageOut.Reset(DDS::kFileError);
00202     }
00203     else {
00204       fMessageOut.Reset(DDS::kOk);
00205       fMessageOut << newfilename.c_str();
00206       MSG("DDS",Msg::kVerbose) << "CS_" << fPid 
00207           << " PerInputStreamManager status after opening new file:\n" 
00208                                << fInputStreamManager;
00209     }
00210   }
00211   else {
00212     MSG("DDS",Msg::kDebug) << "CS_" << fPid 
00213            << " Failed to retrieve new filename." << endl;     
00214     fMessageOut.Reset(DDS::kFileError);
00215   }
00216 
00217   return;
00218 
00219 }

bool DDSChildServer::IsClientConnected (  )  const [private]

Definition at line 221 of file DDSChildServer.cxx.

References fTSocket, gSystem(), and len.

Referenced by Next(), and Run().

00221                                              {
00222   // Purpose: Check client/server socket connection to make sure that client 
00223   //          has not departed abruptly.
00224   //
00225   // Argument: none.
00226   //
00227   // Return: true/false.
00228   //
00229   // Contact: S. Kasahara
00230   //
00231 
00232   TSystem::ResetErrno();
00233   if (!fTSocket || !fTSocket->IsValid()) return false;
00234 
00235   Int_t n;
00236   UInt_t len;
00237   Int_t savevalue;
00238   fTSocket->GetOption(kNoBlock,savevalue);
00239   fTSocket->SetOption(kNoBlock,1); // set no block
00240   Int_t socketdescriptor = fTSocket->GetDescriptor();
00241   n = gSystem->RecvRaw(socketdescriptor, &len, sizeof(UInt_t),kPeek);
00242   fTSocket->SetOption(kNoBlock,savevalue); // reset noblock option
00243 
00244   if ( n <= 0 && n != -4) return false; // socket error
00245   return true; // connection is valid
00246 
00247 }

bool DDSChildServer::IsValid (  )  const [inline]

Definition at line 42 of file DDSChildServer.h.

References fTSocket.

Referenced by GoToFile(), main(), Next(), Print(), and ~DDSChildServer().

00042 { return (fTSocket != (TSocket*)0) ? true : false; }

void DDSChildServer::Next (  )  [private]

Definition at line 249 of file DDSChildServer.cxx.

References DDS::AsString(), MomNavigator::Clear(), PerInputStreamManager::CloseFile(), fFileHandler, fInputStreamManager, fMessageOut, fMom, fPid, fSubscription, Get(), PerInputStreamManager::GetCurrentVld(), PerStream::GetFullFilePathName(), PerInputStreamManager::GetLastEntryVld(), PerFileManager::GetOpenedFile(), VldTimeStamp::GetSec(), PerStreamManager::GetStreamMap(), DDSFileHandler::GetSymLinkTargetName(), VldContext::GetTimeStamp(), DDSFileHandler::GoToNextFile(), DDSFileHandler::GoToSymLinkFile(), gSystem(), PerFileManager::Instance(), IsClientConnected(), PerInputStreamManager::IsFileEnd(), IsValid(), PerInputStreamManager::IsValidSelectionString(), DDS::kAll, Msg::kDebug, DDS::kError, DDS::kFileError, DDS::kFileKeepUp, DDS::kInvalidSelection, Per::kRead, DDS::kRecordKeepUp, DDS::kSocketError, DDS::kTimeoutNewFile, DDS::kTimeoutNewRecord, Msg::kVerbose, Msg::kWarning, MSG, DDSFileHandler::NewFileAvailable(), PerInputStreamManager::Next(), PerInputStreamManager::Previous(), PerInputStreamManager::SetFileEnd(), and timer().

Referenced by Run().

00249                           {
00250   // Purpose: This Service responds to a request from the client to
00251   //          send the next available entry satisfying the client's
00252   //          subscription.  This method is activated when the DDS::kNext
00253   //          message is received in DDSChildServer::Run().
00254   //
00255   // Argument: none.
00256   //
00257   // Return: none.
00258   //
00259   // Contact: S. Kasahara
00260   //
00261 
00262   // Client must have submitted subscription first
00263   if ( !fSubscription ) {
00264     fMessageOut.Reset(DDS::kError);
00265     MSG("DDS",Msg::kWarning) << "CS_" << fPid 
00266          << " Next called but no subscription has been submitted." << endl; 
00267     return;
00268   }
00269 
00270   if ( !fFileHandler || !fFileHandler -> IsValid()) {
00271     fMessageOut.Reset(DDS::kFileError);
00272     MSG("DDS",Msg::kWarning) << "CS_" << fPid 
00273          << " Next called but DDSFileHandler is InValid." << endl; 
00274     return;
00275   }
00276 
00277   // If very first call to Next(), need to create Mom container
00278   if (!fMom) fMom = new MomNavigator();
00279 
00280   // Retrieve waittime (secs) and advanceby from fMessageIn and start timer
00281   UInt_t waittime;
00282   (*fMessageIn) >> waittime;
00283   UInt_t advanceby;
00284   (*fMessageIn) >> advanceby;
00285   TStopwatch timer; timer.Start();
00286   bool isNewSet = false; // at least one new record set available
00287 
00288   Int_t keepUpWindow = fSubscription -> GetKeepUpWindow();
00289   while ( timer.RealTime() < waittime ) {
00290     timer.Continue();
00291     fMom->Clear();  // clear mom object
00292     
00293     DDS::EKeepUpMode keepupmode = fSubscription -> GetKeepUpMode();
00294     if ( (keepupmode == DDS::kFileKeepUp || keepupmode == DDS::kRecordKeepUp)
00295          && fFileHandler->NewFileAvailable() ) {
00296       // Set new file for managed streams
00297       std::string newfilename = fFileHandler->GoToSymLinkFile();
00298       MSG("DDS",Msg::kDebug) << "CS_" << fPid
00299         << " Next detected NewFileAvailable and keepupmode is " 
00300         << DDS::AsString(keepupmode) << ", called GoToSymLinkFile." << endl;
00301       if ( !newfilename.empty() ) {
00302         MSG("DDS",Msg::kVerbose) << "CS_" << fPid 
00303               << " PerInputStreamManager status before closing old file:\n" 
00304                                  << fInputStreamManager;
00305         fInputStreamManager.CloseFile();
00306         MSG("DDS",Msg::kDebug) << "CS_" << fPid << " Advancing to " 
00307                               << newfilename << "." << endl;
00308         if(!fInputStreamManager.SetFile("*",newfilename,Per::kRead)) {
00309           fMessageOut.Reset(DDS::kFileError);
00310           return;
00311         }
00312         else {
00313           MSG("DDS",Msg::kVerbose) << "CS_" << fPid 
00314               << " PerInputStreamManager status after opening new file:\n" 
00315                                    << fInputStreamManager;
00316         }
00317       }
00318       else {
00319         MSG("DDS",Msg::kDebug) << "CS_" << fPid 
00320                         <<  " Failed to retrieve new filename." << endl;     
00321       }
00322     }
00323     if ( !fInputStreamManager.IsValidSelectionString() ) {
00324       MSG("DDS",Msg::kDebug) << "CS_" << fPid
00325                      << " Has invalid subscription selection string." << endl;
00326       fMessageOut.Reset(DDS::kInvalidSelection);
00327       return;
00328     }
00329     
00330     if ( fInputStreamManager.Next(0,advanceby) > 0 ) {
00331       isNewSet = true;
00332       if ( keepupmode != DDS::kRecordKeepUp ) {
00333         // Load current record set in Mom
00334         this -> Get();
00335         return;
00336       }
00337       else {
00338         Int_t lastTime = (Int_t)
00339           (fInputStreamManager.GetLastEntryVld().GetTimeStamp().GetSec());
00340         Int_t currentTime = (Int_t)
00341           (fInputStreamManager.GetCurrentVld().GetTimeStamp().GetSec());
00342         if ( lastTime - currentTime <= keepUpWindow ) {
00343           this -> Get();
00344           return;
00345         }
00346       }
00347     }
00348     else {
00349       // Determine if file has reached end or has been aborted
00350       if ( fFileHandler->NewFileAvailable() && 
00351           !fInputStreamManager.IsFileEnd() ) {
00352         std::string currentfilename = "";
00353         const PerStreamManager::StreamMap& sm_mgr
00354                                       = fInputStreamManager.GetStreamMap();
00355         PerStreamManager::StreamMapConstItr itr_mgr = sm_mgr.begin();
00356         const PerInputStream* instream = 0;
00357         if ( itr_mgr != sm_mgr.end() ) instream 
00358                         = dynamic_cast<PerInputStream*>(itr_mgr->second);
00359         if (instream) currentfilename = instream->GetFullFilePathName();
00360         PerFileManager& filemgr = PerFileManager::Instance();
00361         if ( !((filemgr.GetOpenedFile(currentfilename))->HasFileEndKey()) ) {
00362           std::string symlinktargetname = fFileHandler->GetSymLinkTargetName();
00363           if ( symlinktargetname != currentfilename ) {
00364             MSG("DDS",Msg::kDebug) << "CS_" << fPid << " Current file " 
00365             << currentfilename << "\n not closed but symlink currentfile " 
00366             << symlinktargetname << "\n has moved on." 
00367             << " Assume abort, finish current file, and move to next file." 
00368             << endl;
00369             fInputStreamManager.SetFileEnd();
00370           }
00371         }
00372       }
00373       else if ( fInputStreamManager.IsFileEnd() ){  
00374         // All streams have reached end and writer has closed file
00375         // Attempt to update file for all managed streams
00376         std::string fullfilepathname="";
00377         if ( keepupmode == DDS::kAll) {
00378           fullfilepathname = fFileHandler->GoToNextFile();
00379           if ( !fullfilepathname.empty() ) {
00380             MSG("DDS",Msg::kDebug) << "CS_" << fPid
00381             << " Next call to GoToNextFile retrieved file\n\t" 
00382             << fullfilepathname <<  "." << endl;
00383           }
00384         }
00385         else if ( fFileHandler->NewFileAvailable() ) {
00386           fullfilepathname = fFileHandler->GoToSymLinkFile();
00387           MSG("DDS",Msg::kDebug) << "CS_" << fPid
00388           << " Next call to GoToSymLinkFile retrieved filename\n\t" 
00389           << fullfilepathname << "." << endl;
00390         }
00391         if (!fullfilepathname.empty()) {
00392           MSG("DDS",Msg::kVerbose) << "CS_" << fPid 
00393            << " PerInputStreamManager status before closing old file:\n" 
00394            << fInputStreamManager;
00395           fInputStreamManager.CloseFile();
00396           if(!fInputStreamManager.SetFile("*",fullfilepathname,Per::kRead)) {
00397             fMessageOut.Reset(DDS::kFileError);
00398             return;
00399           }
00400           MSG("DDS",Msg::kVerbose) << "CS_" << fPid 
00401                                    << " PerInputStreamManager status after opening new file:\n" 
00402                                    << fInputStreamManager;
00403         }
00404         else {
00405           if ( keepupmode == DDS::kRecordKeepUp && isNewSet ) {
00406             // File end was reached. Must back up a set and then Get
00407             fInputStreamManager.Previous(0,advanceby);
00408             this -> Get();
00409             return;
00410           }
00411           // Sleep for 1 sec while we wait for new file
00412           gSystem->Sleep(1000);
00413           // After waking up, test socket connection to make sure client is
00414           // still there.
00415           if ( !IsClientConnected() ) {
00416             fMessageOut.Reset(DDS::kSocketError);
00417             return;
00418           } 
00419         }
00420       }
00421       else {
00422         if ( keepupmode == DDS::kRecordKeepUp && isNewSet ) {
00423           // don't wait, retrieve most recent new set available
00424           this -> Get();
00425           return;
00426         }
00427         // Sleep for 1 sec while we wait for updated tree
00428         gSystem->Sleep(1000);
00429         // After waking up, test socket connection to make sure client is
00430         // still there.
00431         if ( !IsClientConnected() ) {
00432           fMessageOut.Reset(DDS::kSocketError);
00433           return;
00434         } 
00435       }
00436     }
00437   }
00438   timer.Stop();
00439   // Next reached "TimeOut", find out if its because it was waiting for
00440   // a new record from the current file or because it was waiting for a new
00441   // file.
00442   if ( !fInputStreamManager.IsFileEnd() ) {
00443     fMessageOut.Reset(DDS::kTimeoutNewRecord);
00444   }
00445   else {
00446     fMessageOut.Reset(DDS::kTimeoutNewFile);
00447   }
00448   
00449   return;
00450 
00451 }

std::ostream & DDSChildServer::Print ( std::ostream &  ms  )  const

Definition at line 453 of file DDSChildServer.cxx.

References fClientAddress, fPid, and IsValid().

Referenced by operator<<().

00453                                                       {
00454   // Purpose: Print DDSChildServer status on std::ostream.
00455   //
00456   // Argument: ms  std::ostream to print on.
00457   //
00458   // Return: std::ostream reference.
00459   //
00460   // Contact: S. Kasahara
00461   //
00462 
00463   if ( IsValid() ) {
00464     ms << "CS_" << fPid << ": connected to client at " 
00465     // Print the client's host name, address, and port number
00466        << fClientAddress.GetHostName() <<"/"<< fClientAddress.GetHostAddress()
00467        << "(port " << fClientAddress.GetPort() << ")." << endl;
00468   }
00469   else {
00470     ms << "CS_" << fPid << ": childserver socket is not connected." << endl;
00471   }
00472 
00473   return ms;
00474 
00475 }

Int_t DDSChildServer::Run (  ) 

Definition at line 477 of file DDSChildServer.cxx.

References fMaxInactive, fMessageIn, fMessageOut, fPid, fShutdown, fTSocket, GoToFile(), gSystem(), IsClientConnected(), DDS::kGoToFile, DDS::kInactive, DDS::kMessageUnknown, DDS::kNext, DDS::kShutdown, DDS::kSubscribe, Msg::kWarning, MSG, Next(), Shutdown(), Subscribe(), and timer().

00477                           {
00478   // Purpose: This is the main method of the DDSChildServer.  It listens
00479   //          for and responds to client service requests.  Each request
00480   //          received (of type DDS::EMessageType) is responded to in
00481   //          return with a single message of type DDS::EMessageType sent
00482   //          to the client upon completion of servicing the request.
00483   //          In this way the client can remain in sync with the child
00484   //          server's processing of its requests.
00485   //
00486   //          The services which the client can currently request from the
00487   //          child server are:
00488   //          DDS::kGoToFile == Request to advance to new file.
00489   //          DDS::kNext == Send next entry satisfying subscription.
00490   //          DDS::kShutdown == Request to shutdown child server.
00491   //          DDS::kSubscribe == Request to submit new subscription.
00492   //          
00493   //          The return status with which the child server can respond to
00494   //          the client are those returned by the service methods: 
00495   //          GoToFile(),Next(),Shutdown(), and Subscribe() as well as:
00496   //          DDS::kMessageUnknown == Unrecognized message received.
00497   //          DDS::kSocketError == An error return was received on the
00498   //                               socket receipt of the client's message.   
00499   //
00500   // Argument: none.
00501   //
00502   // Return: Return code = 0 => normal finish
00503   //                     = 1 => socket error caused premature end
00504   //
00505   // Contact: S. Kasahara
00506   //
00507   // Notes: The DDSChildServer will remain in the Run method until it receives
00508   //        the DDS::kShutdown message from its client.  
00509   // 
00510 
00511   Int_t rc = 0;
00512 
00513   TStopwatch timer;
00514   timer.Start();
00515 
00516   while ( !fShutdown ) {
00517     // Wait for new message from client
00518     fTSocket -> SetOption(kNoBlock,1);
00519     Int_t rettype = fTSocket->Recv(fMessageIn);
00520     fTSocket -> SetOption(kNoBlock,0);
00521     if ( rettype != -4 ) {
00522       timer.Reset();
00523       if ( rettype > 0 ) {
00524         switch ( fMessageIn -> What() ) {
00525 
00526         case DDS::kGoToFile:
00527           // Request to send next entry satisfying subscription
00528           GoToFile();
00529           break;
00530 
00531         case DDS::kNext:
00532            // Request to send next entry satisfying subscription
00533           Next();
00534           break;
00535 
00536         case DDS::kShutdown:
00537           // Request to shutdown child server
00538           Shutdown();
00539           break;
00540 
00541         case DDS::kSubscribe:
00542           // Request to submit new subscription
00543           Subscribe();
00544           break;
00545 
00546         default:
00547           MSG("DDS",Msg::kWarning)<< "CS_"<< fPid << ": Unknown client message: "
00548                                 << fMessageIn -> What() << " received.\n"
00549                                 << "Unable to process client request." << endl;
00550           fMessageOut.Reset(DDS::kMessageUnknown);
00551           break;
00552         } // end of switch block
00553 
00554         if ( fMessageIn ) delete fMessageIn; fMessageIn = 0; 
00555 
00556       } // end of socket received message block
00557       else {
00558         // Error return on TSocket::Recv call
00559         fShutdown = true;
00560         delete fTSocket; fTSocket = 0;
00561         MSG("DDS",Msg::kWarning) << "CS_" << fPid
00562               << ": Socket error detected on TSocket::Recv." <<
00563               "\n ChildServer will be shutdown." << endl;
00564         rc = 1;
00565       }
00566     
00567       if ( fTSocket && IsClientConnected() ) {
00568         // send return status & objects to client
00569         if ( fTSocket -> Send(fMessageOut) < 0 ) {
00570           // An error occurred while sending message
00571           if (errno == EPIPE) {
00572             // Broken pipe between child server and client is fatal
00573             fShutdown = true;
00574             delete fTSocket; fTSocket = 0;
00575             MSG("DDS",Msg::kWarning) << "CS_" << fPid
00576                << ": Socket error detected on TSocket::Send." 
00577                << "\n ChildServer will be shutdown." << endl;
00578             rc = 1;
00579           }
00580         }
00581       }
00582       else if (!fShutdown) {
00583         // Error detected in socket connection
00584         fShutdown = true;
00585         delete fTSocket; fTSocket = 0;
00586         MSG("DDS",Msg::kWarning) << "CS_" << fPid
00587         << ": Socket error detected indicating client has disconnected.\n"
00588                                << " ChildServer will be shutdown."<< endl;
00589         rc = 1;
00590       }
00591     }
00592     else {
00593       if ( timer.RealTime() > fMaxInactive ) {
00594         MSG("DDS",Msg::kWarning) << "CS_" << fPid
00595                                  << ": Inactivity time limit of "
00596                                  << fMaxInactive << "(sec) reached.\n"
00597                                  << "ChildServer will be shutdown." << endl;
00598         Shutdown();
00599         fMessageOut.Reset(DDS::kInactive);
00600         rc = 2;
00601         if ( fTSocket && IsClientConnected() ) fTSocket->Send(fMessageOut);
00602       }
00603       else { 
00604         timer.Continue();
00605         gSystem->Sleep(10); // sleep 10 msec so as to avoid excessive cpu
00606       }
00607     }
00608   } // End of loop over client requests 
00609 
00610   return rc;
00611 
00612 }  

void DDSChildServer::Shutdown (  )  [private]

Definition at line 614 of file DDSChildServer.cxx.

References PerInputStreamManager::CloseStream(), fInputStreamManager, fMessageOut, fPid, fShutdown, DDS::kOk, Msg::kVerbose, and MSG.

Referenced by Run().

00614                               {
00615   // Purpose: This Service shuts down the child server.  This method is 
00616   //          activated when the DDS::kShutdown message is received in
00617   //          DDSChildServer::Run().
00618   //
00619   // Argument: none.
00620   //
00621   // Return: none.
00622   //
00623   // Contact: S. Kasahara
00624   //
00625 
00626   fShutdown = true;   // stops processing loop in Run
00627   MSG("DDS",Msg::kVerbose) << "CS_" << fPid 
00628      << " PerInputStreamManager status before shutdown:\n" 
00629      << fInputStreamManager; 
00630   fInputStreamManager.CloseStream();  // close all managed streams
00631   fMessageOut.Reset(DDS::kOk);
00632 
00633   return;
00634 
00635 }

void DDSChildServer::Subscribe (  )  [private]

Definition at line 637 of file DDSChildServer.cxx.

References PerInputStreamManager::CloseStream(), fFileHandler, fInputStreamManager, fMessageIn, fMessageOut, fPid, fSubscription, DDSFileHandler::GetCurrentFileName(), PerInputStreamManager::GetOpenedStream(), PerStreamManager::GetStreamMap(), Msg::kDebug, DDS::kError, DDS::kOk, Msg::kWarning, MSG, PerInputStreamManager::OpenStream(), and PerInputStreamManager::SetMaxSyncDelay().

Referenced by Run().

00637                                {
00638   // Purpose: This Service receives and processes a new subscription from
00639   //          the client.  This method is activated when the DDS::kSubscribe
00640   //          message is received in DDSChildServer::Run().
00641   //
00642   // Argument: none.
00643   //
00644   // Return: none.
00645   //
00646   // Contact: S. Kasahara
00647   //
00648 
00649   // If previous subscription exists, must delete it
00650   if ( fSubscription) delete fSubscription; fSubscription = 0;
00651 
00652   // Attempt to receive DDSSubscription object from fMessageIn
00653   fSubscription = dynamic_cast<DDSSubscription*>
00654                  (fMessageIn -> ReadObject(fMessageIn->GetClass()));
00655   if ( !fSubscription ) {
00656     MSG("DDS",Msg::kWarning) << "CS_" << fPid
00657     << "\nSubscribe failed to receive expected DDSSubscription from client." 
00658     << endl;
00659     fMessageOut.Reset(DDS::kError);
00660   }
00661   else {
00662     // Subscription received successfully
00663     MSG("DDS",Msg::kDebug) << "CS_" << fPid
00664                            << " Subscribe received subscription:\n"
00665                            << fSubscription;
00666 
00667     fMessageOut.Reset(DDS::kOk);
00668     if (fFileHandler && ( (fSubscription -> GetDataSource())  
00669                         !=(fFileHandler  -> GetDataSource())
00670                        || (fSubscription -> IsOffLine())
00671                         !=(fFileHandler -> IsOffLine()) ) ) {
00672       delete fFileHandler; fFileHandler=0;
00673     }
00674     if (!fFileHandler) 
00675       fFileHandler = new DDSFileHandler(fSubscription -> GetDataSource(),
00676                                         fSubscription -> IsOffLine());
00677     // Determine current file from filehandler
00678     std::string currentFileName = fFileHandler->GetCurrentFileName();
00679 
00680     // Open new streams in subscription list
00681     PerInputStream* instream;
00682     const DDSSubscription::StreamMap& sm_sub = fSubscription->GetStreamMap();
00683     for (DDSSubscription::StreamMapConstItr itr_sub = sm_sub.begin();
00684                                             itr_sub!= sm_sub.end();++itr_sub) {
00685       string streamName = (itr_sub->first).Data();
00686       if ( !(instream = dynamic_cast<PerInputStream*>
00687                         (fInputStreamManager.GetOpenedStream(streamName))) ) {
00688         // For DDS application, streamname & treename are the same;
00689         instream = fInputStreamManager.OpenStream(streamName,streamName);
00690         if (!currentFileName.empty())instream -> SetFile(currentFileName);
00691       }
00692       if ( instream ) instream -> SetSelection((itr_sub->second).Data());
00693     }
00694 
00695     fInputStreamManager.SetMaxSyncDelay(fSubscription -> GetMaxSyncDelay());
00696 
00697     // Finally,  close all streams not on subscription list
00698     const PerStreamManager::StreamMap& sm_mgr
00699                                       = fInputStreamManager.GetStreamMap();
00700     for ( PerStreamManager::StreamMapConstItr itr_mgr = sm_mgr.begin();
00701                                            itr_mgr!= sm_mgr.end(); ++itr_mgr) {
00702       std::string streamName = itr_mgr->first;
00703       DDSSubscription::StreamMapConstItr itr_sub 
00704                                         = sm_sub.find(streamName.c_str());
00705       if (itr_sub == sm_sub.end())fInputStreamManager.CloseStream(streamName);
00706     }
00707   }
00708   
00709   return;
00710 
00711 }


Member Data Documentation

TInetAddress DDSChildServer::fClientAddress [private]

Definition at line 72 of file DDSChildServer.h.

Referenced by Print().

DDSFileHandler* DDSChildServer::fFileHandler [private]

Definition at line 61 of file DDSChildServer.h.

Referenced by GoToFile(), Next(), Subscribe(), and ~DDSChildServer().

PerInputStreamManager DDSChildServer::fInputStreamManager [private]

Definition at line 70 of file DDSChildServer.h.

Referenced by Get(), GoToFile(), Next(), Shutdown(), Subscribe(), and ~DDSChildServer().

Int_t DDSChildServer::fMaxInactive [private]

Definition at line 73 of file DDSChildServer.h.

Referenced by Run().

TMessage* DDSChildServer::fMessageIn [private]

Definition at line 68 of file DDSChildServer.h.

Referenced by Run(), Subscribe(), and ~DDSChildServer().

TMessage DDSChildServer::fMessageOut [private]

Definition at line 69 of file DDSChildServer.h.

Referenced by Get(), GoToFile(), Next(), Run(), Shutdown(), and Subscribe().

MomNavigator* DDSChildServer::fMom [private]

Definition at line 67 of file DDSChildServer.h.

Referenced by Get(), Next(), and ~DDSChildServer().

int DDSChildServer::fPid [private]

Definition at line 63 of file DDSChildServer.h.

Referenced by GoToFile(), Next(), Print(), Run(), Shutdown(), and Subscribe().

TInetAddress DDSChildServer::fServerAddress [private]

Definition at line 71 of file DDSChildServer.h.

bool DDSChildServer::fShutdown [private]

Definition at line 62 of file DDSChildServer.h.

Referenced by Run(), and Shutdown().

DDSSubscription* DDSChildServer::fSubscription [private]

Definition at line 60 of file DDSChildServer.h.

Referenced by Next(), Subscribe(), and ~DDSChildServer().

TSocket* DDSChildServer::fTSocket [private]

Definition at line 59 of file DDSChildServer.h.

Referenced by IsClientConnected(), IsValid(), Run(), and ~DDSChildServer().


The documentation for this class was generated from the following files:
Generated on Wed Dec 10 22:49:28 2014 for loon by  doxygen 1.4.7