]> jspc29.x-matter.uni-frankfurt.de Git - daqdata.git/commitdiff
JAM: new approach to clean up eventbuilder queues when too many subevents were droppe...
authorhadaq <hadaq>
Thu, 24 Nov 2011 18:09:02 +0000 (18:09 +0000)
committerhadaq <hadaq>
Thu, 24 Nov 2011 18:09:02 +0000 (18:09 +0000)
hadaq/evtbuild.c

index c4fb600ae5813bb1ceb2df6cb3b828767769edb5..0e6f1b68069dd1447a2bedfd543aa7d8076a812d 100644 (file)
@@ -1,4 +1,4 @@
-static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.154 2011-08-16 10:54:41 hadaq Exp $";
+static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.155 2011-11-24 18:09:02 hadaq Exp $";
 
 #define _POSIX_C_SOURCE 199309L
 #define SYSLOG_NAMES
@@ -54,7 +54,9 @@ static RFILE *fRemote = NULL;
 
 #define RUNID_INFO_DT  600             /* time in s until not available runid info is logged  */
 #define LOST_SUBEVT_LIMIT 10000        /* number of popped subevents per trigger sequence until warning */
-#define LOST_SUBEVT_DROP  2            /* drop all queue contents after this number of lost sub warnings per trigger sequence */
+#define LOST_SUBEVT_DROP  4            /* drop all queue contents after this number of lost sub warnings per trigger sequence */
+#define QUEUE_FLUSH_EVENTS 10000
+#define QUEUE_FLUSH_REPEAT 5
 
 #define NEVTIDS 64UL                   /* must be 2^n */
 #define NEVTIDS_IN_FILE 0UL            /* must be 2^n */
@@ -297,7 +299,7 @@ int isMismatchCritical(TheArgs *theArgs, TheStats *my)
 
        if (theArgs->maxTrigMismatch == -1) {
                theArgs->maxTrigMismatch = 90;  /* JAM: set reasonable default if not specified */
-               sprintf(msglog, "<I> Critical trigger mismatch ratio takes default of %d %% ", theArgs->maxTrigMismatch);
+               sprintf(msglog, "<I> Critical trigger mismatch ratio takes default of %d %% ", (int) theArgs->maxTrigMismatch);
                storeLogInfo(theArgs, msglog);
                /*return retVal; */
        }
@@ -1108,6 +1110,8 @@ int main(int argc, char *argv[])
        int brokenEvtCnt = 0;
        unsigned long popCnt = 0;
        unsigned long popTermCnt = 0;
+       int discSubCnt = 0;
+       int discEvCnt = 0;
 
        currId = 0;
 
@@ -1200,6 +1204,7 @@ int main(int argc, char *argv[])
                                conHadTuQueue_voidP(hadTuQueue[i], storage);
                        }
                        subEvt = HadTuQueue_front(hadTuQueue[i]);
+
 #ifndef NDEBUG
                        syslog(LOG_DEBUG, "hadTuQueue[%d]: %p = subEvt: %s", i, subEvt, SubEvt_2charP(subEvt));
 #endif
