-static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.35 2008-06-17 18:56:23 hadaq Exp $";
+static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.36 2009-12-10 15:39:55 hadaq Exp $";
#define _POSIX_C_SOURCE 199309L
unsigned long isStandalone;
unsigned long priority;
unsigned long queueSize;
- unsigned long varQSize[MAXINPATH];
- int varQSizeCnt;
+ unsigned long varQSize[MAXINPATH];
+ int varQSizeCnt;
char verbosity[PARAM_MAX_VALUE_LEN];
- unsigned int buffStat;
- char shmname[PARAM_MAX_VALUE_LEN];
+ unsigned int buffStat;
+ char shmname[PARAM_MAX_VALUE_LEN];
} TheArgs;
-typedef struct TheStatsS
-{
- unsigned long *netmemBuff[MAXINPATH];
- unsigned long *nrOfMsgs;
- unsigned long *recvBytesRate[MAXINPATH];
+typedef struct TheStatsS {
+ unsigned long *netmemBuff[MAXINPATH];
+ unsigned long *nrOfMsgs;
+ unsigned long *recvBytesRate[MAXINPATH];
} TheStats;
static jmp_buf terminateJmp;
static void usage(const char *progName)
{
syslog(LOG_ERR, "Usage: %s -i inPath [-i inPath] -m nrOfMsgs [-p priority]", progName);
- syslog( LOG_ERR, "[-b] show fill levels of buffers]" );
- syslog( LOG_ERR, "[-S shmem_name] extension of a shared memory name to be used" );
+ syslog(LOG_ERR, "[-b] show fill levels of buffers]");
+ syslog(LOG_ERR, "[-S shmem_name] extension of a shared memory name to be used");
}
static void argsDump(TheArgs *my)
syslog(LOG_DEBUG, "isStandalone: %d", my->isStandalone);
syslog(LOG_DEBUG, "queueSize: %d", my->queueSize);
syslog(LOG_DEBUG, "verbosity: %s", my->verbosity);
- if( strcmp( my->shmname, "" ) != 0 ) {
- syslog (LOG_DEBUG, "shmem name: %s", my->shmname);
+ if (strcmp(my->shmname, "") != 0) {
+ syslog(LOG_DEBUG, "shmem name: %s", my->shmname);
}
}
my->nrOfMsgs = 0;
my->priority = 0;
my->isStandalone = 1;
- my->queueSize = 1 * 1024 * 1024;
+ my->queueSize = 4 * 1024 * 1024;
strcpy(my->verbosity, "info");
for (i = 0; i < MAXINPATH; i++) {
- my->varQSize[i] = 1 * 1024 * 1024;
+ my->varQSize[i] = 4 * 1024 * 1024;
}
my->buffStat = 0;
- strcpy (my->shmname, "");
+ strcpy(my->shmname, "");
}
static int argsFromCL(TheArgs *my, int argc, char *argv[])
strcpy(my->verbosity, optarg);
break;
case 'b':
- my->buffStat = 1;
- break;
+ my->buffStat = 1;
+ break;
case 'S':
strcpy(my->shmname, optarg);
break;
Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize);
Param_getString(param, name, "verb", ¶mWasFound, my->verbosity);
- /* this is an argument for a variable queue size. S.Y.*/
+ /* this is an argument for a variable queue size. S.Y. */
Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize);
desParam(param);
}
static void printTime()
{
- struct timeval tv;
- struct tm* ptm;
- char time_string[40];
- long milliseconds;
+ struct timeval tv;
+ struct tm *ptm;
+ char time_string[40];
+ long milliseconds;
- gettimeofday( &tv, NULL );
+ gettimeofday(&tv, NULL);
- /* Obtain the time of day, and convert it to a tm struct. */
- ptm = localtime (&tv.tv_sec);
+ /* Obtain the time of day, and convert it to a tm struct. */
+ ptm = localtime(&tv.tv_sec);
- /* Format the date and time, down to a single second. */
- strftime (time_string, sizeof (time_string), "%Y-%m-%d %H:%M:%S", ptm);
+ /* Format the date and time, down to a single second. */
+ strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
- /* Compute milliseconds from microseconds. */
- milliseconds = tv.tv_usec / 1000;
+ /* Compute milliseconds from microseconds. */
+ milliseconds = tv.tv_usec / 1000;
- /* Print the formatted time, in seconds, followed by a decimal point
- and the milliseconds. */
- printf ("Time: %s.%03ld\n", time_string, milliseconds);
+ /* Print the formatted time, in seconds, followed by a decimal point
+ and the milliseconds. */
+ printf("Time: %s.%03ld\n", time_string, milliseconds);
}
-static void add2Stat( TheArgs * theArgs, TheStats * my, float interval, ShmTrans **shmtr, NetTrans **nettr ) {
-
- /* Add statistic for fill levels of buffers in percentage. */
-
- static time_t t_0 = 0;
- float buffSize, queueSize;
- time_t t, dT;
- int i;
- unsigned long fillLevel;
- static unsigned long lastBytesRecv[MAXINPATH];
-
- t = time (NULL);
- dT = t - t_0;
-
- if( dT >= interval ) {
- for( i=0; i<theArgs->nrOfMsgs; i++ ) {
- buffSize = 2*theArgs->varQSize[i];
- queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
-
- /* Add here statistic for fill levels of buffers */
- fillLevel = (unsigned long) (100*queueSize+0.5)/buffSize;
- (*my->netmemBuff[i]) = fillLevel;
-
- /* Add more statistic for recv bytes per second */
- (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i];
- lastBytesRecv[i] = *nettr[i]->bytesReceived;
-
- }
- }
-
- t_0 = t;
+static void add2Stat(TheArgs *theArgs, TheStats *my, float interval, ShmTrans **shmtr, NetTrans **nettr)
+{
+
+ /* Add statistic for fill levels of buffers in percentage. */
+
+ static time_t t_0 = 0;
+ float buffSize, queueSize;
+ time_t t, dT;
+ int i;
+ unsigned long fillLevel;
+ static unsigned long lastBytesRecv[MAXINPATH];
+
+ t = time(NULL);
+ dT = t - t_0;
+
+ if (dT >= interval) {
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
+ buffSize = 2 * theArgs->varQSize[i];
+ queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
+
+ /* Add here statistic for fill levels of buffers */
+ fillLevel = (unsigned long) (100 * queueSize + 0.5) / buffSize;
+ (*my->netmemBuff[i]) = fillLevel;
+
+ /* Add more statistic for recv bytes per second */
+ (*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i];
+ lastBytesRecv[i] = *nettr[i]->bytesReceived;
+
+ }
+ }
+
+ t_0 = t;
}
-static void statsBufferDump (TheArgs * theArgs, float interval, ShmTrans **shmtr, NetTrans **my)
+static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr, NetTrans **my)
{
- static time_t t0 = 0;
- time_t t, dT;
- int i, j;
- int col = 0;
- char emptybuffer[] = "-";
- int outputGraph = 1; /* Graphical output */
- int outputNum = 0; /* Numerical output */
-
- t = time (NULL);
- dT = t - t0;
-
- /* fprintf (stderr, " dT: %.0lf interval: %.0lf", (double)dT, interval); */
-
- if (dT >= interval && theArgs->buffStat)
- {
-
- if( outputNum == 1) {
- fputs("==============================================================================\n\n",stderr);
- for(i=0; i<theArgs->nrOfMsgs; i++) {
-
- fprintf (stderr, "q[%2d]: ", i);
- if (shmtr[i] != NULL)
- if (theArgs->varQSize[i]*2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) {
- fprintf (stderr, "%8d ", HadTuQueue_size(shmtr[i]->wrQueue));
- }
- else {
- fprintf (stderr, "%8s ", "EXCEEDED");
- }
- else
- fprintf (stderr, "%8s ", emptybuffer);
-
- col++;
- if (col == 6) {
- fputc ('\n', stderr);
- col = 0;
- }
- }
- }
-
- if( outputGraph == 1 ){
- fputs ("------------------ buffer fill levels ----------------------------------------\n", stderr);
-
- float buffSize, queueSize, pktsDisc;
- int maxnorm = 10.;
-
- for( j=0; j<maxnorm; j++ ){
- fprintf (stderr, "%1d ", maxnorm - j - 1);
- for( i=0; i<theArgs->nrOfMsgs; i++ ){
- buffSize = 2*theArgs->varQSize[i];
- queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
-
- if(maxnorm - maxnorm*queueSize/buffSize < j){
- fputc( '|', stderr );
- }
- else{
- fputc( ' ', stderr );
- }
- }
- fputc( '\n', stderr );
- }
+ static time_t t0 = 0;
+ time_t t, dT;
+ int i, j;
+ int col = 0;
+ char emptybuffer[] = "-";
+ int outputGraph = 1; /* Graphical output */
+ int outputNum = 0; /* Numerical output */
- /* The following is just to print the numbers of buffers*/
- int factor, mod;
-
- fputs( "q:", stderr );
- factor = 0;
- for( i=0; i<theArgs->nrOfMsgs; i++ ) {
- mod = i%10;
- fprintf (stderr, "%1d", mod);
- }
- fputc( '\n', stderr );
-
- fputs( " ", stderr );
- for( i=0; i<theArgs->nrOfMsgs; i++ ) {
- mod = i%10;
- if( mod == 0 )
- fprintf (stderr, "%1d", i/10 );
- else
- fputc( ' ', stderr );
- }
- fputc( '\n', stderr );
-
- fputs( "------------------ discarded packets -----------------------------------------\n", stderr );
- for( j=0; j<maxnorm; j++ ){
- fprintf (stderr, "%1d ", maxnorm - j - 1);
- for( i=0; i<theArgs->nrOfMsgs; i++ ){
- pktsDisc = (*my[i]->pktsDiscarded);
-
- if(maxnorm - (pktsDisc + maxnorm - 1)/maxnorm < j){
- fputc( 'D', stderr );
- }
- else{
- fputc( ' ', stderr );
- }
- }
- fputc( '\n', stderr );
+ t = time(NULL);
+ dT = t - t0;
+
+ /* fprintf (stderr, " dT: %.0lf interval: %.0lf", (double)dT, interval); */
+
+ if (dT >= interval && theArgs->buffStat) {
+
+ if (outputNum == 1) {
+ fputs("==============================================================================\n\n", stderr);
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
+
+ fprintf(stderr, "q[%2d]: ", i);
+ if (shmtr[i] != NULL)
+ if (theArgs->varQSize[i] * 2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) {
+ fprintf(stderr, "%8d ", HadTuQueue_size(shmtr[i]->wrQueue));
+ } else {
+ fprintf(stderr, "%8s ", "EXCEEDED");
+ } else
+ fprintf(stderr, "%8s ", emptybuffer);
+
+ col++;
+ if (col == 6) {
+ fputc('\n', stderr);
+ col = 0;
+ }
+ }
+ }
+
+ if (outputGraph == 1) {
+ fputs("------------------ buffer fill levels ----------------------------------------\n", stderr);
+
+ float buffSize, queueSize, pktsDisc;
+ int maxnorm = 10.;
+
+ for (j = 0; j < maxnorm; j++) {
+ fprintf(stderr, "%1d ", maxnorm - j - 1);
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
+ buffSize = 2 * theArgs->varQSize[i];
+ queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
+
+ if (maxnorm - maxnorm * queueSize / buffSize < j) {
+ fputc('|', stderr);
+ } else {
+ fputc(' ', stderr);
+ }
+ }
+ fputc('\n', stderr);
+ }
+
+ /* The following is just to print the numbers of buffers */
+ int factor, mod;
+
+ fputs("q:", stderr);
+ factor = 0;
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
+ mod = i % 10;
+ fprintf(stderr, "%1d", mod);
+ }
+ fputc('\n', stderr);
+
+ fputs(" ", stderr);
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
+ mod = i % 10;
+ if (mod == 0)
+ fprintf(stderr, "%1d", i / 10);
+ else
+ fputc(' ', stderr);
+ }
+ fputc('\n', stderr);
+
+ fputs("------------------ discarded packets -----------------------------------------\n", stderr);
+ for (j = 0; j < maxnorm; j++) {
+ fprintf(stderr, "%1d ", maxnorm - j - 1);
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
+ pktsDisc = (*my[i]->pktsDiscarded);
+
+ if (maxnorm - (pktsDisc + maxnorm - 1) / maxnorm < j) {
+ fputc('D', stderr);
+ } else {
+ fputc(' ', stderr);
+ }
+ }
+ fputc('\n', stderr);
+ }
+
+ /* get wall-clock time */
+ printTime();
+ }
}
- /* get wall-clock time */
- printTime();
- }
- }
-
- t0 = t;
+ t0 = t;
}
static void statsDump(TheArgs *theArgs, NetTrans **my, int interval)
int i;
- if (theArgs->isStandalone && strcmp(theArgs->verbosity, "info") == 0 && (dT = (time(NULL) - lastTime)) >= interval && !(theArgs->buffStat)) {
+ if (theArgs->isStandalone && strcmp(theArgs->verbosity, "info") == 0 && (dT = (time(NULL) - lastTime)) >= interval
+ && !(theArgs->buffStat)) {
fputs("==============================================================================\n", stderr);
for (i = 0; i < theArgs->nrOfMsgs; i++) {
fprintf(stderr, "%17s%02d:%6s", "pktsReceived", i, unit(*my[i]->pktsReceived));
desParam(p);
}
-static void storeInfoStop(const char *n, time_t t, Worker *w, TheArgs * my)
+static void storeInfoStop(const char *n, time_t t, Worker *w, TheArgs *my)
{
Param pS, *p = &pS;
int i;
conSetupParam(p, getenv("DAQ_SETUP"));
- for (i = 0; i < my->nrOfMsgs*5+1 && strcmp(w->statistics[i].name, "") != 0; i++) {
+ for (i = 0; i < my->nrOfMsgs * 5 + 1 && strcmp(w->statistics[i].name, "") != 0; i++) {
Param_storeInt(p, n, w->statistics[i].name, w->statistics[i].value);
}
strftime(s, 20, "%Y-%m-%dT%H:%M:%S", localtime(&t));
hadTuSize = malloc(theArgs->nrOfMsgs * sizeof(size_t));
char buf[_POSIX_PATH_MAX];
- sprintf( buf, "%s%s", argv[0], theArgs->shmname );
+ sprintf(buf, "%s%s", argv[0], theArgs->shmname);
if (NULL == (worker = Worker_initBegin(buf, sigHandler, theArgs->priority, theArgs->isStandalone))) {
syslog(LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror(errno));
/* use shared mem name from CL if given */
sprintf(buf, "netqueue%s%d", theArgs->shmname, i);
- if( theArgs->queueSize > 0 ){
- /* if the queueSize is given in CL */
- shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize);
- }
- else{
- /* otherwise use variable queue size */
- shmTrans[i] = ShmTrans_open(buf, 2 * (long)theArgs->varQSize[i]);
+ if (theArgs->queueSize > 0) {
+ /* if the queueSize is given in CL */
+ shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize);
+ } else {
+ /* otherwise use variable queue size */
+ shmTrans[i] = ShmTrans_open(buf, 2 * (long) theArgs->varQSize[i]);
}
if (shmTrans[i] == NULL) {
}
hadTu[i] = NULL;
- if( theArgs->queueSize > 0) {
- /* if the queueSize is given in CL */
- hadTuSize[i] = theArgs->queueSize - HadTu_hdrSize();
- }
- else{
- /* otherwise use variable queue size */
- /* hadTuSize[i] = theArgs->varQSize[i] - HadTu_hdrSize(); */
- hadTuSize[i] = 51200;
- /*
- * This 50 kB must be bigger than a size of a message which
- * should usually be less than 32kB (UDP_packet_size - water_mark).
- * However some HUGE messages can appear. Those HUGE messages must
- * be below hadTuSize!!! Otherwise the code will crash.
- *
- * 50 kB = 51200 Bytes
- * 100 kB = 102400 Bytes
- * 500 kB = 512000 Bytes
- */
+ if (theArgs->queueSize > 0) {
+ /* if the queueSize is given in CL */
+ hadTuSize[i] = theArgs->queueSize - HadTu_hdrSize();
+ } else {
+ /* otherwise use variable queue size */
+ /* hadTuSize[i] = theArgs->varQSize[i] - HadTu_hdrSize(); */
+ hadTuSize[i] = 51200;
+ /*
+ * This 50 kB must be bigger than a size of a message which
+ * should usually be less than 32kB (UDP_packet_size - water_mark).
+ * However some HUGE messages can appear. Those HUGE messages must
+ * be below hadTuSize!!! Otherwise the code will crash.
+ *
+ * 50 kB = 51200 Bytes
+ * 100 kB = 102400 Bytes
+ * 500 kB = 512000 Bytes
+ */
}
}
storeInfoStart(argv[0], time(NULL), theArgs);
/* Add statistic for fill levels of buffers etc. */
- for( i=0; i<theArgs->nrOfMsgs; i++ ) {
- char buf[WORKER_MAX_NAME_LEN];
- sprintf( buf, "netmemBuff%d", i );
- theStats->netmemBuff[i] = Worker_addStatistic( worker, buf );
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
+ char buf[WORKER_MAX_NAME_LEN];
+ sprintf(buf, "netmemBuff%d", i);
+ theStats->netmemBuff[i] = Worker_addStatistic(worker, buf);
- sprintf( buf, "bytesReceivedRate%d", i );
- theStats->recvBytesRate[i] = Worker_addStatistic( worker, buf );
+ sprintf(buf, "bytesReceivedRate%d", i);
+ theStats->recvBytesRate[i] = Worker_addStatistic(worker, buf);
}
- theStats->nrOfMsgs = Worker_addStatistic( worker, "nrOfMsgs" );
+ theStats->nrOfMsgs = Worker_addStatistic(worker, "nrOfMsgs");
(*theStats->nrOfMsgs) = theArgs->nrOfMsgs;
Worker_initEnd(worker);
while (1) {
long msgsCompleted;
- add2Stat( theArgs, theStats, 1, shmTrans, netTrans );
- statsDump( theArgs, netTrans, 1 );
- statsBufferDump( theArgs, 1, shmTrans, netTrans );
+ add2Stat(theArgs, theStats, 1, shmTrans, netTrans);
+ statsDump(theArgs, netTrans, 1);
+ statsBufferDump(theArgs, 1, shmTrans, netTrans);
for (i = 0; i < theArgs->nrOfMsgs; i++) {
if (hadTu[i] == NULL) {
/* } */
/* } */
- fd_set *fdSet;
+ fd_set *fdSet;
fdSet = NetTrans_multiReceive(netTrans, hadTu, hadTuSize, theArgs->nrOfMsgs);
for (i = 0; i < theArgs->nrOfMsgs; i++) {
- if (FD_ISSET(netTrans[i]->fd, fdSet)) {
- if (assembleMsg(netTrans[i], hadTu[i], hadTuSize[i])) {
- ShmTrans_send(shmTrans[i]);
- hadTu[i] = NULL;
- }
+ if (FD_ISSET(netTrans[i]->fd, fdSet)) {
+ if (assembleMsg(netTrans[i], hadTu[i], hadTuSize[i])) {
+ ShmTrans_send(shmTrans[i]);
+ hadTu[i] = NULL;
+ }
}
- }
+ }
}
}
- storeInfoStop( argv[0], time(NULL), worker, theArgs );
- statsDump( theArgs, netTrans, 0 );
+ storeInfoStop(argv[0], time(NULL), worker, theArgs);
+ statsDump(theArgs, netTrans, 0);
for (i = 0; i < theArgs->nrOfMsgs; i++) {
ShmTrans_remove(shmTrans[i]);