-static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.154 2011-08-16 10:54:41 hadaq Exp $";
+static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.155 2011-11-24 18:09:02 hadaq Exp $";
#define _POSIX_C_SOURCE 199309L
#define SYSLOG_NAMES
#define RUNID_INFO_DT 600 /* time in s until not available runid info is logged */
#define LOST_SUBEVT_LIMIT 10000 /* number of popped subevents per trigger sequence until warning */
-#define LOST_SUBEVT_DROP 2 /* drop all queue contents after this number of lost sub warnings per trigger sequence */
+#define LOST_SUBEVT_DROP 4 /* drop all queue contents after this number of lost sub warnings per trigger sequence */
+#define QUEUE_FLUSH_EVENTS 10000
+#define QUEUE_FLUSH_REPEAT 5
#define NEVTIDS 64UL /* must be 2^n */
#define NEVTIDS_IN_FILE 0UL /* must be 2^n */
if (theArgs->maxTrigMismatch == -1) {
theArgs->maxTrigMismatch = 90; /* JAM: set reasonable default if not specified */
- sprintf(msglog, "<I> Critical trigger mismatch ratio takes default of %d %% ", theArgs->maxTrigMismatch);
+ sprintf(msglog, "<I> Critical trigger mismatch ratio takes default of %d %% ", (int) theArgs->maxTrigMismatch);
storeLogInfo(theArgs, msglog);
/*return retVal; */
}
int brokenEvtCnt = 0;
unsigned long popCnt = 0;
unsigned long popTermCnt = 0;
+ int discSubCnt = 0;
+ int discEvCnt = 0;
currId = 0;
conHadTuQueue_voidP(hadTuQueue[i], storage);
}
subEvt = HadTuQueue_front(hadTuQueue[i]);
+
#ifndef NDEBUG
syslog(LOG_DEBUG, "hadTuQueue[%d]: %p = subEvt: %s", i, subEvt, SubEvt_2charP(subEvt));
#endif
step = 0;
popCnt++;
if (popCnt >= LOST_SUBEVT_LIMIT) {
- if (theArgs->debugOptsCnt)
- Debug_printPopTrigMismatch(theArgs, theStats, theDebug, i);
-
+ /*if (theArgs->debugOptsCnt)
+ Debug_printPopTrigMismatch(theArgs, theStats, theDebug, i);
+ */
/* JAM: do not bail out immediately, but display problem message first.
* Scale down warning message frequency:*/
if ((popCnt % LOST_SUBEVT_LIMIT) == 0) {
- sprintf(msglog, "<W> Subevent stream %d has popped nr:%d queue entries to sync with master stream!", i,
- popCnt);
+ sprintf(msglog,
+ "<W> Subevent stream %d has popped nr:%d queue entries to sync with master stream of trig no %d after found stream trig no %d",
+ i, popCnt, currTrigNr, trigNr);
storeLogInfo(theArgs, msglog);
popTermCnt++;
}
if (popTermCnt > LOST_SUBEVT_DROP) {
/* storeLogInfo(theArgs, "<E> too many lost subevents. Exiting now!");
goto bailOut1; */
+ int jj = 0;
+ int discEventSum = 0;
+ popCnt = 0;
+ popTermCnt = 0; /*reset globally */
+
+ storeLogInfo(theArgs, "<W> too many lost subevents. Dropping historic buffers...");
+ /* avoid here that other channels run full during clearing */
+ for (jj = 0; jj < QUEUE_FLUSH_REPEAT; jj++) {
+ for (q = 0; q < theArgs->nrOfMsgs; q++) {
+ /*sprintf(msglog, "<I> Clearing queue %d ...", q);
+ storeLogInfo(theArgs, msglog); */
+
+ discSubCnt = 0;
+ while (discSubCnt < QUEUE_FLUSH_EVENTS) {
+ if (hadTuQueue[q] != NULL) {
+
+
+ while (!HadTuQueue_empty(hadTuQueue[q])) {
+ /*subEvt = HadTuQueue_front(hadTuQueue[q]);
+ trigNo[q] = SubEvt_trigNr(subEvt) >> 8; */
+ HadTuQueue_pop(hadTuQueue[q]);
+ discSubCnt++;
+ }
+ /* NOTE; HadTuQueue_clear() will not at all make queue "empty" JAM */
+ /* HadTuQueue_clear(hadTuQueue[q]); */
+ desHadTuQueue(hadTuQueue[q]);
+ free(hadTuQueue[q]);
+ hadTuQueue[q] = NULL;
+ ShmTrans_free(shmTrans[q]);
+ /* JAM: at this point, we still did not switch to other double buffer, but
+ * just reach next queue segment. Only if this is at buffer end,
+ * buffer switching is initiated by ShmTrans_recv() at the begin of subevent loop*/
+ }
+ /* discarding complete double buffer does not help at all, once channels are out of sync */
+ /*if(HadTuQueue_front(shmTrans[q]->rdQueue) == NULL)
+ {
+ were at the end of complete double buffer.
+ sprintf(msglog, "<I> Reached end of double buffer stream %d ", q);
+ storeLogInfo(theArgs, msglog);
+ break;
+ } */
+
+ if (hadTuQueue[q] == NULL) {
+ void *storage;
+ storage = ShmTrans_recv(shmTrans[q]);
+ hadTuQueue[q] = malloc(HadTuQueue_sizeOf());
+ conHadTuQueue_voidP(hadTuQueue[q], storage);
+ }
+
+ } /*while(discSubCnt < QUEUE_FLUSH_EVENTS* */
+
+ if (discSubCnt > discEvCnt)
+ discEvCnt = discSubCnt;
+ sprintf(msglog, "<I> Cleared queue %d from %d subevents.", q, discSubCnt);
+ storeLogInfo(theArgs, msglog);
+
+ } /* for q */
+ (*theStats->evtsDiscarded) += discEvCnt; /* account max of discarded subevents as discarded events */
+ discEventSum += discEvCnt;
+ } /* for jj */
+ i = 0; /* start again with next trigger id */
+ step = 0;
+
+ sprintf(msglog, "<I> leaving queue cleanup section after discarding %d events", discEventSum);
+ storeLogInfo(theArgs, msglog);
+ discEvCnt = 0;
+ discSubCnt = 0;
- /* throw away all queues and start again: */
- storeLogInfo(theArgs, "<E> too many lost subevents. Dropping all queues!");
- for (q = 0; q < theArgs->nrOfMsgs; q++) {
- HadTuQueue_clear(hadTuQueue[q]);
- desHadTuQueue(hadTuQueue[q]);
- free(hadTuQueue[q]);
- hadTuQueue[q] = NULL;
- ShmTrans_free(shmTrans[q]);
- }
- i = 0; /* start again with evaluating other double buffer */
- step = 1;
}
}
} else {
}
evtIsBroken = 1;
}
- if (HadTuQueue_empty(hadTuQueue[i])) {
+ if ((hadTuQueue[i] != NULL) && HadTuQueue_empty(hadTuQueue[i])) {
desHadTuQueue(hadTuQueue[i]);
free(hadTuQueue[i]);
hadTuQueue[i] = NULL;
ShmTrans_free(shmTrans[i]);
+ /* JAM: at this point, we still did not switch to other double buffer, but
+ * just reach next queue segment. Only if this is at buffer end,
+ * buffer switching is initiated by ShmTrans_recv() at the begin of subevent loop*/
}
}