]> jspc29.x-matter.uni-frankfurt.de Git - daqdata.git/commitdiff
Variable queue sizes fixed. Sergey.
authorhadaq <hadaq>
Sat, 5 Jun 2010 23:00:27 +0000 (23:00 +0000)
committerhadaq <hadaq>
Sat, 5 Jun 2010 23:00:27 +0000 (23:00 +0000)
hadaq/evtbuild.c

index fef2b0dd40e9d40b412e9e9a1b5d16f0f4820f6e..9cb6ad1b0b08228d672c6bb1c94ada058c14e359 100644 (file)
@@ -1,4 +1,4 @@
-static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.105 2010-04-20 16:29:34 hadaq Exp $";
+static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.106 2010-06-05 23:00:27 hadaq Exp $";
 
 #define _POSIX_C_SOURCE 199309L
 #define SYSLOG_NAMES
@@ -35,7 +35,7 @@ static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hada
 #include "ansiTape.h"
 #include "genid32.h"
 
-#ifdef RFIO
+#undef RFIO
 #include "rawapin.h"                   /* for rfio */
 static RFILE *fRemote = NULL;
 #endif
@@ -79,7 +79,8 @@ typedef struct TheArgsS {
        int slowCtrlFileCnt;
        unsigned long isStandalone;
        unsigned long priority;
-       unsigned long queueSize;
+       unsigned long queueSize[MAXINPATH];
+       int queueSizeCnt;
        char verbosity[PARAM_MAX_VALUE_LEN];
        unsigned long evtId;
        unsigned long maxFileSz;
@@ -93,9 +94,10 @@ typedef struct TheArgsS {
        double secsizelimit;
        double resdown_offset;
 
-       /* the following arguments are for the variable queue size. S.Y. */
-       unsigned long varQSize[MAXINPATH];
-       int varQSizeCnt;
+       /* the following arguments are for the variable queue size */
+       char *queueVarSize[MAXINPATH];
+       char queueVarSizeS[MAXINPATH][PARAM_MAX_NAME_LEN];
+       int queueVarSizeCnt;
 
        char rfioRemotePath[PARAM_MAX_VALUE_LEN];
        char rfioLustrePath[PARAM_MAX_VALUE_LEN];
@@ -111,6 +113,7 @@ typedef struct TheArgsS {
        char shmname[PARAM_MAX_VALUE_LEN];
        unsigned int ebnum;
        char runinfo2ora[PARAM_MAX_VALUE_LEN];
+       char evtstat[PARAM_MAX_VALUE_LEN];      /* file name for statistics of discarded event */
 
        char fileName[PARAM_MAX_VALUE_LEN];
 } TheArgs;
@@ -213,6 +216,41 @@ static void usage(const char *progName)
        syslog(LOG_ERR, "Usage: [--ignore] ignore trigger mismatch conditions");
 }
 
+static int makeQueues(TheArgs *my)
+{
+       int retVal = 0;
+       int i;
+
+       int queueNum;
+       char newQueueSize[30];
+
+       /* Init all queue sizes */
+       for (i = 1; i < my->nrOfMsgs; i++) {
+               my->queueSize[i] = my->queueSize[0];
+       }
+
+       /* Extract specific values for queue sizes */
+       for (i = 0; i < my->queueVarSizeCnt; i++) {
+
+               if (0 == (sscanf(my->queueVarSize[i], "%d:%s", &queueNum, newQueueSize))) {
+                       syslog(LOG_ERR, "evtbuild.c: makeQueues: %s", "queueVarSize option is wrong.");
+                       retVal = -1;
+                       break;
+               } else {
+                       if (queueNum >= my->nrOfMsgs) {
+                               syslog(LOG_ERR, "evtbuild.c: makeQueues: %s",
+                                          "queue number in queueVarSize option exceeds maximum number of data sources.");
+                               retVal = -1;
+                               break;
+                       }
+
+                       my->queueSize[queueNum] = atoi(newQueueSize);
+               }
+       }
+
+       return retVal;
+}
+
 static void argsDump(TheArgs *my)
 {
        int i;
@@ -227,7 +265,7 @@ static void argsDump(TheArgs *my)
        syslog(LOG_DEBUG, "expId: %s", my->expId);
        syslog(LOG_DEBUG, "priority: %d", my->priority);
        syslog(LOG_DEBUG, "isStandalone: %d", my->isStandalone);
-       syslog(LOG_DEBUG, "queueSize: %d", my->queueSize);
+       syslog(LOG_DEBUG, "queueSize: %d", my->queueSize[0]);
        syslog(LOG_DEBUG, "verbosity: %s", my->verbosity);
        syslog(LOG_DEBUG, "evtId: %ld", my->evtId);
        syslog(LOG_DEBUG, "maxFileSz: %ld", my->maxFileSz);
@@ -276,7 +314,7 @@ static void argsDefault(TheArgs *my)
        strcpy(my->shmname, "");
        my->priority = 0;
        my->isStandalone = 1;
-       my->queueSize = 4 * 1024 * 1024UL;
+       my->queueSize[0] = 64 * 1024 * 1024UL;
        strcpy(my->verbosity, "info");
        my->evtId = 0;
        my->maxFileSz = (1.5 * 1024 * 1024 * 1024UL - 1);
@@ -290,8 +328,9 @@ static void argsDefault(TheArgs *my)
        strcpy(my->respath, "");
 
        for (i = 0; i < MAXINPATH; i++) {
-               my->varQSize[i] = 4 * 1024 * 1024UL;
+               my->queueVarSize[i] = my->queueVarSizeS[i];
        }
+       my->queueVarSizeCnt = 0;        /* no varible queue sizes by default */
 
        strcpy(my->rfioRemotePath, "");
        strcpy(my->rfioLustrePath, "");
@@ -308,6 +347,7 @@ static void argsDefault(TheArgs *my)
 
        /* read path from DAQ_SETUP and define full file name */
        sprintf(my->runinfo2ora, "%s_runinfo2ora.txt", getenv("DAQ_SETUP"));
+       sprintf(my->evtstat, "%s_evtstat.txt", getenv("DAQ_SETUP"));
 }
 
 static int argsFromCL(TheArgs *my, int argc, char *argv[])
@@ -344,7 +384,7 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[])
                        {"ignore", 0, 0, 'i'},
                        {0, 0, 0, 0}
                };
