]> jspc29.x-matter.uni-frankfurt.de Git - daqdata.git/commitdiff
Variable queue sizes fixed. Sergey.
authorhadaq <hadaq>
Sat, 5 Jun 2010 23:02:30 +0000 (23:02 +0000)
committerhadaq <hadaq>
Sat, 5 Jun 2010 23:02:30 +0000 (23:02 +0000)
hadaq/netmem.c

index 61ffc527f74b59e45245981260171d1a0c174133..52390af96946d9fbfbccede2f9fd194c30285e76 100644 (file)
@@ -1,4 +1,4 @@
-static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.36 2009-12-10 15:39:55 hadaq Exp $";
+static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.37 2010-06-05 23:02:30 hadaq Exp $";
 
 
 #define _POSIX_C_SOURCE 199309L
@@ -29,12 +29,18 @@ typedef struct TheArgsS {
        unsigned long nrOfMsgs;
        unsigned long isStandalone;
        unsigned long priority;
-       unsigned long queueSize;
-       unsigned long varQSize[MAXINPATH];
-       int varQSizeCnt;
+       unsigned long queueSize[MAXINPATH];
+       int queueSizeCnt;
+
+       /* the following arguments are for the variable queue size */
+       char *queueVarSize[MAXINPATH];
+       char queueVarSizeS[MAXINPATH][PARAM_MAX_NAME_LEN];
+       int queueVarSizeCnt;
+
        char verbosity[PARAM_MAX_VALUE_LEN];
        unsigned int buffStat;
        char shmname[PARAM_MAX_VALUE_LEN];
+       char evtstat[PARAM_MAX_VALUE_LEN];
 } TheArgs;
 
 typedef struct TheStatsS {
@@ -55,6 +61,43 @@ static void usage(const char *progName)
        syslog(LOG_ERR, "Usage: %s -i inPath [-i inPath] -m nrOfMsgs [-p priority]", progName);
        syslog(LOG_ERR, "[-b] show fill levels of buffers]");
        syslog(LOG_ERR, "[-S shmem_name] extension of a shared memory name to be used");
+       syslog(LOG_ERR, "[-q size] queue size");
+       syslog(LOG_ERR, "[-Q num:size] used to set specific queue size for given queue number");
+}
+
+static int makeQueues(TheArgs *my)
+{
+       int retVal = 0;
+       int i;
+
+       int queueNum;
+       char newQueueSize[30];
+
+       /* Init all queue sizes */
+       for (i = 1; i < my->nrOfMsgs; i++) {
+               my->queueSize[i] = my->queueSize[0];
+       }
+
+       /* Extract specific values for queue sizes */
+       for (i = 0; i < my->queueVarSizeCnt; i++) {
+
+               if (0 == (sscanf(my->queueVarSize[i], "%d:%s", &queueNum, newQueueSize))) {
+                       syslog(LOG_ERR, "evtbuild.c: makeQueues: %s", "queueVarSize option is wrong.");
+                       retVal = -1;
+                       break;
+               } else {
+                       if (queueNum >= my->nrOfMsgs) {
+                               syslog(LOG_ERR, "evtbuild.c: makeQueues: %s",
+                                          "queue number in queueVarSize option exceeds maximum number of data sources.");
+                               retVal = -1;
+                               break;
+                       }
+
+                       my->queueSize[queueNum] = atoi(newQueueSize);
+               }
+       }
+
+       return retVal;
 }
 
 static void argsDump(TheArgs *my)
@@ -67,7 +110,7 @@ static void argsDump(TheArgs *my)
        syslog(LOG_DEBUG, "nrOfMsgs: %d", my->nrOfMsgs);
        syslog(LOG_DEBUG, "priority: %d", my->priority);
        syslog(LOG_DEBUG, "isStandalone: %d", my->isStandalone);
-       syslog(LOG_DEBUG, "queueSize: %d", my->queueSize);
+       syslog(LOG_DEBUG, "queueSize: %d", my->queueSize[0]);
        syslog(LOG_DEBUG, "verbosity: %s", my->verbosity);
        if (strcmp(my->shmname, "") != 0) {
                syslog(LOG_DEBUG, "shmem name: %s", my->shmname);
@@ -85,16 +128,22 @@ static void argsDefault(TheArgs *my)
        my->nrOfMsgs = 0;
        my->priority = 0;
        my->isStandalone = 1;
-       my->queueSize = 4 * 1024 * 1024;
+       my->queueSize[0] = 64 * 1024 * 1024UL;
        strcpy(my->verbosity, "info");
 
        for (i = 0; i < MAXINPATH; i++) {
-               my->varQSize[i] = 4 * 1024 * 1024;
+               my->queueVarSize[i] = my->queueVarSizeS[i];
        }
+       my->queueVarSizeCnt = 0;        /* no varible queue sizes by default */
+
        my->buffStat = 0;
        strcpy(my->shmname, "");
+
+       sprintf(my->evtstat, "%s_evtstat.txt", getenv("DAQ_SETUP"));
 }
 
