]> jspc29.x-matter.uni-frankfurt.de Git - daqdata.git/commitdiff
buffer size equal to evtbuild. Sergey
authorhadaq <hadaq>
Thu, 10 Dec 2009 15:39:55 +0000 (15:39 +0000)
committerhadaq <hadaq>
Thu, 10 Dec 2009 15:39:55 +0000 (15:39 +0000)
hadaq/netmem.c

index 0834a90cdca35cf871ec6cfcff428c123ad6b47a..61ffc527f74b59e45245981260171d1a0c174133 100644 (file)
@@ -1,4 +1,4 @@
-static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.35 2008-06-17 18:56:23 hadaq Exp $";
+static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.36 2009-12-10 15:39:55 hadaq Exp $";
 
 
 #define _POSIX_C_SOURCE 199309L
@@ -30,18 +30,17 @@ typedef struct TheArgsS {
        unsigned long isStandalone;
        unsigned long priority;
        unsigned long queueSize;
-        unsigned long varQSize[MAXINPATH];
-        int varQSizeCnt;
+       unsigned long varQSize[MAXINPATH];
+       int varQSizeCnt;
        char verbosity[PARAM_MAX_VALUE_LEN];
-        unsigned int buffStat;        
-        char shmname[PARAM_MAX_VALUE_LEN];
+       unsigned int buffStat;
+       char shmname[PARAM_MAX_VALUE_LEN];
 } TheArgs;
 
-typedef struct TheStatsS
-{
-  unsigned long *netmemBuff[MAXINPATH];
-  unsigned long *nrOfMsgs;
-  unsigned long *recvBytesRate[MAXINPATH];
+typedef struct TheStatsS {
+       unsigned long *netmemBuff[MAXINPATH];
+       unsigned long *nrOfMsgs;
+       unsigned long *recvBytesRate[MAXINPATH];
 } TheStats;
 
 static jmp_buf terminateJmp;
@@ -54,8 +53,8 @@ void sigHandler(int sig)
 static void usage(const char *progName)
 {
        syslog(LOG_ERR, "Usage: %s -i inPath [-i inPath] -m nrOfMsgs [-p priority]", progName);
-       syslog( LOG_ERR, "[-b] show fill levels of buffers]" );
-       syslog( LOG_ERR, "[-S shmem_name] extension of a shared memory name to be used" );
+       syslog(LOG_ERR, "[-b] show fill levels of buffers]");
+       syslog(LOG_ERR, "[-S shmem_name] extension of a shared memory name to be used");
 }
 
 static void argsDump(TheArgs *my)
@@ -70,8 +69,8 @@ static void argsDump(TheArgs *my)
        syslog(LOG_DEBUG, "isStandalone: %d", my->isStandalone);
        syslog(LOG_DEBUG, "queueSize: %d", my->queueSize);
        syslog(LOG_DEBUG, "verbosity: %s", my->verbosity);
-        if( strcmp( my->shmname, "" ) != 0 ) {
-                syslog (LOG_DEBUG, "shmem name: %s", my->shmname);
+       if (strcmp(my->shmname, "") != 0) {
+               syslog(LOG_DEBUG, "shmem name: %s", my->shmname);
        }
 }
 
@@ -86,14 +85,14 @@ static void argsDefault(TheArgs *my)
        my->nrOfMsgs = 0;
        my->priority = 0;
        my->isStandalone = 1;
-       my->queueSize = 1 * 1024 * 1024;
+       my->queueSize = 4 * 1024 * 1024;
        strcpy(my->verbosity, "info");
 
        for (i = 0; i < MAXINPATH; i++) {
-               my->varQSize[i] = 1 * 1024 * 1024;
+               my->varQSize[i] = 4 * 1024 * 1024;
        }
        my->buffStat = 0;
-        strcpy (my->shmname, "");
+       strcpy(my->shmname, "");
 }
 
 static int argsFromCL(TheArgs *my, int argc, char *argv[])
@@ -127,8 +126,8 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[])
                        strcpy(my->verbosity, optarg);
                        break;
                case 'b':
