-static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.42 2010-07-23 12:38:23 hadaq Exp $";
+static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.43 2010-07-30 09:19:31 hadaq Exp $";
#define _POSIX_C_SOURCE 199309L
#define SYSLOG_NAMES
unsigned long portNr[MAXINPATH];
int portNrCnt;
+
+ /* Maximu delay of incoming data in seconds. */
+ unsigned long dataDelay;
} TheArgs;
typedef struct TheStatsS {
syslog(LOG_ERR, " [-q queueSize] Size of the queue (shared memory segments) in MBytes.");
syslog(LOG_ERR, " [-Q queueNr:queueSize] Set different queue sizes in MB for different queue numbers,");
syslog(LOG_ERR, " example: -Q 2:4 -Q 4:8 -Q 5:12");
+ syslog(LOG_ERR, " [-d seconds] Maximum delay of incoming data in seconds.");
}
static int makeQueues(TheArgs *my)
my->portNrCnt = 0;
+ my->dataDelay = 5; /* seconds */
+
strcpy(my->shmname, "");
sprintf(my->evtstat, "%s_evtstat.txt", getenv("DAQ_SETUP"));
}
-static inPath_short2wide(TheArgs *theArgs, char *optarg)
-{
-
-
-}
-
static int argsFromCL(TheArgs *my, int argc, char *argv[])
{
extern char *optarg;
inPathCntFromParam = my->inPathCnt;
my->inPathCnt = 0;
- while ((i = getopt(argc, argv, "ai:m:q:Q:p:v:bS:")) != -1) {
+ while ((i = getopt(argc, argv, "ai:m:q:Q:p:v:bS:d:")) != -1) {
switch (i) {
case 'm':
my->nrOfMsgs = atoi(optarg);
case 'S':
strcpy(my->shmname, optarg);
break;
+ case 'd':
+ my->dataDelay = atoi(optarg);
+ break;
default:
return -1;
break;
return retVal;
}
+static int checkDataSources(TheArgs *theArgs, TheStats *theStats)
+{
+ int ready2check = 0;
+ int i;
+ static unsigned long dataSources[MAXINPATH];
+ static int once = 1;
+
+ static time_t t_0 = 0;
+ time_t t, dT;
+
+ t = time(NULL);
+ dT = t - t_0;
+
+ if (dT >= 1) {
+
+ /* Init to zero */
+ if (once) {
+ for (i = 0; i < theArgs->nrOfMsgs; i++)
+ dataSources[i] = 0;
+ once = 0;
+ }
+
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
+ if ((*theStats->recvBytesRate[i]) > 0)
+ dataSources[i]++;
+
+ if (dataSources[i] > theArgs->dataDelay)
+ ready2check = 1;
+ }
+
+ int go2exit = 0;
+
+ if (ready2check) {
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
+ if (dataSources[i] == 0) {
+ fprintf(stderr, "<E> daq_netmem: source %d, port %d: no data received.\n", i, theArgs->portNr[i]);
+ go2exit = 1;
+ }
+ }
+
+ if (go2exit)
+ return -1;
+
+ /* Init to zero */
+ ready2check = 0;
+ for (i = 0; i < theArgs->nrOfMsgs; i++)
+ dataSources[i] = 0;
+ }
+
+ t_0 = t;
+ }
+
+ return 0;
+}
+
static void printTime()
{
struct timeval tv;
printf("%3d", fillLevel);
/* Add more statistic for recv bytes per second */
- (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i];
+ if (*nettr[i]->bytesReceived - lastBytesRecv[i] > 0) {
+ (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i];
+ } else {
+ (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived;
+ }
lastBytesRecv[i] = *nettr[i]->bytesReceived;
}
+
t_0 = t;
printf("\n");
add2Stat(theArgs, theStats, 1, shmTrans, netTrans);
statsDump(theArgs, netTrans, 1);
+ if (-1 == checkDataSources(theArgs, theStats)) {
+ fprintf(stderr, "Exit.\n");
+ goto bailOut1;
+ }
for (i = 0; i < theArgs->nrOfMsgs; i++) {
if (hadTu[i] == NULL) {
storeInfoStop(argv[0], time(NULL), worker, theArgs);
statsDump(theArgs, netTrans, 0);
+ bailOut1:
+
for (i = 0; i < theArgs->nrOfMsgs; i++) {
ShmTrans_remove(shmTrans[i]);
NetTrans_remove(netTrans[i]);