From: hadaq Date: Fri, 30 Jul 2010 09:19:31 +0000 (+0000) Subject: EB exit condition if no data coming for one of sources. Sergey. X-Git-Url: https://jspc29.x-matter.uni-frankfurt.de/git/?a=commitdiff_plain;h=ff63c5e7a027b755a873d6840108ebfbf0f8a223;p=daqdata.git EB exit condition if no data coming for one of sources. Sergey. --- diff --git a/hadaq/netmem.c b/hadaq/netmem.c index e1949d6..a2b0bda 100644 --- a/hadaq/netmem.c +++ b/hadaq/netmem.c @@ -1,4 +1,4 @@ -static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.42 2010-07-23 12:38:23 hadaq Exp $"; +static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.43 2010-07-30 09:19:31 hadaq Exp $"; #define _POSIX_C_SOURCE 199309L #define SYSLOG_NAMES @@ -42,6 +42,9 @@ typedef struct TheArgsS { unsigned long portNr[MAXINPATH]; int portNrCnt; + + /* Maximu delay of incoming data in seconds. */ + unsigned long dataDelay; } TheArgs; typedef struct TheStatsS { @@ -68,6 +71,7 @@ static void usage(const char *progName) syslog(LOG_ERR, " [-q queueSize] Size of the queue (shared memory segments) in MBytes."); syslog(LOG_ERR, " [-Q queueNr:queueSize] Set different queue sizes in MB for different queue numbers,"); syslog(LOG_ERR, " example: -Q 2:4 -Q 4:8 -Q 5:12"); + syslog(LOG_ERR, " [-d seconds] Maximum delay of incoming data in seconds."); } static int makeQueues(TheArgs *my) @@ -143,17 +147,13 @@ static void argsDefault(TheArgs *my) my->portNrCnt = 0; + my->dataDelay = 5; /* seconds */ + strcpy(my->shmname, ""); sprintf(my->evtstat, "%s_evtstat.txt", getenv("DAQ_SETUP")); } -static inPath_short2wide(TheArgs *theArgs, char *optarg) -{ - - -} - static int argsFromCL(TheArgs *my, int argc, char *argv[]) { extern char *optarg; @@ -164,7 +164,7 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[]) inPathCntFromParam = my->inPathCnt; my->inPathCnt = 0; - while ((i = getopt(argc, argv, "ai:m:q:Q:p:v:bS:")) != -1) { + while ((i = getopt(argc, argv, "ai:m:q:Q:p:v:bS:d:")) != -1) { switch (i) { case 'm': my->nrOfMsgs = atoi(optarg); @@ -190,6 +190,9 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[]) case 'S': strcpy(my->shmname, optarg); break; + case 'd': + my->dataDelay = atoi(optarg); + break; default: return -1; break; @@ -265,6 +268,61 @@ static char *unit(unsigned long v) return retVal; } +static int checkDataSources(TheArgs *theArgs, TheStats *theStats) +{ + int ready2check = 0; + int i; + static unsigned long dataSources[MAXINPATH]; + static int once = 1; + + static time_t t_0 = 0; + time_t t, dT; + + t = time(NULL); + dT = t - t_0; + + if (dT >= 1) { + + /* Init to zero */ + if (once) { + for (i = 0; i < theArgs->nrOfMsgs; i++) + dataSources[i] = 0; + once = 0; + } + + for (i = 0; i < theArgs->nrOfMsgs; i++) { + if ((*theStats->recvBytesRate[i]) > 0) + dataSources[i]++; + + if (dataSources[i] > theArgs->dataDelay) + ready2check = 1; + } + + int go2exit = 0; + + if (ready2check) { + for (i = 0; i < theArgs->nrOfMsgs; i++) { + if (dataSources[i] == 0) { + fprintf(stderr, " daq_netmem: source %d, port %d: no data received.\n", i, theArgs->portNr[i]); + go2exit = 1; + } + } + + if (go2exit) + return -1; + + /* Init to zero */ + ready2check = 0; + for (i = 0; i < theArgs->nrOfMsgs; i++) + dataSources[i] = 0; + } + + t_0 = t; + } + + return 0; +} + static void printTime() { struct timeval tv; @@ -319,9 +377,14 @@ static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans ** printf("%3d", fillLevel); /* Add more statistic for recv bytes per second */ - (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i]; + if (*nettr[i]->bytesReceived - lastBytesRecv[i] > 0) { + (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i]; + } else { + (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived; + } lastBytesRecv[i] = *nettr[i]->bytesReceived; } + t_0 = t; printf("\n"); @@ -546,6 +609,10 @@ int main(int argc, char *argv[]) add2Stat(theArgs, theStats, 1, shmTrans, netTrans); statsDump(theArgs, netTrans, 1); + if (-1 == checkDataSources(theArgs, theStats)) { + fprintf(stderr, "Exit.\n"); + goto bailOut1; + } for (i = 0; i < theArgs->nrOfMsgs; i++) { if (hadTu[i] == NULL) { @@ -578,6 +645,8 @@ int main(int argc, char *argv[]) storeInfoStop(argv[0], time(NULL), worker, theArgs); statsDump(theArgs, netTrans, 0); + bailOut1: + for (i = 0; i < theArgs->nrOfMsgs; i++) { ShmTrans_remove(shmTrans[i]); NetTrans_remove(netTrans[i]);