+
+
 static int argsFromCL(TheArgs *my, int argc, char *argv[])
 {
        extern char *optarg;
@@ -105,7 +154,7 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[])
        inPathCntFromParam = my->inPathCnt;
        my->inPathCnt = 0;
 
-       while ((i = getopt(argc, argv, "ai:m:q:p:v:bS:")) != -1) {
+       while ((i = getopt(argc, argv, "ai:m:q:Q:p:v:bS:")) != -1) {
                switch (i) {
                case 'm':
                        my->nrOfMsgs = atoi(optarg);
@@ -120,7 +169,10 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[])
                        my->priority = atoi(optarg);
                        break;
                case 'q':
-                       my->queueSize = atoi(optarg);
+                       my->queueSize[0] = strtoul(optarg, NULL, 0);
+                       break;
+               case 'Q':
+                       strcpy(my->queueVarSize[my->queueVarSizeCnt++], optarg);
                        break;
                case 'v':
                        strcpy(my->verbosity, optarg);
@@ -158,11 +210,11 @@ static int argsFromParam(TheArgs *my, int argc, char *argv[])
        Param_getInt(param, name, "nrofmsgs", &paramWasFound, &my->nrOfMsgs);
        Param_getInt(param, name, "stndln", &paramWasFound, &my->isStandalone);
        Param_getInt(param, name, "prio", &paramWasFound, &my->priority);
-       Param_getInt(param, name, "qsize", &paramWasFound, &my->queueSize);
+       Param_getInt(param, name, "qsize", &paramWasFound, &my->queueSize[0]);
        Param_getString(param, name, "verb", &paramWasFound, my->verbosity);
 
-       /* this is an argument for a variable queue size. S.Y. */
-       Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize);
+       /* this is an argument for a variable queue size */
+       /* Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize); */
        desParam(param);
 }
 
@@ -208,7 +260,7 @@ static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans **
        /* Add statistic for fill levels of buffers in percentage. */
 
        static time_t t_0 = 0;
-       float buffSize, queueSize;
+       unsigned long buffSize, queueSize;
        time_t t, dT;
        int i;
        unsigned long fillLevel;
@@ -219,21 +271,22 @@ static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans **
 
        if (dT >= interval) {
                for (i = 0; i < theArgs->nrOfMsgs; i++) {
-                       buffSize = 2 * theArgs->varQSize[i];
+                       buffSize = 2 * theArgs->queueSize[i];
                        queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
 
                        /* Add here statistic for fill levels of buffers */
                        fillLevel = (unsigned long) (100 * queueSize + 0.5) / buffSize;
                        (*my->netmemBuff[i]) = fillLevel;
 
+                       /* printf("q[%02d]%5d\%\n",i,fillLevel); */
+
                        /* Add more statistic for recv bytes per second */
                        (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i];
                        lastBytesRecv[i] = *nettr[i]->bytesReceived;
 
                }
+               t_0 = t;
        }
-
-       t_0 = t;
 }
 
 static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr, NetTrans **my)
@@ -259,7 +312,7 @@ static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr,
 
                                fprintf(stderr, "q[%2d]: ", i);
                                if (shmtr[i] != NULL)
