StRoot  1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
daqReader.cxx
1 #include <stdio.h>
2 #include <ctype.h>
3 #include <sys/stat.h>
4 #include <fcntl.h>
5 #include <string.h>
6 #include <sys/mman.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <errno.h>
10 #include <time.h>
11 #include <arpa/inet.h>
12 #include <pwd.h>
13 #include <assert.h>
14 #include <iccp2k.h>
15 
16 #if defined(__linux__) || defined(__APPLE__)
17 #include <sched.h>
18 // some older Linuxes (notably 6.2) crash and burn
19 // because of not having madvise so...
20 // Redhat EL3 crashes the kernel! with madvise... christ...
21 #define madvise(x,y,z)
22 
23 #else
24 #include <procfs.h>
25 #endif
26 
27 // MUST BE INCLUDED BEFORE ANY OTHER RTS INCLUDE!
28 //#include <daqReader.hh>
29 
30 #include <rtsLog.h>
31 #include <daqFormats.h>
32 #include <iccp.h>
33 #include <SFS/sfs_index.h>
34 #include <rts.h>
35 
36 #include "daqReader.h"
37 #include "msgNQLib.h"
38 #include "cfgutil.h"
39 #include "daq_det.h"
40 
41 #ifndef MADV_DONTNEED
42 #define madvise(x,y,z)
43 #endif
44 
45 
46 u_int evp_daqbits ;
47 
48 //Tonko:
49 static const char cvs_id_string[] = "$Id: daqReader.cxx,v 1.74 2023/02/13 14:37:41 tonko Exp $" ;
50 
51 static int evtwait(int task, ic_msg *m) ;
52 static int ask(int desc, ic_msg *m) ;
53 
54 DATAP *getlegacydatap(char *mem, int bytes);
55 
56 //static int thisNode = EVP_NODE ;
57 //static int evpDesc ;
58 static const char *getCommand(void) ;
59 
60 
61 
62 
63 // CONSTRUCTOR!
64 daqReader::daqReader(char *mem, int size)
65 {
66 
67 
68  init();
69  input_type = pointer;
70 
71  data_memory = mem;
72  event_memory = mem;
73  evt_offset_in_file = 0;
74 
75  data_size = size;
76 
77  file_size = size;
78  // LOG("JEFF", "filesize=%d mem=%p",file_size,mem);
79 
80  crit_cou = 0;
81 
82 
83 }
84 
85 daqReader::daqReader(const char *name)
86 {
87  struct stat64 stat_buf ;
88 
89  // Default map flags
90  map_prot = PROT_READ;
91  map_flags = MAP_SHARED | MAP_NORESERVE;
92 
93  init();
94 
95  if(name == NULL) { // EVP
96  input_type = live;
97  isevp = 1;
98  if(reconnect() < 0) status = EVP_STAT_CRIT; // will loop until success or Cntrl-C I guess...
99  return ; // that's it....
100  }
101 
102  // Move the filename in...
103  strcpy(fname, name);
104 
105 
106  // This code is reached only if the argument was non-NULL
107  // file or directory?
108  if(stat64(fname, &stat_buf) < 0) { // error
109  LOG(CRIT,"Can't stat \"%s\" [%s]",fname,strerror(errno),0,0,0);
110  status = EVP_STAT_CRIT;
111  sleep(1);
112  return;
113  }
114 
115  LOG(NOTE,"Using file \"%s\"...",fname,0,0,0,0) ;
116 
117  // directory?
118  if(stat_buf.st_mode & S_IFDIR) {
119  LOG(DBG,"Running through a directory %s...",fname,0,0,0,0) ;
120  input_type = dir;
121 
122  // change to that directory...
123  // if(chdir(fname) < 0) {
124 // LOG(ERR,"Can't chdir %s [%s]",fname,strerror(errno),0,0,0) ;
125 // sleep(1) ;
126 // return ;
127 // }
128 
129  status = EVP_STAT_OK ;
130  return;
131  }
132 
133  // If not, it must be a file...
134  file_size = stat_buf.st_size ;
135  evt_offset_in_file = 0;
136  input_type = file;
137 
138  strcpy(file_name,fname) ;
139 
140  // descriptor for ".daq" view of file...
141  LOG(DBG,"Running through a file %s of %d bytes",fname,file_size,0,0,0) ;
142 
143  desc = open64(fname,O_RDONLY,0444) ;
144  if(desc < 0) { // error
145  LOG(ERR,"Can't open %s [%s]",fname,strerror(errno),0,0,0) ;
146  sleep(1) ;
147  return ;
148  }
149 
150  status = EVP_STAT_OK ;
151 
152  return ;
153 }
154 
155 int daqReader::getDetectorSize(const char *det)
156 {
157  if (!sfs) return 0;
158 
159  SfsDirsize sz;
160 
161  sfs->getDirSize((char *)det, &sz);
162  // printf("sizeof(%s) : %lld (%3.1f%% overhead) %lld %lld\n",det, sz.dataSize, ((double)(sz.size - sz.dataSize)) / ((double)(sz.dataSize)), sz.dataSize, sz.size);
163 
164  return (int)sz.size;
165 }
166 
167 void daqReader::init()
168 {
169  // Get evphostname
170  EVP_HOSTNAME = (char *)_EVP_HOSTNAME;
171  static char evp_hostname[100];
172  char *str = getenv((char *)"EVP_HOSTNAME");
173 
174  if(str) {
175  strcpy(evp_hostname, str);
176  EVP_HOSTNAME = evp_hostname;
177  }
178 
179 
180 #ifndef RTS_ONLINE
181  // this is only for Offline!
182 
183  // rtsLogLevel(WARN) ;
184  // rtsLogAddDest("130.199.60.86",RTS_LOG_PORT_READER) ; // reader.log to daqman
185 
186  // allow this one message to daqman's reader log...
187  //rtsLogOutput(RTS_LOG_NET) ;
188  //LOG(INFO,"%s",cvs_id_string) ;
189  // back to STDERR
190  rtsLogOutput(RTS_LOG_STDERR) ; // STDERR only!
191  LOG(INFO,"%s",cvs_id_string) ;
192 #endif
193 
194 
195  // if(strcmp(EVP_HOSTNAME, _EVP_HOSTNAME) != 0) {
196  LOG(DBG,"EVP_HOSTNAME set to %s", EVP_HOSTNAME);
197  // }
198 
199  isevp = 0; // assume not...
200  fname[0] = '\0';
201  memmap = new MemMap();
202  sfs = new sfs_index();
203  sfs_lastevt = 0;
204 
205  runconfig = (rccnf *) valloc(sizeof(rccnf)) ;
206  memset(runconfig,0,sizeof(rccnf));
207 
208  memset(dets,0,sizeof(dets)) ;
209  memset(pseudo_dets,0,sizeof(pseudo_dets));
210 
211  // setup...
212  do_open = 1 ;
213  do_mmap = 1 ;
214  strcpy(evp_disk,"") ;
215 
216  desc = -1 ;
217  //mem_mapped = NULL ;
218  //bytes_mapped = 0 ;
219 
220  issued = 0 ;
221  last_issued = time(NULL) ;
222  status = EVP_STAT_CRIT ;
223  // tot_bytes = 0 ;
224 
225  input_type = none;
226 
227  event_number = 0 ; // last known event...
228  total_events = 0 ;
229  readall_reset();
230  bytes = 0 ;
231 
232  file_size = 0 ; // only if it's a real big file...
233 
234  status = EVP_STAT_OK ;
235 
236  return ;
237 }
238 
239 int daqReader::setOpen(int flg)
240 {
241  int ret ;
242 
243  ret = do_open ;
244  do_open = flg ;
245 
246  return ret ;
247 }
248 
249 int daqReader::setMmap(int flg)
250 {
251  int ret ;
252 
253  ret = do_mmap ;
254  do_mmap = flg ;
255 
256  return ret ;
257 }
258 
259 int daqReader::setLog(int flg)
260 {
261 
262  if(flg) {
263  rtsLogOutput(RTS_LOG_STDERR|RTS_LOG_NET) ;
264  }
265  else {
266  rtsLogOutput(RTS_LOG_NET) ;
267  }
268 
269  return 0 ;
270 }
271 
272 
273 char *daqReader::setEvpDisk(char *name)
274 {
275  char *saved = _static_str_return_;
276 
277  strcpy(saved, evp_disk) ;
278  strncpy(evp_disk,name,sizeof(evp_disk)-1) ;
279 
280  return saved ;
281 }
282 
283 daqReader::~daqReader(void)
284 {
285  LOG(DBG,"Destructor %s",fname,0,0,0,0) ;
286 
287  // clean-up input file...
288  if(desc >= 0) close(desc) ;
289 
290 
291  if(input_type == live) {
292  msgNQDelete(evpDesc) ;
293  }
294 
295  if(memmap) delete(memmap);
296  if(sfs) delete(sfs) ;
297  if(runconfig) free(runconfig) ;
298 
299  // call detector destructors!
300  for(int i=0;i<32;i++) {
301  if(dets[i]) {
302  LOG(DBG,"Will destruct det %d",i) ;
303  delete(dets[i]) ;
304  }
305  if(pseudo_dets[i]) {
306  LOG(DBG,"Will destruct pseudo det %d",i) ;
307  delete(pseudo_dets[i]) ;
308  }
309  }
310 
311 
312  return ;
313 }
314 
315 /*
316  Args
317  =====
318  If num == 0 Next event
319  num > 0 That particula event (if it makes sense...)
320 
321 
322  Output
323  ======
324  Returns a valid pointer on success or NULL on error...
325 */
326 
327 
328 char *daqReader::get(int num, int type)
329 {
330  // static int crit_cou = 30 ;
331  //static struct LOGREC lrhd ;
332  int ret ;
333  //int leftover, tmp ;
334  // char _evp_basedir_[40];
335  //static char _last_evp_dir_[40];
336 
337  memset(trgIds, 0xffffffff, sizeof(trgIds));
338  trgIdsSet = 0;
339  trgIdsNotPresent = 0;
340 
341  event_number++;
342  LOG(DBG, "processing nth event. n=%d from %s %lld",event_number, getInputType(),evt_offset_in_file);
343 
344  // nix some variables which may have no meaning for all streams...
345  evb_type = 0 ;
346  evb_type_cou = 0 ;
347  evb_cou = 0 ;
348  run = 0 ;
349 
350  memset(streaming_node, 0, sizeof(streaming_node));
351  streaming_evb = -1;
352  streaming_seq = -1;
353 
354  // check status of constructor or previous get()
355 
356  int delay = getStatusBasedEventDelay();
357  if(delay) {
358  if(delay == -1) {
359  LOG(CRIT, "Exiting because of critical errors");
360  exit(0);
361  }
362 
363  LOG(DBG, "Delay of %d usec because of previous event status %d",delay, status);
364  usleep(delay);
365  }
366 
367  // unmap previous event if any...
368  if(memmap->mem) {
369  memmap->unmap();
370  }
371 
372  if((input_type == dir) ||
373  (input_type == live)) {
374 
375  if(getNextEventFilename(num, type) < 0) {
376  event_number--;
377  return NULL;
378  }
379 
380  if(openEventFile() < 0) {
381  if(input_type == dir) {
382  LOG(NOTE,"No File, but didn't see token 0 - stopping...",event_number) ;
383  status = EVP_STAT_EOR ;
384  event_number--;
385  //event_number = 1;
386  }
387  LOG("JEFF", "x");
388  return NULL;
389  }
390  }
391 
392  // at this point:
393  // 1. there is no file and we are memory mapped already
394  // 2. (or) the file is opened...
395  //
396  // Why? either:
397  //
398  // 1. Is a single event file in which case it was opened above
399  // the current position is set to the begining of the file
400  //
401  // (or)
402  //
403  // 2. it is a multi-event file and the file is opened in the constructer
404  // or remains open from the previous event. The current position is set to
405  // the begining of the next event (points at: LRHD or DATAP or sfs FILE record)
406  //
407  //if((input_type == file) || (input_type == pointer)) event_number++ ;
408 
409  if(input_type == pointer) { // start at beginning...
410  if(event_memory == NULL)
411  event_memory = data_memory;
412  }
413 
414  // Sets:
415  // event_size, lrhd_offset, datap_offset & status if failure...
416 
417  repeat:
418 
419  int error = getEventSize();
420 
421  if(status == EVP_STAT_LOG) {
422  // we know this is a stand alone SFS file
423  // and we know it is a log message, or at least has no event
424  // try the next directory
425 
426  LOG(DBG, "Skipping a non-event SFS file...");
427 
428  lseek64(desc, event_size, SEEK_CUR);
429  evt_offset_in_file += event_size;
430  //LOG("JEFF", "evt_offset_in_file = %d",evt_offset_in_file);
431  event_size = 0;
432  goto repeat;
433  }
434 
435  // We handle the possible padding bug by
436  // searching and tacking the extra padding to the end of the event
437  // where it can do us no harm...
438  //
439  int padchecks = 0;
440  for(;;) {
441  padchecks++;
442  int inc = addToEventSize(event_size);
443 
444  if(inc == 0) {
445  LOG(DBG, "No extra increment... event size=%d",event_size);
446  break;
447  }
448 
449  LOG(WARN, "Found a padding bug. Adding %d to event size",inc);
450  event_size += inc;
451 
452  if(padchecks > 5) {
453  LOG(ERR, "Error finding next event...");
454  status = EVP_STAT_EOR;
455  return NULL;
456  }
457  }
458 
459  LOG(DBG, "Event size is %d (%d) %d",event_size, error, status);
460 
461  if(status == EVP_STAT_EOR) {
462  LOG(DBG, "Status = EOR");
463 // LOG("JEFF", "x");
464  return NULL;
465  }
466 
467  if(error == 0) {
468  status = EVP_STAT_EOR;
469  LOG(NOTE, "x");
470  return NULL;
471  }
472 
473  if(error < 0) {
474  status = EVP_STAT_EVT;
475  LOG("JEFF", "x");
476  return NULL;
477  }
478 
479  if((event_size + evt_offset_in_file) > file_size) {
480  LOG(WARN,"This event is truncated... Good events %d [%d+%d > %d]...", total_events,evt_offset_in_file,event_size,file_size,0) ;
481  if((input_type == pointer) ||
482  (input_type == dir)) {
483  status = EVP_STAT_EVT ;
484  }
485  else {
486  status = EVP_STAT_EOR ;
487  }
488  LOG("JEFF", "x");
489  return NULL;
490  }
491 
492 
493  if(input_type != pointer) {
494  LOG(DBG, "Mapping event file %s, offset %d, size %d",
495  file_name, evt_offset_in_file, event_size);
496 
497  char *mapmem = memmap->map(desc, evt_offset_in_file, event_size, map_prot, map_flags);
498  if(!mapmem) {
499  LOG(CRIT, "Error mapping memory for event");
500  exit(0);
501  }
502  }
503  else {
504  memmap->map_real_mem(event_memory, event_size);
505  }
506 
507  LOG(DBG, "Event is now in memory: start=0x%x, length=%d",memmap->mem,event_size);
508 
509  // Neccessary?
510  if(input_type == pointer) {
511  if(run == 0) {
512  LOG(DBG, "Does this ever get called?");
513  run = readall_run;
514  }
515  }
516 
517  LOG(DBG, "about to mount sfs file: %s %d 0x%x",file_name, evt_offset_in_file, sfs);
518 
519  // Now, mount the sfs file...
520  // The mount unmounts and closes previous mount...
521  LOG(DBG, "mounting single dir(mem): off=%d sz=%d",evt_offset_in_file, event_size);
522 
523 
524 
525  LOG(DBG, "[%c%c%c%c]",memmap->mem[0],memmap->mem[1],memmap->mem[2],memmap->mem[3]);
526 
527  //printf("%p, %d %lld\n",memmap->mem,event_size, evt_offset_in_file);
528  ret = sfs->mountSingleDirMem(memmap->mem, event_size, evt_offset_in_file);
529 
530 
531  if(ret < 0) {
532  LOG(ERR, "Error mounting sfs?");
533  status = EVP_STAT_EVT;
534  LOG("JEFF", "x");
535  return NULL;
536  }
537 
538  // CD to the current event...
539  fs_dir *fsdir = sfs->opendir("/");
540  for(;;) {
541  fs_dirent *ent = sfs->readdir(fsdir);
542  if(!ent) {
543  sfs->closedir(fsdir);
544  LOG(ERR, "Error finding event directory in sfs?");
545 
546  // Skip directory... go to next
547  status = EVP_STAT_EVT;
548  LOG("JEFF", "x");
549  return NULL;
550  }
551 
552  LOG(DBG, "does dir (%s) satisfy '/#' or '/nnnn'",ent->full_name);
553 
554  if(memcmp(ent->full_name, "/#", 2) == 0) {
555  LOG(DBG, "change sfs dir to %s",ent->full_name);
556 
557  seq = atoi(&ent->full_name[2]);
558 
559  sfs->cd(ent->full_name);
560  sfs->closedir(fsdir);
561  break;
562  }
563 
564  if(memcmp(ent->full_name, "/%", 2) == 0) {
565  char _node[10];
566  char _evb[10];
567  char _idx[10];
568  memset(_node, 0, sizeof(_node));
569  memset(_evb, 0, sizeof(_evb));
570  memset(_idx, 0, sizeof(_idx));
571 
572  char *s = &ent->full_name[2];
573  char *d = _node;
574  while(*s != '-') {
575  *d++ = *s++;
576  }
577  *s++;
578  d = &_evb[0];
579  while(*s != '-') {
580  *d++ = *s++;
581  }
582  *s++;
583  d = &_idx[0];
584  while(*s != '\0') {
585  *d++ = *s++;
586  }
587 
588  strcpy(streaming_node, _node);
589  streaming_evb = atoi(_evb);
590  streaming_seq = atoi(_idx);
591 
592  LOG(DBG, "change sfs dir to %s (%s) (%s) (%s)",
593  ent->full_name, _node, _evb, _idx);
594 
595  sfs->cd(ent->full_name);
596  sfs->closedir(fsdir);
597  break;
598  }
599 
600  if(allnumeric(&ent->full_name[1])) {
601  seq = atoi(&ent->full_name[1]);
602  sfs->cd(ent->full_name);
603  sfs->closedir(fsdir);
604  break;
605  }
606 
607 
608  LOG(DBG, "SFS event directory not yet found: %s",ent->full_name);
609  }
610 
611  fs_dirent *datap = sfs->opendirent("legacy");
612  if(datap) {
613  mem = memmap->mem + datap->offset;
614  LOG(DBG, "Event has a datap bank at 0x%x",mem);
615  }
616  else {
617  mem = NULL;
618  LOG(DBG, "Event has no DATAP bank");
619  }
620 
621 
622  bytes = 0;
623  if(mem) {
624  bytes = event_size - (mem - memmap->mem);
625  LOG(DBG, "size = %d %d",event_size, bytes);
626  }
627 
628  // Fill in run number
629  //
630  run = 0;
631  fs_dirent *lrhd_ent = sfs->opendirent("lrhd");
632  if(lrhd_ent) {
633  char *lrhd_buff = memmap->mem + lrhd_ent->offset;
634  LOGREC *lrhd_rec = (LOGREC *)lrhd_buff;
635  run = lrhd_rec->lh.run;
636 
637  if(lrhd_rec->lh.byte_order != 0x04030201) {
638  run = swap32(run);
639  }
640  }
641 
642 
643  // Now we need to fill in the summary information from datap
644  //
645  SummaryInfo info;
646  fs_dirent *summary = sfs->opendirent("EventSummary");
647  if(summary) {
648  char *buff = memmap->mem + summary->offset;
649  fillSummaryInfo(&info,(gbPayload *)buff);
650  copySummaryInfoIn(&info);
651  }
652  else { // take it from datap
653  LOG(DBG, "No EventSummary, search for legacy datap");
654  summary = sfs->opendirent("legacy");
655  if(!summary) {
656  LOG(DBG, "No EventSummary and no DATAP... hacking summary info");
657  hackSummaryInfo();
658  }
659  else {
660  char *buff = memmap->mem + summary->offset;
661  fillSummaryInfo(&info,(DATAP *)buff);
662  copySummaryInfoIn(&info);
663  }
664  }
665 
666  // all done - all OK
667  status = EVP_STAT_OK ;
668  //tot_bytes += bytes ;
669  total_events++ ;
670 
671 
672  // move to next event although it may make no sense...
673  long long int endpos = lseek64(desc, 0, SEEK_CUR);
674 
675  long long int nexteventpos = lseek64(desc, event_size, SEEK_CUR) ;
676 
677  LOG(DBG,"End of event: start_offset=%d end_offset=%lld, file size %lld",endpos,nexteventpos,file_size) ;
678 
679  evt_offset_in_file = nexteventpos;
680  //LOG("JEFF", "evt_offset_in_file = %d",evt_offset_in_file);
681 
682  // Now we want to get detsinrun/evpgroupsinrun
683  // First read from the EvbSummary
684 
685  fs_dirent *esum = sfs->opendirent("EvbSummary");
686  if(esum) {
687  LOG(DBG, "We've got an EvbSummary");
688  char *buff = (char *)memmap->mem + esum->offset;
689  EvbSummary *evbsum = (EvbSummary *)buff;
690  detsinrun = evbsum->detectorsInRun;
691  detsinrun64 = evbsum->detectorsInRun;
692  evpgroupsinrun = 0xffffffff;
693 
694  if(evbsum->version > 2) {
695  triggerFrequency = evbsum->triggerFrequency;
696  triggerFrequency_valid = 1;
697  }
698  else {
699  triggerFrequency = 9.3e6;
700  triggerFrequency_valid = 0;
701  }
702  }
703  else {
704  LOG(DBG, "No EvbSummary Record");
705  if(run != (unsigned int)runconfig->run) {
706  char rccnf_file[256];
707 
708  runconfig->run = 0; // set invalid to start...
709 
710 
711  if(input_type == live) {
712  sprintf(rccnf_file,"%s%s/%d/0",evp_disk,_evp_basedir_,run) ;
713  }
714  else if(input_type == dir) {
715  sprintf(rccnf_file,"%s/0",fname);
716  }
717  else if (input_type == file) {
718  sprintf(rccnf_file,"rccnf_%d.txt",run);
719  }
720 
721  if(input_type != pointer) {
722  if(getRccnf(rccnf_file, runconfig) < 0) {
723  LOG(DBG, "No runconfig file %s",rccnf_file,0,0,0,0);
724  }
725  }
726 
727  if(runconfig->run == 0) {
728  detsinrun = 0xffffffff;
729  detsinrun64 = 0xffffffffffffffff;
730  evpgroupsinrun = 0xffffffff;
731  }
732  else {
733  detsinrun = runconfig->detMask;
734  detsinrun64 = runconfig->detMask;
735  evpgroupsinrun = runconfig->grpMask;
736  }
737  }
738  }
739 
740  /* Tonko; May 11, 2012
741  Catch the TPC FY12 UU future-protection bug
742  */
743  detector_bugs = 0; // clear them all first
744  detector_bugs64 = 0;
745 
746  if(detectors & (1 << TPX_ID)) {
747  if((run >= 13114025) && (run < 13130030)) {
748  if(trgcmd == 4) {
749 
750  for(int s=1;s<=24;s++) {
751  for(int r=1;r<=6;r++) {
752  // skip known dead RDOs during this period
753  if((s==5) && (r==1)) continue ;
754  if((s==6) && (r==1)) continue ;
755  if((s==7) && (r==1)) continue ;
756  if((s==14) && (r==3)) continue ;
757  if((s==21) && (r==1)) continue ;
758  if((s==22) && (r==2)) continue ;
759 
760 
761  char name[32] ;
762 
763  sprintf(name,"tpx/sec%02d/cld%02d",s,r) ;
764  if(!get_sfs_name(name)) {
765  detectors &= ~(1<<TPX_ID) ;
766  LOG(WARN,"run %d, seq %d -- removing TPX due to FY12 UU future-protection bug",run,seq) ;
767 
768  detector_bugs |= (1<<TPX_ID) ; // set tje bug status
769  detector_bugs64 |= (1<<TPX_ID);
770  goto bug_check_done ;
771  }
772  }
773  }
774  }
775  }
776  bug_check_done:;
777  }
778 
779  // Tonko: before we return, call Make which prepares the DETs for operation...
780  //Make() ;
781 
782  // *****
783  // jml 2/13/07
784  // now we return pointer to this
785  // get datap by evp->mem
786  //
787  // why? each reader needs to know whether it is acting on
788  // a sfs file or a .daq buffer
789  //
790  // the current design is messed up in that every detector bank
791  // is global and static while the evpReader is a class
792  // that can have multiple instances. the detReader functions
793  // used to take datap pointers so nothing at all could be passed
794  // to them. At least this hack doesn't require modifications to
795  // existing code...
796  //
797 
798  // ****
799  // old false comment...
800  // return the pointer at the beginning of DATAP!
801  // at this point we return the pointer to READ_ONLY DATAP
802  // the event size is "bytes"
803  // ****
804  return (char *)this;
805 }
806 
807 
808 // Skip n events then, call get...
809 char *daqReader::skip_then_get(int numToSkip, int num, int type)
810 {
811  //LOG("JEFF", "into skip");
812  if(input_type != file) {
813  LOG(CRIT, "Can't call skip_then_get unless running from file...");
814  status = EVP_STAT_EVT;
815  return NULL;
816  }
817 
818  event_number++;
819 
820  int error;
821 
822  for(int i=0;i<numToSkip;i++) {
823  event_number++;
824 
825  repeat2:
826  error = getEventSize();
827 
828  if(status == EVP_STAT_LOG) {
829  lseek64(desc, event_size, SEEK_CUR);
830  evt_offset_in_file += event_size;
831  //LOG("JEFF", "evt_offset_in_file = %d",evt_offset_in_file);
832  event_size = 0;
833  goto repeat2;
834  }
835 
836 
837  // handle potential padding bug...
838  int padchecks = 0;
839  for(;;) {
840  padchecks++;
841  int inc = addToEventSize(event_size);
842 
843  if(inc == 0) {
844  LOG(DBG, "No extra increment... event size=%d",event_size);
845  break;
846  }
847 
848  LOG(WARN, "Found a padding bug. Adding %d to event size",inc);
849  event_size += inc;
850 
851  if(padchecks > 5) {
852  LOG(ERR, "Error finding next event...");
853  status = EVP_STAT_EOR;
854  return NULL;
855  }
856  }
857 
858  if(status == EVP_STAT_EOR) {
859  return NULL;
860  }
861 
862  if(error == 0) {
863  status = EVP_STAT_EOR;
864  return NULL;
865  }
866 
867  if(error < 0) {
868  status = EVP_STAT_EVT;
869  return NULL;
870  }
871 
872  if((event_size + evt_offset_in_file) > file_size) {
873  LOG(WARN, "This event is truncated");
874  status = EVP_STAT_EOR;
875  return NULL;
876  }
877 
878  long long int nexteventpos = lseek64(desc, event_size, SEEK_CUR);
879  LOG(DBG, "skip evt pos = %lld", nexteventpos);
880 
881  evt_offset_in_file += event_size;
882  //LOG("JEFF", "evt_offset_in_file = %d",evt_offset_in_file);
883  }
884 
885  LOG(DBG, "out of skip");
886  return get(num, type);
887 }
888 
889  // Get event size...
890  int daqReader::addToEventSize(int sz)
891  {
892  if(input_type == pointer) return 0;
893 
894  long long int orig_offset = lseek64(desc, 0, SEEK_CUR);
895 
896  LOG(DBG, "orig_offset = %lld sz=%d",orig_offset,sz);
897 
898  lseek64(desc, sz, SEEK_CUR);
899 
900  char buff[10];
901  int ret = read(desc, buff, 8);
902  if(ret == 0) {
903  lseek64(desc, orig_offset, SEEK_SET);
904  return 0;
905  }
906 
907  if(memcmp(buff, "LRHD",4) == 0) {
908  lseek64(desc, orig_offset, SEEK_SET);
909  return 0;
910  }
911 
912  if(memcmp(buff, "DATAP",4) == 0) {
913  lseek64(desc, orig_offset, SEEK_SET);
914  return 0;
915  }
916 
917  if(memcmp(buff, "SFS", 3) == 0) {
918  lseek64(desc, orig_offset, SEEK_SET);
919  return 0;
920  }
921 
922  if(memcmp(buff, "FILE", 4) == 0) {
923  lseek64(desc, orig_offset, SEEK_SET);
924  return 0;
925  }
926 
927  LOG(ERR, "buff = %c%c%c off=%lld",buff[0],buff[1],buff[2], orig_offset);
928 
929  lseek64(desc, orig_offset, SEEK_SET);
930 
931 
932  return 8192;
933  }
934 
935  int daqReader::getEventSize()
936  {
937  MemMap headermap;
938  char *m;
939  int swap;
940 
941  int ret = -1; // lets be a pessimist and assume failure...
942  long long int offset = 0;
943  long long int space_left;
944 
945  //LOG("JEFF", "Get event size");
946 
947  status = EVP_STAT_OK;
948  event_size = 0;
949 
950  if(input_type == pointer) {
951  m = event_memory;
952  space_left = data_size - (m - data_memory);
953 
954 
955  //LOG("JEFF", "space_left=%d ds=%d m-dm=%d",space_left,data_size,m-data_memory);
956  }
957  else {
958  offset = lseek64(desc, 0, SEEK_CUR);
959  space_left = file_size - offset;
960 
961  if(space_left > 1024) space_left = 1024;
962 
963  LOG(DBG, "Space left = %d",space_left);
964 
965  if(space_left == 0) return 0;
966  if(space_left < (long long int)sizeof(LOGREC)) return -1;
967 
968  m = headermap.map(desc, offset, space_left, map_prot, map_flags);
969  if(!m) {
970  LOG(ERR, "Error mapping header information");
971  return -1;
972  }
973  }
974 
975 
976  //LOG("JEFF", "space_left=%d",space_left);
977 
978  if(space_left == 0) return 0;
979  if(space_left < (long long int)sizeof(LOGREC)) {
980  LOG(NOTE, "File truncated: only %lld bytes left",space_left);
981  status = EVP_STAT_EOR;
982  goto done;
983  }
984 
985  LOG(DBG, "OFFSET = %lld", offset);
986 
987  // Wait, we might not have a LRHD or DATAP...
988  while((memcmp(m, "LRHD", 4) != 0) &&
989  (memcmp(m, "DATAP", 5) != 0)) {
990 
991  //LOG("JEFF", "Event starts with %c%c%c%c%c not LRHD or DATAP. Check if sfs file...",m[0],m[1],m[2],m[3],m[4]);
992 
993  sfs_index *tmp_sfs = new sfs_index();
994  int sz = tmp_sfs->getSingleDirSize(file_name, evt_offset_in_file);
995 
996  //LOG("JEFF", "single dir size = %d",sz);
997 
998  // Check to see if its a valid directory...
999  if(sz > 0) {
1000  tmp_sfs->mountSingleDir(file_name, evt_offset_in_file);
1001 
1002  int satisfy=0;
1003 
1004  // CD to the current event...
1005  fs_dir *fsdir = tmp_sfs->opendir("/");
1006  fs_dirent *ent = tmp_sfs->readdir(fsdir);
1007  if(!ent) {
1008  tmp_sfs->closedir(fsdir);
1009  LOG(ERR, "Error finding event directory in sfs?");
1010 
1011  // Skip directory... go to next
1012  status = EVP_STAT_EVT;
1013  sz = -1;
1014  goto done;
1015  }
1016 
1017  LOG(DBG, "does dir (%s) satisfy '/#' or '/nnnn'",ent->full_name);
1018 
1019  if(memcmp(ent->full_name, "/#", 2) == 0) {
1020  satisfy = 1;
1021  }
1022 
1023  // streaming...
1024  if(memcmp(ent->full_name, "/%", 2) == 0) {
1025  satisfy = 1;
1026  }
1027 
1028  if(allnumeric(&ent->full_name[1])) {
1029  satisfy = 1;
1030  }
1031 
1032  tmp_sfs->closedir(fsdir);
1033 
1034  if(satisfy == 0) {
1035  status = EVP_STAT_LOG;
1036  //LOG("JEFF", "STATLOG");
1037  }
1038 
1039  tmp_sfs->umount();
1040  }
1041 
1042  delete tmp_sfs;
1043 
1044  if(sz < 0) {
1045  LOG(ERR, "Event starts with %c%c%c%c%c not LRHD or DATAP and not a SFS file... bad event",m[0],m[1],m[2],m[3],m[4]);
1046 
1047  status = EVP_STAT_EVT;
1048  goto done;
1049  }
1050 
1051  event_size = sz;
1052  ret = 0;
1053  goto done;
1054  }
1055 
1056  // LOG("JEFF", "here?");
1057 
1058  // Now at the start of the real event!
1059  //
1060  // if datap, simple...
1061  if(memcmp(m, "DATAP", 5) == 0) {
1062  DATAP *datap = (DATAP *)m;
1063  int swap = (datap->bh.byte_order == 0x04030201) ? 0 : 1;
1064 
1065  event_size = qswap32(swap, datap->len) * 4;
1066  ret = 0;
1067  goto done;
1068  }
1069 
1070  // Ok... typical event with LRHD starting point
1071  // first skip non-data banks
1072  LOGREC *lrhd;
1073 
1074  for(;;) {
1075  if(memcmp(m, "LRHD", 4) == 0) {
1076  lrhd = (LOGREC *)m;
1077 
1078  LOG(DBG, "record_type = %c%c%c%c",
1079  lrhd->record_type[0],
1080  lrhd->record_type[1],
1081  lrhd->record_type[2],
1082  lrhd->record_type[3]);
1083 
1084  if(memcmp(lrhd->record_type, "DATA", 4) == 0) break;
1085 
1086  if(memcmp(lrhd->record_type, "ENDR", 4) == 0) {
1087 
1088  LOG(DBG, "Got ENDR record");
1089 
1090  status = EVP_STAT_EOR;
1091  goto done;
1092  }
1093 
1094  LOG(DBG, "Skipping LRHD record type %c%c%c%c",
1095  lrhd->record_type[0],
1096  lrhd->record_type[1],
1097  lrhd->record_type[2],
1098  lrhd->record_type[3]);
1099 
1100  space_left -= sizeof(LOGREC);
1101  offset += sizeof(LOGREC);
1102  m += sizeof(LOGREC);
1103  event_size += sizeof(LOGREC);
1104 
1105  if(space_left < (long long int)sizeof(LOGREC)) {
1106  LOG(NOTE, "File truncated: only %lld bytes left",space_left);
1107  status = EVP_STAT_EOR;
1108  goto done;
1109  }
1110 
1111  }
1112  else {
1113  LOG(DBG, "Corrupted headers: %c%c%c%c%c",
1114  m[0],m[1],m[2],m[3],m[4]);
1115 
1116  goto done;
1117  }
1118  }
1119 
1120  //LOG("JEFF", "here");
1121 
1122  // This is a valid event!
1123  ret = 0;
1124 
1125  // Have the DATA LRHD in *m
1126  //LOG("JEFF", "event_size=%d",event_size);
1127 
1128  swap = (lrhd->lh.byte_order == 0x04030201) ? 0 : 1;
1129  event_size += qswap32(swap,lrhd->length) * 4;
1130 
1131  //LOG("JEFF", "here %d",event_size);
1132  done:
1133 
1134  //LOG("JEFF", "event_size=%d",event_size);
1135  if(ret == 0) {
1136  ret = event_size;
1137  }
1138 
1139  headermap.unmap();
1140  return ret;
1141  }
1142 
1143 
1144  const char *daqReader::getInputType()
1145  {
1146  switch(input_type) {
1147  case none:
1148  return "none";
1149  case pointer:
1150  return "pointer";
1151  case file:
1152  return "file";
1153  case live:
1154  return "live";
1155  case dir:
1156  return "dir";
1157  }
1158 
1159  return "null";
1160  }
1161 
1162  int daqReader::openEventFile()
1163  {
1164  struct stat64 stat_buf ;
1165 
1166  // First close file if any...
1167  if(desc > 0) {
1168  close(desc);
1169  desc = -1;
1170  }
1171 
1172  errno = 0 ;
1173 
1174  desc = open64(file_name,O_RDONLY,0666) ;
1175  if(desc < 0) {
1176  LOG(NOTE,"Error opening file %s [%s] - skipping...",file_name,strerror(errno),0,0,0) ;
1177  status = EVP_STAT_EVT ;
1178  return -1;
1179  }
1180 
1181  // get the file_size ;
1182  int ret = stat64(file_name,&stat_buf) ;
1183  if(ret < 0) {
1184  LOG(ERR,"Can't stat %s",file_name,0,0,0,0) ;
1185  status = EVP_STAT_EVT ;
1186  close(desc) ;
1187  desc = -1 ;
1188  return -1;
1189  }
1190 
1191  file_size = stat_buf.st_size ;
1192  evt_offset_in_file = 0;
1193  bytes = 0 ;
1194  return 0;
1195  }
1196 
1197  int daqReader::getNextEventFilename(int num, int type)
1198  {
1199  if((event_number != 1) && (token == 0)) { // we read at least one event and it was token==0 thus this is it...
1200  LOG(DBG,"Previous event (%d) was Token 0 in directory - stopping...",event_number,0,0,0,0) ;
1201  status = EVP_STAT_EOR ;
1202  if(input_type == live) {
1203  event_number = 1;
1204  }
1205  return -1;
1206  }
1207 
1208  if(input_type == dir) {
1209  return getNextEventFilenameFromDir(num);
1210  }
1211  else if (input_type == live) {
1212  return getNextEventFilenameFromLive(type);
1213  }
1214  else {
1215  LOG(ERR, "Wrong input type");
1216  return -1;
1217  }
1218  }
1219 
1220  int daqReader::copySummaryInfoIn(SummaryInfo *info)
1221  {
1222  // gbPayload is mostly little endian...
1223  token = info->token;
1224  evt_time = info->evt_time;
1225  detectors = info->detectors;
1226  detectors64 = info->detectors;
1227  daqbits_l1 = info->daqbits_l1;
1228  daqbits_l2 = info->daqbits_l2;
1229  evpgroups = info->evpgroups;
1230  daqbits = info->daqbits;
1231  evp_daqbits = info->evp_daqbits;
1232  //seq = info->seq;
1233 
1234  // event descriptor is big endian...
1235  trgword = info->trgword;
1236  trgcmd = info->trgcmd;
1237  daqcmd = info->daqcmd;
1238  flags = info->flags;
1239 
1240  memcpy(L1summary, info->L1summary, sizeof(L1summary));
1241  memcpy(L2summary, info->L2summary, sizeof(L2summary));
1242  memcpy(L3summary, info->L3summary, sizeof(L3summary));
1243 
1244  // 64 bit extension...
1245  daqbits64 = ((u_longlong)L3summary[1]) << 32;
1246  daqbits64 += L3summary[0];
1247  daqbits64_l1 = ((u_longlong)L1summary[1]) << 32;
1248  daqbits64_l1 += L1summary[0];
1249  daqbits64_l2 = ((u_longlong)L2summary[1]) << 32;
1250  daqbits64_l2 += L2summary[0];
1251 
1252  return 0;
1253  }
1254 
1255 
1256  int daqReader::hackSummaryInfo()
1257  {
1258  // gbPayload is mostly little endian...
1259  token = 0;
1260  evt_time = 0;
1261  detectors = 0;
1262  detectors64 = 0;
1263  daqbits_l1 = 0;
1264  daqbits_l2 = 0;
1265  evpgroups = 0;
1266  daqbits = 0;
1267  evp_daqbits = 0;
1268 
1269  daqbits64 = 0ll;
1270  daqbits64_l1 = 0ll;
1271  daqbits64_l2 = 0ll;
1272 
1273  // event descriptor is big endian...
1274  trgword = 0;
1275  trgcmd = 0;
1276  daqcmd = 0;
1277 
1278  memset(L1summary, 0, sizeof(L1summary));
1279  memset(L2summary, 0, sizeof(L2summary));
1280  memset(L3summary, 0, sizeof(L3summary));
1281 
1282  return 0;
1283  }
1284 
1285  int daqReader::fillSummaryInfo(SummaryInfo *info, gbPayload *pay)
1286  {
1287  // First, determine which gbPayload:
1288 
1289  LOG(DBG, "pay=0x%x",pay);
1290 
1291  u_int version = pay->gbPayloadVersion;
1292 
1293  LOG(DBG, "version = 0x%x", version);
1294 
1295  if(((version & 0xff000000) != 0xda000000) && ((b2h32(version) & 0x000000ff ) != 0x40)) { // Version 0x01
1296  LOG(DBG, "gbPayload version 0x10");
1297 
1298  gbPayload_0x01 *pv = (gbPayload_0x01 *)pay;
1299  LOG(DBG, "gbPayload 0x01: v#=0x%x",b2h32(version)); // picked up from big endian evtdes
1300  return fillSummaryInfo_v01(info, pv);
1301  }
1302 
1303  if(((version & 0xff000000) != 0xda000000) && ((b2h32(version) & 0x000000ff ) == 0x40)) { // Version 0x01a
1304  LOG(DBG, "gbPayload version 0x01a");
1305  gbPayload_0x01a *pv = (gbPayload_0x01a *)pay;
1306  LOG(DBG, "gbPayload 0x01a: v#=0x%x", b2h32(version)); // picked up from big endian evtdesc
1307  return fillSummaryInfo_v01a(info, pv);
1308  }
1309 
1310  if(version == 0xda000002) {
1311  LOG(DBG, "gbPayload 0x02: v#=0x%x",version);
1312  return fillSummaryInfo_v02(info, (gbPayload_0x02 *)pay);
1313  }
1314 
1315  if(version == 0xda000003) {
1316  LOG(DBG, "gbPayload 0x03: v=0x%x", version);
1317  return fillSummaryInfo_v03(info, pay);
1318  }
1319 
1320  LOG(ERR, "gbPayload Version Unknown: 0x%x vs 0x%x. Using 02", version, GB_PAYLOAD_VERSION);
1321  return fillSummaryInfo_v03(info, pay);
1322  }
1323 
1324 int daqReader::fillSummaryInfo_v03(SummaryInfo *info, gbPayload *pay) {
1325  // gbPayload is mostly little endian...
1326 
1327  LOG(DBG, "gbPayloadVersion=0x%x, trgVersion=0x%x", pay->gbPayloadVersion, pay->EventDescriptor.TrgDataFmtVer);
1328 
1329  info->token = l2h32(pay->token);
1330  info->evt_time = l2h32(pay->sec);
1331  info->detectors = l2h32(pay->rtsDetMask);
1332  info->detectors64 = l2h32(pay->rtsDetMask);
1333  info->daqbits_l1 = l2h32(pay->L1summary[0]);
1334  info->daqbits_l2 = l2h32(pay->L2summary[0]);
1335  info->evpgroups = l2h32(pay->L3summary[3]);
1336  info->daqbits = l2h32(pay->L3summary[0]);
1337  info->evp_daqbits = daqbits;
1338  info->flags = l2h32(pay->flags);
1339 
1340  // event descriptor is big endian...
1341  //info->trgword = b2h16(pay->EventDescriptor.TriggerWord);
1342  info->trgcmd = pay->EventDescriptor.actionWdTrgCommand;
1343  info->daqcmd = pay->EventDescriptor.actionWdDaqCommand;
1344 
1345  for(int i=0;i<2;i++) info->L1summary[i] = l2h32(pay->L1summary[i]);
1346  for(int i=0;i<2;i++) info->L2summary[i] = l2h32(pay->L2summary[i]);
1347  for(int i=0;i<4;i++) info->L3summary[i] = l2h32(pay->L3summary[i]);
1348 
1349  return 0;
1350  }
1351 
1352 
1353 int daqReader::fillSummaryInfo_v02(SummaryInfo *info, gbPayload_0x02 *pay) {
1354  // gbPayload is mostly little endian...
1355 
1356  LOG(DBG, "gbPayloadVersion=0x%x, trgVersion=0x%x", pay->gbPayloadVersion, pay->EventDescriptor.TrgDataFmtVer);
1357 
1358  info->token = l2h32(pay->token);
1359  info->evt_time = l2h32(pay->sec);
1360  info->detectors = l2h32(pay->rtsDetMask);
1361  info->detectors64 = l2h32(pay->rtsDetMask);
1362  info->daqbits_l1 = l2h32(pay->L1summary[0]);
1363  info->daqbits_l2 = l2h32(pay->L2summary[0]);
1364  info->evpgroups = l2h32(pay->L3summary[3]);
1365  info->daqbits = l2h32(pay->L3summary[0]);
1366  info->evp_daqbits = daqbits;
1367  info->flags = l2h32(pay->flags);
1368 
1369  // event descriptor is big endian...
1370  //info->trgword = b2h16(pay->EventDescriptor.TriggerWord);
1371  info->trgcmd = pay->EventDescriptor.actionWdTrgCommand;
1372  info->daqcmd = pay->EventDescriptor.actionWdDaqCommand;
1373 
1374  for(int i=0;i<2;i++) info->L1summary[i] = l2h32(pay->L1summary[i]);
1375  for(int i=0;i<2;i++) info->L2summary[i] = l2h32(pay->L2summary[i]);
1376  for(int i=0;i<4;i++) info->L3summary[i] = l2h32(pay->L3summary[i]);
1377 
1378  return 0;
1379  }
1380 
1381  int daqReader::fillSummaryInfo_v01a(SummaryInfo *info, gbPayload_0x01a *pay)
1382  {
1383  LOG(DBG, "gbPayloadVersion=0xda000001, trgVersion=0x%x", pay->EventDescriptor.TrgDataFmtVer);
1384 
1385  // gbPayload is mostly little endian...
1386  info->token = l2h32(pay->token);
1387  info->evt_time = l2h32(pay->sec);
1388  info->detectors = l2h32(pay->rtsDetMask);
1389  info->detectors64 = l2h32(pay->rtsDetMask);
1390  info->daqbits_l1 = l2h32(pay->L1summary[0]);
1391  info->daqbits_l2 = l2h32(pay->L2summary[0]);
1392  info->evpgroups = l2h32(pay->L3summary[2]);
1393  info->daqbits = l2h32(pay->L3summary[0]);
1394  info->evp_daqbits = daqbits;
1395  info->flags = l2h32(pay->flags);
1396 
1397  // event descriptor is big endian...
1398  info->trgword = b2h16(pay->EventDescriptor.TriggerWord);
1399  info->trgcmd = pay->EventDescriptor.actionWdTrgCommand;
1400  info->daqcmd = pay->EventDescriptor.actionWdDaqCommand;
1401 
1402  for(int i=0;i<2;i++) info->L1summary[i] = l2h32(pay->L1summary[i]);
1403  for(int i=0;i<2;i++) info->L2summary[i] = l2h32(pay->L2summary[i]);
1404  for(int i=0;i<4;i++) info->L3summary[i] = l2h32(pay->L3summary[i]);
1405 
1406  return 0;
1407  }
1408 
1409  int daqReader::fillSummaryInfo_v01(SummaryInfo *info, gbPayload_0x01 *pay)
1410  {
1411  LOG(DBG, "gbPayloadVersion=0xda000001, trgVersion=0x%x", pay->EventDescriptor.TrgDataFmtVer);
1412 
1413  // gbPayload is mostly little endian...
1414  info->token = l2h32(pay->token);
1415  info->evt_time = l2h32(pay->sec);
1416  info->detectors = l2h32(pay->rtsDetMask);
1417  info->detectors64 = l2h32(pay->rtsDetMask);
1418  info->daqbits_l1 = l2h32(pay->L1summary[0]);
1419  info->daqbits_l2 = l2h32(pay->L2summary[0]);
1420  info->evpgroups = l2h32(pay->L3summary[2]);
1421  info->daqbits = l2h32(pay->L3summary[0]);
1422  info->evp_daqbits = daqbits;
1423 
1424  // event descriptor is big endian...
1425  info->trgword = b2h16(pay->EventDescriptor.TriggerWord);
1426  info->trgcmd = pay->EventDescriptor.actionWdTrgCommand;
1427  info->daqcmd = pay->EventDescriptor.actionWdDaqCommand;
1428 
1429  for(int i=0;i<2;i++) info->L1summary[i] = l2h32(pay->L1summary[i]);
1430  for(int i=0;i<2;i++) info->L2summary[i] = l2h32(pay->L2summary[i]);
1431  for(int i=0;i<4;i++) info->L3summary[i] = l2h32(pay->L3summary[i]);
1432 
1433  return 0;
1434  }
1435 
1436 
1437  int daqReader::fillSummaryInfo(SummaryInfo *info, DATAP *datap)
1438  {
1439  int swap = (datap->bh.byte_order == 0x04030201) ? 0 : 1;
1440 
1441  info->token = qswap32(swap, datap->bh.token);
1442  info->evt_time = qswap32(swap, datap->time);
1443  info->detectors = qswap32(swap, datap->detector);
1444  info->detectors64 = qswap32(swap, datap->detector);
1445  info->seq = qswap32(swap, datap->seq);
1446  info->daqbits_l1 = qswap32(swap, datap->TRG_L1_summary[0]);
1447  info->daqbits_l2 = qswap32(swap, datap->TRG_L2_summary[0]);
1448  info->evpgroups = qswap32(swap, datap->L3_Summary[2]);
1449  info->trgword = qswap32(swap, datap->trg_word);
1450 
1451  info->trgcmd = (qswap32(swap, datap->trg_in_word) >> 12) & 0xF ; // _just_ the trigger command
1452  info->daqcmd = (qswap32(swap, datap->trg_in_word) >> 8) & 0xF ; // DAQ command
1453 
1454 
1455  info->daqbits = qswap32(swap, datap->L3_Summary[0]);
1456  info->evp_daqbits = daqbits;
1457 
1458  for(int i=0;i<2;i++) info->L1summary[i] = qswap32(swap, datap->TRG_L1_summary[i]);
1459  for(int i=0;i<2;i++) info->L2summary[i] = qswap32(swap, datap->TRG_L2_summary[i]);
1460  for(int i=0;i<4;i++) info->L3summary[i] = qswap32(swap, datap->L3_Summary[i]);
1461 
1462  return 0;
1463  }
1464 
1465 
1466 
1467  // Only called when data source is directory
1468  // Fills in "file_name"
1469  // status = EOR if end of run
1470  int daqReader::getNextEventFilenameFromDir(int eventNum)
1471  {
1472  LOG(DBG, "Getting next event from dir: event_number=%d eventNum=%d",event_number, eventNum);
1473 
1474  if(eventNum==0) eventNum = event_number;
1475  sprintf(file_name,"%s/%d",fname,eventNum) ;
1476 
1477  LOG(DBG, "Getting next event from dir: event_number=%s",file_name);
1478 
1479  event_number = eventNum ;
1480  return STAT_OK;
1481  }
1482 
1483  int daqReader::getNextEventFilenameFromLive(int type)
1484  {
1485  int ret;
1486  ic_msg m ;
1487 
1488  // evp no longer requests events to be shipped to the pool...
1489  // issue get event only if not issued before
1490 
1491  if(issued) {
1492  if((time(NULL) - last_issued) > 10) { // reissue
1493  LOG(DBG,"Re-issueing request...",0,0,0,0,0) ;
1494  issued = 0 ;
1495  }
1496  }
1497 
1498  if(!issued) {
1499 
1500  m.ld.dword[0] = htonl(type) ; // event type...
1501 
1502  LOG(DBG, "dword[0] is type=%d",type);
1503 
1504  ret = ask(evpDesc,&m) ;
1505  if(ret != STAT_OK) { // some error...
1506 
1507  LOG(ERR,"Queue error %d - recheck EVP...",ret,0,0,0,0) ;
1508  reconnect() ;
1509  status = EVP_STAT_EVT ;
1510  return -1;
1511  }
1512  issued = 1 ;
1513  last_issued = time(NULL) ;
1514  }
1515 
1516 
1517  // wait for the event (unless run is finished but still checking)
1518  //if((num == 0) || !readall_rundone) {
1519  if(!readall_rundone) {
1520  int timedout=0;
1521 
1522  ret = evtwait(evpDesc, &m) ;
1523 
1524  // got some reply here so nix issued
1525  issued = 0;
1526 
1527  // If queues are broken we have problems...
1528  if((ret != STAT_OK) && (ret != STAT_TIMED_OUT)) {
1529  reconnect() ;
1530  LOG(ERR,"Queue error %d - recheck EVP...",ret,0,0,0,0) ;
1531  status = EVP_STAT_EVT ; // treat is as temporary...
1532  return -1;
1533  }
1534 
1535  if(ret == STAT_TIMED_OUT) { // retry ...
1536  timedout = 1;
1537 
1538 #if defined(__linux__) || defined(__APPLE__)
1539  sched_yield() ;
1540 #else
1541  yield() ; // give up time slice?
1542 #endif
1543 
1544  LOG(DBG, "Waiting 1 second, no event yet...");
1545  usleep(100000) ; // .1 ms?
1546  status = EVP_STAT_OK ; // no error...
1547  return -1; // but also noevent - only on wait!
1548  }
1549 
1550  LOG(DBG, "m.head.status = %d EOR=%d",m.head.status,EVP_STAT_EOR);
1551 
1552  // check misc.
1553 
1554  switch(m.head.status) {
1555  case STAT_SEQ:
1556  { // end of run!
1557 
1558  LOG(DBG,"End of Run!",0,0,0,0,0) ;
1559 
1560  status = EVP_STAT_EOR ;
1561  return -1;
1562  }
1563  break;
1564 
1565  case STAT_OK:
1566  {
1567  // if run_is_done, never overwrote m!
1568 
1569  if(!timedout) {
1570  evb_type = ntohl(m.ld.dword[2]) ;
1571  evb_type_cou = ntohl(m.ld.dword[3]) ;
1572  evb_cou = ntohl(m.ld.dword[4]) ;
1573  run = ntohl(m.ld.dword[0]) ;
1574  readall_run = run;
1575  readall_lastevt = ntohl(m.ld.dword[1]);
1576  strcpy(_evp_basedir_, (char *)&m.ld.dword[5]);
1577 
1578  event_number = readall_lastevt; // overwritten by num later...
1579 
1580  sprintf(_last_evp_dir_, "%s%s/%d",evp_disk,_evp_basedir_,run);
1581  }
1582  }
1583  break;
1584 
1585  default:
1586  { // some event failure
1587  if(!timedout) {
1588  LOG(WARN,"Event in error - not stored...",0,0,0,0,0) ;
1589  status = EVP_STAT_EVT ; // repeat
1590  return -1;
1591  }
1592  }
1593  break;
1594  }
1595  }
1596 
1597  /*
1598  // A few last checks...
1599  if(num != 0) {
1600 
1601  event_number = num;
1602 
1603  if(readall_lastevt == 0) {
1604  LOG(DBG, "EVP_ readall but don't have any events rundone=%d run=%d",readall_rundone,run);
1605  // readall_reset();
1606  sleep(1);
1607  status = EVP_STAT_OK;
1608  return -1;
1609  }
1610 
1611  if(readall_lastevt < event_number) {
1612  LOG(WARN, "requesting an event that hasn't arrived rundone=%d run=%d",readall_rundone, run);
1613  sleep(1);
1614  status = EVP_STAT_OK;
1615  return -1;
1616  }
1617  }
1618  */
1619 
1620  // Now we've got the event number...
1621  sprintf(file_name,"%s/%d",_last_evp_dir_, event_number) ;
1622 
1623  struct stat64 s;
1624  if(stat64(file_name, &s) < 0) {
1625  LOG(DBG, "No file %s, try _DELETE",file_name);
1626  sprintf(file_name,"%s_DELETE/%d",_last_evp_dir_,event_number);
1627  }
1628 
1629  LOG(DBG,"Live Event: file->%s",file_name,0,0,0,0) ;
1630  return 0;
1631  }
1632 
1633  int daqReader::getOfflineId(int bit)
1634  {
1635  if(trgIdsNotPresent) return -1;
1636  if(bit > 63) return -1;
1637 
1638  if(trgIdsSet) {
1639  return trgIds[bit];
1640  }
1641 
1642  trgIdsSet = 1;
1643 
1644  fs_dirent *trgid_dir = sfs->opendirent("TRGID");
1645  if(trgid_dir) {
1646  UINT32 *trgid_buff = (UINT32 *)(memmap->mem + trgid_dir->offset);
1647  memset(trgIds,0xffffffff,sizeof(trgIds));
1648 
1649  int sz = trgid_dir->sz;
1650  sz /= 4;
1651  if(sz > 64) sz = 64;
1652 
1653  for(int i=0;i<sz;i++) {
1654  trgIds[i] = trgid_buff[i];
1655  }
1656  }
1657  else {
1658  LOG(ERR, "Can't find TRGID bank, can't get the offline id");
1659  memset(trgIds, 0xffffffff, sizeof(trgIds));
1660  trgIdsNotPresent = 1;
1661  }
1662 
1663  return trgIds[bit];
1664  }
1665 
1667 
1668  char *daqReader::get_sfs_name(const char *right)
1669  {
1670  if(sfs == 0) return 0 ;
1671 
1672  if(right == 0) right = "/" ;
1673 
1674  fs_dirent *d = sfs->opendirent(right) ;
1675  if(d == 0) return 0 ;
1676 
1677  LOG(DBG,"opendirent(%s) returns %s as full name, %s as d_name ",right,d->full_name,d->d_name) ;
1678  return d->full_name ;
1679 
1680  }
1681 
1682 #if 0 // unused
1683  /*
1684  parse the string of the form i.e. "tpc ssd tpx" and
1685  return a bitlist of RTS detectors
1686  */
1687  static u_int parse_det_string(const char *list)
1688  {
1689  u_int mask = 0 ;
1690 
1691  //LOG(DBG,"Parsing \"%s\"",list) ;
1692 
1693 
1694 
1695  reparse:;
1696 
1697  for(int i=0;i<32;i++) {
1698  const char *name = rts2name(i) ;
1699 
1700  if(name==0) continue ;
1701 
1702  //LOG(DBG,"Checking id %d: %s",i,name) ;
1703 
1704  if(strncasecmp(list,name,strlen(name))==0) {
1705  //LOG(DBG,"********* Got %d: %s",i,name) ;
1706  mask |= (1<<i) ;
1707  break ;
1708  }
1709  }
1710 
1711  // move to either end or to the next space
1712  int got_space = 0 ;
1713  while(*list != 0) {
1714  if(*list == ' ') got_space = 1 ;
1715  list++ ;
1716 
1717  if(got_space) goto reparse ;
1718 
1719  } ;
1720 
1721  // LOG(DBG,"Returning 0x%08X",mask) ;
1722  return mask ;
1723  }
1724 
1725 #endif
1726 
1727  daq_det *daqReader::det(const char *which)
1728  {
1729  assert(which) ;
1730 
1731 
1732  // for speed, first we try what we already created...
1733  for(int i=0;i<DAQ_READER_MAX_DETS;i++) {
1734 
1735  if(dets[i]) {
1736  if(strcasecmp(which, dets[i]->name)==0) return dets[i] ;
1737  }
1738  if(pseudo_dets[i]) {
1739  if(strcasecmp(which, pseudo_dets[i]->name)==0) return pseudo_dets[i] ;
1740  }
1741  }
1742 
1743  LOG(DBG,"det %s not yet created... attempting through factory...",which) ;
1744  int id = -1000 ; // assume not found
1745 
1746  // not yet created; try real dets first...
1747  for(int i=0;i<DAQ_READER_MAX_DETS;i++) {
1748  const char *name = rts2name(i) ;
1749  if(name == 0) continue ;
1750 
1751  //LOG(TERR,"trying %s for %s",name,which) ;
1752 
1753  if(strcasecmp(name,which)==0) {
1754  //LOG(TERR,"Creating %s",which) ;
1755  dets[i] = daq_det_factory::make_det(i) ;
1756  dets[i]->managed_by(this) ;
1757 
1758  return dets[i] ; // done...
1759  }
1760  }
1761 
1762  // not found in DAQ dets, try pseudo dets...
1763  if(strcasecmp(which,"emc_pseudo")==0) id = -BTOW_ID ; // by definition...
1764  if(strcasecmp(which,"hlt")==0) id = -L3_ID ; // by definition...
1765  if(strcasecmp(which,"itpc_pseudo")==0) id = -SVT_ID ; //HACK!!!!
1766 // if(strcasecmp(which,"l4")==0) id = -L4_ID ;
1767 
1768  if(id < -32) { // not found even in pseudo
1769  LOG(CRIT,"Requested det \"%s\" not created -- check spelling!",which) ;
1770  assert(!"UNKNOWN DET") ;
1771  return 0 ;
1772  }
1773 
1774  int wh = -id ; // make positive for pseudo array...
1775  pseudo_dets[wh] = daq_det_factory::make_det(id) ;
1776  pseudo_dets[wh]->managed_by(this) ;
1777 
1778  return pseudo_dets[wh] ;
1779  }
1780 
1781  void daqReader::insert(class daq_det *which, int id)
1782  {
1783  assert(which) ;
1784  LOG(DBG,"calling insert(%d): name %s",id,which->name) ;
1785 
1786  if((id>=0) && (id<DAQ_READER_MAX_DETS)) {
1787  dets[id] = which ;
1788  return ;
1789  }
1790  else if(id <0) {
1791  id *= -1 ;
1792  if(id >= DAQ_READER_MAX_DETS) ;
1793  else {
1794  pseudo_dets[id] = which ;
1795  return ;
1796  }
1797  }
1798 
1799  LOG(ERR,"rts_id %d out of bounds for %s",id,which->name) ;
1800 
1801  }
1802 
1803  void daqReader::Make()
1804  {
1805  for(int i=0;i<DAQ_READER_MAX_DETS;i++) {
1806  if(dets[i]) {
1807  LOG(DBG,"Calling %s make",dets[i]->name) ;
1808  dets[i]->Make() ;
1809  }
1810  }
1811  }
1812 
1813  void daqReader::de_insert(int id)
1814  {
1815  LOG(DBG,"calling de_insert(%d)",id) ;
1816 
1817  if((id>=0) && (id<DAQ_READER_MAX_DETS)) {
1818  if(dets[id]) {
1819  LOG(DBG,"Should destruct %d?",id) ;
1820  }
1821  dets[id] = 0 ; // mark as freed
1822  return ;
1823  }
1824  else if(id < 0) {
1825  id *= -1 ;
1826  if(id >= DAQ_READER_MAX_DETS) ;
1827  else {
1828  if(pseudo_dets[id]) LOG(DBG,"Should destruct %d?",id) ;
1829  pseudo_dets[id] = 0 ;
1830  return ;
1831  }
1832 
1833  }
1834 
1835  LOG(ERR,"rts_id %d out of bounds",id) ;
1836  }
1837 
1838 
1839  /*
1840  char *daqReader::getSFSEventNumber()
1841  {
1842  //fsr_lastevt;
1843 
1844  fs_dir *dir = sfs->opendir("/");
1845  fs_dirent *ent;
1846  int evt=0x7fffffff;
1847  while((ent = sfs->readdir(dir))) {
1848  // need to skip the "#" ;
1849  int i ;
1850  if(ent->d_name[0] == '#') {
1851  i = atoi(ent->d_name+1);
1852  }
1853  else {
1854  i = atoi(ent->d_name);
1855  }
1856 
1857  //LOG(DBG,"atoi of %s is %d",ent->d_name,i) ;
1858  if((i < evt) && (i>sfs_lastevt)) {
1859  evt = i;
1860  }
1861  }
1862  sfs->closedir(dir);
1863 
1864  // check if no events left...
1865  if(evt==0x7fffffff) {
1866  if(isfilelist) {
1867  LOG(WARN,"File Finished: %d events.", total_events) ;
1868  status = EVP_STAT_EOR ;
1869  }
1870  else {
1871  LOG(ERR,"No event?") ;
1872  status = EVP_STAT_EVT ;
1873  }
1874 
1875  return NULL;
1876  }
1877 
1878  sfs_lastevt = evt;
1879 
1880  event_number = evt;
1881  total_events++;
1882 
1883  bytes = 0;
1884  // run = ?
1885 
1886  evb_type = 0;
1887  evb_cou = 0;
1888  evb_type_cou = 0;
1889  token = 0;
1890  trgcmd = 0;
1891  daqcmd = 0;
1892  trgword = 0;
1893  phyword = 0;
1894  daqbits = 0;
1895  evpgroups = 0;
1896 
1897  evt_time = 0;
1898  seq = evt;
1899  detectors = 0;
1900 
1901  detsinrun = 0;
1902  evpgroupsinrun = 0;
1903 
1904  return (char *)this;
1905  }
1906  */
1907 
1908 
1909 
1910 
1911  static int evtwait(int desc, ic_msg *m)
1912  {
1913  int ret ;
1914  static int counter = 0 ;
1915 
1916  // wait with no timeout - handled by higher level code...
1917  ret = msgNQReceive(desc,(char *)m, sizeof(ic_msg),NO_WAIT) ;
1918 
1919  LOG(DBG,"msgNQReceive returned %d",ret,0,0,0,0) ;
1920 
1921  if(ret == MSG_Q_TIMEOUT) {
1922  LOG(DBG, "read a timeout count=%d",counter);
1923  counter++ ;
1924  if(counter >= 100) {
1925  counter = 0 ;
1926  if(msgNQCheck(desc)) {
1927  LOG(DBG, "check returned ok...");
1928  return STAT_TIMED_OUT ;
1929  }
1930  else {
1931  LOG(DBG,"EVP_TASK died",0,0,0,0,0) ;
1932  return STAT_ERROR ;
1933  }
1934  }
1935 
1936  return STAT_TIMED_OUT ;
1937  }
1938 
1939  counter = 0 ;
1940 
1941  if(ret > 0) {
1942  int i, *intp ;
1943  intp = (int *) m ;
1944  LOG(DBG,"0x%08X 0x%08X 0x%08X; %d %d",*intp,*(intp+1),*(intp+2),m->head.daq_cmd,m->head.status) ;
1945 
1946  for(i=0;i<3;i++) {
1947  *(intp+i) = ntohl(*(intp+i)) ;
1948  }
1949  LOG(DBG,"0x%08X 0x%08X 0x%08X; %d %d",*intp,*(intp+1),*(intp+2),m->head.daq_cmd,m->head.status) ;
1950  return STAT_OK ;
1951  }
1952 
1953  return STAT_ERROR ; // critical - please reboot
1954  }
1955 
1956  // Fixes the summary fields for a randomly supplied datap
1957  // Returns -1 if error
1958  int daqReader::fixDatapSummary(DATAP *datap)
1959  {
1960  DATAP *sumdatap;
1961 
1962  fs_dirent *ent = sfs->opendirent("legacy");
1963  if(!ent) {
1964  LOG(ERR, "Can't find legacy data");
1965  return -1;
1966  }
1967 
1968  sumdatap = (DATAP *)(memmap->mem + ent->offset);
1969 
1970  if(memcmp(sumdatap->bh.bank_type, "DATAP", 5) != 0) {
1971  char *x = sumdatap->bh.bank_type;
1972  LOG(ERR, "fixDatapSummary... legacy not DATAP: %c%c%c%c%c",
1973  x[0],x[1],x[2],x[3],x[4]);
1974  return -1;
1975  }
1976 
1977 
1978  datap->len = sumdatap->len;
1979  datap->time = sumdatap->time;
1980  datap->seq = sumdatap->seq;
1981  datap->trg_word = sumdatap->trg_word;
1982  datap->trg_in_word = sumdatap->trg_in_word;
1983  datap->detector = sumdatap->detector;
1984  memcpy(datap->TRG_L1_summary, sumdatap->TRG_L1_summary, sizeof(datap->TRG_L1_summary));
1985  memcpy(datap->TRG_L2_summary, sumdatap->TRG_L2_summary, sizeof(datap->TRG_L2_summary));
1986  memcpy(datap->L3_Summary, sumdatap->L3_Summary, sizeof(datap->L3_Summary));
1987  memcpy(&datap->evtdes, &sumdatap->evtdes, sizeof(datap->evtdes));
1988 
1989  return 0;
1990  }
1991 
1992  int daqReader::reconnect(void)
1993  {
1994  int ret ;
1995  int retries ;
1996  ic_msg msg ;
1997 
1998  if(evpDesc != -1) close(evpDesc);
1999 
2000  evpDesc = -1 ; // mark as disconnected
2001 
2002  retries = 0 ;
2003 
2004  for(;;) { // until success...
2005 
2006  evpDesc = msgNQCreate(EVP_HOSTNAME,EVP_PORT,120) ;
2007 
2008  if(evpDesc < 0) {
2009  LOG(ERR,"Can't create connection to %s:%d [%s] - will retry...",EVP_HOSTNAME,EVP_PORT,
2010  strerror(errno),0,0) ;
2011  fprintf(stderr,"CRITICAL: Can't create connection to %s:%d [%s] - will retry...\n",EVP_HOSTNAME,EVP_PORT,
2012  strerror(errno)) ;
2013  sleep(1) ;
2014  return -1;
2015  }
2016 
2017 
2018  if(retries) {
2019  LOG(WARN,"Connection suceeded!",0,0,0,0,0) ;
2020  }
2021 
2022 
2023  LOG(DBG,"Opened connection to %s, port %d on descriptor %d",EVP_HOSTNAME, EVP_PORT,evpDesc,0,0) ;
2024 
2025  msg.head.daq_cmd = RTS_ETHDOOR_ANNOUNCE ;
2026  msg.head.status = 0 ;
2027  msg.ld.dword[0] = htonl(getpid()) ;
2028 
2029  char *user ;
2030 
2031  struct passwd *passwd = getpwuid(getuid()) ;
2032  if(passwd == NULL) {
2033  LOG(WARN,"User doesn't exist?",0,0,0,0,0) ;
2034  user = (char *)"????" ;
2035  }
2036  else {
2037  user = passwd->pw_name ;
2038  }
2039 
2040 
2041  strncpy((char *)&msg.ld.dword[1],user,12) ;
2042  strncpy((char *)&msg.ld.dword[4],getCommand(),12) ;
2043  msg.head.valid_words = 1+7 ;
2044 
2045 #define BABABA
2046 #ifdef BABABA
2047  {
2048  int jj ;
2049  int *intp = (int *) &msg ;
2050  for(jj=0;jj<3;jj++) {
2051  *(intp+jj) = htonl(*(intp+jj)) ;
2052  }
2053  }
2054 #endif
2055 
2056  ret = msgNQSend(evpDesc,(char *)&msg,120,60) ;
2057  if(ret < 0) {
2058  LOG(ERR,"Can't send data to %s! - will reconnect...",EVP_HOSTNAME,0,0,0,0) ;
2059  msgNQDelete(evpDesc) ;
2060  evpDesc = -1 ;
2061  continue ;
2062  //return ;
2063  }
2064 
2065  // all OK...
2066  status = EVP_STAT_OK ;
2067  LOG(DBG,"Returning to caller, status %d",status,0,0,0,0) ;
2068  break ; // that's it....
2069  }
2070 
2071 
2072 
2073 
2074  return 0 ;
2075  }
2076 
2077  static int ask(int desc, ic_msg *m)
2078  {
2079 
2080  int ret ;
2081  time_t tm ;
2082  int jj ;
2083  int *intp = (int *) m ;
2084 
2085  m->head.daq_cmd = EVP_REQ_EVENT ;
2086  m->head.status = STAT_OK ;
2087  m->head.dst_task = EVP_TASK ;
2088  m->head.valid_words = 1+1 ; // reserve one for type...
2089  m->head.source_id = EVP_NODE ;
2090  m->head.src_task = EVP_TASK_READER ;
2091 
2092  LOG(DBG,"Sending request to EVP_TASK",0,0,0,0,0) ;
2093  tm = time(NULL) ;
2094 
2095 
2096  for(jj=0;jj<3;jj++) {
2097  *(intp+jj) = htonl(*(intp+jj)) ;
2098  }
2099 
2100  ret = msgNQSend(desc, (char *)m, 120,10) ;
2101 
2102  LOG(DBG,"msgNQSend returned %d in %d seconds",ret,time(NULL)-tm,0,0,0) ;
2103 
2104  if(ret < 0) { // communication error
2105  return STAT_ERROR ;
2106  }
2107  else { // OK
2108  return STAT_OK ;
2109  }
2110  }
2111 
2112 
2113  static const char *getCommand(void)
2114  {
2115 
2116 
2117  static const char *str = "(no-name)" ;
2118 #if defined(__linux__) || defined(__APPLE__)
2119  FILE *file ;
2120  static char name[128] ;
2121  int dummy ;
2122 
2123  file = fopen("/proc/self/stat","r") ;
2124  if(file==NULL) return str ;
2125 
2126  fscanf(file,"%d %s",&dummy,name) ;
2127  fclose(file) ;
2128  *(name+strlen(name)-1) = 0 ;
2129  return name+1 ;
2130 #else // solaris
2131  int fd, ret ;
2132  static struct psinfo ps ;
2133 
2134  fd = open("/proc/self/psinfo",O_RDONLY,0666) ;
2135  if(fd < 0) return str ;
2136 
2137  ret = read(fd,(char *)&ps,sizeof(ps)) ;
2138  close(fd) ;
2139 
2140  if(ret != sizeof(ps)) {
2141  return str ;
2142  }
2143 
2144  return ps.pr_fname ;
2145 #endif
2146  }
2147 
2148 
2149 
2150 
2151 
2152  // jml 12/28/07
2153  //
2154  // Starts at the 2008 datap, and searches
2155  // for the "legacy" file in the sfs format...
2156  // then grabs DATAP from there...
2157 
2158  // This code is NOT to be taken as the "right" way to do anything :-)
2159  // in general it is not good to ignore the length fields in the various
2160  // banks... I make use of the idea that for all affected data files
2161  // LOGREC and DATAP have the proper sizes...
2162 
2163  // copy of SFS format header, as this hack and I don't want this dependent
2164  // on sfs includes
2165  struct copy_SFS_File {
2166  char type[4]; // "FILE"
2167  UINT32 byte_order;
2168  UINT32 sz; // any number, but file will be padded to be x4
2169  UINT8 head_sz; // must be x4
2170  UINT8 attr;
2171  UINT16 reserved;
2172  char name[4]; // get rid of padding confusions... by alligning
2173  };
2174 
2175 
2176  DATAP *getlegacydatap(char *mem, int bytes)
2177  {
2178  int off = 0;
2179  char *curr = mem;
2180 
2181  LOG(DBG, "off = %d bytes = %d", off, bytes);
2182  while(off < bytes) {
2183  // skip many types of banks...
2184 
2185  if(memcmp(curr, "LRHD", 4) == 0) {
2186  LOG(DBG, "hop over LRHD");
2187  curr += sizeof(LOGREC);
2188  off += sizeof(LOGREC);
2189  }
2190  else if(memcmp(curr, "DATAP", 5) == 0) {
2191  LOG(DBG, "hop over DATAP");
2192  curr += sizeof(DATAP);
2193  off += sizeof(DATAP);
2194  }
2195  else if(memcmp(curr, "SFS", 3) == 0) {
2196  LOG(DBG, "hop over SFS volume spec");
2197  curr += 12;
2198  off += 12;
2199  }
2200  else if(memcmp(curr, "HEAD", 4) == 0) {
2201  LOG(DBG, "hop over SFS header");
2202  curr += 12;
2203  off += 12;
2204  }
2205  else if(memcmp(curr, "FILE", 4) == 0) { // sfs bank...
2206  copy_SFS_File *file = (copy_SFS_File *)curr;
2207 
2208  int swap = (file->byte_order == 0x04030201) ? 0 : 1;
2209 
2210  if(strstr(file->name, "legacy")) {
2211  LOG(DBG, "Found legacy file");
2212  off += file->head_sz;
2213  curr += file->head_sz;
2214 
2215  if(memcmp(curr, "DATAP", 5) != 0) {
2216  LOG(ERR, "Got to legacy file, but not DATAP? is %c%c%c%c%c",
2217  curr[0],curr[1],curr[2],curr[3],curr[4]);
2218  return NULL;
2219  }
2220 
2221  return (DATAP *)curr;
2222  }
2223  else {
2224  LOG(DBG, "hop over SFS File (%s)", file->name);
2225  off += file->head_sz + ((qswap32(swap, file->sz)+3) & 0xfffffffc);
2226  curr += file->head_sz + ((qswap32(swap, file->sz)+3) & 0xfffffffc);
2227  }
2228  }
2229  else {
2230  LOG(DBG, "There is no legacy datap");
2231  return NULL;
2232  }
2233  }
2234 
2235  LOG(ERR, "no legacy datap");
2236  return NULL;
2237  }
2238 
2239 int daqReader::getStatusBasedEventDelay()
2240 {
2241 
2242  int delay = 0;
2243 
2244  switch(status) {
2245  case EVP_STAT_EVT : // something wrong with last event...
2246  delay = 100000;
2247  LOG(DBG, "Delaying for %d usec because of error on last event",delay);
2248  break ;
2249  case EVP_STAT_EOR : // EndOfRun was the last status and yet we are asked again...
2250  delay = 100000;
2251  LOG(DBG, "Delaying for %d usec because last event was end of run",delay);
2252  break ;
2253  case EVP_STAT_CRIT :
2254  delay = 1000000;
2255  LOG(ERR, "Delaying for %d usec because last event had critical status",delay);
2256 
2257  crit_cou++;
2258  if(crit_cou > 10) {
2259  LOG(ERR,"That's IT! Bye...",0,0,0,0,0);
2260  sleep(1) ; // linger...
2261  exit(-1) ;
2262  }
2263 
2264  default : // all OK...
2265  break ;
2266  }
2267 
2268  if(status != EVP_STAT_CRIT) crit_cou = 0;
2269 
2270  return delay;
2271 }
2272 
2273 
2274 int daqReader::writeCurrentEventToDisk(char *ofilename)
2275 {
2276  int fdo;
2277  int ret;
2278 
2279  if(memmap->mem == NULL) {
2280  LOG(ERR, "Can't write current event. No event");
2281  return -1;
2282  }
2283 
2284 
2285  fdo = open(ofilename, O_APPEND | O_WRONLY | O_CREAT, 0666);
2286  if(fdo < 0) {
2287  LOG(ERR, "Error opening output file %s (%s)", ofilename, strerror(errno));
2288  return -1;
2289  }
2290 
2291  ret = write(fdo, memmap->mem, event_size);
2292  if(ret != event_size) {
2293  LOG(ERR, "Error writing event data (%s)",strerror(errno));
2294  close(fdo);
2295  return -1;
2296  }
2297 
2298  close(fdo);
2299  return 0;
2300 }
2301 
2302 MemMap::MemMap()
2303 {
2304  mem=NULL;
2305  actual_mem_start=NULL;
2306  actual_size=0;
2307  fd = -1;
2308  real_mem = 0;
2309  page_size = sysconf(_SC_PAGESIZE);
2310 }
2311 
2312 MemMap::~MemMap()
2313 {
2314  unmap();
2315 }
2316 
2317 char *MemMap::map_real_mem(char *buffer, int _size)
2318 {
2319  offset = 0;
2320  size = _size;
2321  fd = 0;
2322  real_mem = 1;
2323  mem = buffer;
2324  return mem;
2325 }
2326 
2327 void daqReader::setCopyOnWriteMapping() {
2328  map_prot = PROT_READ | PROT_WRITE;
2329  map_flags = MAP_PRIVATE | MAP_NORESERVE;
2330 }
2331 
2332 char *MemMap::map(int _fd, long long int _offset, int _size, int map_prot, int map_flags)
2333 {
2334  offset = _offset;
2335  size = _size;
2336  fd = _fd;
2337 
2338  LOG(DBG, "Calling mmap fd=%d offset=%d size=%d",
2339  _fd, _offset, _size);
2340 
2341  int excess = offset % page_size;
2342  actual_offset = offset - excess;
2343  actual_size = size + excess; // actual size need not be a multiple of pagesize...
2344 
2345  LOG(DBG, " mmap excess=%d aoffset=%d asize=%d",
2346  excess, actual_offset, actual_size);
2347 
2348  actual_mem_start = (char *) mmap64(NULL, actual_size, map_prot, map_flags, fd, actual_offset) ;
2349  madvise(actual_mem_start, actual_size, MADV_SEQUENTIAL);
2350 
2351  if(((void *)actual_mem_start) == MAP_FAILED) {
2352  LOG(ERR,"Error in mmap (%s)",strerror(errno),0,0,0,0) ;
2353 
2354  mem=NULL;
2355  offset=0;
2356  size=0;
2357  actual_offset=0;
2358  actual_mem_start=NULL;
2359  actual_size=0;
2360  return NULL;
2361  }
2362 
2363  mem = actual_mem_start + excess;
2364  return mem;
2365 }
2366 
2367  void MemMap::unmap()
2368  {
2369  if(mem==NULL) return;
2370 
2371  if(real_mem) {
2372  real_mem = 0;
2373  }
2374  else {
2375  madvise(actual_mem_start, actual_size, MADV_DONTNEED);
2376  munmap(actual_mem_start, actual_size);
2377  }
2378 
2379  mem=NULL;
2380  offset=0;
2381  size=0;
2382  actual_offset=0;
2383  actual_mem_start=NULL;
2384  actual_size=0;
2385  }
2386 
2387  // Must be reading from daq file!
2388  // Must have a current event by calling a previous "get()"
2389  // Fills the EvtHeader with the "next" events header
2390  //
2391  // return 0 if next event end of file
2392  // return -1 if other error
2393  int daqReader::readNextFutureSummaryInfo(SummaryInfo *info)
2394  {
2395  memset(info, 0, sizeof(SummaryInfo));
2396 
2397  if(input_type != file) {
2398  LOG(ERR, "Can't read next future evt header unless reading from daq file");
2399  return -1;
2400  }
2401 
2402  if(sfs->singleDirMount == 0) {
2403  LOG(ERR, "Need a current file to read the next one...");
2404  return -1;
2405  }
2406 
2407  LOG(DBG, "(sfs) singleDirOffset = %d singleDirSize = %d", sfs->singleDirOffset, sfs->singleDirSize);
2408 
2409  long long int offset = sfs->singleDirOffset + sfs->singleDirSize;
2410 
2411  sfs_index *nsfs = new sfs_index();
2412  if(!nsfs) {
2413  LOG(ERR, "Couldn't create sfs_index");
2414  return -1;
2415  }
2416 
2417  LOG(DBG, "mounting dir at offset %s:%d",file_name,offset);
2418  int ret = nsfs->mountSingleDir(file_name, offset);
2419  if(ret < 0) {
2420  LOG("ERR", "Error mounting dir at offset %s:%d",file_name,offset);
2421 
2422  delete nsfs;
2423  return ret;
2424  }
2425  if(ret == 0) {
2426  LOG(DBG, "End of file reading next dir...");
2427  delete nsfs;
2428  return ret;
2429  }
2430 
2431  LOG(DBG, "Mounted dir (nsfs) off: %d sz: %d",nsfs->singleDirOffset, nsfs->singleDirSize);
2432 
2433 
2434  // Got to open this myself xxxxxxxxxxx
2435  int fd = open(file_name, O_RDONLY);
2436  if(fd <=0) {
2437  LOG(ERR, "No defined file descriptor");
2438  delete nsfs;
2439  return -1;
2440  }
2441 
2442  MemMap *nmem = new MemMap();
2443  char *mymem = nmem->map(fd, nsfs->singleDirOffset, nsfs->singleDirSize, map_prot, map_flags);
2444  if(!mymem) {
2445  LOG(ERR, "Couldn't map memory");
2446  delete nmem;
2447  delete nsfs;
2448  close(fd);
2449  return -1;
2450  }
2451 
2452  LOG(DBG, "mapped off=%d sz=%d into 0x%x",nsfs->singleDirOffset, nsfs->singleDirSize, mymem);
2453 
2454  fs_dir *rootdir = nsfs->opendir("/");
2455  for(;;) {
2456  fs_dirent *ent = nsfs->readdir(rootdir);
2457  if(!ent) {
2458  nsfs->closedir(rootdir);
2459  LOG(ERR, "Error finding event directory in sfs?");
2460 
2461  // Skip directory... go to next
2462  nsfs->closedir(rootdir);
2463  delete nmem;
2464  delete nsfs;
2465  close(fd);
2466  return -1;
2467  }
2468 
2469  if(memcmp(ent->full_name, "/#", 2) == 0) {
2470  info->seq = atoi(&ent->full_name[2]);
2471 
2472  nsfs->cd(ent->full_name);
2473  nsfs->closedir(rootdir);
2474  break;
2475  }
2476 
2477  if(allnumeric(&ent->full_name[1])) {
2478  info->seq = atoi(&ent->full_name[1]);
2479  nsfs->cd(ent->full_name);
2480  nsfs->closedir(rootdir);
2481  break;
2482  }
2483 
2484  LOG(DBG, "SFS event directory not yet found: %s",ent->full_name);
2485  }
2486 
2487  fs_dirent *summary = nsfs->opendirent("EventSummary");
2488  if(summary) {
2489 
2490  // recal that the memory is mounted from the beginning of the event
2491  // while the offset here is from the beginning of the file
2492  int mem_offset = summary->offset-nsfs->singleDirOffset;
2493  LOG(DBG, "found summary %d: file[%d-%d] sz=%d corr=%d)",summary->offset,nsfs->singleDirOffset, nsfs->singleDirOffset+nsfs->singleDirSize,nsfs->singleDirSize, summary->offset-nsfs->singleDirOffset);
2494 
2495  char *buff = mymem + mem_offset;
2496  fillSummaryInfo(info,(gbPayload *)buff);
2497  }
2498  else { // take it from datap
2499  LOG(NOTE, "No EventSummary, search for legacy datap");
2500  summary = nsfs->opendirent("legacy");
2501  if(!summary) {
2502  LOG(NOTE, "No EventSummary and no DATAP... hacking summary info");
2503  }
2504  else {
2505  long long int mem_offset = summary->offset - nsfs->singleDirOffset;
2506  char *buff = mymem + mem_offset;
2507  fillSummaryInfo(info,(DATAP *)buff);
2508  copySummaryInfoIn(info);
2509  }
2510  }
2511 
2512  delete nsfs;
2513  delete nmem;
2514  close(fd);
2515 
2516  return 1;
2517  }
2518 
2519 
Definition: iccp.h:685
Definition: cfgutil.h:4
void version(std::ostream &os=std::cout)
print HepMC version
Definition: Version.h:27