From 0e5732b9d7cd7c3baff28aed9fdfff13487199b8 Mon Sep 17 00:00:00 2001 From: hadaq Date: Sat, 5 Jun 2010 23:00:27 +0000 Subject: [PATCH] Variable queue sizes fixed. Sergey. --- hadaq/evtbuild.c | 165 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 113 insertions(+), 52 deletions(-) diff --git a/hadaq/evtbuild.c b/hadaq/evtbuild.c index fef2b0d..9cb6ad1 100644 --- a/hadaq/evtbuild.c +++ b/hadaq/evtbuild.c @@ -1,4 +1,4 @@ -static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.105 2010-04-20 16:29:34 hadaq Exp $"; +static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.106 2010-06-05 23:00:27 hadaq Exp $"; #define _POSIX_C_SOURCE 199309L #define SYSLOG_NAMES @@ -35,7 +35,7 @@ static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hada #include "ansiTape.h" #include "genid32.h" -#ifdef RFIO +#undef RFIO #include "rawapin.h" /* for rfio */ static RFILE *fRemote = NULL; #endif @@ -79,7 +79,8 @@ typedef struct TheArgsS { int slowCtrlFileCnt; unsigned long isStandalone; unsigned long priority; - unsigned long queueSize; + unsigned long queueSize[MAXINPATH]; + int queueSizeCnt; char verbosity[PARAM_MAX_VALUE_LEN]; unsigned long evtId; unsigned long maxFileSz; @@ -93,9 +94,10 @@ typedef struct TheArgsS { double secsizelimit; double resdown_offset; - /* the following arguments are for the variable queue size. S.Y. */ - unsigned long varQSize[MAXINPATH]; - int varQSizeCnt; + /* the following arguments are for the variable queue size */ + char *queueVarSize[MAXINPATH]; + char queueVarSizeS[MAXINPATH][PARAM_MAX_NAME_LEN]; + int queueVarSizeCnt; char rfioRemotePath[PARAM_MAX_VALUE_LEN]; char rfioLustrePath[PARAM_MAX_VALUE_LEN]; @@ -111,6 +113,7 @@ typedef struct TheArgsS { char shmname[PARAM_MAX_VALUE_LEN]; unsigned int ebnum; char runinfo2ora[PARAM_MAX_VALUE_LEN]; + char evtstat[PARAM_MAX_VALUE_LEN]; /* file name for statistics of discarded event */ char fileName[PARAM_MAX_VALUE_LEN]; } TheArgs; @@ -213,6 +216,41 @@ static void usage(const char *progName) syslog(LOG_ERR, "Usage: [--ignore] ignore trigger mismatch conditions"); } +static int makeQueues(TheArgs *my) +{ + int retVal = 0; + int i; + + int queueNum; + char newQueueSize[30]; + + /* Init all queue sizes */ + for (i = 1; i < my->nrOfMsgs; i++) { + my->queueSize[i] = my->queueSize[0]; + } + + /* Extract specific values for queue sizes */ + for (i = 0; i < my->queueVarSizeCnt; i++) { + + if (0 == (sscanf(my->queueVarSize[i], "%d:%s", &queueNum, newQueueSize))) { + syslog(LOG_ERR, "evtbuild.c: makeQueues: %s", "queueVarSize option is wrong."); + retVal = -1; + break; + } else { + if (queueNum >= my->nrOfMsgs) { + syslog(LOG_ERR, "evtbuild.c: makeQueues: %s", + "queue number in queueVarSize option exceeds maximum number of data sources."); + retVal = -1; + break; + } + + my->queueSize[queueNum] = atoi(newQueueSize); + } + } + + return retVal; +} + static void argsDump(TheArgs *my) { int i; @@ -227,7 +265,7 @@ static void argsDump(TheArgs *my) syslog(LOG_DEBUG, "expId: %s", my->expId); syslog(LOG_DEBUG, "priority: %d", my->priority); syslog(LOG_DEBUG, "isStandalone: %d", my->isStandalone); - syslog(LOG_DEBUG, "queueSize: %d", my->queueSize); + syslog(LOG_DEBUG, "queueSize: %d", my->queueSize[0]); syslog(LOG_DEBUG, "verbosity: %s", my->verbosity); syslog(LOG_DEBUG, "evtId: %ld", my->evtId); syslog(LOG_DEBUG, "maxFileSz: %ld", my->maxFileSz); @@ -276,7 +314,7 @@ static void argsDefault(TheArgs *my) strcpy(my->shmname, ""); my->priority = 0; my->isStandalone = 1; - my->queueSize = 4 * 1024 * 1024UL; + my->queueSize[0] = 64 * 1024 * 1024UL; strcpy(my->verbosity, "info"); my->evtId = 0; my->maxFileSz = (1.5 * 1024 * 1024 * 1024UL - 1); @@ -290,8 +328,9 @@ static void argsDefault(TheArgs *my) strcpy(my->respath, ""); for (i = 0; i < MAXINPATH; i++) { - my->varQSize[i] = 4 * 1024 * 1024UL; + my->queueVarSize[i] = my->queueVarSizeS[i]; } + my->queueVarSizeCnt = 0; /* no varible queue sizes by default */ strcpy(my->rfioRemotePath, ""); strcpy(my->rfioLustrePath, ""); @@ -308,6 +347,7 @@ static void argsDefault(TheArgs *my) /* read path from DAQ_SETUP and define full file name */ sprintf(my->runinfo2ora, "%s_runinfo2ora.txt", getenv("DAQ_SETUP")); + sprintf(my->evtstat, "%s_evtstat.txt", getenv("DAQ_SETUP")); } static int argsFromCL(TheArgs *my, int argc, char *argv[]) @@ -344,7 +384,7 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[]) {"ignore", 0, 0, 'i'}, {0, 0, 0, 0} }; - i = getopt_long(argc, argv, "am:f:r:o:d:q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:A:bEL:S:B:O:iW:M:F:X:C:", long_options, + i = getopt_long(argc, argv, "am:f:r:o:d:q:Q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:A:bEL:S:B:O:iW:M:F:X:C:", long_options, &option_index); if (i == -1) break; @@ -371,7 +411,10 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[]) my->priority = strtoul(optarg, NULL, 0); break; case 'q': - my->queueSize = strtoul(optarg, NULL, 0); + my->queueSize[0] = strtoul(optarg, NULL, 0); + break; + case 'Q': + strcpy(my->queueVarSize[my->queueVarSizeCnt++], optarg); break; case 'v': strcpy(my->verbosity, optarg); @@ -482,10 +525,8 @@ static int argsCheck(TheArgs *my) } /* Conditions: if the Remote Event Server is used */ - if ((my->resdownscale != 0 && (my->resnumevents == -1 || - (strcmp(my->respath, "") == 0))) || - (my->resnumevents != -1 && (my->resdownscale == 0 || - (strcmp(my->respath, "") == 0))) || + if ((my->resdownscale != 0 && (my->resnumevents == -1 || (strcmp(my->respath, "") == 0))) || + (my->resnumevents != -1 && (my->resdownscale == 0 || (strcmp(my->respath, "") == 0))) || ((strcmp(my->respath, "") != 0) && (my->resnumevents == -1 || my->resdownscale == 0))) { fprintf(stderr, @@ -518,7 +559,7 @@ static int argsCheck(TheArgs *my) return 1; } - if (((my->maxFileSz) < (my->queueSize))) { + if (((my->maxFileSz) < (my->queueSize[0]))) { fprintf(stderr, " evtbuild.c, argsCheck(): --filesize must be larger than queuesize(-q)\n"); return 1; } @@ -549,12 +590,12 @@ static int argsFromParam(TheArgs *my, int argc, char *argv[]) Param_getString(param, name, "expid", ¶mWasFound, my->expId); Param_getInt(param, name, "stndln", ¶mWasFound, &my->isStandalone); Param_getInt(param, name, "prio", ¶mWasFound, &my->priority); - Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize); + Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize[0]); Param_getString(param, name, "verb", ¶mWasFound, my->verbosity); Param_getInt(param, name, "evtid", ¶mWasFound, &my->evtId); Param_getInt(param, name, "maxfilesz", ¶mWasFound, &my->maxFileSz); - Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize); + /* Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize); */ desParam(param); } @@ -599,7 +640,7 @@ static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans ** /* Add statistic for fill levels of buffers in percentage. */ static time_t t_0 = 0; - float buffSize, queueSize; + unsigned long buffSize, queueSize; time_t t, dT; int i; unsigned long fillLevel; @@ -609,16 +650,17 @@ static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans ** if (dT >= interval) { for (i = 0; i < theArgs->nrOfMsgs; i++) { - buffSize = 2 * theArgs->varQSize[i]; + buffSize = 2 * theArgs->queueSize[i]; queueSize = HadTuQueue_size(shmtr[i]->rdQueue); /* Add here statistic for fill levels of buffers */ - fillLevel = (unsigned long) (100 * queueSize + 0.5) / buffSize; + fillLevel = (unsigned long) (100. * queueSize + 0.5) / buffSize; (*my->evtbuildBuff[i]) = fillLevel; + + /* printf("q[%02d]%5d\%\n",i,fillLevel); */ } + t_0 = t; } - - t_0 = t; } static void statsBufferDump(TheArgs *theArgs, TheStats *my, float interval, HadTuQueue **htuq, ShmTrans **shmtr, char *progName) @@ -668,7 +710,7 @@ static void statsBufferDump(TheArgs *theArgs, TheStats *my, float interval, HadT for (j = 0; j < maxnorm; j++) { fprintf(stderr, "%1d ", maxnorm - j - 1); for (i = 0; i < theArgs->nrOfMsgs; i++) { - buffSize = 2 * theArgs->varQSize[i]; + buffSize = 2 * theArgs->queueSize[i]; queueSize = HadTuQueue_size(shmtr[i]->rdQueue); if (maxnorm - maxnorm * queueSize / buffSize < j) { @@ -788,14 +830,26 @@ unsigned long getRunId(TheArgs *my) return myRunId; } -static void statsDump(TheArgs *theArgs, TheStats *my, int interval) +static void storeEvtStat(TheArgs *myArgs, unsigned long evtstat) { - static unsigned long lastEC; - static unsigned long lastEE; - static unsigned long lastTE; - static unsigned long lastED; - static unsigned long lastBW; + FILE *fp; + char ltime[20]; /* local time */ + time_t t = time(NULL); + strftime(ltime, 20, "%Y-%m-%d %H:%M:%S", localtime(&t)); + + fp = fopen(myArgs->evtstat, "a+"); + fprintf(fp, "%s %d %s %d %d %d\n", "daq_evtbuild:", (int) getpid(), ltime, myArgs->runNr, myArgs->ebnum, evtstat); + fclose(fp); +} + +static void statsDump(TheArgs *theArgs, TheStats *my, int interval) +{ + static unsigned long lastEC = 0; + static unsigned long lastEE = 0; + static unsigned long lastTE = 0; + static unsigned long lastED = 0; + static unsigned long lastBW = 0; static time_t t0 = 0; time_t t, dT; int i; @@ -815,16 +869,25 @@ static void statsDump(TheArgs *theArgs, TheStats *my, int interval) fprintf(stderr, "%19s:%6s", "evtsTagError ", unit(*my->evtsTagError)); fputc('\n', stderr); - if (dT > 0) { - fprintf(stderr, "%19s:%6s", "evtsComplete/s", unit((*my->evtsComplete - lastEC) / dT)); - fprintf(stderr, "%19s:%6s", "evtsDiscarded/s", unit((*my->evtsDiscarded - lastED) / dT)); - fprintf(stderr, "%19s:%6s", "bytesWritten/s", unit((*my->bytesWritten - lastBW) / dT)); - fputc('\n', stderr); - fprintf(stderr, "%19s:%6s", "evtsDataError/s", unit((*my->evtsDataError - lastEE) / dT)); - fprintf(stderr, "%19s:%6s", "evtsTagError/s", unit((*my->evtsTagError - lastTE) / dT)); - fputc('\n', stderr); + if ((long) (*my->evtsComplete) - (long) lastEC < 0) { + lastEC = 0; + lastED = 0; + lastBW = 0; + lastEE = 0; + lastTE = 0; } + fprintf(stderr, "%19s:%6s", "evtsComplete/s", unit((*my->evtsComplete - lastEC) / dT)); + fprintf(stderr, "%19s:%6s", "evtsDiscarded/s", unit((*my->evtsDiscarded - lastED) / dT)); + fprintf(stderr, "%19s:%6s", "bytesWritten/s", unit((*my->bytesWritten - lastBW) / dT)); + fputc('\n', stderr); + fprintf(stderr, "%19s:%6s", "evtsDataError/s", unit((*my->evtsDataError - lastEE) / dT)); + fprintf(stderr, "%19s:%6s", "evtsTagError/s", unit((*my->evtsTagError - lastTE) / dT)); + fputc('\n', stderr); + + if (*my->evtsDiscarded - lastED > 0) + storeEvtStat(theArgs, *my->evtsDiscarded - lastED); + lastEC = *my->evtsComplete; lastEE = *my->evtsDataError; lastTE = *my->evtsTagError; @@ -863,7 +926,6 @@ static void statsDump(TheArgs *theArgs, TheStats *my, int interval) } } - static void storeRunInfoStart(time_t t, TheArgs *myArgs) { /* open ascii file eb_runinfo2ora.txt to store simple information for @@ -918,7 +980,7 @@ static void storeInfoStart(const char *n, time_t t, TheArgs *my) Param_storeString(p, n, "expid", my->expId); Param_storeInt(p, n, "prio", my->priority); Param_storeInt(p, n, "stndln", my->isStandalone); - Param_storeInt(p, n, "qsize", my->queueSize); + Param_storeInt(p, n, "qsize", my->queueSize[0]); Param_storeString(p, n, "verb", my->verbosity); if (my->resdownscale != 0) { Param_storeInt(p, n, "resdownscale", my->resdownscale); @@ -1572,6 +1634,11 @@ int main(int argc, char *argv[]) exit(EXIT_FAILURE); } + if (0 > makeQueues(theArgs)) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + if (argsCheck(theArgs)) { sleep(10); exit(EXIT_FAILURE); @@ -1618,7 +1685,7 @@ int main(int argc, char *argv[]) hadTuQueue = malloc(sizeof(HadTuQueue *)); sprintf(buf, "subevtqueue%s", theArgs->shmname); - shmTrans[0] = ShmTrans_create(buf, 2 * theArgs->queueSize); + shmTrans[0] = ShmTrans_create(buf, 2 * theArgs->queueSize[0]); hadTuQueue[0] = NULL; theArgs->nrOfMsgs = 1; @@ -1632,15 +1699,9 @@ int main(int argc, char *argv[]) /* use shared mem name from CL if given */ sprintf(buf, "netqueue%s%d", theArgs->shmname, i); - if (theArgs->queueSize > 0) { - /* The queueSize is given in CL. */ - shmTrans[i] = ShmTrans_create(buf, 2 * theArgs->queueSize); - } else { - shmTrans[i] = ShmTrans_create(buf, 2 * (long) theArgs->varQSize[i]); - - if (shmTrans[i] == NULL) { - fprintf(stderr, " ShmTrans_create: could not create buffer /dev/shm/%s.shm\n", buf); - } + shmTrans[i] = ShmTrans_create(buf, 2 * theArgs->queueSize[i]); + if (shmTrans[i] == NULL) { + fprintf(stderr, " ShmTrans_create: could not create buffer /dev/shm/%s.shm\n", buf); } hadTuQueue[i] = NULL; @@ -1879,7 +1940,7 @@ int main(int argc, char *argv[]) * Something went wrong with sinchronization of Event Builders, * close the file. */ - if ((!(theArgs->epicsCtrl) && (*theStats->bytesWritten) >= ((theArgs->maxFileSz) - (theArgs->queueSize))) || + if ((!(theArgs->epicsCtrl) && (*theStats->bytesWritten) >= ((theArgs->maxFileSz) - (theArgs->queueSize[0]))) || (theArgs->epicsCtrl && runNr < newRunId) || (theArgs->epicsCtrl && (*theStats->bytesWritten) >= 1900000000) || (theArgs->epicsCtrl && newRunId == 0)) { evt = newEvt(EvtDecoding_64bitAligned, EvtId_runStop); @@ -1950,7 +2011,7 @@ int main(int argc, char *argv[]) /* store simple stop run info */ storeRunInfoStop(ourTime, theArgs, theStats); - statsDump(theArgs, theStats, 1); + /* statsDump(theArgs, theStats, 1); */ for (i = 0; i < theArgs->nrOfMsgs; i++) ShmTrans_remove(shmTrans[i]); -- 2.43.0