-                       my->buffStat = 1;
-                       break;
+                       my->buffStat = 1;
+                       break;
                case 'S':
                        strcpy(my->shmname, optarg);
                        break;
@@ -162,7 +161,7 @@ static int argsFromParam(TheArgs *my, int argc, char *argv[])
        Param_getInt(param, name, "qsize", &paramWasFound, &my->queueSize);
        Param_getString(param, name, "verb", &paramWasFound, my->verbosity);
 
-       /* this is an argument for a variable queue size. S.Y.*/
+       /* this is an argument for a variable queue size. S.Y. */
        Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize);
        desParam(param);
 }
@@ -182,166 +181,162 @@ static char *unit(unsigned long v)
 
 static void printTime()
 {
-  struct timeval tv;
-  struct tm* ptm;
-  char time_string[40];
-  long milliseconds;
+       struct timeval tv;
+       struct tm *ptm;
+       char time_string[40];
+       long milliseconds;
 
-  gettimeofday( &tv, NULL );
+       gettimeofday(&tv, NULL);
 
-  /* Obtain the time of day, and convert it to a tm struct. */
-  ptm = localtime (&tv.tv_sec);
+       /* Obtain the time of day, and convert it to a tm struct. */
+       ptm = localtime(&tv.tv_sec);
 
-  /* Format the date and time, down to a single second. */
-  strftime (time_string, sizeof (time_string), "%Y-%m-%d %H:%M:%S", ptm);
+       /* Format the date and time, down to a single second. */
+       strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
 
-  /* Compute milliseconds from microseconds. */
-  milliseconds = tv.tv_usec / 1000;
+       /* Compute milliseconds from microseconds. */
+       milliseconds = tv.tv_usec / 1000;
 
-  /* Print the formatted time, in seconds, followed by a decimal point
-     and the milliseconds. */
-  printf ("Time: %s.%03ld\n", time_string, milliseconds);
+       /* Print the formatted time, in seconds, followed by a decimal point
+          and the milliseconds. */
+       printf("Time: %s.%03ld\n", time_string, milliseconds);
 }
 
-static void add2Stat( TheArgs * theArgs, TheStats * my, float interval, ShmTrans **shmtr, NetTrans **nettr ) {
-
-  /* Add statistic for fill levels of buffers in percentage. */
-
-  static time_t t_0 = 0;
-  float buffSize, queueSize;
-  time_t t, dT;
-  int i;
-  unsigned long fillLevel;
-  static unsigned long lastBytesRecv[MAXINPATH];
-
-  t = time (NULL);
-  dT = t - t_0;
-  
-  if( dT >= interval ) {    
-    for( i=0; i<theArgs->nrOfMsgs; i++ ) {
-      buffSize  = 2*theArgs->varQSize[i];
-      queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
-      
-      /* Add here statistic for fill levels of buffers */
-      fillLevel = (unsigned long) (100*queueSize+0.5)/buffSize;
-      (*my->netmemBuff[i]) = fillLevel;
-
-      /* Add more statistic for recv bytes per second */
-      (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i];
-      lastBytesRecv[i] = *nettr[i]->bytesReceived;
-      
-    }
-  }
-
-  t_0 = t;
+static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans **shmtr, NetTrans **nettr)
+{
+
+       /* Add statistic for fill levels of buffers in percentage. */
+
+       static time_t t_0 = 0;
+       float buffSize, queueSize;
+       time_t t, dT;
+       int i;
+       unsigned long fillLevel;
+       static unsigned long lastBytesRecv[MAXINPATH];
+
+       t = time(NULL);
+       dT = t - t_0;
+
+       if (dT >= interval) {
+               for (i = 0; i < theArgs->nrOfMsgs; i++) {
+                       buffSize = 2 * theArgs->varQSize[i];
+                       queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
+
+                       /* Add here statistic for fill levels of buffers */
+                       fillLevel = (unsigned long) (100 * queueSize + 0.5) / buffSize;
+                       (*my->netmemBuff[i]) = fillLevel;
+
+                       /* Add more statistic for recv bytes per second */
+                       (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i];
+                       lastBytesRecv[i] = *nettr[i]->bytesReceived;
+
+               }
+       }
+
+       t_0 = t;
 }
 
