From 90743cedf2d097890eac5f7294f56c7f40f6a632 Mon Sep 17 00:00:00 2001 From: hadaq Date: Thu, 24 Nov 2011 18:09:02 +0000 Subject: [PATCH] JAM: new approach to clean up eventbuilder queues when too many subevents were dropped. Actually, my previous try was very wrong, since queues were not cleared correctly. Still under development. --- hadaq/evtbuild.c | 105 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 85 insertions(+), 20 deletions(-) diff --git a/hadaq/evtbuild.c b/hadaq/evtbuild.c index c4fb600..0e6f1b6 100644 --- a/hadaq/evtbuild.c +++ b/hadaq/evtbuild.c @@ -1,4 +1,4 @@ -static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.154 2011-08-16 10:54:41 hadaq Exp $"; +static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.155 2011-11-24 18:09:02 hadaq Exp $"; #define _POSIX_C_SOURCE 199309L #define SYSLOG_NAMES @@ -54,7 +54,9 @@ static RFILE *fRemote = NULL; #define RUNID_INFO_DT 600 /* time in s until not available runid info is logged */ #define LOST_SUBEVT_LIMIT 10000 /* number of popped subevents per trigger sequence until warning */ -#define LOST_SUBEVT_DROP 2 /* drop all queue contents after this number of lost sub warnings per trigger sequence */ +#define LOST_SUBEVT_DROP 4 /* drop all queue contents after this number of lost sub warnings per trigger sequence */ +#define QUEUE_FLUSH_EVENTS 10000 +#define QUEUE_FLUSH_REPEAT 5 #define NEVTIDS 64UL /* must be 2^n */ #define NEVTIDS_IN_FILE 0UL /* must be 2^n */ @@ -297,7 +299,7 @@ int isMismatchCritical(TheArgs *theArgs, TheStats *my) if (theArgs->maxTrigMismatch == -1) { theArgs->maxTrigMismatch = 90; /* JAM: set reasonable default if not specified */ - sprintf(msglog, " Critical trigger mismatch ratio takes default of %d %% ", theArgs->maxTrigMismatch); + sprintf(msglog, " Critical trigger mismatch ratio takes default of %d %% ", (int) theArgs->maxTrigMismatch); storeLogInfo(theArgs, msglog); /*return retVal; */ } @@ -1108,6 +1110,8 @@ int main(int argc, char *argv[]) int brokenEvtCnt = 0; unsigned long popCnt = 0; unsigned long popTermCnt = 0; + int discSubCnt = 0; + int discEvCnt = 0; currId = 0; @@ -1200,6 +1204,7 @@ int main(int argc, char *argv[]) conHadTuQueue_voidP(hadTuQueue[i], storage); } subEvt = HadTuQueue_front(hadTuQueue[i]); + #ifndef NDEBUG syslog(LOG_DEBUG, "hadTuQueue[%d]: %p = subEvt: %s", i, subEvt, SubEvt_2charP(subEvt)); #endif @@ -1291,32 +1296,89 @@ int main(int argc, char *argv[]) step = 0; popCnt++; if (popCnt >= LOST_SUBEVT_LIMIT) { - if (theArgs->debugOptsCnt) - Debug_printPopTrigMismatch(theArgs, theStats, theDebug, i); - + /*if (theArgs->debugOptsCnt) + Debug_printPopTrigMismatch(theArgs, theStats, theDebug, i); + */ /* JAM: do not bail out immediately, but display problem message first. * Scale down warning message frequency:*/ if ((popCnt % LOST_SUBEVT_LIMIT) == 0) { - sprintf(msglog, " Subevent stream %d has popped nr:%d queue entries to sync with master stream!", i, - popCnt); + sprintf(msglog, + " Subevent stream %d has popped nr:%d queue entries to sync with master stream of trig no %d after found stream trig no %d", + i, popCnt, currTrigNr, trigNr); storeLogInfo(theArgs, msglog); popTermCnt++; } if (popTermCnt > LOST_SUBEVT_DROP) { /* storeLogInfo(theArgs, " too many lost subevents. Exiting now!"); goto bailOut1; */ + int jj = 0; + int discEventSum = 0; + popCnt = 0; + popTermCnt = 0; /*reset globally */ + + storeLogInfo(theArgs, " too many lost subevents. Dropping historic buffers..."); + /* avoid here that other channels run full during clearing */ + for (jj = 0; jj < QUEUE_FLUSH_REPEAT; jj++) { + for (q = 0; q < theArgs->nrOfMsgs; q++) { + /*sprintf(msglog, " Clearing queue %d ...", q); + storeLogInfo(theArgs, msglog); */ + + discSubCnt = 0; + while (discSubCnt < QUEUE_FLUSH_EVENTS) { + if (hadTuQueue[q] != NULL) { + + + while (!HadTuQueue_empty(hadTuQueue[q])) { + /*subEvt = HadTuQueue_front(hadTuQueue[q]); + trigNo[q] = SubEvt_trigNr(subEvt) >> 8; */ + HadTuQueue_pop(hadTuQueue[q]); + discSubCnt++; + } + /* NOTE; HadTuQueue_clear() will not at all make queue "empty" JAM */ + /* HadTuQueue_clear(hadTuQueue[q]); */ + desHadTuQueue(hadTuQueue[q]); + free(hadTuQueue[q]); + hadTuQueue[q] = NULL; + ShmTrans_free(shmTrans[q]); + /* JAM: at this point, we still did not switch to other double buffer, but + * just reach next queue segment. Only if this is at buffer end, + * buffer switching is initiated by ShmTrans_recv() at the begin of subevent loop*/ + } + /* discarding complete double buffer does not help at all, once channels are out of sync */ + /*if(HadTuQueue_front(shmTrans[q]->rdQueue) == NULL) + { + were at the end of complete double buffer. + sprintf(msglog, " Reached end of double buffer stream %d ", q); + storeLogInfo(theArgs, msglog); + break; + } */ + + if (hadTuQueue[q] == NULL) { + void *storage; + storage = ShmTrans_recv(shmTrans[q]); + hadTuQueue[q] = malloc(HadTuQueue_sizeOf()); + conHadTuQueue_voidP(hadTuQueue[q], storage); + } + + } /*while(discSubCnt < QUEUE_FLUSH_EVENTS* */ + + if (discSubCnt > discEvCnt) + discEvCnt = discSubCnt; + sprintf(msglog, " Cleared queue %d from %d subevents.", q, discSubCnt); + storeLogInfo(theArgs, msglog); + + } /* for q */ + (*theStats->evtsDiscarded) += discEvCnt; /* account max of discarded subevents as discarded events */ + discEventSum += discEvCnt; + } /* for jj */ + i = 0; /* start again with next trigger id */ + step = 0; + + sprintf(msglog, " leaving queue cleanup section after discarding %d events", discEventSum); + storeLogInfo(theArgs, msglog); + discEvCnt = 0; + discSubCnt = 0; - /* throw away all queues and start again: */ - storeLogInfo(theArgs, " too many lost subevents. Dropping all queues!"); - for (q = 0; q < theArgs->nrOfMsgs; q++) { - HadTuQueue_clear(hadTuQueue[q]); - desHadTuQueue(hadTuQueue[q]); - free(hadTuQueue[q]); - hadTuQueue[q] = NULL; - ShmTrans_free(shmTrans[q]); - } - i = 0; /* start again with evaluating other double buffer */ - step = 1; } } } else { @@ -1326,11 +1388,14 @@ int main(int argc, char *argv[]) } evtIsBroken = 1; } - if (HadTuQueue_empty(hadTuQueue[i])) { + if ((hadTuQueue[i] != NULL) && HadTuQueue_empty(hadTuQueue[i])) { desHadTuQueue(hadTuQueue[i]); free(hadTuQueue[i]); hadTuQueue[i] = NULL; ShmTrans_free(shmTrans[i]); + /* JAM: at this point, we still did not switch to other double buffer, but + * just reach next queue segment. Only if this is at buffer end, + * buffer switching is initiated by ShmTrans_recv() at the begin of subevent loop*/ } } -- 2.43.0