-               i = getopt_long(argc, argv, "am:f:r:o:d:q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:A:bEL:S:B:O:iW:M:F:X:C:", long_options,
+               i = getopt_long(argc, argv, "am:f:r:o:d:q:Q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:A:bEL:S:B:O:iW:M:F:X:C:", long_options,
                                                &option_index);
                if (i == -1)
                        break;
@@ -371,7 +411,10 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[])
                        my->priority = strtoul(optarg, NULL, 0);
                        break;
                case 'q':
-                       my->queueSize = strtoul(optarg, NULL, 0);
+                       my->queueSize[0] = strtoul(optarg, NULL, 0);
+                       break;
+               case 'Q':
+                       strcpy(my->queueVarSize[my->queueVarSizeCnt++], optarg);
                        break;
                case 'v':
                        strcpy(my->verbosity, optarg);
@@ -482,10 +525,8 @@ static int argsCheck(TheArgs *my)
        }
 
        /* Conditions: if the Remote Event Server is used */
-       if ((my->resdownscale != 0 && (my->resnumevents == -1 ||
-                                                                  (strcmp(my->respath, "") == 0))) ||
-               (my->resnumevents != -1 && (my->resdownscale == 0 ||
-                                                                       (strcmp(my->respath, "") == 0))) ||
+       if ((my->resdownscale != 0 && (my->resnumevents == -1 || (strcmp(my->respath, "") == 0))) ||
+               (my->resnumevents != -1 && (my->resdownscale == 0 || (strcmp(my->respath, "") == 0))) ||
                ((strcmp(my->respath, "") != 0) && (my->resnumevents == -1 || my->resdownscale == 0))) {
 
                fprintf(stderr,
@@ -518,7 +559,7 @@ static int argsCheck(TheArgs *my)
                return 1;
        }
 
-       if (((my->maxFileSz) < (my->queueSize))) {
+       if (((my->maxFileSz) < (my->queueSize[0]))) {
                fprintf(stderr, "<E> evtbuild.c, argsCheck(): --filesize must be larger than queuesize(-q)\n");
                return 1;
        }
@@ -549,12 +590,12 @@ static int argsFromParam(TheArgs *my, int argc, char *argv[])
        Param_getString(param, name, "expid", &paramWasFound, my->expId);
        Param_getInt(param, name, "stndln", &paramWasFound, &my->isStandalone);
        Param_getInt(param, name, "prio", &paramWasFound, &my->priority);
-       Param_getInt(param, name, "qsize", &paramWasFound, &my->queueSize);
+       Param_getInt(param, name, "qsize", &paramWasFound, &my->queueSize[0]);
        Param_getString(param, name, "verb", &paramWasFound, my->verbosity);
        Param_getInt(param, name, "evtid", &paramWasFound, &my->evtId);
        Param_getInt(param, name, "maxfilesz", &paramWasFound, &my->maxFileSz);
 
-       Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize);
+       /* Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize); */
        desParam(param);
 }
 
@@ -599,7 +640,7 @@ static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans **
        /* Add statistic for fill levels of buffers in percentage. */
 
        static time_t t_0 = 0;
-       float buffSize, queueSize;
+       unsigned long buffSize, queueSize;
        time_t t, dT;
        int i;
        unsigned long fillLevel;
@@ -609,16 +650,17 @@ static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans **
 
        if (dT >= interval) {
                for (i = 0; i < theArgs->nrOfMsgs; i++) {
-                       buffSize = 2 * theArgs->varQSize[i];
+                       buffSize = 2 * theArgs->queueSize[i];
                        queueSize = HadTuQueue_size(shmtr[i]->rdQueue);
 
                        /* Add here statistic for fill levels of buffers */
-                       fillLevel = (unsigned long) (100 * queueSize + 0.5) / buffSize;
+                       fillLevel = (unsigned long) (100. * queueSize + 0.5) / buffSize;
                        (*my->evtbuildBuff[i]) = fillLevel;
+
+                       /* printf("q[%02d]%5d\%\n",i,fillLevel); */
                }
+               t_0 = t;
        }
-
-       t_0 = t;
 }
 
 static void statsBufferDump(TheArgs *theArgs, TheStats *my, float interval, HadTuQueue **htuq, ShmTrans **shmtr, char *progName)
