#include <DDSChildServer.h>
Public Member Functions | |
| DDSChildServer (Int_t sockfd, Int_t maxinactive) | |
| virtual | ~DDSChildServer () |
| bool | IsValid () const |
| std::ostream & | Print (std::ostream &ms) const |
| Int_t | Run () |
Private Member Functions | |
| void | Get () |
| void | GoToFile () |
| bool | IsClientConnected () const |
| void | Next () |
| void | Shutdown () |
| void | Subscribe () |
Private Attributes | |
| TSocket * | fTSocket |
| DDSSubscription * | fSubscription |
| DDSFileHandler * | fFileHandler |
| bool | fShutdown |
| int | fPid |
| MomNavigator * | fMom |
| TMessage * | fMessageIn |
| TMessage | fMessageOut |
| PerInputStreamManager | fInputStreamManager |
| TInetAddress | fServerAddress |
| TInetAddress | fClientAddress |
| Int_t | fMaxInactive |
Definition at line 32 of file DDSChildServer.h.
| DDSChildServer::DDSChildServer | ( | Int_t | sockfd, | |
| Int_t | maxinactive | |||
| ) |
Definition at line 52 of file DDSChildServer.cxx.
References gSystem(), Msg::kWarning, and MSG.
00052 : fTSocket(0), 00053 fSubscription(0), fFileHandler(0), fShutdown(false), fPid(0), fMom(0), 00054 fMessageIn(0), fMaxInactive(maxinactive) { 00055 // Purpose: Normal constructor for DDSChildServer object. This constructor 00056 // creates a ROOT TSocket attached to the connected socket 00057 // descriptor sockfd (previously established and authorized 00058 // by the parent server). 00059 // 00060 // Argument: sockfd connected socket descriptor. 00061 // 00062 // Return: n/a. 00063 // 00064 // Contact: S. Kasahara 00065 // 00066 // Notes: Use IsValid() to check if the DDSChildServer was created 00067 // successfully. 00068 // 00069 00070 // Get process id of child server for identification purposes 00071 fPid = gSystem -> GetPid(); 00072 00073 // Create ROOT TSocket from connected socket descriptor 00074 fTSocket = new TSocket(sockfd); 00075 00076 if (!fTSocket || !fTSocket -> IsValid()) { 00077 // An error occured during the creation of the TSocket 00078 MSG("DDS",Msg::kWarning) << "CS_" << fPid 00079 << ": Unable to create child server socket with socket descriptor " 00080 << sockfd << "." << endl; 00081 if (fTSocket) delete fTSocket; fTSocket = 0; 00082 fShutdown = true; 00083 } 00084 else { 00085 fServerAddress = fTSocket -> GetLocalInetAddress(); 00086 fClientAddress = fTSocket -> GetInetAddress(); 00087 } 00088 00089 fInputStreamManager.SetUpdateMode(true); 00090 fInputStreamManager.SetPrintOpt("brief"); 00091 00092 // Remove default plugin (TXNetFile) for "root:" prefaced file so that 00093 // TNetFile will be used to connect to rootd server used with dispatcher 00094 // This was done beginning with root v5.10/00, but apparently should 00095 // no longer be done beginning with root v5.16/00. 00096 if ( gROOT -> GetVersionInt() < 51600 ) 00097 gROOT -> GetPluginManager() -> RemoveHandler("TFile","^root:"); 00098 // Default subscription 00099 fSubscription = new DDSSubscription(); 00100 fFileHandler = new DDSFileHandler(fSubscription->GetDataSource(), 00101 fSubscription->IsOffLine()); 00102 00103 }
| DDSChildServer::~DDSChildServer | ( | ) | [virtual] |
Definition at line 105 of file DDSChildServer.cxx.
References PerInputStreamManager::CloseStream(), fFileHandler, fInputStreamManager, fMessageIn, fMom, fSubscription, fTSocket, and IsValid().
00105 { 00106 // Purpose: Destructor. 00107 // 00108 // Argument: n/a. 00109 // 00110 // Return: n/a. 00111 // 00112 // Contact: S. Kasahara 00113 // 00114 00115 fInputStreamManager.CloseStream(); 00116 if ( IsValid() ) delete fTSocket; fTSocket = 0; 00117 if ( fSubscription ) delete fSubscription; fSubscription = 0; 00118 if ( fFileHandler ) delete fFileHandler; fFileHandler = 0; 00119 if ( fMom ) delete fMom; fMom = 0; 00120 if ( fMessageIn ) delete fMessageIn; fMessageIn = 0; 00121 00122 }
| void DDSChildServer::Get | ( | ) | [private] |
Definition at line 124 of file DDSChildServer.cxx.
References MomNavigator::AdoptFragment(), fInputStreamManager, fMessageOut, fMom, MomNavigator::FragmentIter(), PerInputStreamManager::Get(), RecMinos::GetTempTags(), DDS::kOk, and Registry::Registry().
Referenced by Next().
00124 { 00125 // Purpose: For now a private method to respond to a request to load 00126 // a the current record set from the stream manager. 00127 // 00128 // Argument: none. 00129 // 00130 // Return: none. 00131 // 00132 // Contact: S. Kasahara 00133 // 00134 00135 // If very first call to Next(), need to create Mom container 00136 if (!fMom) fMom = new MomNavigator(); 00137 00138 // Load current record set in Mom 00139 fInputStreamManager.Get(fMom); 00140 // Because fTempTags stored in records won't survive the i/o trip 00141 // need to unpack them here and repack them on the other side 00142 TIter riter = fMom->FragmentIter(); 00143 RecMinos* record = 0; 00144 while ( (record = dynamic_cast<RecMinos*>(riter.Next())) ) { 00145 Registry* temptags = new Registry(record->GetTempTags()); // get copy 00146 fMom->AdoptFragment(temptags); // adopted by mom 00147 } 00148 // Load retrieved mom object into fMessageOut for shipping 00149 fMessageOut.Reset(DDS::kOk); 00150 fMessageOut.WriteObject(fMom); 00151 return; 00152 00153 }
| void DDSChildServer::GoToFile | ( | ) | [private] |
Definition at line 155 of file DDSChildServer.cxx.
References PerInputStreamManager::CloseFile(), fFileHandler, fInputStreamManager, fMessageOut, fPid, DDSFileHandler::GoToFile(), DDSFileHandler::GoToNextFile(), DDSFileHandler::GoToSymLinkFile(), IsValid(), Msg::kDebug, DDS::kFileError, DDS::kOk, Per::kRead, Msg::kVerbose, Msg::kWarning, and MSG.
Referenced by Run().
00155 { 00156 // Purpose: This Service responds to a request from the client to 00157 // advance to a user-specified filename. 00158 // 00159 // Argument: none. 00160 // 00161 // Return: none. 00162 // 00163 // Contact: S. Kasahara 00164 // 00165 00166 if ( !fFileHandler || !fFileHandler -> IsValid()) { 00167 fMessageOut.Reset(DDS::kFileError); 00168 MSG("DDS",Msg::kWarning) << "CS_" << fPid 00169 << " GoToFile called but DDSFileHandler is InValid." << endl; 00170 return; 00171 } 00172 00173 // Retrieve filename from fMessageIn 00174 char filename[512]; 00175 (*fMessageIn) >> filename; 00176 std::string newfilename; 00177 if ( !strncmp(filename,"next",4) ) { 00178 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " GoToNextFile requested." 00179 << endl; 00180 newfilename = fFileHandler->GoToNextFile(); 00181 } 00182 else if ( !strncmp(filename,"symlink",7) ) { 00183 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " GoToSymLinkFile requested." 00184 << endl; 00185 newfilename = fFileHandler->GoToSymLinkFile(); 00186 } 00187 else { 00188 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " GoToFile " << filename 00189 << " requested." << endl; 00190 newfilename = fFileHandler->GoToFile(filename); 00191 } 00192 00193 if ( !newfilename.empty() ) { 00194 MSG("DDS",Msg::kVerbose) << "CS_" << fPid 00195 << " PerInputStreamManager status before closing old file:\n" 00196 << fInputStreamManager; 00197 fInputStreamManager.CloseFile(); 00198 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " Advancing to " << newfilename 00199 << "." << endl; 00200 if(!fInputStreamManager.SetFile("*",newfilename,Per::kRead)) { 00201 fMessageOut.Reset(DDS::kFileError); 00202 } 00203 else { 00204 fMessageOut.Reset(DDS::kOk); 00205 fMessageOut << newfilename.c_str(); 00206 MSG("DDS",Msg::kVerbose) << "CS_" << fPid 00207 << " PerInputStreamManager status after opening new file:\n" 00208 << fInputStreamManager; 00209 } 00210 } 00211 else { 00212 MSG("DDS",Msg::kDebug) << "CS_" << fPid 00213 << " Failed to retrieve new filename." << endl; 00214 fMessageOut.Reset(DDS::kFileError); 00215 } 00216 00217 return; 00218 00219 }
| bool DDSChildServer::IsClientConnected | ( | ) | const [private] |
Definition at line 221 of file DDSChildServer.cxx.
References fTSocket, gSystem(), and len.
Referenced by Next(), and Run().
00221 { 00222 // Purpose: Check client/server socket connection to make sure that client 00223 // has not departed abruptly. 00224 // 00225 // Argument: none. 00226 // 00227 // Return: true/false. 00228 // 00229 // Contact: S. Kasahara 00230 // 00231 00232 TSystem::ResetErrno(); 00233 if (!fTSocket || !fTSocket->IsValid()) return false; 00234 00235 Int_t n; 00236 UInt_t len; 00237 Int_t savevalue; 00238 fTSocket->GetOption(kNoBlock,savevalue); 00239 fTSocket->SetOption(kNoBlock,1); // set no block 00240 Int_t socketdescriptor = fTSocket->GetDescriptor(); 00241 n = gSystem->RecvRaw(socketdescriptor, &len, sizeof(UInt_t),kPeek); 00242 fTSocket->SetOption(kNoBlock,savevalue); // reset noblock option 00243 00244 if ( n <= 0 && n != -4) return false; // socket error 00245 return true; // connection is valid 00246 00247 }
| bool DDSChildServer::IsValid | ( | ) | const [inline] |
Definition at line 42 of file DDSChildServer.h.
References fTSocket.
Referenced by GoToFile(), main(), Next(), Print(), and ~DDSChildServer().
00042 { return (fTSocket != (TSocket*)0) ? true : false; }
| void DDSChildServer::Next | ( | ) | [private] |
Definition at line 249 of file DDSChildServer.cxx.
References DDS::AsString(), MomNavigator::Clear(), PerInputStreamManager::CloseFile(), fFileHandler, fInputStreamManager, fMessageOut, fMom, fPid, fSubscription, Get(), PerInputStreamManager::GetCurrentVld(), PerStream::GetFullFilePathName(), PerInputStreamManager::GetLastEntryVld(), PerFileManager::GetOpenedFile(), VldTimeStamp::GetSec(), PerStreamManager::GetStreamMap(), DDSFileHandler::GetSymLinkTargetName(), VldContext::GetTimeStamp(), DDSFileHandler::GoToNextFile(), DDSFileHandler::GoToSymLinkFile(), gSystem(), PerFileManager::Instance(), IsClientConnected(), PerInputStreamManager::IsFileEnd(), IsValid(), PerInputStreamManager::IsValidSelectionString(), DDS::kAll, Msg::kDebug, DDS::kError, DDS::kFileError, DDS::kFileKeepUp, DDS::kInvalidSelection, Per::kRead, DDS::kRecordKeepUp, DDS::kSocketError, DDS::kTimeoutNewFile, DDS::kTimeoutNewRecord, Msg::kVerbose, Msg::kWarning, MSG, DDSFileHandler::NewFileAvailable(), PerInputStreamManager::Next(), PerInputStreamManager::Previous(), PerInputStreamManager::SetFileEnd(), and timer().
Referenced by Run().
00249 { 00250 // Purpose: This Service responds to a request from the client to 00251 // send the next available entry satisfying the client's 00252 // subscription. This method is activated when the DDS::kNext 00253 // message is received in DDSChildServer::Run(). 00254 // 00255 // Argument: none. 00256 // 00257 // Return: none. 00258 // 00259 // Contact: S. Kasahara 00260 // 00261 00262 // Client must have submitted subscription first 00263 if ( !fSubscription ) { 00264 fMessageOut.Reset(DDS::kError); 00265 MSG("DDS",Msg::kWarning) << "CS_" << fPid 00266 << " Next called but no subscription has been submitted." << endl; 00267 return; 00268 } 00269 00270 if ( !fFileHandler || !fFileHandler -> IsValid()) { 00271 fMessageOut.Reset(DDS::kFileError); 00272 MSG("DDS",Msg::kWarning) << "CS_" << fPid 00273 << " Next called but DDSFileHandler is InValid." << endl; 00274 return; 00275 } 00276 00277 // If very first call to Next(), need to create Mom container 00278 if (!fMom) fMom = new MomNavigator(); 00279 00280 // Retrieve waittime (secs) and advanceby from fMessageIn and start timer 00281 UInt_t waittime; 00282 (*fMessageIn) >> waittime; 00283 UInt_t advanceby; 00284 (*fMessageIn) >> advanceby; 00285 TStopwatch timer; timer.Start(); 00286 bool isNewSet = false; // at least one new record set available 00287 00288 Int_t keepUpWindow = fSubscription -> GetKeepUpWindow(); 00289 while ( timer.RealTime() < waittime ) { 00290 timer.Continue(); 00291 fMom->Clear(); // clear mom object 00292 00293 DDS::EKeepUpMode keepupmode = fSubscription -> GetKeepUpMode(); 00294 if ( (keepupmode == DDS::kFileKeepUp || keepupmode == DDS::kRecordKeepUp) 00295 && fFileHandler->NewFileAvailable() ) { 00296 // Set new file for managed streams 00297 std::string newfilename = fFileHandler->GoToSymLinkFile(); 00298 MSG("DDS",Msg::kDebug) << "CS_" << fPid 00299 << " Next detected NewFileAvailable and keepupmode is " 00300 << DDS::AsString(keepupmode) << ", called GoToSymLinkFile." << endl; 00301 if ( !newfilename.empty() ) { 00302 MSG("DDS",Msg::kVerbose) << "CS_" << fPid 00303 << " PerInputStreamManager status before closing old file:\n" 00304 << fInputStreamManager; 00305 fInputStreamManager.CloseFile(); 00306 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " Advancing to " 00307 << newfilename << "." << endl; 00308 if(!fInputStreamManager.SetFile("*",newfilename,Per::kRead)) { 00309 fMessageOut.Reset(DDS::kFileError); 00310 return; 00311 } 00312 else { 00313 MSG("DDS",Msg::kVerbose) << "CS_" << fPid 00314 << " PerInputStreamManager status after opening new file:\n" 00315 << fInputStreamManager; 00316 } 00317 } 00318 else { 00319 MSG("DDS",Msg::kDebug) << "CS_" << fPid 00320 << " Failed to retrieve new filename." << endl; 00321 } 00322 } 00323 if ( !fInputStreamManager.IsValidSelectionString() ) { 00324 MSG("DDS",Msg::kDebug) << "CS_" << fPid 00325 << " Has invalid subscription selection string." << endl; 00326 fMessageOut.Reset(DDS::kInvalidSelection); 00327 return; 00328 } 00329 00330 if ( fInputStreamManager.Next(0,advanceby) > 0 ) { 00331 isNewSet = true; 00332 if ( keepupmode != DDS::kRecordKeepUp ) { 00333 // Load current record set in Mom 00334 this -> Get(); 00335 return; 00336 } 00337 else { 00338 Int_t lastTime = (Int_t) 00339 (fInputStreamManager.GetLastEntryVld().GetTimeStamp().GetSec()); 00340 Int_t currentTime = (Int_t) 00341 (fInputStreamManager.GetCurrentVld().GetTimeStamp().GetSec()); 00342 if ( lastTime - currentTime <= keepUpWindow ) { 00343 this -> Get(); 00344 return; 00345 } 00346 } 00347 } 00348 else { 00349 // Determine if file has reached end or has been aborted 00350 if ( fFileHandler->NewFileAvailable() && 00351 !fInputStreamManager.IsFileEnd() ) { 00352 std::string currentfilename = ""; 00353 const PerStreamManager::StreamMap& sm_mgr 00354 = fInputStreamManager.GetStreamMap(); 00355 PerStreamManager::StreamMapConstItr itr_mgr = sm_mgr.begin(); 00356 const PerInputStream* instream = 0; 00357 if ( itr_mgr != sm_mgr.end() ) instream 00358 = dynamic_cast<PerInputStream*>(itr_mgr->second); 00359 if (instream) currentfilename = instream->GetFullFilePathName(); 00360 PerFileManager& filemgr = PerFileManager::Instance(); 00361 if ( !((filemgr.GetOpenedFile(currentfilename))->HasFileEndKey()) ) { 00362 std::string symlinktargetname = fFileHandler->GetSymLinkTargetName(); 00363 if ( symlinktargetname != currentfilename ) { 00364 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " Current file " 00365 << currentfilename << "\n not closed but symlink currentfile " 00366 << symlinktargetname << "\n has moved on." 00367 << " Assume abort, finish current file, and move to next file." 00368 << endl; 00369 fInputStreamManager.SetFileEnd(); 00370 } 00371 } 00372 } 00373 else if ( fInputStreamManager.IsFileEnd() ){ 00374 // All streams have reached end and writer has closed file 00375 // Attempt to update file for all managed streams 00376 std::string fullfilepathname=""; 00377 if ( keepupmode == DDS::kAll) { 00378 fullfilepathname = fFileHandler->GoToNextFile(); 00379 if ( !fullfilepathname.empty() ) { 00380 MSG("DDS",Msg::kDebug) << "CS_" << fPid 00381 << " Next call to GoToNextFile retrieved file\n\t" 00382 << fullfilepathname << "." << endl; 00383 } 00384 } 00385 else if ( fFileHandler->NewFileAvailable() ) { 00386 fullfilepathname = fFileHandler->GoToSymLinkFile(); 00387 MSG("DDS",Msg::kDebug) << "CS_" << fPid 00388 << " Next call to GoToSymLinkFile retrieved filename\n\t" 00389 << fullfilepathname << "." << endl; 00390 } 00391 if (!fullfilepathname.empty()) { 00392 MSG("DDS",Msg::kVerbose) << "CS_" << fPid 00393 << " PerInputStreamManager status before closing old file:\n" 00394 << fInputStreamManager; 00395 fInputStreamManager.CloseFile(); 00396 if(!fInputStreamManager.SetFile("*",fullfilepathname,Per::kRead)) { 00397 fMessageOut.Reset(DDS::kFileError); 00398 return; 00399 } 00400 MSG("DDS",Msg::kVerbose) << "CS_" << fPid 00401 << " PerInputStreamManager status after opening new file:\n" 00402 << fInputStreamManager; 00403 } 00404 else { 00405 if ( keepupmode == DDS::kRecordKeepUp && isNewSet ) { 00406 // File end was reached. Must back up a set and then Get 00407 fInputStreamManager.Previous(0,advanceby); 00408 this -> Get(); 00409 return; 00410 } 00411 // Sleep for 1 sec while we wait for new file 00412 gSystem->Sleep(1000); 00413 // After waking up, test socket connection to make sure client is 00414 // still there. 00415 if ( !IsClientConnected() ) { 00416 fMessageOut.Reset(DDS::kSocketError); 00417 return; 00418 } 00419 } 00420 } 00421 else { 00422 if ( keepupmode == DDS::kRecordKeepUp && isNewSet ) { 00423 // don't wait, retrieve most recent new set available 00424 this -> Get(); 00425 return; 00426 } 00427 // Sleep for 1 sec while we wait for updated tree 00428 gSystem->Sleep(1000); 00429 // After waking up, test socket connection to make sure client is 00430 // still there. 00431 if ( !IsClientConnected() ) { 00432 fMessageOut.Reset(DDS::kSocketError); 00433 return; 00434 } 00435 } 00436 } 00437 } 00438 timer.Stop(); 00439 // Next reached "TimeOut", find out if its because it was waiting for 00440 // a new record from the current file or because it was waiting for a new 00441 // file. 00442 if ( !fInputStreamManager.IsFileEnd() ) { 00443 fMessageOut.Reset(DDS::kTimeoutNewRecord); 00444 } 00445 else { 00446 fMessageOut.Reset(DDS::kTimeoutNewFile); 00447 } 00448 00449 return; 00450 00451 }
| std::ostream & DDSChildServer::Print | ( | std::ostream & | ms | ) | const |
Definition at line 453 of file DDSChildServer.cxx.
References fClientAddress, fPid, and IsValid().
Referenced by operator<<().
00453 { 00454 // Purpose: Print DDSChildServer status on std::ostream. 00455 // 00456 // Argument: ms std::ostream to print on. 00457 // 00458 // Return: std::ostream reference. 00459 // 00460 // Contact: S. Kasahara 00461 // 00462 00463 if ( IsValid() ) { 00464 ms << "CS_" << fPid << ": connected to client at " 00465 // Print the client's host name, address, and port number 00466 << fClientAddress.GetHostName() <<"/"<< fClientAddress.GetHostAddress() 00467 << "(port " << fClientAddress.GetPort() << ")." << endl; 00468 } 00469 else { 00470 ms << "CS_" << fPid << ": childserver socket is not connected." << endl; 00471 } 00472 00473 return ms; 00474 00475 }
| Int_t DDSChildServer::Run | ( | ) |
Definition at line 477 of file DDSChildServer.cxx.
References fMaxInactive, fMessageIn, fMessageOut, fPid, fShutdown, fTSocket, GoToFile(), gSystem(), IsClientConnected(), DDS::kGoToFile, DDS::kInactive, DDS::kMessageUnknown, DDS::kNext, DDS::kShutdown, DDS::kSubscribe, Msg::kWarning, MSG, Next(), Shutdown(), Subscribe(), and timer().
00477 { 00478 // Purpose: This is the main method of the DDSChildServer. It listens 00479 // for and responds to client service requests. Each request 00480 // received (of type DDS::EMessageType) is responded to in 00481 // return with a single message of type DDS::EMessageType sent 00482 // to the client upon completion of servicing the request. 00483 // In this way the client can remain in sync with the child 00484 // server's processing of its requests. 00485 // 00486 // The services which the client can currently request from the 00487 // child server are: 00488 // DDS::kGoToFile == Request to advance to new file. 00489 // DDS::kNext == Send next entry satisfying subscription. 00490 // DDS::kShutdown == Request to shutdown child server. 00491 // DDS::kSubscribe == Request to submit new subscription. 00492 // 00493 // The return status with which the child server can respond to 00494 // the client are those returned by the service methods: 00495 // GoToFile(),Next(),Shutdown(), and Subscribe() as well as: 00496 // DDS::kMessageUnknown == Unrecognized message received. 00497 // DDS::kSocketError == An error return was received on the 00498 // socket receipt of the client's message. 00499 // 00500 // Argument: none. 00501 // 00502 // Return: Return code = 0 => normal finish 00503 // = 1 => socket error caused premature end 00504 // 00505 // Contact: S. Kasahara 00506 // 00507 // Notes: The DDSChildServer will remain in the Run method until it receives 00508 // the DDS::kShutdown message from its client. 00509 // 00510 00511 Int_t rc = 0; 00512 00513 TStopwatch timer; 00514 timer.Start(); 00515 00516 while ( !fShutdown ) { 00517 // Wait for new message from client 00518 fTSocket -> SetOption(kNoBlock,1); 00519 Int_t rettype = fTSocket->Recv(fMessageIn); 00520 fTSocket -> SetOption(kNoBlock,0); 00521 if ( rettype != -4 ) { 00522 timer.Reset(); 00523 if ( rettype > 0 ) { 00524 switch ( fMessageIn -> What() ) { 00525 00526 case DDS::kGoToFile: 00527 // Request to send next entry satisfying subscription 00528 GoToFile(); 00529 break; 00530 00531 case DDS::kNext: 00532 // Request to send next entry satisfying subscription 00533 Next(); 00534 break; 00535 00536 case DDS::kShutdown: 00537 // Request to shutdown child server 00538 Shutdown(); 00539 break; 00540 00541 case DDS::kSubscribe: 00542 // Request to submit new subscription 00543 Subscribe(); 00544 break; 00545 00546 default: 00547 MSG("DDS",Msg::kWarning)<< "CS_"<< fPid << ": Unknown client message: " 00548 << fMessageIn -> What() << " received.\n" 00549 << "Unable to process client request." << endl; 00550 fMessageOut.Reset(DDS::kMessageUnknown); 00551 break; 00552 } // end of switch block 00553 00554 if ( fMessageIn ) delete fMessageIn; fMessageIn = 0; 00555 00556 } // end of socket received message block 00557 else { 00558 // Error return on TSocket::Recv call 00559 fShutdown = true; 00560 delete fTSocket; fTSocket = 0; 00561 MSG("DDS",Msg::kWarning) << "CS_" << fPid 00562 << ": Socket error detected on TSocket::Recv." << 00563 "\n ChildServer will be shutdown." << endl; 00564 rc = 1; 00565 } 00566 00567 if ( fTSocket && IsClientConnected() ) { 00568 // send return status & objects to client 00569 if ( fTSocket -> Send(fMessageOut) < 0 ) { 00570 // An error occurred while sending message 00571 if (errno == EPIPE) { 00572 // Broken pipe between child server and client is fatal 00573 fShutdown = true; 00574 delete fTSocket; fTSocket = 0; 00575 MSG("DDS",Msg::kWarning) << "CS_" << fPid 00576 << ": Socket error detected on TSocket::Send." 00577 << "\n ChildServer will be shutdown." << endl; 00578 rc = 1; 00579 } 00580 } 00581 } 00582 else if (!fShutdown) { 00583 // Error detected in socket connection 00584 fShutdown = true; 00585 delete fTSocket; fTSocket = 0; 00586 MSG("DDS",Msg::kWarning) << "CS_" << fPid 00587 << ": Socket error detected indicating client has disconnected.\n" 00588 << " ChildServer will be shutdown."<< endl; 00589 rc = 1; 00590 } 00591 } 00592 else { 00593 if ( timer.RealTime() > fMaxInactive ) { 00594 MSG("DDS",Msg::kWarning) << "CS_" << fPid 00595 << ": Inactivity time limit of " 00596 << fMaxInactive << "(sec) reached.\n" 00597 << "ChildServer will be shutdown." << endl; 00598 Shutdown(); 00599 fMessageOut.Reset(DDS::kInactive); 00600 rc = 2; 00601 if ( fTSocket && IsClientConnected() ) fTSocket->Send(fMessageOut); 00602 } 00603 else { 00604 timer.Continue(); 00605 gSystem->Sleep(10); // sleep 10 msec so as to avoid excessive cpu 00606 } 00607 } 00608 } // End of loop over client requests 00609 00610 return rc; 00611 00612 }
| void DDSChildServer::Shutdown | ( | ) | [private] |
Definition at line 614 of file DDSChildServer.cxx.
References PerInputStreamManager::CloseStream(), fInputStreamManager, fMessageOut, fPid, fShutdown, DDS::kOk, Msg::kVerbose, and MSG.
Referenced by Run().
00614 { 00615 // Purpose: This Service shuts down the child server. This method is 00616 // activated when the DDS::kShutdown message is received in 00617 // DDSChildServer::Run(). 00618 // 00619 // Argument: none. 00620 // 00621 // Return: none. 00622 // 00623 // Contact: S. Kasahara 00624 // 00625 00626 fShutdown = true; // stops processing loop in Run 00627 MSG("DDS",Msg::kVerbose) << "CS_" << fPid 00628 << " PerInputStreamManager status before shutdown:\n" 00629 << fInputStreamManager; 00630 fInputStreamManager.CloseStream(); // close all managed streams 00631 fMessageOut.Reset(DDS::kOk); 00632 00633 return; 00634 00635 }
| void DDSChildServer::Subscribe | ( | ) | [private] |
Definition at line 637 of file DDSChildServer.cxx.
References PerInputStreamManager::CloseStream(), fFileHandler, fInputStreamManager, fMessageIn, fMessageOut, fPid, fSubscription, DDSFileHandler::GetCurrentFileName(), PerInputStreamManager::GetOpenedStream(), PerStreamManager::GetStreamMap(), Msg::kDebug, DDS::kError, DDS::kOk, Msg::kWarning, MSG, PerInputStreamManager::OpenStream(), and PerInputStreamManager::SetMaxSyncDelay().
Referenced by Run().
00637 { 00638 // Purpose: This Service receives and processes a new subscription from 00639 // the client. This method is activated when the DDS::kSubscribe 00640 // message is received in DDSChildServer::Run(). 00641 // 00642 // Argument: none. 00643 // 00644 // Return: none. 00645 // 00646 // Contact: S. Kasahara 00647 // 00648 00649 // If previous subscription exists, must delete it 00650 if ( fSubscription) delete fSubscription; fSubscription = 0; 00651 00652 // Attempt to receive DDSSubscription object from fMessageIn 00653 fSubscription = dynamic_cast<DDSSubscription*> 00654 (fMessageIn -> ReadObject(fMessageIn->GetClass())); 00655 if ( !fSubscription ) { 00656 MSG("DDS",Msg::kWarning) << "CS_" << fPid 00657 << "\nSubscribe failed to receive expected DDSSubscription from client." 00658 << endl; 00659 fMessageOut.Reset(DDS::kError); 00660 } 00661 else { 00662 // Subscription received successfully 00663 MSG("DDS",Msg::kDebug) << "CS_" << fPid 00664 << " Subscribe received subscription:\n" 00665 << fSubscription; 00666 00667 fMessageOut.Reset(DDS::kOk); 00668 if (fFileHandler && ( (fSubscription -> GetDataSource()) 00669 !=(fFileHandler -> GetDataSource()) 00670 || (fSubscription -> IsOffLine()) 00671 !=(fFileHandler -> IsOffLine()) ) ) { 00672 delete fFileHandler; fFileHandler=0; 00673 } 00674 if (!fFileHandler) 00675 fFileHandler = new DDSFileHandler(fSubscription -> GetDataSource(), 00676 fSubscription -> IsOffLine()); 00677 // Determine current file from filehandler 00678 std::string currentFileName = fFileHandler->GetCurrentFileName(); 00679 00680 // Open new streams in subscription list 00681 PerInputStream* instream; 00682 const DDSSubscription::StreamMap& sm_sub = fSubscription->GetStreamMap(); 00683 for (DDSSubscription::StreamMapConstItr itr_sub = sm_sub.begin(); 00684 itr_sub!= sm_sub.end();++itr_sub) { 00685 string streamName = (itr_sub->first).Data(); 00686 if ( !(instream = dynamic_cast<PerInputStream*> 00687 (fInputStreamManager.GetOpenedStream(streamName))) ) { 00688 // For DDS application, streamname & treename are the same; 00689 instream = fInputStreamManager.OpenStream(streamName,streamName); 00690 if (!currentFileName.empty())instream -> SetFile(currentFileName); 00691 } 00692 if ( instream ) instream -> SetSelection((itr_sub->second).Data()); 00693 } 00694 00695 fInputStreamManager.SetMaxSyncDelay(fSubscription -> GetMaxSyncDelay()); 00696 00697 // Finally, close all streams not on subscription list 00698 const PerStreamManager::StreamMap& sm_mgr 00699 = fInputStreamManager.GetStreamMap(); 00700 for ( PerStreamManager::StreamMapConstItr itr_mgr = sm_mgr.begin(); 00701 itr_mgr!= sm_mgr.end(); ++itr_mgr) { 00702 std::string streamName = itr_mgr->first; 00703 DDSSubscription::StreamMapConstItr itr_sub 00704 = sm_sub.find(streamName.c_str()); 00705 if (itr_sub == sm_sub.end())fInputStreamManager.CloseStream(streamName); 00706 } 00707 } 00708 00709 return; 00710 00711 }
TInetAddress DDSChildServer::fClientAddress [private] |
DDSFileHandler* DDSChildServer::fFileHandler [private] |
Definition at line 61 of file DDSChildServer.h.
Referenced by GoToFile(), Next(), Subscribe(), and ~DDSChildServer().
Definition at line 70 of file DDSChildServer.h.
Referenced by Get(), GoToFile(), Next(), Shutdown(), Subscribe(), and ~DDSChildServer().
Int_t DDSChildServer::fMaxInactive [private] |
TMessage* DDSChildServer::fMessageIn [private] |
Definition at line 68 of file DDSChildServer.h.
Referenced by Run(), Subscribe(), and ~DDSChildServer().
TMessage DDSChildServer::fMessageOut [private] |
Definition at line 69 of file DDSChildServer.h.
Referenced by Get(), GoToFile(), Next(), Run(), Shutdown(), and Subscribe().
MomNavigator* DDSChildServer::fMom [private] |
int DDSChildServer::fPid [private] |
Definition at line 63 of file DDSChildServer.h.
Referenced by GoToFile(), Next(), Print(), Run(), Shutdown(), and Subscribe().
TInetAddress DDSChildServer::fServerAddress [private] |
Definition at line 71 of file DDSChildServer.h.
bool DDSChildServer::fShutdown [private] |
DDSSubscription* DDSChildServer::fSubscription [private] |
Definition at line 60 of file DDSChildServer.h.
Referenced by Next(), Subscribe(), and ~DDSChildServer().
TSocket* DDSChildServer::fTSocket [private] |
Definition at line 59 of file DDSChildServer.h.
Referenced by IsClientConnected(), IsValid(), Run(), and ~DDSChildServer().
1.4.7