-static void statsBufferDump (TheArgs * theArgs, float interval, ShmTrans **shmtr, NetTrans **my)
+static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr, NetTrans **my)
 {
-  static time_t t0 = 0;
-  time_t t, dT;
-  int i, j;
-  int col = 0;
-  char emptybuffer[] = "-";
-  int outputGraph = 1;   /* Graphical output */
-  int outputNum = 0;     /* Numerical output */
-  
-  t = time (NULL);
-  dT = t - t0;
-
-  /* fprintf (stderr, " dT: %.0lf interval: %.0lf", (double)dT, interval); */
-
-  if (dT >= interval && theArgs->buffStat)
-    {
-
-      if( outputNum == 1) {
-       fputs("==============================================================================\n\n",stderr);
-       for(i=0; i<theArgs->nrOfMsgs; i++) {
-         
-         fprintf (stderr, "q[%2d]: ", i);
-         if (shmtr[i] != NULL)
-           if (theArgs->varQSize[i]*2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) {
-             fprintf (stderr, "%8d ", HadTuQueue_size(shmtr[i]->wrQueue));
-           }
-           else {
-             fprintf (stderr, "%8s ", "EXCEEDED");
-           } 
-         else
-           fprintf (stderr, "%8s ", emptybuffer);
-         
-         col++;
-         if (col == 6) {
-           fputc ('\n', stderr);
-           col = 0;
-         }
-       }
-      }
-
-      if( outputGraph == 1 ){
-       fputs ("------------------ buffer fill levels ----------------------------------------\n", stderr);
-
-       float buffSize, queueSize, pktsDisc;
-       int maxnorm = 10.;      
-
-       for( j=0; j<maxnorm; j++ ){
-         fprintf (stderr, "%1d ", maxnorm - j - 1); 
-         for( i=0; i<theArgs->nrOfMsgs; i++ ){
-           buffSize  = 2*theArgs->varQSize[i];
-           queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
-           
-           if(maxnorm - maxnorm*queueSize/buffSize < j){ 
-               fputc( '|', stderr );         
-           }
-           else{
-             fputc( ' ', stderr );
-           }
-         }
-         fputc( '\n', stderr );
-       }
+       static time_t t0 = 0;
+       time_t t, dT;
+       int i, j;
+       int col = 0;
+       char emptybuffer[] = "-";
+       int outputGraph = 1;            /* Graphical output */
+       int outputNum = 0;                      /* Numerical output */
 
-       /* The following is just to print the numbers of buffers*/
-       int factor, mod;
-       
-       fputs( "q:", stderr );
-       factor = 0;
-       for( i=0; i<theArgs->nrOfMsgs; i++ ) {
-         mod = i%10;
-         fprintf (stderr, "%1d", mod);
-       }
-       fputc( '\n', stderr );
-       
-       fputs( "  ", stderr );
-       for( i=0; i<theArgs->nrOfMsgs; i++ ) {
-         mod = i%10;
-         if( mod == 0 )
-           fprintf (stderr, "%1d", i/10 );
-         else
-           fputc( ' ', stderr );
-       }
-       fputc( '\n', stderr );
-
-       fputs( "------------------ discarded packets -----------------------------------------\n", stderr );
-       for( j=0; j<maxnorm; j++ ){
-         fprintf (stderr, "%1d ", maxnorm - j - 1); 
-         for( i=0; i<theArgs->nrOfMsgs; i++ ){
-           pktsDisc = (*my[i]->pktsDiscarded);
-           
-           if(maxnorm - (pktsDisc + maxnorm - 1)/maxnorm < j){ 
-               fputc( 'D', stderr );
-           }
-           else{
-             fputc( ' ', stderr );
-           }
-         }
-         fputc( '\n', stderr );
+       t = time(NULL);
+       dT = t - t0;
+
+       /* fprintf (stderr, " dT: %.0lf interval: %.0lf", (double)dT, interval); */
+
+       if (dT >= interval && theArgs->buffStat) {
+
+               if (outputNum == 1) {
+                       fputs("==============================================================================\n\n", stderr);
+                       for (i = 0; i < theArgs->nrOfMsgs; i++) {
+
+                               fprintf(stderr, "q[%2d]: ", i);
+                               if (shmtr[i] != NULL)
+                                       if (theArgs->varQSize[i] * 2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) {
+                                               fprintf(stderr, "%8d ", HadTuQueue_size(shmtr[i]->wrQueue));
+                                       } else {
+                                               fprintf(stderr, "%8s ", "EXCEEDED");
+                               } else
+                                       fprintf(stderr, "%8s ", emptybuffer);
+
+                               col++;
+                               if (col == 6) {
+                                       fputc('\n', stderr);
+                                       col = 0;
+                               }
+                       }
+               }
+
+               if (outputGraph == 1) {
+                       fputs("------------------ buffer fill levels ----------------------------------------\n", stderr);
+
+                       float buffSize, queueSize, pktsDisc;
+                       int maxnorm = 10.;
+
+                       for (j = 0; j < maxnorm; j++) {
+                               fprintf(stderr, "%1d ", maxnorm - j - 1);
+                               for (i = 0; i < theArgs->nrOfMsgs; i++) {
+                                       buffSize = 2 * theArgs->varQSize[i];
+                                       queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
+
+                                       if (maxnorm - maxnorm * queueSize / buffSize < j) {
+                                               fputc('|', stderr);
+                                       } else {
+                                               fputc(' ', stderr);
+                                       }
+                               }
+                               fputc('\n', stderr);
+                       }
+
+                       /* The following is just to print the numbers of buffers */
+                       int factor, mod;
+
+                       fputs("q:", stderr);
+                       factor = 0;
+                       for (i = 0; i < theArgs->nrOfMsgs; i++) {
+                               mod = i % 10;
+                               fprintf(stderr, "%1d", mod);
+                       }
+                       fputc('\n', stderr);
+
+                       fputs("  ", stderr);
+                       for (i = 0; i < theArgs->nrOfMsgs; i++) {
+                               mod = i % 10;
+                               if (mod == 0)
+                                       fprintf(stderr, "%1d", i / 10);
+                               else
+                                       fputc(' ', stderr);
+                       }
+                       fputc('\n', stderr);
+
+                       fputs("------------------ discarded packets -----------------------------------------\n", stderr);
+                       for (j = 0; j < maxnorm; j++) {
+                               fprintf(stderr, "%1d ", maxnorm - j - 1);
+                               for (i = 0; i < theArgs->nrOfMsgs; i++) {
+                                       pktsDisc = (*my[i]->pktsDiscarded);
+
+                                       if (maxnorm - (pktsDisc + maxnorm - 1) / maxnorm < j) {
+                                               fputc('D', stderr);
+                                       } else {
+                                               fputc(' ', stderr);
+                                       }
+                               }
+                               fputc('\n', stderr);
+                       }
+
+                       /* get wall-clock time */
+                       printTime();
+               }
        }
 
-       /* get wall-clock time */
-       printTime();
-      }
-    }
-  
-  t0 = t;
+       t0 = t;
 }
 
 static void statsDump(TheArgs *theArgs, NetTrans **my, int interval)
@@ -352,7 +347,8 @@ static void statsDump(TheArgs *theArgs, NetTrans **my, int interval)
        int i;
 
 
-       if (theArgs->isStandalone && strcmp(theArgs->verbosity, "info") == 0 && (dT = (time(NULL) - lastTime)) >= interval && !(theArgs->buffStat)) {
+       if (theArgs->isStandalone && strcmp(theArgs->verbosity, "info") == 0 && (dT = (time(NULL) - lastTime)) >= interval
+               && !(theArgs->buffStat)) {
                fputs("==============================================================================\n", stderr);
                for (i = 0; i < theArgs->nrOfMsgs; i++) {
                        fprintf(stderr, "%17s%02d:%6s", "pktsReceived", i, unit(*my[i]->pktsReceived));
@@ -392,7 +388,7 @@ static void storeInfoStart(const char *n, time_t t, TheArgs *my)
        desParam(p);
 }
 
-static void storeInfoStop(const char *n, time_t t, Worker *w, TheArgs * my)
+static void storeInfoStop(const char *n, time_t t, Worker *w, TheArgs *my)
 {
        Param pS, *p = &pS;
        int i;
@@ -400,7 +396,7 @@ static void storeInfoStop(const char *n, time_t t, Worker *w, TheArgs * my)
 
        conSetupParam(p, getenv("DAQ_SETUP"));
 
-       for (i = 0; i < my->nrOfMsgs*5+1 && strcmp(w->statistics[i].name, "") != 0; i++) {
+       for (i = 0; i < my->nrOfMsgs * 5 + 1 && strcmp(w->statistics[i].name, "") != 0; i++) {
                Param_storeInt(p, n, w->statistics[i].name, w->statistics[i].value);
        }
        strftime(s, 20, "%Y-%m-%dT%H:%M:%S", localtime(&t));
@@ -449,7 +445,7 @@ int main(int argc, char *argv[])
        hadTuSize = malloc(theArgs->nrOfMsgs * sizeof(size_t));
 
        char buf[_POSIX_PATH_MAX];
-       sprintf( buf, "%s%s", argv[0], theArgs->shmname );
+       sprintf(buf, "%s%s", argv[0], theArgs->shmname);
 
        if (NULL == (worker = Worker_initBegin(buf, sigHandler, theArgs->priority, theArgs->isStandalone))) {
                syslog(LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror(errno));
@@ -467,13 +463,12 @@ int main(int argc, char *argv[])
                /* use shared mem name from CL if given */
                sprintf(buf, "netqueue%s%d", theArgs->shmname, i);
 
-               if( theArgs->queueSize > 0 ){
-                 /* if the queueSize is given in CL */
-                 shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize);
-               }
-               else{
-                 /* otherwise use variable queue size */
-                 shmTrans[i] = ShmTrans_open(buf, 2 * (long)theArgs->varQSize[i]);
+               if (theArgs->queueSize > 0) {
+                       /* if the queueSize is given in CL */
+                       shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize);
+               } else {
+                       /* otherwise use variable queue size */
+                       shmTrans[i] = ShmTrans_open(buf, 2 * (long) theArgs->varQSize[i]);
                }
 
                if (shmTrans[i] == NULL) {
@@ -482,39 +477,38 @@ int main(int argc, char *argv[])
                }
                hadTu[i] = NULL;
 
-               if( theArgs->queueSize > 0) {
-                 /* if the queueSize is given in CL */
-                 hadTuSize[i] = theArgs->queueSize - HadTu_hdrSize();
-               }
-               else{
-                 /* otherwise use variable queue size */
-                 /* hadTuSize[i] = theArgs->varQSize[i] - HadTu_hdrSize(); */
-                 hadTuSize[i] = 51200;
-                 /*
-                  * This 50 kB must be bigger than a size of a message which 
-                  * should usually be less than 32kB (UDP_packet_size - water_mark). 
-                  * However some HUGE messages can appear. Those HUGE messages must 
-                  * be below hadTuSize!!! Otherwise the code will crash. 
-                  * 
-                  * 50 kB  = 51200  Bytes 
-                  * 100 kB = 102400 Bytes 
-                  * 500 kB = 512000 Bytes 
-                  */
+               if (theArgs->queueSize > 0) {
+                       /* if the queueSize is given in CL */
+                       hadTuSize[i] = theArgs->queueSize - HadTu_hdrSize();
+               } else {
+                       /* otherwise use variable queue size */
+                       /* hadTuSize[i] = theArgs->varQSize[i] - HadTu_hdrSize(); */
+                       hadTuSize[i] = 51200;
+                       /*
+                        * This 50 kB must be bigger than a size of a message which 
+                        * should usually be less than 32kB (UDP_packet_size - water_mark). 
+                        * However some HUGE messages can appear. Those HUGE messages must 
+                        * be below hadTuSize!!! Otherwise the code will crash. 
+                        * 
+                        * 50 kB  = 51200  Bytes 
+                        * 100 kB = 102400 Bytes 
+                        * 500 kB = 512000 Bytes 
+                        */
                }
        }
        storeInfoStart(argv[0], time(NULL), theArgs);
 
        /* Add statistic for fill levels of buffers etc. */
-       for( i=0; i<theArgs->nrOfMsgs; i++ ) {
-         char buf[WORKER_MAX_NAME_LEN];
-         sprintf( buf, "netmemBuff%d", i );
-         theStats->netmemBuff[i] = Worker_addStatistic( worker, buf );
+       for (i = 0; i < theArgs->nrOfMsgs; i++) {
+               char buf[WORKER_MAX_NAME_LEN];
+               sprintf(buf, "netmemBuff%d", i);
+               theStats->netmemBuff[i] = Worker_addStatistic(worker, buf);
 
-         sprintf( buf, "bytesReceivedRate%d", i );
-         theStats->recvBytesRate[i] = Worker_addStatistic( worker, buf );
+               sprintf(buf, "bytesReceivedRate%d", i);
+               theStats->recvBytesRate[i] = Worker_addStatistic(worker, buf);
        }
 
-       theStats->nrOfMsgs = Worker_addStatistic( worker, "nrOfMsgs" );
+       theStats->nrOfMsgs = Worker_addStatistic(worker, "nrOfMsgs");
        (*theStats->nrOfMsgs) = theArgs->nrOfMsgs;
 
        Worker_initEnd(worker);
@@ -523,9 +517,9 @@ int main(int argc, char *argv[])
                while (1) {
                        long msgsCompleted;
 
-                       add2Stat( theArgs, theStats, 1, shmTrans, netTrans );
-                       statsDump( theArgs, netTrans, 1 );
-                       statsBufferDump( theArgs, 1, shmTrans, netTrans );
+                       add2Stat(theArgs, theStats, 1, shmTrans, netTrans);
+                       statsDump(theArgs, netTrans, 1);
+                       statsBufferDump(theArgs, 1, shmTrans, netTrans);
 
                        for (i = 0; i < theArgs->nrOfMsgs; i++) {
                                if (hadTu[i] == NULL) {
@@ -542,21 +536,21 @@ int main(int argc, char *argv[])
 /*                             } */
 /*                     } */
 
-                       fd_set *fdSet;
+                       fd_set *fdSet;
                        fdSet = NetTrans_multiReceive(netTrans, hadTu, hadTuSize, theArgs->nrOfMsgs);
                        for (i = 0; i < theArgs->nrOfMsgs; i++) {
-                                if (FD_ISSET(netTrans[i]->fd, fdSet)) {
-                                 if (assembleMsg(netTrans[i], hadTu[i], hadTuSize[i])) {
-                                       ShmTrans_send(shmTrans[i]);
-                                       hadTu[i] = NULL;
-                                 }
+                               if (FD_ISSET(netTrans[i]->fd, fdSet)) {
+                                       if (assembleMsg(netTrans[i], hadTu[i], hadTuSize[i])) {
+                                               ShmTrans_send(shmTrans[i]);
+                                               hadTu[i] = NULL;
+                                       }
                                }
-                       }                       
+                       }
 
                }
        }
-       storeInfoStop( argv[0], time(NULL), worker, theArgs );
-       statsDump( theArgs, netTrans, 0 );
+       storeInfoStop(argv[0], time(NULL), worker, theArgs);
+       statsDump(theArgs, netTrans, 0);
 
        for (i = 0; i < theArgs->nrOfMsgs; i++) {
                ShmTrans_remove(shmTrans[i]);