00001
00002 #include <stdio.h>
00003
00004 #include <unistd.h>
00005
00006 #include <stdlib.h>
00007
00008 #include <string.h>
00009
00010
00011 #include <stdint.h>
00012
00013 #include "OnlineUtil/msgLogLib/msgLog.h"
00014 #include "OnlineUtil/rototalk.h"
00015
00016
00017 #include <time.h>
00018 #include <sys/time.h>
00019
00020 #include "OnlineUtil/rawBlockIds.h"
00021 #include "OnlineUtil/rdChecksum.h"
00022 #include "OnlineUtil/rotoMessages.h"
00023
00024
00025 int32_t send_bogus_stuff(int32_t* buffer, int32_t bsize, int32_t mxrec);
00026 int32_t parse_detector(const char* detname);
00027 int32_t config_autosave(const char* config);
00028 int32_t config_compress(const char* config);
00029 int32_t config_basketsize(const char* config);
00030 int32_t build_dcs_blockid(int32_t detector, int32_t major, int32_t version);
00031
00032 int32_t* append_dcs_header_block(int32_t* buffer, int32_t* toofar);
00033 int32_t* append_dcs_monitor_block(int32_t* buffer, int32_t* toofar);
00034 int32_t* append_dcs_alarm_block(int32_t* buffer, int32_t* toofar);
00035
00036
00037 #define MAXBUFFER 8388608
00038
00039
00040 int32_t gDetector = kMdBlockSourceFarDetector;
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072 int main(int argc, char **argv)
00073 {
00074
00075
00076 const char* iphost = "localhost";
00077 const char* default_producer = "DCS";
00078 const int default_port = 9012;
00079 const char* producer = default_producer;
00080 const char* autosave_config = 0;
00081 const char* compress_config = 0;
00082 const char* basketsize_config = 0;
00083 int32_t port = default_port;
00084 int32_t whoami = MINOS_ROOTER_DCS;
00085 int32_t bsize = MAXBUFFER;
00086 int32_t* buffer = 0;
00087 int32_t nerr = 0;
00088 int32_t mxrec = 1000;
00089 int32_t nodelay_flag = 1;
00090 int32_t echoMsgLog = 0;
00091 int32_t copt;
00092
00093
00094
00095
00096
00097 while ((copt = getopt(argc, argv, "i:b:d:c:C:B:w:p:ev:n:D:h")) != EOF) {
00098 switch (copt) {
00099 case 'i':
00100 iphost = optarg;
00101 break;
00102 case 'b':
00103 bsize = atoi(optarg);
00104 break;
00105 case 'd':
00106 gDetector = parse_detector(optarg);
00107 break;
00108 case 'c':
00109 autosave_config = optarg;
00110 break;
00111 case 'C':
00112 compress_config = optarg;
00113 break;
00114 case 'B':
00115 basketsize_config = optarg;
00116 break;
00117 case 'w':
00118 producer = optarg;
00119 break;
00120 case 'p':
00121 port = atoi(optarg);
00122 break;
00123 case 'e':
00124 echoMsgLog = 1;
00125 break;
00126 case 'v':
00127 roto_verbose = atoi(optarg);
00128 break;
00129 case 'n':
00130 mxrec = atoi(optarg);
00131 break;
00132 case 'D':
00133 nodelay_flag = atoi(optarg);
00134 break;
00135 case 'h':
00136 printf(" usage: %s -i<hostname> -b<buffer size> -w<whoami> -p<port #> <filenames..>\n", argv[0]);
00137 printf(" -i: hostname where rotorooter is running\n");
00138 printf(" -b: buffer size to use\n");
00139 printf(" -d: detector ('near','far','caldet')\n");
00140 printf(" -w: DCP, DCS or BeamMon\n");
00141 printf(" -p: port number rotorooter is listening on\n");
00142 printf(" -v: how verbose to be\n");
00143 printf(" -c: autosave config \"streamName,nrec,nsec[;streamName,nrec,nsec]\"\n");
00144 printf(" -C: compression config \"streamName,level[;streamName,level]\"\n");
00145 printf(" -B: basketsize config \"streamName,size[;streamName,size]\"\n");
00146
00147 printf(" -n: maximum number of records from each file\n");
00148
00149 printf(" -h: print this message\n");
00150 exit(1);
00151 default:
00152 printf(" unrecognized option '%c' ignored\n",(char)optopt);
00153 break;
00154 }
00155 }
00156
00157 msgLogInit(argv[0]);
00158 msgLogNodeIdSet(whoami);
00159 msgLogLocalEchoSet(echoMsgLog);
00160 logDebugLevelSet(3);
00161 logNotice("starting %s",argv[0]);
00162
00163
00164 buffer = (int32_t*) malloc(bsize);
00165 if (!buffer) {
00166 printf("failed to allocate buffer of size %d\n",bsize);
00167 exit(1);
00168 }
00169 if (roto_verbose>1) printf("allocated buffer of size %d\n",bsize);
00170
00171
00172 if (0 == strcmp(producer,default_producer) &&
00173 port == default_port) {
00174
00175 if (roto_verbose>1)
00176 printf("connect via roto_open_DCP_connection\n");
00177 nerr += roto_open_DCS_connection(iphost);
00178 } else {
00179
00180 if (0 == strcmp(producer,"DCP")) {
00181 whoami = MINOS_ROOTER_DCP;
00182 }
00183 else if (0 == strcmp(producer,"DCS")) {
00184 whoami = MINOS_ROOTER_DCS;
00185 }
00186 else if (0 == strcmp(producer,"BeamMon")) {
00187 whoami = MINOS_ROOTER_BEAMMON;
00188 }
00189 else
00190 whoami = MINOS_ROOTER_UNIDENTIFIED_CLIENT;
00191
00192 if (roto_verbose>1)
00193 printf("connect via roto_open_connection: port %d, whoami %s (0x%2.2x)\n",port,producer,whoami);
00194 nerr += roto_open_connection(iphost,port,whoami);
00195 }
00196 if (nerr) exit(nerr);
00197
00198 nerr += roto_set_connection_nodelay(nodelay_flag);
00199
00200 if (autosave_config) nerr += config_autosave(autosave_config);
00201 if (compress_config) nerr += config_compress(compress_config);
00202 if (basketsize_config) nerr += config_basketsize(basketsize_config);
00203
00204 nerr += send_bogus_stuff(buffer,bsize,mxrec);
00205
00206
00207 nerr += roto_close_connection();
00208
00209
00210 free(buffer);
00211
00212 logNotice("stopping %s",argv[0]);
00213 msgLogCleanup();
00214
00215 exit(nerr);
00216 }
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234 int32_t send_bogus_stuff(int32_t* buffer, int32_t bsize, int32_t mxrec)
00235 {
00236
00237 int32_t nRecords=0;
00238 int32_t nbytes;
00239
00240 int32_t* current;
00241 int32_t* endofbuf = buffer + bsize/sizeof(int32_t);
00242
00243 int32_t nerr = 0;
00244
00245 struct timeval now;
00246 gettimeofday(&now,0);
00247
00248 nerr += roto_open_dcsfile(gDetector,now.tv_sec,now.tv_usec*1000);
00249 if (nerr) {
00250 printf("Rotorooter failed to open output file\n");
00251 return 1;
00252 }
00253
00254 while (1) {
00255 if (mxrec>0 && nRecords==mxrec) break;
00256 nRecords++;
00257
00258
00259
00260
00261
00262 current = buffer;
00263
00264
00265 current = append_dcs_header_block(current,endofbuf);
00266
00267
00268
00269
00270
00271 current = append_dcs_alarm_block(current,endofbuf);
00272 current = append_dcs_monitor_block(current,endofbuf);
00273
00274
00275
00276
00277 nbytes = (current-buffer)*sizeof(int32_t);
00278 nerr += roto_send_record(buffer,nbytes);
00279
00280 logDebug(5,"sent record %d of length %d bytes",nRecords,nbytes);
00281
00282 }
00283
00284 nerr += roto_close_dcsfile();
00285 return nerr;
00286 }
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302 int32_t parse_detector(const char* detname)
00303 {
00304
00305 switch (detname[0]) {
00306 case 'n':
00307 case 'N':
00308 case '1':
00309 return kMdBlockSourceNearDetector;
00310 break;
00311 case 'f':
00312 case 'F':
00313 case '2':
00314 return kMdBlockSourceFarDetector;
00315 break;
00316 case 'c':
00317 case 'C':
00318 case '4':
00319 return kMdBlockSourceCalDetector;
00320 break;
00321 default:
00322 logWarn("parse_detector: could not decipher '%s', using value %d",
00323 detname,gDetector);
00324 return gDetector;
00325 }
00326 }
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344 int32_t config_autosave(const char* config)
00345 {
00346 int32_t nerr = 0;
00347 int32_t len = strlen(config);
00348 char *wcopy = (char*)malloc(len+1);
00349 char stream[1024];
00350 const char *p1;
00351 char *p2;
00352 char achar;
00353 int32_t nrec,nsec, nitems;
00354
00355
00356 p1 = config; p2 = wcopy;
00357 while (p2 < wcopy+len) {
00358 achar = *p1;
00359 if (achar != ',') *p2 = achar;
00360 else *p2 = ' ';
00361 p1++; p2++;
00362 }
00363
00364 p2 = wcopy;
00365 while (p2 < wcopy+len) {
00366 nitems = sscanf(p2,"%s %d %d",stream,&nrec,&nsec);
00367 if (nitems != 3) {
00368 logInfo("config_autosave: only %d items from \"%s\"\n",nitems,p2);
00369 nerr += rototalk_err_badconfig;
00370 }
00371 else
00372 nerr += roto_send_autosave_config(stream,nrec,nsec);
00373
00374 p2 = strchr(p2,';');
00375 if (!p2) break;
00376 p2++;
00377 }
00378
00379
00380 free(wcopy);
00381 wcopy = 0;
00382
00383 return nerr;
00384 }
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402 int32_t config_compress(const char* config)
00403 {
00404 int32_t nerr = 0;
00405 int32_t len = strlen(config);
00406 char *wcopy = (char*)malloc(len+1);
00407 char stream[1024];
00408 const char *p1;
00409 char *p2;
00410 char achar;
00411 int32_t level, nitems;
00412
00413
00414 p1 = config; p2 = wcopy;
00415 while (p2 < wcopy+len) {
00416 achar = *p1;
00417 if (achar != ',') *p2 = achar;
00418 else *p2 = ' ';
00419 p1++; p2++;
00420 }
00421
00422 p2 = wcopy;
00423 while (p2 < wcopy+len) {
00424 nitems = sscanf(p2,"%s %d",stream,&level);
00425 if (nitems != 2) {
00426 logInfo("config_compress: only %d items from \"%s\"\n",nitems,p2);
00427 nerr += rototalk_err_badconfig;
00428 }
00429 else
00430 nerr += roto_send_compress_config(stream,level);
00431
00432 p2 = strchr(p2,';');
00433 if (!p2) break;
00434 p2++;
00435 }
00436
00437
00438 free(wcopy);
00439 wcopy = 0;
00440
00441 return nerr;
00442 }
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460 int32_t config_basketsize(const char* config)
00461 {
00462 int32_t nerr = 0;
00463 int32_t len = strlen(config);
00464 char *wcopy = (char*)malloc(len+1);
00465 char stream[1024];
00466 const char *p1;
00467 char *p2;
00468 char achar;
00469 int32_t bsize, nitems;
00470
00471
00472 p1 = config; p2 = wcopy;
00473 while (p2 < wcopy+len) {
00474 achar = *p1;
00475 if (achar != ',') *p2 = achar;
00476 else *p2 = ' ';
00477 p1++; p2++;
00478 }
00479
00480 p2 = wcopy;
00481 while (p2 < wcopy+len) {
00482 nitems = sscanf(p2,"%s %d",stream,&bsize);
00483 if (nitems != 2) {
00484 logInfo("config_basketsize: only %d items from \"%s\"\n",nitems,p2);
00485 nerr += rototalk_err_badconfig;
00486 }
00487 else
00488 nerr += roto_send_basketsize_config(stream,bsize);
00489
00490 p2 = strchr(p2,';');
00491 if (!p2) break;
00492 p2++;
00493 }
00494
00495
00496 free(wcopy);
00497 wcopy = 0;
00498
00499 return nerr;
00500 }
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536 int32_t build_dcs_blockid(int32_t detector, int32_t major, int32_t version)
00537 {
00538 const int32_t shiftRawBlkIdMinor = 0;
00539 const int32_t shiftRawBlkIdMajor = 8;
00540 const int32_t shiftRawBlkIdDetector = 25;
00541 const int32_t shiftRawBlkIdCSimFlag = 28;
00542
00543 const int32_t maskRawBlkIdMinor = 0x000000ff;
00544 const int32_t maskRawBlkIdMajor = 0x00ffff00;
00545 const int32_t maskRawBlkIdIsDCS = 0x01000000;
00546 const int32_t maskRawBlkIdDetector = 0x0e000000;
00547 const int32_t maskRawBlkIdCSimFlag = 0x30000000;
00548
00549 int32_t cmptSimFlag = 0;
00550 int32_t isDCS = 1;
00551
00552 return
00553 ((cmptSimFlag << shiftRawBlkIdCSimFlag) & maskRawBlkIdCSimFlag) |
00554 (( detector << shiftRawBlkIdDetector) & maskRawBlkIdDetector) |
00555 ((isDCS) ? maskRawBlkIdIsDCS : 0) |
00556 (( major << shiftRawBlkIdMajor) & maskRawBlkIdMajor) |
00557 (( version << shiftRawBlkIdMinor) & maskRawBlkIdMinor);
00558
00559 }
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576 int32_t* append_dcs_header_block(int32_t* buffer, int32_t* toofar)
00577 {
00578
00579
00580
00581
00582
00583
00584
00585
00586 int32_t version = 0;
00587 int32_t blksize = 5;
00588 struct timeval now;
00589
00590
00591 if (buffer+blksize > toofar) {
00592 logError("append_dcs_header_block: need %ld have %d",
00593 blksize,toofar-buffer);
00594 return buffer;
00595 }
00596
00597
00598
00599 buffer[0] = blksize;
00600
00601 buffer[2] = build_dcs_blockid(gDetector,kMdBlockDcsHeader,version);
00602
00603
00604 gettimeofday(&now,0);
00605 buffer[3] = now.tv_sec;
00606 buffer[4] = now.tv_usec*1000;
00607
00608
00609 buffer[1] = rdxsum_calc(buffer+0,1);
00610
00611 return buffer + buffer[0];
00612 }
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629 int32_t* append_dcs_alarm_block(int32_t* buffer, int32_t* toofar)
00630 {
00631
00632
00633
00634
00635
00636
00637
00638 int32_t version = 0;
00639 int32_t blksize, payload, j;
00640
00641
00642
00643
00644
00645
00646
00647 if ( rand()/(RAND_MAX+1.0) > 0.10 ) return buffer;
00648
00649
00650
00651
00652
00653
00654 payload = 1 + (int32_t)(30.*rand()/(RAND_MAX+1.0));
00655 blksize = 3 + payload;
00656
00657
00658 if (buffer+blksize > toofar) {
00659 logError("append_dcs_alarm_block: need %ld have %d",
00660 blksize,toofar-buffer);
00661 return buffer;
00662 }
00663
00664
00665
00666 buffer[0] = blksize;
00667
00668 buffer[2] = build_dcs_blockid(gDetector,kMdBlockDcsAlarm,version);
00669
00670
00671 for (j=0; j<payload; ++j) buffer[j+3] = rand();
00672
00673
00674 buffer[1] = rdxsum_calc(buffer+0,1);
00675
00676 return buffer + buffer[0];
00677 }
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694 int32_t* append_dcs_monitor_block(int32_t* buffer, int32_t* toofar)
00695 {
00696
00697
00698
00699
00700
00701
00702
00703 int32_t version = 0;
00704 int32_t blksize, payload, j;
00705
00706
00707
00708
00709
00710
00711
00712 if ( rand()/(RAND_MAX+1.0) > 0.90 ) return buffer;
00713
00714
00715
00716
00717
00718
00719 payload = 30 + (int32_t)(300.*rand()/(RAND_MAX+1.0));
00720 blksize = 3 + payload;
00721
00722
00723 if (buffer+blksize > toofar) {
00724 logError("append_dcs_monitor_block: need %ld have %d",
00725 blksize,toofar-buffer);
00726 return buffer;
00727 }
00728
00729
00730
00731 buffer[0] = blksize;
00732
00733 buffer[2] = build_dcs_blockid(gDetector,kMdBlockDcsMonitor,version);
00734
00735
00736 for (j=0; j<payload; ++j) buffer[j+3] = rand();
00737
00738
00739 buffer[1] = rdxsum_calc(buffer+0,1);
00740
00741 return buffer + buffer[0];
00742 }