@@ -1291,32 +1296,89 @@ int main(int argc, char *argv[])
                                step = 0;
                                popCnt++;
                                if (popCnt >= LOST_SUBEVT_LIMIT) {
-                                       if (theArgs->debugOptsCnt)
-                                               Debug_printPopTrigMismatch(theArgs, theStats, theDebug, i);
-
+                                       /*if (theArgs->debugOptsCnt)
+                                          Debug_printPopTrigMismatch(theArgs, theStats, theDebug, i);
+                                        */
                                        /* JAM: do not bail out immediately, but display problem message first.
                                         * Scale down warning message frequency:*/
                                        if ((popCnt % LOST_SUBEVT_LIMIT) == 0) {
-                                               sprintf(msglog, "<W> Subevent stream %d has popped  nr:%d queue entries to sync with master stream!", i,
-                                                               popCnt);
+                                               sprintf(msglog,
+                                                               "<W> Subevent stream %d has popped  nr:%d queue entries to sync with master stream of trig no %d after found stream trig no %d",
+                                                               i, popCnt, currTrigNr, trigNr);
                                                storeLogInfo(theArgs, msglog);
                                                popTermCnt++;
                                        }
                                        if (popTermCnt > LOST_SUBEVT_DROP) {
                                                /*  storeLogInfo(theArgs, "<E> too many lost subevents. Exiting now!");
                                                   goto bailOut1; */
+                                               int jj = 0;
+                                               int discEventSum = 0;
+                                               popCnt = 0;
+                                               popTermCnt = 0; /*reset globally */
+
+                                               storeLogInfo(theArgs, "<W> too many lost subevents. Dropping historic buffers...");
+                                               /* avoid here that other channels run full during clearing */
+                                               for (jj = 0; jj < QUEUE_FLUSH_REPEAT; jj++) {
+                                                       for (q = 0; q < theArgs->nrOfMsgs; q++) {
+                                                               /*sprintf(msglog, "<I>  Clearing queue %d ...", q);
+                                                                  storeLogInfo(theArgs, msglog); */
+
+                                                               discSubCnt = 0;
+                                                               while (discSubCnt < QUEUE_FLUSH_EVENTS) {
+                                                                       if (hadTuQueue[q] != NULL) {
+
+
+                                                                               while (!HadTuQueue_empty(hadTuQueue[q])) {
+                                                                                       /*subEvt = HadTuQueue_front(hadTuQueue[q]);
+                                                                                          trigNo[q] = SubEvt_trigNr(subEvt) >> 8; */
+                                                                                       HadTuQueue_pop(hadTuQueue[q]);
+                                                                                       discSubCnt++;
+                                                                               }
+                                                                               /* NOTE; HadTuQueue_clear() will not at all make queue "empty" JAM */
+                                                                               /*  HadTuQueue_clear(hadTuQueue[q]); */
+                                                                               desHadTuQueue(hadTuQueue[q]);
+                                                                               free(hadTuQueue[q]);
+                                                                               hadTuQueue[q] = NULL;
+                                                                               ShmTrans_free(shmTrans[q]);
+                                                                               /* JAM: at this point, we still did not switch to other double buffer, but
+                                                                                * just reach next queue segment. Only if this is at buffer end,
+                                                                                * buffer switching is initiated by ShmTrans_recv() at the begin of subevent loop*/
+                                                                       }
+                                                                       /* discarding complete double buffer does not help at all, once channels are out of sync */
+                                                                       /*if(HadTuQueue_front(shmTrans[q]->rdQueue) == NULL)
+                                                                          {
+                                                                          were at the end of complete double buffer.
+                                                                          sprintf(msglog, "<I>  Reached end of double buffer stream %d ", q);
+                                                                          storeLogInfo(theArgs, msglog);
+                                                                          break;
+                                                                          } */
+
+                                                                       if (hadTuQueue[q] == NULL) {
+                                                                               void *storage;
+                                                                               storage = ShmTrans_recv(shmTrans[q]);
+                                                                               hadTuQueue[q] = malloc(HadTuQueue_sizeOf());
+                                                                               conHadTuQueue_voidP(hadTuQueue[q], storage);
+                                                                       }
+
+                                                               }       /*while(discSubCnt < QUEUE_FLUSH_EVENTS* */
+
+                                                               if (discSubCnt > discEvCnt)
+                                                                       discEvCnt = discSubCnt;
+                                                               sprintf(msglog, "<I>  Cleared queue %d from %d subevents.", q, discSubCnt);
+                                                               storeLogInfo(theArgs, msglog);
+
+                                                       }       /* for q */
+                                                       (*theStats->evtsDiscarded) += discEvCnt;        /* account max of discarded subevents as discarded events */
+                                                       discEventSum += discEvCnt;
+                                               }               /* for jj */
+                                               i = 0;  /* start again with next trigger id */
+                                               step = 0;
+
+                                               sprintf(msglog, "<I>  leaving queue cleanup section after discarding %d events", discEventSum);
+                                               storeLogInfo(theArgs, msglog);
+                                               discEvCnt = 0;
+                                               discSubCnt = 0;
 
-                                               /* throw away all queues and start again: */
-                                               storeLogInfo(theArgs, "<E> too many lost subevents. Dropping all queues!");
-                                               for (q = 0; q < theArgs->nrOfMsgs; q++) {
-                                                       HadTuQueue_clear(hadTuQueue[q]);
-                                                       desHadTuQueue(hadTuQueue[q]);
-                                                       free(hadTuQueue[q]);
-                                                       hadTuQueue[q] = NULL;
-                                                       ShmTrans_free(shmTrans[q]);
-                                               }
-                                               i = 0;  /* start again with evaluating other double buffer */
-                                               step = 1;
                                        }
                                }
                        } else {
@@ -1326,11 +1388,14 @@ int main(int argc, char *argv[])
                                }
                                evtIsBroken = 1;
                        }
-                       if (HadTuQueue_empty(hadTuQueue[i])) {
+                       if ((hadTuQueue[i] != NULL) && HadTuQueue_empty(hadTuQueue[i])) {
                                desHadTuQueue(hadTuQueue[i]);
                                free(hadTuQueue[i]);
                                hadTuQueue[i] = NULL;
                                ShmTrans_free(shmTrans[i]);
+                               /* JAM: at this point, we still did not switch to other double buffer, but
+                                * just reach next queue segment. Only if this is at buffer end,
+                                * buffer switching is initiated by ShmTrans_recv() at the begin of subevent loop*/
                        }
                }