@@ -668,7 +710,7 @@ static void statsBufferDump(TheArgs *theArgs, TheStats *my, float interval, HadT
                                for (j = 0; j < maxnorm; j++) {
                                        fprintf(stderr, "%1d ", maxnorm - j - 1);
                                        for (i = 0; i < theArgs->nrOfMsgs; i++) {
-                                               buffSize = 2 * theArgs->varQSize[i];
+                                               buffSize = 2 * theArgs->queueSize[i];
                                                queueSize = HadTuQueue_size(shmtr[i]->rdQueue);
 
                                                if (maxnorm - maxnorm * queueSize / buffSize < j) {
@@ -788,14 +830,26 @@ unsigned long getRunId(TheArgs *my)
        return myRunId;
 }
 
-static void statsDump(TheArgs *theArgs, TheStats *my, int interval)
+static void storeEvtStat(TheArgs *myArgs, unsigned long evtstat)
 {
-       static unsigned long lastEC;
 
-       static unsigned long lastEE;
-       static unsigned long lastTE;
-       static unsigned long lastED;
-       static unsigned long lastBW;
+       FILE *fp;
+       char ltime[20];                         /* local time */
+       time_t t = time(NULL);
+       strftime(ltime, 20, "%Y-%m-%d %H:%M:%S", localtime(&t));
+
+       fp = fopen(myArgs->evtstat, "a+");
+       fprintf(fp, "%s %d %s %d %d %d\n", "daq_evtbuild:", (int) getpid(), ltime, myArgs->runNr, myArgs->ebnum, evtstat);
+       fclose(fp);
+}
+
+static void statsDump(TheArgs *theArgs, TheStats *my, int interval)
+{
+       static unsigned long lastEC = 0;
+       static unsigned long lastEE = 0;
+       static unsigned long lastTE = 0;
+       static unsigned long lastED = 0;
+       static unsigned long lastBW = 0;
        static time_t t0 = 0;
        time_t t, dT;
        int i;
@@ -815,16 +869,25 @@ static void statsDump(TheArgs *theArgs, TheStats *my, int interval)
                        fprintf(stderr, "%19s:%6s", "evtsTagError  ", unit(*my->evtsTagError));
                        fputc('\n', stderr);
 
-                       if (dT > 0) {
-                               fprintf(stderr, "%19s:%6s", "evtsComplete/s", unit((*my->evtsComplete - lastEC) / dT));
-                               fprintf(stderr, "%19s:%6s", "evtsDiscarded/s", unit((*my->evtsDiscarded - lastED) / dT));
-                               fprintf(stderr, "%19s:%6s", "bytesWritten/s", unit((*my->bytesWritten - lastBW) / dT));
-                               fputc('\n', stderr);
-                               fprintf(stderr, "%19s:%6s", "evtsDataError/s", unit((*my->evtsDataError - lastEE) / dT));
-                               fprintf(stderr, "%19s:%6s", "evtsTagError/s", unit((*my->evtsTagError - lastTE) / dT));
-                               fputc('\n', stderr);
+                       if ((long) (*my->evtsComplete) - (long) lastEC < 0) {
+                               lastEC = 0;
+                               lastED = 0;
+                               lastBW = 0;
+                               lastEE = 0;
+                               lastTE = 0;
                        }
 
+                       fprintf(stderr, "%19s:%6s", "evtsComplete/s", unit((*my->evtsComplete - lastEC) / dT));
+                       fprintf(stderr, "%19s:%6s", "evtsDiscarded/s", unit((*my->evtsDiscarded - lastED) / dT));
+                       fprintf(stderr, "%19s:%6s", "bytesWritten/s", unit((*my->bytesWritten - lastBW) / dT));
+                       fputc('\n', stderr);
+                       fprintf(stderr, "%19s:%6s", "evtsDataError/s", unit((*my->evtsDataError - lastEE) / dT));
+                       fprintf(stderr, "%19s:%6s", "evtsTagError/s", unit((*my->evtsTagError - lastTE) / dT));
+                       fputc('\n', stderr);
+
+                       if (*my->evtsDiscarded - lastED > 0)
+                               storeEvtStat(theArgs, *my->evtsDiscarded - lastED);
+
                        lastEC = *my->evtsComplete;
                        lastEE = *my->evtsDataError;
                        lastTE = *my->evtsTagError;
@@ -863,7 +926,6 @@ static void statsDump(TheArgs *theArgs, TheStats *my, int interval)
        }
 }
 
-
 static void storeRunInfoStart(time_t t, TheArgs *myArgs)
 {
        /* open ascii file eb_runinfo2ora.txt to store simple information for 
@@ -918,7 +980,7 @@ static void storeInfoStart(const char *n, time_t t, TheArgs *my)
        Param_storeString(p, n, "expid", my->expId);
        Param_storeInt(p, n, "prio", my->priority);
        Param_storeInt(p, n, "stndln", my->isStandalone);
-       Param_storeInt(p, n, "qsize", my->queueSize);
+       Param_storeInt(p, n, "qsize", my->queueSize[0]);
        Param_storeString(p, n, "verb", my->verbosity);
        if (my->resdownscale != 0) {
                Param_storeInt(p, n, "resdownscale", my->resdownscale);
@@ -1572,6 +1634,11 @@ int main(int argc, char *argv[])
                exit(EXIT_FAILURE);
        }
 
+       if (0 > makeQueues(theArgs)) {
+               usage(argv[0]);
+               exit(EXIT_FAILURE);
+       }
+
        if (argsCheck(theArgs)) {
                sleep(10);
                exit(EXIT_FAILURE);
@@ -1618,7 +1685,7 @@ int main(int argc, char *argv[])
                hadTuQueue = malloc(sizeof(HadTuQueue *));
 
                sprintf(buf, "subevtqueue%s", theArgs->shmname);
-               shmTrans[0] = ShmTrans_create(buf, 2 * theArgs->queueSize);
+               shmTrans[0] = ShmTrans_create(buf, 2 * theArgs->queueSize[0]);
 
                hadTuQueue[0] = NULL;
                theArgs->nrOfMsgs = 1;
@@ -1632,15 +1699,9 @@ int main(int argc, char *argv[])
                        /* use shared mem name from CL if given */
                        sprintf(buf, "netqueue%s%d", theArgs->shmname, i);
 
-                       if (theArgs->queueSize > 0) {
-                               /* The queueSize is given in CL.  */
-                               shmTrans[i] = ShmTrans_create(buf, 2 * theArgs->queueSize);
-                       } else {
-                               shmTrans[i] = ShmTrans_create(buf, 2 * (long) theArgs->varQSize[i]);
-
-                               if (shmTrans[i] == NULL) {
-                                       fprintf(stderr, "<E> ShmTrans_create: could not create buffer /dev/shm/%s.shm\n", buf);
-                               }
+                       shmTrans[i] = ShmTrans_create(buf, 2 * theArgs->queueSize[i]);
+                       if (shmTrans[i] == NULL) {
+                               fprintf(stderr, "<E> ShmTrans_create: could not create buffer /dev/shm/%s.shm\n", buf);
                        }
 
                        hadTuQueue[i] = NULL;
@@ -1879,7 +1940,7 @@ int main(int argc, char *argv[])
                 *           Something went wrong with sinchronization of Event Builders,
                 *           close the file.
                 */
-               if ((!(theArgs->epicsCtrl) && (*theStats->bytesWritten) >= ((theArgs->maxFileSz) - (theArgs->queueSize))) ||
+               if ((!(theArgs->epicsCtrl) && (*theStats->bytesWritten) >= ((theArgs->maxFileSz) - (theArgs->queueSize[0]))) ||
                        (theArgs->epicsCtrl && runNr < newRunId) ||
                        (theArgs->epicsCtrl && (*theStats->bytesWritten) >= 1900000000) || (theArgs->epicsCtrl && newRunId == 0)) {
                        evt = newEvt(EvtDecoding_64bitAligned, EvtId_runStop);
@@ -1950,7 +2011,7 @@ int main(int argc, char *argv[])
        /* store simple stop run info */
        storeRunInfoStop(ourTime, theArgs, theStats);
 
-       statsDump(theArgs, theStats, 1);
+       /* statsDump(theArgs, theStats, 1); */
 
        for (i = 0; i < theArgs->nrOfMsgs; i++)
                ShmTrans_remove(shmTrans[i]);