From: hadaq Date: Sat, 5 Jun 2010 23:02:30 +0000 (+0000) Subject: Variable queue sizes fixed. Sergey. X-Git-Url: https://jspc29.x-matter.uni-frankfurt.de/git/?a=commitdiff_plain;h=c67119b553cb3a30a4beb897e2a3f93e5ca91f3a;p=daqdata.git Variable queue sizes fixed. Sergey. --- diff --git a/hadaq/netmem.c b/hadaq/netmem.c index 61ffc52..52390af 100644 --- a/hadaq/netmem.c +++ b/hadaq/netmem.c @@ -1,4 +1,4 @@ -static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.36 2009-12-10 15:39:55 hadaq Exp $"; +static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.37 2010-06-05 23:02:30 hadaq Exp $"; #define _POSIX_C_SOURCE 199309L @@ -29,12 +29,18 @@ typedef struct TheArgsS { unsigned long nrOfMsgs; unsigned long isStandalone; unsigned long priority; - unsigned long queueSize; - unsigned long varQSize[MAXINPATH]; - int varQSizeCnt; + unsigned long queueSize[MAXINPATH]; + int queueSizeCnt; + + /* the following arguments are for the variable queue size */ + char *queueVarSize[MAXINPATH]; + char queueVarSizeS[MAXINPATH][PARAM_MAX_NAME_LEN]; + int queueVarSizeCnt; + char verbosity[PARAM_MAX_VALUE_LEN]; unsigned int buffStat; char shmname[PARAM_MAX_VALUE_LEN]; + char evtstat[PARAM_MAX_VALUE_LEN]; } TheArgs; typedef struct TheStatsS { @@ -55,6 +61,43 @@ static void usage(const char *progName) syslog(LOG_ERR, "Usage: %s -i inPath [-i inPath] -m nrOfMsgs [-p priority]", progName); syslog(LOG_ERR, "[-b] show fill levels of buffers]"); syslog(LOG_ERR, "[-S shmem_name] extension of a shared memory name to be used"); + syslog(LOG_ERR, "[-q size] queue size"); + syslog(LOG_ERR, "[-Q num:size] used to set specific queue size for given queue number"); +} + +static int makeQueues(TheArgs *my) +{ + int retVal = 0; + int i; + + int queueNum; + char newQueueSize[30]; + + /* Init all queue sizes */ + for (i = 1; i < my->nrOfMsgs; i++) { + my->queueSize[i] = my->queueSize[0]; + } + + /* Extract specific values for queue sizes */ + for (i = 0; i < my->queueVarSizeCnt; i++) { + + if (0 == (sscanf(my->queueVarSize[i], "%d:%s", &queueNum, newQueueSize))) { + syslog(LOG_ERR, "evtbuild.c: makeQueues: %s", "queueVarSize option is wrong."); + retVal = -1; + break; + } else { + if (queueNum >= my->nrOfMsgs) { + syslog(LOG_ERR, "evtbuild.c: makeQueues: %s", + "queue number in queueVarSize option exceeds maximum number of data sources."); + retVal = -1; + break; + } + + my->queueSize[queueNum] = atoi(newQueueSize); + } + } + + return retVal; } static void argsDump(TheArgs *my) @@ -67,7 +110,7 @@ static void argsDump(TheArgs *my) syslog(LOG_DEBUG, "nrOfMsgs: %d", my->nrOfMsgs); syslog(LOG_DEBUG, "priority: %d", my->priority); syslog(LOG_DEBUG, "isStandalone: %d", my->isStandalone); - syslog(LOG_DEBUG, "queueSize: %d", my->queueSize); + syslog(LOG_DEBUG, "queueSize: %d", my->queueSize[0]); syslog(LOG_DEBUG, "verbosity: %s", my->verbosity); if (strcmp(my->shmname, "") != 0) { syslog(LOG_DEBUG, "shmem name: %s", my->shmname); @@ -85,16 +128,22 @@ static void argsDefault(TheArgs *my) my->nrOfMsgs = 0; my->priority = 0; my->isStandalone = 1; - my->queueSize = 4 * 1024 * 1024; + my->queueSize[0] = 64 * 1024 * 1024UL; strcpy(my->verbosity, "info"); for (i = 0; i < MAXINPATH; i++) { - my->varQSize[i] = 4 * 1024 * 1024; + my->queueVarSize[i] = my->queueVarSizeS[i]; } + my->queueVarSizeCnt = 0; /* no varible queue sizes by default */ + my->buffStat = 0; strcpy(my->shmname, ""); + + sprintf(my->evtstat, "%s_evtstat.txt", getenv("DAQ_SETUP")); } + + static int argsFromCL(TheArgs *my, int argc, char *argv[]) { extern char *optarg; @@ -105,7 +154,7 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[]) inPathCntFromParam = my->inPathCnt; my->inPathCnt = 0; - while ((i = getopt(argc, argv, "ai:m:q:p:v:bS:")) != -1) { + while ((i = getopt(argc, argv, "ai:m:q:Q:p:v:bS:")) != -1) { switch (i) { case 'm': my->nrOfMsgs = atoi(optarg); @@ -120,7 +169,10 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[]) my->priority = atoi(optarg); break; case 'q': - my->queueSize = atoi(optarg); + my->queueSize[0] = strtoul(optarg, NULL, 0); + break; + case 'Q': + strcpy(my->queueVarSize[my->queueVarSizeCnt++], optarg); break; case 'v': strcpy(my->verbosity, optarg); @@ -158,11 +210,11 @@ static int argsFromParam(TheArgs *my, int argc, char *argv[]) Param_getInt(param, name, "nrofmsgs", ¶mWasFound, &my->nrOfMsgs); Param_getInt(param, name, "stndln", ¶mWasFound, &my->isStandalone); Param_getInt(param, name, "prio", ¶mWasFound, &my->priority); - Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize); + Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize[0]); Param_getString(param, name, "verb", ¶mWasFound, my->verbosity); - /* this is an argument for a variable queue size. S.Y. */ - Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize); + /* this is an argument for a variable queue size */ + /* Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize); */ desParam(param); } @@ -208,7 +260,7 @@ static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans ** /* Add statistic for fill levels of buffers in percentage. */ static time_t t_0 = 0; - float buffSize, queueSize; + unsigned long buffSize, queueSize; time_t t, dT; int i; unsigned long fillLevel; @@ -219,21 +271,22 @@ static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans ** if (dT >= interval) { for (i = 0; i < theArgs->nrOfMsgs; i++) { - buffSize = 2 * theArgs->varQSize[i]; + buffSize = 2 * theArgs->queueSize[i]; queueSize = HadTuQueue_size(shmtr[i]->wrQueue); /* Add here statistic for fill levels of buffers */ fillLevel = (unsigned long) (100 * queueSize + 0.5) / buffSize; (*my->netmemBuff[i]) = fillLevel; + /* printf("q[%02d]%5d\%\n",i,fillLevel); */ + /* Add more statistic for recv bytes per second */ (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i]; lastBytesRecv[i] = *nettr[i]->bytesReceived; } + t_0 = t; } - - t_0 = t; } static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr, NetTrans **my) @@ -259,7 +312,7 @@ static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr, fprintf(stderr, "q[%2d]: ", i); if (shmtr[i] != NULL) - if (theArgs->varQSize[i] * 2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) { + if (theArgs->queueSize[i] * 2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) { fprintf(stderr, "%8d ", HadTuQueue_size(shmtr[i]->wrQueue)); } else { fprintf(stderr, "%8s ", "EXCEEDED"); @@ -283,7 +336,7 @@ static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr, for (j = 0; j < maxnorm; j++) { fprintf(stderr, "%1d ", maxnorm - j - 1); for (i = 0; i < theArgs->nrOfMsgs; i++) { - buffSize = 2 * theArgs->varQSize[i]; + buffSize = 2 * theArgs->queueSize[i]; queueSize = HadTuQueue_size(shmtr[i]->wrQueue); if (maxnorm - maxnorm * queueSize / buffSize < j) { @@ -339,14 +392,30 @@ static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr, t0 = t; } +static void storeEvtStat(TheArgs *myArgs, unsigned long pktsDiscard, unsigned long msgsDiscard, unsigned long queue) +{ + + FILE *fp; + char ltime[20]; /* local time */ + time_t t = time(NULL); + strftime(ltime, 20, "%Y-%m-%d %H:%M:%S", localtime(&t)); + + fp = fopen(myArgs->evtstat, "a+"); + fprintf(fp, "%s %d %s %d %d %d\n", "daq_netmem: ", (int) getpid(), ltime, queue, pktsDiscard, msgsDiscard); + fclose(fp); +} + static void statsDump(TheArgs *theArgs, NetTrans **my, int interval) { + static unsigned long lastPR[MAXINPATH]; + static unsigned long lastPD[MAXINPATH]; + static unsigned long lastMD[MAXINPATH]; + static unsigned long lastBR[MAXINPATH]; static time_t lastTime = 0; time_t dT; int i; - if (theArgs->isStandalone && strcmp(theArgs->verbosity, "info") == 0 && (dT = (time(NULL) - lastTime)) >= interval && !(theArgs->buffStat)) { fputs("==============================================================================\n", stderr); @@ -361,7 +430,21 @@ static void statsDump(TheArgs *theArgs, NetTrans **my, int interval) lastBR[i] = *my[i]->bytesReceived; fputc('\n', stderr); fputs("------------------------------------------------------------------------------\n", stderr); + + if ((long) (*my[i]->pktsReceived) - (long) lastPR < 0) { + lastPR[i] = 0; + lastPD[i] = 0; + lastMD[i] = 0; + } + + if ((*my[i]->pktsDiscarded - lastPD[i]) > 0 || (*my[i]->msgsDiscarded - lastMD[i]) > 0) + storeEvtStat(theArgs, *my[i]->pktsDiscarded - lastPD[i], *my[i]->msgsDiscarded - lastMD[i], i); + + lastPR[i] = *my[i]->pktsReceived; + lastPD[i] = *my[i]->pktsDiscarded; + lastMD[i] = *my[i]->msgsDiscarded; } + lastTime += dT; } } @@ -383,7 +466,7 @@ static void storeInfoStart(const char *n, time_t t, TheArgs *my) Param_storeInt(p, n, "nrofmsgs", my->nrOfMsgs); Param_storeInt(p, n, "stndln", my->isStandalone); Param_storeInt(p, n, "prio", my->priority); - Param_storeInt(p, n, "qsize", my->queueSize); + Param_storeInt(p, n, "qsize", my->queueSize[0]); Param_storeString(p, n, "verb", my->verbosity); desParam(p); } @@ -425,6 +508,12 @@ int main(int argc, char *argv[]) usage(argv[0]); exit(EXIT_FAILURE); } + + if (0 > makeQueues(theArgs)) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + for (i = 0; prioritynames[i].c_name != NULL && 0 != strcmp(prioritynames[i].c_name, theArgs->verbosity); i++) { } if (prioritynames[i].c_name == NULL) { @@ -452,7 +541,7 @@ int main(int argc, char *argv[]) exit(EXIT_FAILURE); } for (i = 0; i < theArgs->nrOfMsgs; i++) { - char buf[80]; + char buf[_POSIX_PATH_MAX]; netTrans[i] = NetTrans_create(theArgs->inPath[i], 0, worker); if (netTrans[i] == NULL) { @@ -463,13 +552,7 @@ int main(int argc, char *argv[]) /* use shared mem name from CL if given */ sprintf(buf, "netqueue%s%d", theArgs->shmname, i); - if (theArgs->queueSize > 0) { - /* if the queueSize is given in CL */ - shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize); - } else { - /* otherwise use variable queue size */ - shmTrans[i] = ShmTrans_open(buf, 2 * (long) theArgs->varQSize[i]); - } + shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize[i]); if (shmTrans[i] == NULL) { syslog(LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror(errno)); @@ -477,15 +560,19 @@ int main(int argc, char *argv[]) } hadTu[i] = NULL; - if (theArgs->queueSize > 0) { + if (theArgs->queueSize[i] > 0) { /* if the queueSize is given in CL */ - hadTuSize[i] = theArgs->queueSize - HadTu_hdrSize(); + /* if( theArgs->queueSize[i] > 102400 + HadTu_hdrSize() ){ */ +/* hadTuSize[i] = 102400; */ +/* } */ +/* else */ + hadTuSize[i] = theArgs->queueSize[i] - HadTu_hdrSize(); } else { /* otherwise use variable queue size */ /* hadTuSize[i] = theArgs->varQSize[i] - HadTu_hdrSize(); */ - hadTuSize[i] = 51200; + hadTuSize[i] = 204800; /* - * This 50 kB must be bigger than a size of a message which + * This 200 kB must be bigger than a size of a message which * should usually be less than 32kB (UDP_packet_size - water_mark). * However some HUGE messages can appear. Those HUGE messages must * be below hadTuSize!!! Otherwise the code will crash.