From 198a432ed773df6decd3bf2931928ece314cdbf8 Mon Sep 17 00:00:00 2001 From: hadaq Date: Thu, 10 Dec 2009 15:39:55 +0000 Subject: [PATCH] buffer size equal to evtbuild. Sergey --- hadaq/netmem.c | 424 ++++++++++++++++++++++++------------------------- 1 file changed, 209 insertions(+), 215 deletions(-) diff --git a/hadaq/netmem.c b/hadaq/netmem.c index 0834a90..61ffc52 100644 --- a/hadaq/netmem.c +++ b/hadaq/netmem.c @@ -1,4 +1,4 @@ -static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.35 2008-06-17 18:56:23 hadaq Exp $"; +static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.36 2009-12-10 15:39:55 hadaq Exp $"; #define _POSIX_C_SOURCE 199309L @@ -30,18 +30,17 @@ typedef struct TheArgsS { unsigned long isStandalone; unsigned long priority; unsigned long queueSize; - unsigned long varQSize[MAXINPATH]; - int varQSizeCnt; + unsigned long varQSize[MAXINPATH]; + int varQSizeCnt; char verbosity[PARAM_MAX_VALUE_LEN]; - unsigned int buffStat; - char shmname[PARAM_MAX_VALUE_LEN]; + unsigned int buffStat; + char shmname[PARAM_MAX_VALUE_LEN]; } TheArgs; -typedef struct TheStatsS -{ - unsigned long *netmemBuff[MAXINPATH]; - unsigned long *nrOfMsgs; - unsigned long *recvBytesRate[MAXINPATH]; +typedef struct TheStatsS { + unsigned long *netmemBuff[MAXINPATH]; + unsigned long *nrOfMsgs; + unsigned long *recvBytesRate[MAXINPATH]; } TheStats; static jmp_buf terminateJmp; @@ -54,8 +53,8 @@ void sigHandler(int sig) static void usage(const char *progName) { syslog(LOG_ERR, "Usage: %s -i inPath [-i inPath] -m nrOfMsgs [-p priority]", progName); - syslog( LOG_ERR, "[-b] show fill levels of buffers]" ); - syslog( LOG_ERR, "[-S shmem_name] extension of a shared memory name to be used" ); + syslog(LOG_ERR, "[-b] show fill levels of buffers]"); + syslog(LOG_ERR, "[-S shmem_name] extension of a shared memory name to be used"); } static void argsDump(TheArgs *my) @@ -70,8 +69,8 @@ static void argsDump(TheArgs *my) syslog(LOG_DEBUG, "isStandalone: %d", my->isStandalone); syslog(LOG_DEBUG, "queueSize: %d", my->queueSize); syslog(LOG_DEBUG, "verbosity: %s", my->verbosity); - if( strcmp( my->shmname, "" ) != 0 ) { - syslog (LOG_DEBUG, "shmem name: %s", my->shmname); + if (strcmp(my->shmname, "") != 0) { + syslog(LOG_DEBUG, "shmem name: %s", my->shmname); } } @@ -86,14 +85,14 @@ static void argsDefault(TheArgs *my) my->nrOfMsgs = 0; my->priority = 0; my->isStandalone = 1; - my->queueSize = 1 * 1024 * 1024; + my->queueSize = 4 * 1024 * 1024; strcpy(my->verbosity, "info"); for (i = 0; i < MAXINPATH; i++) { - my->varQSize[i] = 1 * 1024 * 1024; + my->varQSize[i] = 4 * 1024 * 1024; } my->buffStat = 0; - strcpy (my->shmname, ""); + strcpy(my->shmname, ""); } static int argsFromCL(TheArgs *my, int argc, char *argv[]) @@ -127,8 +126,8 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[]) strcpy(my->verbosity, optarg); break; case 'b': - my->buffStat = 1; - break; + my->buffStat = 1; + break; case 'S': strcpy(my->shmname, optarg); break; @@ -162,7 +161,7 @@ static int argsFromParam(TheArgs *my, int argc, char *argv[]) Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize); Param_getString(param, name, "verb", ¶mWasFound, my->verbosity); - /* this is an argument for a variable queue size. S.Y.*/ + /* this is an argument for a variable queue size. S.Y. */ Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize); desParam(param); } @@ -182,166 +181,162 @@ static char *unit(unsigned long v) static void printTime() { - struct timeval tv; - struct tm* ptm; - char time_string[40]; - long milliseconds; + struct timeval tv; + struct tm *ptm; + char time_string[40]; + long milliseconds; - gettimeofday( &tv, NULL ); + gettimeofday(&tv, NULL); - /* Obtain the time of day, and convert it to a tm struct. */ - ptm = localtime (&tv.tv_sec); + /* Obtain the time of day, and convert it to a tm struct. */ + ptm = localtime(&tv.tv_sec); - /* Format the date and time, down to a single second. */ - strftime (time_string, sizeof (time_string), "%Y-%m-%d %H:%M:%S", ptm); + /* Format the date and time, down to a single second. */ + strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm); - /* Compute milliseconds from microseconds. */ - milliseconds = tv.tv_usec / 1000; + /* Compute milliseconds from microseconds. */ + milliseconds = tv.tv_usec / 1000; - /* Print the formatted time, in seconds, followed by a decimal point - and the milliseconds. */ - printf ("Time: %s.%03ld\n", time_string, milliseconds); + /* Print the formatted time, in seconds, followed by a decimal point + and the milliseconds. */ + printf("Time: %s.%03ld\n", time_string, milliseconds); } -static void add2Stat( TheArgs * theArgs, TheStats * my, float interval, ShmTrans **shmtr, NetTrans **nettr ) { - - /* Add statistic for fill levels of buffers in percentage. */ - - static time_t t_0 = 0; - float buffSize, queueSize; - time_t t, dT; - int i; - unsigned long fillLevel; - static unsigned long lastBytesRecv[MAXINPATH]; - - t = time (NULL); - dT = t - t_0; - - if( dT >= interval ) { - for( i=0; inrOfMsgs; i++ ) { - buffSize = 2*theArgs->varQSize[i]; - queueSize = HadTuQueue_size(shmtr[i]->wrQueue); - - /* Add here statistic for fill levels of buffers */ - fillLevel = (unsigned long) (100*queueSize+0.5)/buffSize; - (*my->netmemBuff[i]) = fillLevel; - - /* Add more statistic for recv bytes per second */ - (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i]; - lastBytesRecv[i] = *nettr[i]->bytesReceived; - - } - } - - t_0 = t; +static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans **shmtr, NetTrans **nettr) +{ + + /* Add statistic for fill levels of buffers in percentage. */ + + static time_t t_0 = 0; + float buffSize, queueSize; + time_t t, dT; + int i; + unsigned long fillLevel; + static unsigned long lastBytesRecv[MAXINPATH]; + + t = time(NULL); + dT = t - t_0; + + if (dT >= interval) { + for (i = 0; i < theArgs->nrOfMsgs; i++) { + buffSize = 2 * theArgs->varQSize[i]; + queueSize = HadTuQueue_size(shmtr[i]->wrQueue); + + /* Add here statistic for fill levels of buffers */ + fillLevel = (unsigned long) (100 * queueSize + 0.5) / buffSize; + (*my->netmemBuff[i]) = fillLevel; + + /* Add more statistic for recv bytes per second */ + (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i]; + lastBytesRecv[i] = *nettr[i]->bytesReceived; + + } + } + + t_0 = t; } -static void statsBufferDump (TheArgs * theArgs, float interval, ShmTrans **shmtr, NetTrans **my) +static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr, NetTrans **my) { - static time_t t0 = 0; - time_t t, dT; - int i, j; - int col = 0; - char emptybuffer[] = "-"; - int outputGraph = 1; /* Graphical output */ - int outputNum = 0; /* Numerical output */ - - t = time (NULL); - dT = t - t0; - - /* fprintf (stderr, " dT: %.0lf interval: %.0lf", (double)dT, interval); */ - - if (dT >= interval && theArgs->buffStat) - { - - if( outputNum == 1) { - fputs("==============================================================================\n\n",stderr); - for(i=0; inrOfMsgs; i++) { - - fprintf (stderr, "q[%2d]: ", i); - if (shmtr[i] != NULL) - if (theArgs->varQSize[i]*2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) { - fprintf (stderr, "%8d ", HadTuQueue_size(shmtr[i]->wrQueue)); - } - else { - fprintf (stderr, "%8s ", "EXCEEDED"); - } - else - fprintf (stderr, "%8s ", emptybuffer); - - col++; - if (col == 6) { - fputc ('\n', stderr); - col = 0; - } - } - } - - if( outputGraph == 1 ){ - fputs ("------------------ buffer fill levels ----------------------------------------\n", stderr); - - float buffSize, queueSize, pktsDisc; - int maxnorm = 10.; - - for( j=0; jnrOfMsgs; i++ ){ - buffSize = 2*theArgs->varQSize[i]; - queueSize = HadTuQueue_size(shmtr[i]->wrQueue); - - if(maxnorm - maxnorm*queueSize/buffSize < j){ - fputc( '|', stderr ); - } - else{ - fputc( ' ', stderr ); - } - } - fputc( '\n', stderr ); - } + static time_t t0 = 0; + time_t t, dT; + int i, j; + int col = 0; + char emptybuffer[] = "-"; + int outputGraph = 1; /* Graphical output */ + int outputNum = 0; /* Numerical output */ - /* The following is just to print the numbers of buffers*/ - int factor, mod; - - fputs( "q:", stderr ); - factor = 0; - for( i=0; inrOfMsgs; i++ ) { - mod = i%10; - fprintf (stderr, "%1d", mod); - } - fputc( '\n', stderr ); - - fputs( " ", stderr ); - for( i=0; inrOfMsgs; i++ ) { - mod = i%10; - if( mod == 0 ) - fprintf (stderr, "%1d", i/10 ); - else - fputc( ' ', stderr ); - } - fputc( '\n', stderr ); - - fputs( "------------------ discarded packets -----------------------------------------\n", stderr ); - for( j=0; jnrOfMsgs; i++ ){ - pktsDisc = (*my[i]->pktsDiscarded); - - if(maxnorm - (pktsDisc + maxnorm - 1)/maxnorm < j){ - fputc( 'D', stderr ); - } - else{ - fputc( ' ', stderr ); - } - } - fputc( '\n', stderr ); + t = time(NULL); + dT = t - t0; + + /* fprintf (stderr, " dT: %.0lf interval: %.0lf", (double)dT, interval); */ + + if (dT >= interval && theArgs->buffStat) { + + if (outputNum == 1) { + fputs("==============================================================================\n\n", stderr); + for (i = 0; i < theArgs->nrOfMsgs; i++) { + + fprintf(stderr, "q[%2d]: ", i); + if (shmtr[i] != NULL) + if (theArgs->varQSize[i] * 2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) { + fprintf(stderr, "%8d ", HadTuQueue_size(shmtr[i]->wrQueue)); + } else { + fprintf(stderr, "%8s ", "EXCEEDED"); + } else + fprintf(stderr, "%8s ", emptybuffer); + + col++; + if (col == 6) { + fputc('\n', stderr); + col = 0; + } + } + } + + if (outputGraph == 1) { + fputs("------------------ buffer fill levels ----------------------------------------\n", stderr); + + float buffSize, queueSize, pktsDisc; + int maxnorm = 10.; + + for (j = 0; j < maxnorm; j++) { + fprintf(stderr, "%1d ", maxnorm - j - 1); + for (i = 0; i < theArgs->nrOfMsgs; i++) { + buffSize = 2 * theArgs->varQSize[i]; + queueSize = HadTuQueue_size(shmtr[i]->wrQueue); + + if (maxnorm - maxnorm * queueSize / buffSize < j) { + fputc('|', stderr); + } else { + fputc(' ', stderr); + } + } + fputc('\n', stderr); + } + + /* The following is just to print the numbers of buffers */ + int factor, mod; + + fputs("q:", stderr); + factor = 0; + for (i = 0; i < theArgs->nrOfMsgs; i++) { + mod = i % 10; + fprintf(stderr, "%1d", mod); + } + fputc('\n', stderr); + + fputs(" ", stderr); + for (i = 0; i < theArgs->nrOfMsgs; i++) { + mod = i % 10; + if (mod == 0) + fprintf(stderr, "%1d", i / 10); + else + fputc(' ', stderr); + } + fputc('\n', stderr); + + fputs("------------------ discarded packets -----------------------------------------\n", stderr); + for (j = 0; j < maxnorm; j++) { + fprintf(stderr, "%1d ", maxnorm - j - 1); + for (i = 0; i < theArgs->nrOfMsgs; i++) { + pktsDisc = (*my[i]->pktsDiscarded); + + if (maxnorm - (pktsDisc + maxnorm - 1) / maxnorm < j) { + fputc('D', stderr); + } else { + fputc(' ', stderr); + } + } + fputc('\n', stderr); + } + + /* get wall-clock time */ + printTime(); + } } - /* get wall-clock time */ - printTime(); - } - } - - t0 = t; + t0 = t; } static void statsDump(TheArgs *theArgs, NetTrans **my, int interval) @@ -352,7 +347,8 @@ static void statsDump(TheArgs *theArgs, NetTrans **my, int interval) int i; - if (theArgs->isStandalone && strcmp(theArgs->verbosity, "info") == 0 && (dT = (time(NULL) - lastTime)) >= interval && !(theArgs->buffStat)) { + if (theArgs->isStandalone && strcmp(theArgs->verbosity, "info") == 0 && (dT = (time(NULL) - lastTime)) >= interval + && !(theArgs->buffStat)) { fputs("==============================================================================\n", stderr); for (i = 0; i < theArgs->nrOfMsgs; i++) { fprintf(stderr, "%17s%02d:%6s", "pktsReceived", i, unit(*my[i]->pktsReceived)); @@ -392,7 +388,7 @@ static void storeInfoStart(const char *n, time_t t, TheArgs *my) desParam(p); } -static void storeInfoStop(const char *n, time_t t, Worker *w, TheArgs * my) +static void storeInfoStop(const char *n, time_t t, Worker *w, TheArgs *my) { Param pS, *p = &pS; int i; @@ -400,7 +396,7 @@ static void storeInfoStop(const char *n, time_t t, Worker *w, TheArgs * my) conSetupParam(p, getenv("DAQ_SETUP")); - for (i = 0; i < my->nrOfMsgs*5+1 && strcmp(w->statistics[i].name, "") != 0; i++) { + for (i = 0; i < my->nrOfMsgs * 5 + 1 && strcmp(w->statistics[i].name, "") != 0; i++) { Param_storeInt(p, n, w->statistics[i].name, w->statistics[i].value); } strftime(s, 20, "%Y-%m-%dT%H:%M:%S", localtime(&t)); @@ -449,7 +445,7 @@ int main(int argc, char *argv[]) hadTuSize = malloc(theArgs->nrOfMsgs * sizeof(size_t)); char buf[_POSIX_PATH_MAX]; - sprintf( buf, "%s%s", argv[0], theArgs->shmname ); + sprintf(buf, "%s%s", argv[0], theArgs->shmname); if (NULL == (worker = Worker_initBegin(buf, sigHandler, theArgs->priority, theArgs->isStandalone))) { syslog(LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror(errno)); @@ -467,13 +463,12 @@ int main(int argc, char *argv[]) /* use shared mem name from CL if given */ sprintf(buf, "netqueue%s%d", theArgs->shmname, i); - if( theArgs->queueSize > 0 ){ - /* if the queueSize is given in CL */ - shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize); - } - else{ - /* otherwise use variable queue size */ - shmTrans[i] = ShmTrans_open(buf, 2 * (long)theArgs->varQSize[i]); + if (theArgs->queueSize > 0) { + /* if the queueSize is given in CL */ + shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize); + } else { + /* otherwise use variable queue size */ + shmTrans[i] = ShmTrans_open(buf, 2 * (long) theArgs->varQSize[i]); } if (shmTrans[i] == NULL) { @@ -482,39 +477,38 @@ int main(int argc, char *argv[]) } hadTu[i] = NULL; - if( theArgs->queueSize > 0) { - /* if the queueSize is given in CL */ - hadTuSize[i] = theArgs->queueSize - HadTu_hdrSize(); - } - else{ - /* otherwise use variable queue size */ - /* hadTuSize[i] = theArgs->varQSize[i] - HadTu_hdrSize(); */ - hadTuSize[i] = 51200; - /* - * This 50 kB must be bigger than a size of a message which - * should usually be less than 32kB (UDP_packet_size - water_mark). - * However some HUGE messages can appear. Those HUGE messages must - * be below hadTuSize!!! Otherwise the code will crash. - * - * 50 kB = 51200 Bytes - * 100 kB = 102400 Bytes - * 500 kB = 512000 Bytes - */ + if (theArgs->queueSize > 0) { + /* if the queueSize is given in CL */ + hadTuSize[i] = theArgs->queueSize - HadTu_hdrSize(); + } else { + /* otherwise use variable queue size */ + /* hadTuSize[i] = theArgs->varQSize[i] - HadTu_hdrSize(); */ + hadTuSize[i] = 51200; + /* + * This 50 kB must be bigger than a size of a message which + * should usually be less than 32kB (UDP_packet_size - water_mark). + * However some HUGE messages can appear. Those HUGE messages must + * be below hadTuSize!!! Otherwise the code will crash. + * + * 50 kB = 51200 Bytes + * 100 kB = 102400 Bytes + * 500 kB = 512000 Bytes + */ } } storeInfoStart(argv[0], time(NULL), theArgs); /* Add statistic for fill levels of buffers etc. */ - for( i=0; inrOfMsgs; i++ ) { - char buf[WORKER_MAX_NAME_LEN]; - sprintf( buf, "netmemBuff%d", i ); - theStats->netmemBuff[i] = Worker_addStatistic( worker, buf ); + for (i = 0; i < theArgs->nrOfMsgs; i++) { + char buf[WORKER_MAX_NAME_LEN]; + sprintf(buf, "netmemBuff%d", i); + theStats->netmemBuff[i] = Worker_addStatistic(worker, buf); - sprintf( buf, "bytesReceivedRate%d", i ); - theStats->recvBytesRate[i] = Worker_addStatistic( worker, buf ); + sprintf(buf, "bytesReceivedRate%d", i); + theStats->recvBytesRate[i] = Worker_addStatistic(worker, buf); } - theStats->nrOfMsgs = Worker_addStatistic( worker, "nrOfMsgs" ); + theStats->nrOfMsgs = Worker_addStatistic(worker, "nrOfMsgs"); (*theStats->nrOfMsgs) = theArgs->nrOfMsgs; Worker_initEnd(worker); @@ -523,9 +517,9 @@ int main(int argc, char *argv[]) while (1) { long msgsCompleted; - add2Stat( theArgs, theStats, 1, shmTrans, netTrans ); - statsDump( theArgs, netTrans, 1 ); - statsBufferDump( theArgs, 1, shmTrans, netTrans ); + add2Stat(theArgs, theStats, 1, shmTrans, netTrans); + statsDump(theArgs, netTrans, 1); + statsBufferDump(theArgs, 1, shmTrans, netTrans); for (i = 0; i < theArgs->nrOfMsgs; i++) { if (hadTu[i] == NULL) { @@ -542,21 +536,21 @@ int main(int argc, char *argv[]) /* } */ /* } */ - fd_set *fdSet; + fd_set *fdSet; fdSet = NetTrans_multiReceive(netTrans, hadTu, hadTuSize, theArgs->nrOfMsgs); for (i = 0; i < theArgs->nrOfMsgs; i++) { - if (FD_ISSET(netTrans[i]->fd, fdSet)) { - if (assembleMsg(netTrans[i], hadTu[i], hadTuSize[i])) { - ShmTrans_send(shmTrans[i]); - hadTu[i] = NULL; - } + if (FD_ISSET(netTrans[i]->fd, fdSet)) { + if (assembleMsg(netTrans[i], hadTu[i], hadTuSize[i])) { + ShmTrans_send(shmTrans[i]); + hadTu[i] = NULL; + } } - } + } } } - storeInfoStop( argv[0], time(NULL), worker, theArgs ); - statsDump( theArgs, netTrans, 0 ); + storeInfoStop(argv[0], time(NULL), worker, theArgs); + statsDump(theArgs, netTrans, 0); for (i = 0; i < theArgs->nrOfMsgs; i++) { ShmTrans_remove(shmTrans[i]); -- 2.43.0