-static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.105 2010-04-20 16:29:34 hadaq Exp $";
+static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.106 2010-06-05 23:00:27 hadaq Exp $";
#define _POSIX_C_SOURCE 199309L
#define SYSLOG_NAMES
#include "ansiTape.h"
#include "genid32.h"
-#ifdef RFIO
+#undef RFIO
#include "rawapin.h" /* for rfio */
static RFILE *fRemote = NULL;
#endif
int slowCtrlFileCnt;
unsigned long isStandalone;
unsigned long priority;
- unsigned long queueSize;
+ unsigned long queueSize[MAXINPATH];
+ int queueSizeCnt;
char verbosity[PARAM_MAX_VALUE_LEN];
unsigned long evtId;
unsigned long maxFileSz;
double secsizelimit;
double resdown_offset;
- /* the following arguments are for the variable queue size. S.Y. */
- unsigned long varQSize[MAXINPATH];
- int varQSizeCnt;
+ /* the following arguments are for the variable queue size */
+ char *queueVarSize[MAXINPATH];
+ char queueVarSizeS[MAXINPATH][PARAM_MAX_NAME_LEN];
+ int queueVarSizeCnt;
char rfioRemotePath[PARAM_MAX_VALUE_LEN];
char rfioLustrePath[PARAM_MAX_VALUE_LEN];
char shmname[PARAM_MAX_VALUE_LEN];
unsigned int ebnum;
char runinfo2ora[PARAM_MAX_VALUE_LEN];
+ char evtstat[PARAM_MAX_VALUE_LEN]; /* file name for statistics of discarded event */
char fileName[PARAM_MAX_VALUE_LEN];
} TheArgs;
syslog(LOG_ERR, "Usage: [--ignore] ignore trigger mismatch conditions");
}
+static int makeQueues(TheArgs *my)
+{
+ int retVal = 0;
+ int i;
+
+ int queueNum;
+ char newQueueSize[30];
+
+ /* Init all queue sizes */
+ for (i = 1; i < my->nrOfMsgs; i++) {
+ my->queueSize[i] = my->queueSize[0];
+ }
+
+ /* Extract specific values for queue sizes */
+ for (i = 0; i < my->queueVarSizeCnt; i++) {
+
+ if (0 == (sscanf(my->queueVarSize[i], "%d:%s", &queueNum, newQueueSize))) {
+ syslog(LOG_ERR, "evtbuild.c: makeQueues: %s", "queueVarSize option is wrong.");
+ retVal = -1;
+ break;
+ } else {
+ if (queueNum >= my->nrOfMsgs) {
+ syslog(LOG_ERR, "evtbuild.c: makeQueues: %s",
+ "queue number in queueVarSize option exceeds maximum number of data sources.");
+ retVal = -1;
+ break;
+ }
+
+ my->queueSize[queueNum] = atoi(newQueueSize);
+ }
+ }
+
+ return retVal;
+}
+
static void argsDump(TheArgs *my)
{
int i;
syslog(LOG_DEBUG, "expId: %s", my->expId);
syslog(LOG_DEBUG, "priority: %d", my->priority);
syslog(LOG_DEBUG, "isStandalone: %d", my->isStandalone);
- syslog(LOG_DEBUG, "queueSize: %d", my->queueSize);
+ syslog(LOG_DEBUG, "queueSize: %d", my->queueSize[0]);
syslog(LOG_DEBUG, "verbosity: %s", my->verbosity);
syslog(LOG_DEBUG, "evtId: %ld", my->evtId);
syslog(LOG_DEBUG, "maxFileSz: %ld", my->maxFileSz);
strcpy(my->shmname, "");
my->priority = 0;
my->isStandalone = 1;
- my->queueSize = 4 * 1024 * 1024UL;
+ my->queueSize[0] = 64 * 1024 * 1024UL;
strcpy(my->verbosity, "info");
my->evtId = 0;
my->maxFileSz = (1.5 * 1024 * 1024 * 1024UL - 1);
strcpy(my->respath, "");
for (i = 0; i < MAXINPATH; i++) {
- my->varQSize[i] = 4 * 1024 * 1024UL;
+ my->queueVarSize[i] = my->queueVarSizeS[i];
}
+ my->queueVarSizeCnt = 0; /* no varible queue sizes by default */
strcpy(my->rfioRemotePath, "");
strcpy(my->rfioLustrePath, "");
/* read path from DAQ_SETUP and define full file name */
sprintf(my->runinfo2ora, "%s_runinfo2ora.txt", getenv("DAQ_SETUP"));
+ sprintf(my->evtstat, "%s_evtstat.txt", getenv("DAQ_SETUP"));
}
static int argsFromCL(TheArgs *my, int argc, char *argv[])
{"ignore", 0, 0, 'i'},
{0, 0, 0, 0}
};
- i = getopt_long(argc, argv, "am:f:r:o:d:q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:A:bEL:S:B:O:iW:M:F:X:C:", long_options,
+ i = getopt_long(argc, argv, "am:f:r:o:d:q:Q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:A:bEL:S:B:O:iW:M:F:X:C:", long_options,
&option_index);
if (i == -1)
break;
my->priority = strtoul(optarg, NULL, 0);
break;
case 'q':
- my->queueSize = strtoul(optarg, NULL, 0);
+ my->queueSize[0] = strtoul(optarg, NULL, 0);
+ break;
+ case 'Q':
+ strcpy(my->queueVarSize[my->queueVarSizeCnt++], optarg);
break;
case 'v':
strcpy(my->verbosity, optarg);
}
/* Conditions: if the Remote Event Server is used */
- if ((my->resdownscale != 0 && (my->resnumevents == -1 ||
- (strcmp(my->respath, "") == 0))) ||
- (my->resnumevents != -1 && (my->resdownscale == 0 ||
- (strcmp(my->respath, "") == 0))) ||
+ if ((my->resdownscale != 0 && (my->resnumevents == -1 || (strcmp(my->respath, "") == 0))) ||
+ (my->resnumevents != -1 && (my->resdownscale == 0 || (strcmp(my->respath, "") == 0))) ||
((strcmp(my->respath, "") != 0) && (my->resnumevents == -1 || my->resdownscale == 0))) {
fprintf(stderr,
return 1;
}
- if (((my->maxFileSz) < (my->queueSize))) {
+ if (((my->maxFileSz) < (my->queueSize[0]))) {
fprintf(stderr, "<E> evtbuild.c, argsCheck(): --filesize must be larger than queuesize(-q)\n");
return 1;
}
Param_getString(param, name, "expid", ¶mWasFound, my->expId);
Param_getInt(param, name, "stndln", ¶mWasFound, &my->isStandalone);
Param_getInt(param, name, "prio", ¶mWasFound, &my->priority);
- Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize);
+ Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize[0]);
Param_getString(param, name, "verb", ¶mWasFound, my->verbosity);
Param_getInt(param, name, "evtid", ¶mWasFound, &my->evtId);
Param_getInt(param, name, "maxfilesz", ¶mWasFound, &my->maxFileSz);
- Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize);
+ /* Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize); */
desParam(param);
}
/* Add statistic for fill levels of buffers in percentage. */
static time_t t_0 = 0;
- float buffSize, queueSize;
+ unsigned long buffSize, queueSize;
time_t t, dT;
int i;
unsigned long fillLevel;
if (dT >= interval) {
for (i = 0; i < theArgs->nrOfMsgs; i++) {
- buffSize = 2 * theArgs->varQSize[i];
+ buffSize = 2 * theArgs->queueSize[i];
queueSize = HadTuQueue_size(shmtr[i]->rdQueue);
/* Add here statistic for fill levels of buffers */
- fillLevel = (unsigned long) (100 * queueSize + 0.5) / buffSize;
+ fillLevel = (unsigned long) (100. * queueSize + 0.5) / buffSize;
(*my->evtbuildBuff[i]) = fillLevel;
+
+ /* printf("q[%02d]%5d\%\n",i,fillLevel); */
}
+ t_0 = t;
}
-
- t_0 = t;
}
static void statsBufferDump(TheArgs *theArgs, TheStats *my, float interval, HadTuQueue **htuq, ShmTrans **shmtr, char *progName)
for (j = 0; j < maxnorm; j++) {
fprintf(stderr, "%1d ", maxnorm - j - 1);
for (i = 0; i < theArgs->nrOfMsgs; i++) {
- buffSize = 2 * theArgs->varQSize[i];
+ buffSize = 2 * theArgs->queueSize[i];
queueSize = HadTuQueue_size(shmtr[i]->rdQueue);
if (maxnorm - maxnorm * queueSize / buffSize < j) {
return myRunId;
}
-static void statsDump(TheArgs *theArgs, TheStats *my, int interval)
+static void storeEvtStat(TheArgs *myArgs, unsigned long evtstat)
{
- static unsigned long lastEC;
- static unsigned long lastEE;
- static unsigned long lastTE;
- static unsigned long lastED;
- static unsigned long lastBW;
+ FILE *fp;
+ char ltime[20]; /* local time */
+ time_t t = time(NULL);
+ strftime(ltime, 20, "%Y-%m-%d %H:%M:%S", localtime(&t));
+
+ fp = fopen(myArgs->evtstat, "a+");
+ fprintf(fp, "%s %d %s %d %d %d\n", "daq_evtbuild:", (int) getpid(), ltime, myArgs->runNr, myArgs->ebnum, evtstat);
+ fclose(fp);
+}
+
+static void statsDump(TheArgs *theArgs, TheStats *my, int interval)
+{
+ static unsigned long lastEC = 0;
+ static unsigned long lastEE = 0;
+ static unsigned long lastTE = 0;
+ static unsigned long lastED = 0;
+ static unsigned long lastBW = 0;
static time_t t0 = 0;
time_t t, dT;
int i;
fprintf(stderr, "%19s:%6s", "evtsTagError ", unit(*my->evtsTagError));
fputc('\n', stderr);
- if (dT > 0) {
- fprintf(stderr, "%19s:%6s", "evtsComplete/s", unit((*my->evtsComplete - lastEC) / dT));
- fprintf(stderr, "%19s:%6s", "evtsDiscarded/s", unit((*my->evtsDiscarded - lastED) / dT));
- fprintf(stderr, "%19s:%6s", "bytesWritten/s", unit((*my->bytesWritten - lastBW) / dT));
- fputc('\n', stderr);
- fprintf(stderr, "%19s:%6s", "evtsDataError/s", unit((*my->evtsDataError - lastEE) / dT));
- fprintf(stderr, "%19s:%6s", "evtsTagError/s", unit((*my->evtsTagError - lastTE) / dT));
- fputc('\n', stderr);
+ if ((long) (*my->evtsComplete) - (long) lastEC < 0) {
+ lastEC = 0;
+ lastED = 0;
+ lastBW = 0;
+ lastEE = 0;
+ lastTE = 0;
}
+ fprintf(stderr, "%19s:%6s", "evtsComplete/s", unit((*my->evtsComplete - lastEC) / dT));
+ fprintf(stderr, "%19s:%6s", "evtsDiscarded/s", unit((*my->evtsDiscarded - lastED) / dT));
+ fprintf(stderr, "%19s:%6s", "bytesWritten/s", unit((*my->bytesWritten - lastBW) / dT));
+ fputc('\n', stderr);
+ fprintf(stderr, "%19s:%6s", "evtsDataError/s", unit((*my->evtsDataError - lastEE) / dT));
+ fprintf(stderr, "%19s:%6s", "evtsTagError/s", unit((*my->evtsTagError - lastTE) / dT));
+ fputc('\n', stderr);
+
+ if (*my->evtsDiscarded - lastED > 0)
+ storeEvtStat(theArgs, *my->evtsDiscarded - lastED);
+
lastEC = *my->evtsComplete;
lastEE = *my->evtsDataError;
lastTE = *my->evtsTagError;
}
}
-
static void storeRunInfoStart(time_t t, TheArgs *myArgs)
{
/* open ascii file eb_runinfo2ora.txt to store simple information for
Param_storeString(p, n, "expid", my->expId);
Param_storeInt(p, n, "prio", my->priority);
Param_storeInt(p, n, "stndln", my->isStandalone);
- Param_storeInt(p, n, "qsize", my->queueSize);
+ Param_storeInt(p, n, "qsize", my->queueSize[0]);
Param_storeString(p, n, "verb", my->verbosity);
if (my->resdownscale != 0) {
Param_storeInt(p, n, "resdownscale", my->resdownscale);
exit(EXIT_FAILURE);
}
+ if (0 > makeQueues(theArgs)) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
if (argsCheck(theArgs)) {
sleep(10);
exit(EXIT_FAILURE);
hadTuQueue = malloc(sizeof(HadTuQueue *));
sprintf(buf, "subevtqueue%s", theArgs->shmname);
- shmTrans[0] = ShmTrans_create(buf, 2 * theArgs->queueSize);
+ shmTrans[0] = ShmTrans_create(buf, 2 * theArgs->queueSize[0]);
hadTuQueue[0] = NULL;
theArgs->nrOfMsgs = 1;
/* use shared mem name from CL if given */
sprintf(buf, "netqueue%s%d", theArgs->shmname, i);
- if (theArgs->queueSize > 0) {
- /* The queueSize is given in CL. */
- shmTrans[i] = ShmTrans_create(buf, 2 * theArgs->queueSize);
- } else {
- shmTrans[i] = ShmTrans_create(buf, 2 * (long) theArgs->varQSize[i]);
-
- if (shmTrans[i] == NULL) {
- fprintf(stderr, "<E> ShmTrans_create: could not create buffer /dev/shm/%s.shm\n", buf);
- }
+ shmTrans[i] = ShmTrans_create(buf, 2 * theArgs->queueSize[i]);
+ if (shmTrans[i] == NULL) {
+ fprintf(stderr, "<E> ShmTrans_create: could not create buffer /dev/shm/%s.shm\n", buf);
}
hadTuQueue[i] = NULL;
* Something went wrong with sinchronization of Event Builders,
* close the file.
*/
- if ((!(theArgs->epicsCtrl) && (*theStats->bytesWritten) >= ((theArgs->maxFileSz) - (theArgs->queueSize))) ||
+ if ((!(theArgs->epicsCtrl) && (*theStats->bytesWritten) >= ((theArgs->maxFileSz) - (theArgs->queueSize[0]))) ||
(theArgs->epicsCtrl && runNr < newRunId) ||
(theArgs->epicsCtrl && (*theStats->bytesWritten) >= 1900000000) || (theArgs->epicsCtrl && newRunId == 0)) {
evt = newEvt(EvtDecoding_64bitAligned, EvtId_runStop);
/* store simple stop run info */
storeRunInfoStop(ourTime, theArgs, theStats);
- statsDump(theArgs, theStats, 1);
+ /* statsDump(theArgs, theStats, 1); */
for (i = 0; i < theArgs->nrOfMsgs; i++)
ShmTrans_remove(shmTrans[i]);