-                                       if (theArgs->varQSize[i] * 2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) {
+                                       if (theArgs->queueSize[i] * 2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) {
                                                fprintf(stderr, "%8d ", HadTuQueue_size(shmtr[i]->wrQueue));
                                        } else {
                                                fprintf(stderr, "%8s ", "EXCEEDED");
@@ -283,7 +336,7 @@ static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr,
                        for (j = 0; j < maxnorm; j++) {
                                fprintf(stderr, "%1d ", maxnorm - j - 1);
                                for (i = 0; i < theArgs->nrOfMsgs; i++) {
-                                       buffSize = 2 * theArgs->varQSize[i];
+                                       buffSize = 2 * theArgs->queueSize[i];
                                        queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
 
                                        if (maxnorm - maxnorm * queueSize / buffSize < j) {
@@ -339,14 +392,30 @@ static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr,
        t0 = t;
 }
 
+static void storeEvtStat(TheArgs *myArgs, unsigned long pktsDiscard, unsigned long msgsDiscard, unsigned long queue)
+{
+
+       FILE *fp;
+       char ltime[20];                         /* local time */
+       time_t t = time(NULL);
+       strftime(ltime, 20, "%Y-%m-%d %H:%M:%S", localtime(&t));
+
+       fp = fopen(myArgs->evtstat, "a+");
+       fprintf(fp, "%s %d %s %d %d %d\n", "daq_netmem:  ", (int) getpid(), ltime, queue, pktsDiscard, msgsDiscard);
+       fclose(fp);
+}
+
 static void statsDump(TheArgs *theArgs, NetTrans **my, int interval)
 {
+       static unsigned long lastPR[MAXINPATH];
+       static unsigned long lastPD[MAXINPATH];
+       static unsigned long lastMD[MAXINPATH];
+
        static unsigned long lastBR[MAXINPATH];
        static time_t lastTime = 0;
        time_t dT;
        int i;
 
-
        if (theArgs->isStandalone && strcmp(theArgs->verbosity, "info") == 0 && (dT = (time(NULL) - lastTime)) >= interval
                && !(theArgs->buffStat)) {
                fputs("==============================================================================\n", stderr);
@@ -361,7 +430,21 @@ static void statsDump(TheArgs *theArgs, NetTrans **my, int interval)
                        lastBR[i] = *my[i]->bytesReceived;
                        fputc('\n', stderr);
                        fputs("------------------------------------------------------------------------------\n", stderr);
+
+                       if ((long) (*my[i]->pktsReceived) - (long) lastPR < 0) {
+                               lastPR[i] = 0;
+                               lastPD[i] = 0;
+                               lastMD[i] = 0;
+                       }
+
+                       if ((*my[i]->pktsDiscarded - lastPD[i]) > 0 || (*my[i]->msgsDiscarded - lastMD[i]) > 0)
+                               storeEvtStat(theArgs, *my[i]->pktsDiscarded - lastPD[i], *my[i]->msgsDiscarded - lastMD[i], i);
+
+                       lastPR[i] = *my[i]->pktsReceived;
+                       lastPD[i] = *my[i]->pktsDiscarded;
+                       lastMD[i] = *my[i]->msgsDiscarded;
                }
+
                lastTime += dT;
        }
 }
@@ -383,7 +466,7 @@ static void storeInfoStart(const char *n, time_t t, TheArgs *my)
        Param_storeInt(p, n, "nrofmsgs", my->nrOfMsgs);
        Param_storeInt(p, n, "stndln", my->isStandalone);
        Param_storeInt(p, n, "prio", my->priority);
-       Param_storeInt(p, n, "qsize", my->queueSize);
+       Param_storeInt(p, n, "qsize", my->queueSize[0]);
        Param_storeString(p, n, "verb", my->verbosity);
        desParam(p);
 }
@@ -425,6 +508,12 @@ int main(int argc, char *argv[])
                usage(argv[0]);
                exit(EXIT_FAILURE);
        }
+
+       if (0 > makeQueues(theArgs)) {
+               usage(argv[0]);
+               exit(EXIT_FAILURE);
+       }
+
        for (i = 0; prioritynames[i].c_name != NULL && 0 != strcmp(prioritynames[i].c_name, theArgs->verbosity); i++) {
        }
        if (prioritynames[i].c_name == NULL) {
@@ -452,7 +541,7 @@ int main(int argc, char *argv[])
                exit(EXIT_FAILURE);
        }
        for (i = 0; i < theArgs->nrOfMsgs; i++) {
-               char buf[80];
+               char buf[_POSIX_PATH_MAX];
 
                netTrans[i] = NetTrans_create(theArgs->inPath[i], 0, worker);
                if (netTrans[i] == NULL) {
@@ -463,13 +552,7 @@ int main(int argc, char *argv[])
                /* use shared mem name from CL if given */
                sprintf(buf, "netqueue%s%d", theArgs->shmname, i);
 
-               if (theArgs->queueSize > 0) {
-                       /* if the queueSize is given in CL */
-                       shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize);
-               } else {
-                       /* otherwise use variable queue size */
-                       shmTrans[i] = ShmTrans_open(buf, 2 * (long) theArgs->varQSize[i]);
-               }
+               shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize[i]);
 
                if (shmTrans[i] == NULL) {
                        syslog(LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror(errno));
@@ -477,15 +560,19 @@ int main(int argc, char *argv[])
                }
                hadTu[i] = NULL;
 
-               if (theArgs->queueSize > 0) {
+               if (theArgs->queueSize[i] > 0) {
                        /* if the queueSize is given in CL */
-                       hadTuSize[i] = theArgs->queueSize - HadTu_hdrSize();
+                       /* if( theArgs->queueSize[i] > 102400 + HadTu_hdrSize() ){ */
+/*                 hadTuSize[i] = 102400; */
+/*               } */
+/*               else */
+                       hadTuSize[i] = theArgs->queueSize[i] - HadTu_hdrSize();
                } else {
                        /* otherwise use variable queue size */
                        /* hadTuSize[i] = theArgs->varQSize[i] - HadTu_hdrSize(); */
-                       hadTuSize[i] = 51200;
+                       hadTuSize[i] = 204800;
                        /*
-                        * This 50 kB must be bigger than a size of a message which 
+                        * This 200 kB must be bigger than a size of a message which 
                         * should usually be less than 32kB (UDP_packet_size - water_mark). 
                         * However some HUGE messages can appear. Those HUGE messages must 
                         * be below hadTuSize!!! Otherwise the code will crash.