-static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.36 2009-12-10 15:39:55 hadaq Exp $";
+static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/netmem.c,v 6.37 2010-06-05 23:02:30 hadaq Exp $";
#define _POSIX_C_SOURCE 199309L
unsigned long nrOfMsgs;
unsigned long isStandalone;
unsigned long priority;
- unsigned long queueSize;
- unsigned long varQSize[MAXINPATH];
- int varQSizeCnt;
+ unsigned long queueSize[MAXINPATH];
+ int queueSizeCnt;
+
+ /* the following arguments are for the variable queue size */
+ char *queueVarSize[MAXINPATH];
+ char queueVarSizeS[MAXINPATH][PARAM_MAX_NAME_LEN];
+ int queueVarSizeCnt;
+
char verbosity[PARAM_MAX_VALUE_LEN];
unsigned int buffStat;
char shmname[PARAM_MAX_VALUE_LEN];
+ char evtstat[PARAM_MAX_VALUE_LEN];
} TheArgs;
typedef struct TheStatsS {
syslog(LOG_ERR, "Usage: %s -i inPath [-i inPath] -m nrOfMsgs [-p priority]", progName);
syslog(LOG_ERR, "[-b] show fill levels of buffers]");
syslog(LOG_ERR, "[-S shmem_name] extension of a shared memory name to be used");
+ syslog(LOG_ERR, "[-q size] queue size");
+ syslog(LOG_ERR, "[-Q num:size] used to set specific queue size for given queue number");
+}
+
+static int makeQueues(TheArgs *my)
+{
+ int retVal = 0;
+ int i;
+
+ int queueNum;
+ char newQueueSize[30];
+
+ /* Init all queue sizes */
+ for (i = 1; i < my->nrOfMsgs; i++) {
+ my->queueSize[i] = my->queueSize[0];
+ }
+
+ /* Extract specific values for queue sizes */
+ for (i = 0; i < my->queueVarSizeCnt; i++) {
+
+ if (0 == (sscanf(my->queueVarSize[i], "%d:%s", &queueNum, newQueueSize))) {
+ syslog(LOG_ERR, "evtbuild.c: makeQueues: %s", "queueVarSize option is wrong.");
+ retVal = -1;
+ break;
+ } else {
+ if (queueNum >= my->nrOfMsgs) {
+ syslog(LOG_ERR, "evtbuild.c: makeQueues: %s",
+ "queue number in queueVarSize option exceeds maximum number of data sources.");
+ retVal = -1;
+ break;
+ }
+
+ my->queueSize[queueNum] = atoi(newQueueSize);
+ }
+ }
+
+ return retVal;
}
static void argsDump(TheArgs *my)
syslog(LOG_DEBUG, "nrOfMsgs: %d", my->nrOfMsgs);
syslog(LOG_DEBUG, "priority: %d", my->priority);
syslog(LOG_DEBUG, "isStandalone: %d", my->isStandalone);
- syslog(LOG_DEBUG, "queueSize: %d", my->queueSize);
+ syslog(LOG_DEBUG, "queueSize: %d", my->queueSize[0]);
syslog(LOG_DEBUG, "verbosity: %s", my->verbosity);
if (strcmp(my->shmname, "") != 0) {
syslog(LOG_DEBUG, "shmem name: %s", my->shmname);
my->nrOfMsgs = 0;
my->priority = 0;
my->isStandalone = 1;
- my->queueSize = 4 * 1024 * 1024;
+ my->queueSize[0] = 64 * 1024 * 1024UL;
strcpy(my->verbosity, "info");
for (i = 0; i < MAXINPATH; i++) {
- my->varQSize[i] = 4 * 1024 * 1024;
+ my->queueVarSize[i] = my->queueVarSizeS[i];
}
+ my->queueVarSizeCnt = 0; /* no varible queue sizes by default */
+
my->buffStat = 0;
strcpy(my->shmname, "");
+
+ sprintf(my->evtstat, "%s_evtstat.txt", getenv("DAQ_SETUP"));
}
+
+
static int argsFromCL(TheArgs *my, int argc, char *argv[])
{
extern char *optarg;
inPathCntFromParam = my->inPathCnt;
my->inPathCnt = 0;
- while ((i = getopt(argc, argv, "ai:m:q:p:v:bS:")) != -1) {
+ while ((i = getopt(argc, argv, "ai:m:q:Q:p:v:bS:")) != -1) {
switch (i) {
case 'm':
my->nrOfMsgs = atoi(optarg);
my->priority = atoi(optarg);
break;
case 'q':
- my->queueSize = atoi(optarg);
+ my->queueSize[0] = strtoul(optarg, NULL, 0);
+ break;
+ case 'Q':
+ strcpy(my->queueVarSize[my->queueVarSizeCnt++], optarg);
break;
case 'v':
strcpy(my->verbosity, optarg);
Param_getInt(param, name, "nrofmsgs", ¶mWasFound, &my->nrOfMsgs);
Param_getInt(param, name, "stndln", ¶mWasFound, &my->isStandalone);
Param_getInt(param, name, "prio", ¶mWasFound, &my->priority);
- Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize);
+ Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize[0]);
Param_getString(param, name, "verb", ¶mWasFound, my->verbosity);
- /* this is an argument for a variable queue size. S.Y. */
- Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize);
+ /* this is an argument for a variable queue size */
+ /* Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize); */
desParam(param);
}
/* Add statistic for fill levels of buffers in percentage. */
static time_t t_0 = 0;
- float buffSize, queueSize;
+ unsigned long buffSize, queueSize;
time_t t, dT;
int i;
unsigned long fillLevel;
if (dT >= interval) {
for (i = 0; i < theArgs->nrOfMsgs; i++) {
- buffSize = 2 * theArgs->varQSize[i];
+ buffSize = 2 * theArgs->queueSize[i];
queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
/* Add here statistic for fill levels of buffers */
fillLevel = (unsigned long) (100 * queueSize + 0.5) / buffSize;
(*my->netmemBuff[i]) = fillLevel;
+ /* printf("q[%02d]%5d\%\n",i,fillLevel); */
+
/* Add more statistic for recv bytes per second */
(*my->recvBytesRate[i]) = *nettr[i]->bytesReceived - lastBytesRecv[i];
lastBytesRecv[i] = *nettr[i]->bytesReceived;
}
+ t_0 = t;
}
-
- t_0 = t;
}
static void statsBufferDump(TheArgs *theArgs, float interval, ShmTrans **shmtr, NetTrans **my)
fprintf(stderr, "q[%2d]: ", i);
if (shmtr[i] != NULL)
- if (theArgs->varQSize[i] * 2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) {
+ if (theArgs->queueSize[i] * 2 - HadTuQueue_size(shmtr[i]->wrQueue) > 0) {
fprintf(stderr, "%8d ", HadTuQueue_size(shmtr[i]->wrQueue));
} else {
fprintf(stderr, "%8s ", "EXCEEDED");
for (j = 0; j < maxnorm; j++) {
fprintf(stderr, "%1d ", maxnorm - j - 1);
for (i = 0; i < theArgs->nrOfMsgs; i++) {
- buffSize = 2 * theArgs->varQSize[i];
+ buffSize = 2 * theArgs->queueSize[i];
queueSize = HadTuQueue_size(shmtr[i]->wrQueue);
if (maxnorm - maxnorm * queueSize / buffSize < j) {
t0 = t;
}
+static void storeEvtStat(TheArgs *myArgs, unsigned long pktsDiscard, unsigned long msgsDiscard, unsigned long queue)
+{
+
+ FILE *fp;
+ char ltime[20]; /* local time */
+ time_t t = time(NULL);
+ strftime(ltime, 20, "%Y-%m-%d %H:%M:%S", localtime(&t));
+
+ fp = fopen(myArgs->evtstat, "a+");
+ fprintf(fp, "%s %d %s %d %d %d\n", "daq_netmem: ", (int) getpid(), ltime, queue, pktsDiscard, msgsDiscard);
+ fclose(fp);
+}
+
static void statsDump(TheArgs *theArgs, NetTrans **my, int interval)
{
+ static unsigned long lastPR[MAXINPATH];
+ static unsigned long lastPD[MAXINPATH];
+ static unsigned long lastMD[MAXINPATH];
+
static unsigned long lastBR[MAXINPATH];
static time_t lastTime = 0;
time_t dT;
int i;
-
if (theArgs->isStandalone && strcmp(theArgs->verbosity, "info") == 0 && (dT = (time(NULL) - lastTime)) >= interval
&& !(theArgs->buffStat)) {
fputs("==============================================================================\n", stderr);
lastBR[i] = *my[i]->bytesReceived;
fputc('\n', stderr);
fputs("------------------------------------------------------------------------------\n", stderr);
+
+ if ((long) (*my[i]->pktsReceived) - (long) lastPR < 0) {
+ lastPR[i] = 0;
+ lastPD[i] = 0;
+ lastMD[i] = 0;
+ }
+
+ if ((*my[i]->pktsDiscarded - lastPD[i]) > 0 || (*my[i]->msgsDiscarded - lastMD[i]) > 0)
+ storeEvtStat(theArgs, *my[i]->pktsDiscarded - lastPD[i], *my[i]->msgsDiscarded - lastMD[i], i);
+
+ lastPR[i] = *my[i]->pktsReceived;
+ lastPD[i] = *my[i]->pktsDiscarded;
+ lastMD[i] = *my[i]->msgsDiscarded;
}
+
lastTime += dT;
}
}
Param_storeInt(p, n, "nrofmsgs", my->nrOfMsgs);
Param_storeInt(p, n, "stndln", my->isStandalone);
Param_storeInt(p, n, "prio", my->priority);
- Param_storeInt(p, n, "qsize", my->queueSize);
+ Param_storeInt(p, n, "qsize", my->queueSize[0]);
Param_storeString(p, n, "verb", my->verbosity);
desParam(p);
}
usage(argv[0]);
exit(EXIT_FAILURE);
}
+
+ if (0 > makeQueues(theArgs)) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
for (i = 0; prioritynames[i].c_name != NULL && 0 != strcmp(prioritynames[i].c_name, theArgs->verbosity); i++) {
}
if (prioritynames[i].c_name == NULL) {
exit(EXIT_FAILURE);
}
for (i = 0; i < theArgs->nrOfMsgs; i++) {
- char buf[80];
+ char buf[_POSIX_PATH_MAX];
netTrans[i] = NetTrans_create(theArgs->inPath[i], 0, worker);
if (netTrans[i] == NULL) {
/* use shared mem name from CL if given */
sprintf(buf, "netqueue%s%d", theArgs->shmname, i);
- if (theArgs->queueSize > 0) {
- /* if the queueSize is given in CL */
- shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize);
- } else {
- /* otherwise use variable queue size */
- shmTrans[i] = ShmTrans_open(buf, 2 * (long) theArgs->varQSize[i]);
- }
+ shmTrans[i] = ShmTrans_open(buf, 2 * theArgs->queueSize[i]);
if (shmTrans[i] == NULL) {
syslog(LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror(errno));
}
hadTu[i] = NULL;
- if (theArgs->queueSize > 0) {
+ if (theArgs->queueSize[i] > 0) {
/* if the queueSize is given in CL */
- hadTuSize[i] = theArgs->queueSize - HadTu_hdrSize();
+ /* if( theArgs->queueSize[i] > 102400 + HadTu_hdrSize() ){ */
+/* hadTuSize[i] = 102400; */
+/* } */
+/* else */
+ hadTuSize[i] = theArgs->queueSize[i] - HadTu_hdrSize();
} else {
/* otherwise use variable queue size */
/* hadTuSize[i] = theArgs->varQSize[i] - HadTu_hdrSize(); */
- hadTuSize[i] = 51200;
+ hadTuSize[i] = 204800;
/*
- * This 50 kB must be bigger than a size of a message which
+ * This 200 kB must be bigger than a size of a message which
* should usually be less than 32kB (UDP_packet_size - water_mark).
* However some HUGE messages can appear. Those HUGE messages must
* be below hadTuSize!!! Otherwise the code will crash.