static char *rcsId =
- "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.82 2008-04-08 14:45:14 hadaq Exp $";
+ "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.83 2008-05-28 14:54:22 hadaq Exp $";
#define _POSIX_C_SOURCE 199309L
#define DEBUG2 0
#define CHECK_MISMATCH 1
#define BEAM 1
+#define TIMEOFFSET 1200000000 /* needed to reconstruct time from runId */
static FILE *outFile;
+static FILE *outLustreFile;
static FILE *outSecondFile;
static FILE *outRESFile;
static AnsiTape *outTape;
static uint32_t seqNr;
static uint32_t res_seqNr;
static uint32_t runNr;
+static uint32_t newRunId; /* needed to get new RUN id from epics ctrl */
static time_t ourTime;
static long file_size;
static long res_file_size;
unsigned int rfioFlag;
char rfioRemotePath[PARAM_MAX_VALUE_LEN];
- unsigned int buffStat;
+ char lustrePath[PARAM_MAX_VALUE_LEN];
+ unsigned short buffStat;
+ unsigned short epicsCtrl;
} TheArgs;
typedef struct TheStatsS
unsigned long *evtId[NEVTIDS];
unsigned long *trigNr[MAXINPATH];
unsigned long *evtsRes;
+ unsigned long *evtbuildBuff[MAXINPATH];
+ unsigned long *nrOfMsgs;
+ unsigned long *runId;
+ unsigned long *evtsCompletePS;
+ unsigned long *bytesWrittenPS;
} TheStats;
static jmp_buf terminateJmp;
-
-
-void
-sigHandler (int sig)
+void sigHandler (int sig)
{
longjmp (terminateJmp, sig);
}
-static void *
-appendFile (void *my, const char *path)
+static void *appendFile (void *my, const char *path)
{
void *subEvt;
char *dataBuf;
return my;
}
-static void
-usage (const char *progName)
+static void usage (const char *progName)
{
syslog (LOG_ERR, "Usage: %s [-x expId]", progName);
syslog (LOG_ERR, "Usage: [-m nrOfMsgs] [-f slowCtrlFile ...]");
syslog (LOG_ERR, "Usage: [-o outPath] [-d null|tape|file|stdout]");
syslog (LOG_ERR, "Usage: [-q queueSize] [-r runNumber]");
syslog (LOG_ERR, "Usage: [-a (agent)] [-p priority] [-I evtId]");
- syslog (LOG_ERR,
- "Usage: [-v debug|info|notice|warning|err|alert|crit|emerg]");
+ syslog (LOG_ERR, "Usage: [-v debug|info|notice|warning|err|alert|crit|emerg]");
syslog (LOG_ERR, "Usage: [--norpc]");
syslog (LOG_ERR, "Usage: [--filesize maximum_size_of_output_file[in MB]]");
- syslog (LOG_ERR, "Usage: [--resdownscale resdownscale_of_events]");
- syslog (LOG_ERR, "Usage: [--resnumevents number_of_events_in_one_resfile]");
- syslog (LOG_ERR,
- "Usage: [--respath path_where_the_downscaling_data_are_written]");
- syslog (LOG_ERR, "Usage: [--secsizelimit max size of second dir[in MB]]");
- syslog (LOG_ERR, "Usage: [--ressizelimit max number of files in res dir]");
- syslog (LOG_ERR,
- "Usage: [--write_data path_where_the_mirroring_data_are_ written]");
+ syslog (LOG_ERR, "Usage: [--resdownscale resdownscale_factor] downscale factor for the res events");
+ syslog (LOG_ERR, "Usage: [--resnumevents evt_num] maximum number of events in a resfile");
+ syslog (LOG_ERR, "Usage: [--respath path] path for the res directory");
+ syslog (LOG_ERR, "Usage: [--secsizelimit max_size] maximum size of second directory with the mirrored data [in MB]");
+ syslog (LOG_ERR, "Usage: [--ressizelimit max_file_num] maximum number of files in res dir");
+ syslog (LOG_ERR, "Usage: [--write_data path] path to the directory with mirrored data");
#ifdef RFIO
- syslog (LOG_ERR, "[--rfio 0|1] write data to tape via rfio (0=no/1=yes)");
- syslog (LOG_ERR, "[--rfiopath path_to_tape_archive]");
+ syslog (LOG_ERR, "Usage: [--rfio path_to_tape_archive] example: --rfio rfiodaq:gstore:/hadaqtest/test002");
#endif
- syslog (LOG_ERR, "[--buffstat 0|1] show fill levels of buffers]");
+ syslog (LOG_ERR, "Usage: [--buffstat] show fill levels of buffers");
+ syslog (LOG_ERR, "Usage: [--epicsctrl] enable synch and distribution of RUN Id by Epics for parallel event builders");
+ syslog (LOG_ERR, "Usage: [--lustre path_to_lustre] path to the file on the Lustre cluster");
}
-static void
-argsDump (TheArgs * my)
+static void argsDump (TheArgs * my)
{
int i;
syslog (LOG_DEBUG, "nrOfMsgs: %d", my->nrOfMsgs);
- for (i = 0; i < my->slowCtrlFileCnt; i++)
- {
- syslog (LOG_DEBUG, "slowCtrlFiles[%d]: %s", i, my->slowCtrlFiles[i]);
- }
+ for (i = 0; i < my->slowCtrlFileCnt; i++) {
+ syslog (LOG_DEBUG, "slowCtrlFiles[%d]: %s", i, my->slowCtrlFiles[i]);
+ }
syslog (LOG_DEBUG, "outPath: %s", my->outPath);
syslog (LOG_DEBUG, "outDev: %s", my->outDev);
syslog (LOG_DEBUG, "runNr: %d", my->runNr);
syslog (LOG_DEBUG, "verbosity: %s", my->verbosity);
syslog (LOG_DEBUG, "evtId: %ld", my->evtId);
syslog (LOG_DEBUG, "maxFileSz: %ld", my->maxFileSz);
- if (my->resdownscale != 0)
- {
+ if (my->resdownscale != 0) {
syslog (LOG_DEBUG, "resdownscale: %ld", my->resdownscale);
syslog (LOG_DEBUG, "resnumevents: %ld", my->resnumevents);
syslog (LOG_DEBUG, "respath: %s", my->respath);
syslog (LOG_DEBUG, "secsizelimit: %ld", my->secsizelimit);
syslog (LOG_DEBUG, "ressizelimit: %d", my->ressizelimit);
- }
- if (my->no_rpc == 1)
- {
+ }
+ if (my->no_rpc == 1) {
syslog (LOG_DEBUG, "no rpc is set");
- }
- if (my->write_data == 1)
- {
+ }
+ if (my->write_data == 1) {
syslog (LOG_DEBUG, "sec_path: %s", my->sec_path);
- }
+ }
+ if( strcmp( my->rfioRemotePath, "" ) != 0 ) {
+ syslog (LOG_DEBUG, "rfio path: %s", my->rfioRemotePath);
+ }
+ if( strcmp( my->lustrePath, "" ) != 0 ) {
+ syslog (LOG_DEBUG, "lustre path: %s", my->lustrePath);
+ }
}
-static void
-argsDefault (TheArgs * my)
+static void argsDefault (TheArgs * my)
{
int i;
strcpy (my->expId, "xx");
my->priority = 0;
my->isStandalone = 1;
- my->queueSize = 1 * 1024 * 1024;
+ my->queueSize = 4 * 1024 * 1024UL;
strcpy (my->verbosity, "info");
my->evtId = 0;
my->maxFileSz = (2 * 1024 * 1024 * 1024UL - 1);
strcpy (my->respath, "");
for (i = 0; i < MAXINPATH; i++) {
- my->varQSize[i] = 1 * 1024 * 1024;
+ my->varQSize[i] = 4 * 1024 * 1024UL;
}
- my->rfioFlag = 0;
+
strcpy (my->rfioRemotePath, "");
+ strcpy (my->lustrePath, "");
my->buffStat = 0;
+ my->epicsCtrl = 0;
}
-static int
-argsFromCL (TheArgs * my, int argc, char *argv[])
+static int argsFromCL (TheArgs * my, int argc, char *argv[])
{
extern char *optarg;
int i;
{"write_data", 1, 0, 'w'},
{"help", 0, 0, 'H'},
{"rfio", 1, 0, 'R'},
- {"rfiopath", 1, 0, 'P'},
- {"buffstat", 1, 0, 'b'},
+ {"buffstat", 0, 0, 'b'},
+ {"epicsctrl", 0, 0, 'E'},
+ {"lustre", 1, 0, 'L'},
{0, 0, 0, 0}
};
- i =
- getopt_long (argc, argv,
- "am:f:r:o:d:q:p:v:x:I:S:tz:e:n:h:w:tz:e:n:Hs:l:R:P:b:",
+ i = getopt_long (argc, argv,
+ "am:f:r:o:d:q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:bEL:",
long_options, &option_index);
if (i == -1)
break;
case 'I':
my->evtId = strtoul (optarg, NULL, 0);
break;
- case 'S':
- my->maxFileSz = strtoul (optarg, NULL, 0);
- break;
- case 't': /*norpc - no arg */
+ case 't': /* norpc - no arg */
my->no_rpc = 1;
break;
- case 'z': /*filesize - one arg */
- if (strtoul (optarg, NULL, 0) > 0
- && strtoul (optarg, NULL, 0) < 20000)
- {
- my->maxFileSz = (1024 * 1024UL * strtoul (optarg, NULL, 0));
- }
- else
- {
- printf
- ("filesize option incorrect,it should be >0 and <20000\n");
- exit (0);
- }
+ case 'z': /* filesize - one arg */
+ my->maxFileSz = (1024 * 1024UL * strtoul (optarg, NULL, 0));
break;
- case 'e': /*resdownscale- one arg & need resnumevents & respath */
- if (strtoul (optarg, NULL, 0) > 0
- && strtoul (optarg, NULL, 0) < 100001)
- {
- my->resdownscale = strtoul (optarg, NULL, 0);
- my->resdown_offset = EVENT_NUM_OFFSET / (my->resdownscale);
- }
- else
- {
- printf
- ("resdownscale option incorrect,it should be >0 and <100001\n");
- }
+ case 'e': /* need resnumevents & respath */
+ my->resdownscale = strtoul (optarg, NULL, 0);
+ if(my->resdownscale > 0)
+ my->resdown_offset = EVENT_NUM_OFFSET / (my->resdownscale);
break;
- case 'n': /*resnumevents - one arg */
- if (strtoul (optarg, NULL, 0) > 99
- && strtoul (optarg, NULL, 0) < 1000000001)
- {
- my->resnumevents = strtoul (optarg, NULL, 0);
- }
- else
- {
- printf
- ("resnumevents incorrect, it should be >99 and <1000000001\n");
- }
+ case 'n':
+ my->resnumevents = strtoul (optarg, NULL, 0);
break;
- case 'l': /*secsizelimit - one arg */
- if (strtoul (optarg, NULL,0) > 10
- && strtoul (optarg, NULL,0) < 100001)
- {
- unsigned long tmp = (/*1024 * 1024UL **/ strtoul (optarg, NULL,0));
-#if 0
- my->secsizelimit = (((double)tmp)/((double)(1024.*1024.)));
-#endif
- my->secsizelimit = (double)tmp;
- printf("set secsizelimit: %d MB \n",my->secsizelimit);
- }
+ case 'l':
+ my->secsizelimit = (double) strtoul (optarg, NULL,0);
break;
- case 's': /*ressizelimit - one arg */
- if (strtoul (optarg, NULL, 0) > 5
- && strtoul (optarg, NULL, 0) < 1000)
- {
- my->ressizelimit = strtoul (optarg, NULL, 0);
- }
+ case 's':
+ my->ressizelimit = strtoul (optarg, NULL, 0);
break;
- case 'h': /*respath - one arg as dir_path */
+ case 'h':
strcpy (my->respath, optarg);
break;
- case 'w': /*write_data - one arg as dir_path */
+ case 'w':
my->write_data = 1;
strcpy (my->sec_path, optarg);
break;
case 'R':
- my->rfioFlag = strtoul (optarg, NULL, 0);
- break;
- case 'P':
strcpy (my->rfioRemotePath, optarg);
break;
+ case 'L':
+ strcpy (my->lustrePath, optarg);
+ break;
case 'b':
- my->buffStat = strtoul (optarg, NULL, 0);
+ my->buffStat = 1;
+ break;
+ case 'E':
+ my->epicsCtrl = 1;
break;
case 'H':
usage (argv[0]);
printf ("%s ", argv[optind++]);
printf ("\n");
}
-/*
-the condition that resdownscale resnumevents respath have to be together
-*/
- if (((my->resdownscale != 0) || (my->resnumevents != -1)
- || (strcmp (my->respath, ""))) && ((my->resdownscale == 0)
- || (my->resnumevents == -1)
- || !strcmp (my->respath, "")))
-#if 0
- if (((my->resdownscale != 0) || (my->resnumevents != -1)
- || (my->respath != NULL)) && ((my->resdownscale == 0)
- || (my->resnumevents == -1)
- || (my->respath == NULL)))
-#endif
- {
- printf
- ("if you are using Remote Events Server, you should fill: resdownscale, resnumevents, respath options\n");
- exit (EXIT_FAILURE);
- }
- if (((my->maxFileSz) < (my->queueSize)))
- {
- printf ("--filesize has to be > queuesize(-q)\n");
- exit (0);
- }
+
+ return 0;
+}
+
+static int argsCheck( TheArgs *my )
+{
+ /*
+ * Check the content of TheArgs.
+ */
+
+ /* check my->maxFileSz */
+ if( (strcmp(my->outDev, "null") != 0) && (my->maxFileSz <= 0 || my->maxFileSz >= 1024 * 1024UL * 2000) ) {
+ fprintf( stderr, "<E> evtbuild.c, argsCheck(): --filesize must be >0 and <20000\n");
+ return 1;
+ }
+
+ /* Conditions: if the Remote Event Server is used */
+ if ( ( my->resdownscale != 0 && ( my->resnumevents == -1 ||
+ (strcmp(my->respath, "") == 0) ) ) ||
+ ( my->resnumevents != -1 && ( my->resdownscale == 0 ||
+ (strcmp(my->respath, "") == 0) ) ) ||
+ ( (strcmp(my->respath, "") != 0) && ( my->resnumevents == -1 ||
+ my->resdownscale == 0 ) ) ) {
+
+ fprintf( stderr, "<E> evtbuild.c, argsCheck(): options --resdownscale --resnumevents --respath must be specified together\n");
+ return 1;
+ }
+
+ /* check my->resdownscale */
+ if( (strcmp(my->respath, "") != 0) &&
+ ( my->resdownscale <= 0 || my->resdownscale >= 100001 ) ) {
+ fprintf( stderr,"<E> evtbuild.c, argsCheck(): --resdownscale must be >0 and <100001\n");
+ return 1;
+ }
+
+ /* check my->resnumevents */
+ if( (strcmp(my->respath, "") != 0) &&
+ ( my->resnumevents <= 99 || my->resnumevents >= 1000000001 ) ) {
+ fprintf( stderr,"<E> evtbuild.c, argsCheck(): --resnumevents must be >99 and <1000000001\n");
+ return 1;
+ }
+
+ /* check my->secsizelimit */
+ if( (strcmp(my->respath, "") != 0) &&
+ ( my->secsizelimit <= -0.0001 || my->secsizelimit >= 0.0001 ) &&
+ ( my->secsizelimit <= 10 || my->secsizelimit >= 100001 ) ) {
+ fprintf( stderr,"<E> evtbuild.c, argsCheck(): --secsizelimit must be >10 and <100000 MB\n");
+ return 1;
+ }
+
+ /* check my->ressizelimit */
+ if( (strcmp(my->respath, "") != 0) &&
+ ( my->ressizelimit <= 5 || my->ressizelimit >= 1000 ) ) {
+ fprintf( stderr, "<E> evtbuild.c, argsCheck(): --ressizelimit must be >5 and <1000\n");
+ return 1;
+ }
+
+ if (((my->maxFileSz) < (my->queueSize))) {
+ fprintf( stderr, "<E> evtbuild.c, argsCheck(): --filesize must be larger than queuesize(-q)\n");
+ return 1;
+ }
+
/* the condition ressizelimit has to be together with respath */
- if ((my->ressizelimit) != 0)
- {
- if (!strcmp (my->respath, ""))
- {
- printf ( "you have to define path, where the files will be collect. use 'respath' option\n" );
- exit (EXIT_FAILURE);
- }
- }
+ if ((my->ressizelimit) != 0 && (strcmp(my->respath, "") == 0) ) {
+ fprintf( stderr, "<E> evtbuild.c, argsCheck(): --respath is not given\n" );
+ return 1;
+ }
+
return 0;
}
-static int
-argsFromParam (TheArgs * my, int argc, char *argv[])
+static int argsFromParam (TheArgs *my, int argc, char *argv[])
{
Param paramS, *param = ¶mS;
int paramWasFound;
conSetupParam (param, getenv ("DAQ_SETUP"));
name = (char *) basename (argv[0]);
- Param_getInt (param, name, "nrofmsgs", ¶mWasFound, &my->nrOfMsgs);
+ Param_getInt (param, name, "nrofmsgs", ¶mWasFound, &my->nrOfMsgs);
Param_getStringArray (param, name, "slwctrlfile", PARAM_MAX_ARRAY_LEN,
&my->slowCtrlFileCnt, my->slowCtrlFiles);
Param_getString (param, name, "outpath", ¶mWasFound, my->outPath);
- Param_getString (param, name, "outdev", ¶mWasFound, my->outDev);
- Param_getString (param, name, "expid", ¶mWasFound, my->expId);
- Param_getInt (param, name, "stndln", ¶mWasFound, &my->isStandalone);
- Param_getInt (param, name, "prio", ¶mWasFound, &my->priority);
- Param_getInt (param, name, "qsize", ¶mWasFound, &my->queueSize);
- Param_getString (param, name, "verb", ¶mWasFound, my->verbosity);
- Param_getInt (param, name, "evtid", ¶mWasFound, &my->evtId);
- Param_getInt (param, name, "maxfilesz", ¶mWasFound, &my->maxFileSz);
+ Param_getString (param, name, "outdev", ¶mWasFound, my->outDev);
+ Param_getString (param, name, "expid", ¶mWasFound, my->expId);
+ Param_getInt (param, name, "stndln", ¶mWasFound, &my->isStandalone);
+ Param_getInt (param, name, "prio", ¶mWasFound, &my->priority);
+ Param_getInt (param, name, "qsize", ¶mWasFound, &my->queueSize);
+ Param_getString (param, name, "verb", ¶mWasFound, my->verbosity);
+ Param_getInt (param, name, "evtid", ¶mWasFound, &my->evtId);
+ Param_getInt (param, name, "maxfilesz", ¶mWasFound, &my->maxFileSz);
Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize);
desParam (param);
}
-static char *
-unit (unsigned long v)
+static char *unit (unsigned long v)
{
static char retVal[6];
static char u[] = " kM";
int i;
- for (i = 0; v >= 10000 && i < sizeof (u) - 2; v /= 1000, i++)
- {
- }
+ for (i = 0; v >= 10000 && i < sizeof (u) - 2; v /= 1000, i++) {}
sprintf (retVal, "%4d%c", v, u[i]);
return retVal;
printf ("Time: %s.%03ld\n", time_string, milliseconds);
}
-static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, HadTuQueue **htuq, ShmTrans **shmtr)
+static void add2Stat( TheArgs * theArgs, TheStats * my, float interval, ShmTrans **shmtr )
+{
+ /* Add statistic for fill levels of buffers in percentage. */
+
+ static unsigned long lastEvtsComplete;
+ static unsigned long lastBytesWritten;
+ static time_t t_0 = 0;
+ float buffSize, queueSize;
+ time_t t, dT;
+ int i;
+ unsigned long fillLevel;
+
+ t = time (NULL);
+ dT = t - t_0;
+
+ if( dT >= interval ) {
+ for( i=0; i<theArgs->nrOfMsgs; i++ ) {
+ buffSize = 2*theArgs->varQSize[i];
+ queueSize = HadTuQueue_size(shmtr[i]->rdQueue);
+
+ /* Add here statistic for fill levels of buffers */
+ fillLevel = (unsigned long) (100*queueSize+0.5)/buffSize;
+ (*my->evtbuildBuff[i]) = fillLevel;
+ }
+
+ /* Add more statistic for evtsComplete and bytesWritten per second */
+ (*my->evtsCompletePS) = (*my->evtsComplete - lastEvtsComplete) / dT;
+ (*my->bytesWrittenPS) = (*my->bytesWritten - lastBytesWritten) / dT;
+ }
+
+ t_0 = t;
+}
+
+static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval,
+ HadTuQueue **htuq, ShmTrans **shmtr)
{
static unsigned long lastEC2;
static unsigned long lastBW2;
int outputGraph = 1;
int outputNum = 0;
- if (theArgs->isStandalone && theArgs->buffStat == 1) {
+ if (theArgs->isStandalone && theArgs->buffStat) {
t = time (NULL);
dT = t - t0;
/* get wall-clock time */
printTime();
+
+ unsigned long runId2print;
+ Worker_getStatistic( "daq_evtbuild", "runId", &runId2print);
+ printf("ioc: RUN Id = %lu\n", runId2print);
}
}
}
}
+unsigned long getRunId( TheArgs *my )
+{
+ unsigned long myRunId = 0;
+ struct timespec tv = { 0, 1e+8 };
+
+ while( my->epicsCtrl && myRunId == 0 ) {
+ if( Worker_getStatistic( "daq_evtbuild", "runId", &myRunId) == -1 ) {
+ fprintf( stderr, "<E> evtbuild.c: getRunId: Worker_getStatistic: cannot get runId!" );
+ sleep(1);
+ }
+ else {
+
+ if( myRunId == 0 )
+ nanosleep( &tv, NULL );
+ }
+
+ }
+
+ return myRunId;
+}
+
static void statsDump (TheArgs * theArgs, TheStats * my, int interval)
{
static unsigned long lastEC;
time_t t, dT;
int i;
-
if (theArgs->isStandalone && strcmp (theArgs->verbosity, "info") == 0 &&
- theArgs->buffStat == 0)
+ !(theArgs->buffStat))
{
t = time (NULL);
dT = t - t0;
fclose(fp);
}
-#define NJUNK 128
-static void
-storeInfoStart (const char *n, time_t t, TheArgs * my)
+static void storeInfoStart (const char *n, time_t t, TheArgs * my)
{
Param pS, *p = &pS;
int i;
Param_storeString (p, n, "respath", my->respath);
Param_storeInt (p, n, "ressizelimit", my->ressizelimit);
- }
- if (my->secsizelimit != 0.)
- {
-/* Param_storeString (p, n, "secsizelimit", my->secsizelimit);*/
}
if (my->no_rpc == 1)
{
{
Param_storeString (p, n, "sec_path", my->sec_path);
}
- /* This storing junk is for having the run start in
- * oracle definately -- BS
- */
-
- /*
- *for (i = 0; i < NJUNK; i++)
- * {
- * Param_storeString (p, n, "junk",
- * "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
- * }
- */
desParam (p);
}
desParam (p);
}
-int is_mismatch_enough_to_stop(TheArgs * theArgs, TheStats * my) {
- /*unsigned long filesize_enough = (theArgs->maxFileSz) * 0.02;*/
- unsigned long filesize_enough = (theArgs->maxFileSz) * 0.1;
- /*unsigned long discarded_enough = (*my->evtsComplete) * 0.03;*/
- unsigned long discarded_enough = (*my->evtsComplete) * 0.1;
- unsigned long tagerror_enough = (*my->evtsComplete) * 0.03;
- if ((*my->bytesWritten) >= filesize_enough)
- {
- if ((*my->evtsDiscarded) > discarded_enough)
- {
- printf("Too many events are broken.\n");
- printf("Debug Inf: filesize_enough: %u,fileWritten: %u, DISCARDED_enough: %u, discarded: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),discarded_enough, (*my->evtsDiscarded), (*my->evtsComplete));
- syslog (LOG_ERR, "Too many events are broken.\n");
- syslog (LOG_ERR, "Debug Inf: filesize_enough: %u,fileWritten: %u, DISCARDED_enough: %u, discarded: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),discarded_enough, (*my->evtsDiscarded), (*my->evtsComplete));
+int is_mismatch_enough_to_stop(TheArgs * theArgs, TheStats * my)
+{
+ /* min file size to start checking */
+ unsigned long minfilesize = (theArgs->maxFileSz) * 0.1; /* 0.02 */
+ /* max number of discarded evts allowed */
+ unsigned long maxdiscarded = (*my->evtsComplete) * 0.1; /* 0.03 */
+ /* max number of evts with tag error allowed */
+ unsigned long maxtagerror = (*my->evtsComplete) * 0.03;
+
+ if ((*my->bytesWritten) >= minfilesize) {
+ if ((*my->evtsDiscarded) > maxdiscarded) {
+
+ syslog (LOG_ERR, "<E> evtbuild.c: Too many events are broken!");
+ syslog (LOG_ERR, "<D> evtbuild.c: bytes written: %u, minimum file size: %u",
+ minfilesize, (*my->bytesWritten) );
+ syslog (LOG_ERR, "<D> evtbuild.c: discarded evts: %u, allowed: %u, total evts num: %u",
+ (*my->evtsDiscarded), maxdiscarded, (*my->evtsComplete));
#if BEAM
- system ("echo tagerror | netcat -w1 hadesdaq 12122");
+ system ("echo tagerror | netcat -w1 hadesdaq 12122");
#endif
- sleep(5);
+ sleep(5);
}
- if ((*my->evtsTagError) > tagerror_enough)
- {
- printf("Too many TagErrors.\n");
- printf("Debug Inf: filesize_enough: %u,fileWritten: %u, TAGERROR_enough: %u, tag_error: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),tagerror_enough, (*my->evtsTagError), (*my->evtsComplete));
- syslog(LOG_ERR, "Too many TagErrors.\n");
- syslog(LOG_ERR, "Debug Inf: filesize_enough: %u,fileWritten: %u, TAGERROR_enough: %u, tag_error: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),tagerror_enough, (*my->evtsTagError), (*my->evtsComplete));
+ if ((*my->evtsTagError) > maxtagerror) {
+
+ syslog(LOG_ERR, "<E> evtbuild.c: Too many TagErrors!");
+ syslog (LOG_ERR, "<D> evtbuild.c: bytes written: %u, minimum file size: %u",
+ minfilesize, (*my->bytesWritten) );
+ syslog (LOG_ERR, "<D> evtbuild.c: tagerror evts: %u, allowed: %u, total evts num: %u",
+ (*my->evtsTagError), maxtagerror, (*my->evtsComplete));
+
#if BEAM
- system ("echo tagerror | netcat -w1 hadesdaq 12122");
+ system ("echo tagerror | netcat -w1 hadesdaq 12122");
#endif
- sleep(5);
+ sleep(5);
}
}
return 0;
}
-int
-get_file_number_in_dir (char *path)
+
+int get_file_number_in_dir (char *path)
{
int file_number;
file_number = 0;
dir = opendir (path);
- if (dir == NULL)
- {
- printf ("open dir: %s failed\n", path);
- exit (EXIT_FAILURE);
- }
+ if (dir == NULL) {
+ syslog(LOG_ERR, "%s, %d: trying to open directory %s: %s",
+ __FILE__, __LINE__, path, strerror(errno));
+ exit (EXIT_FAILURE);
+ }
while (NULL != (dirptr = readdir (dir)))
- {
-#if 0
- strcpy (tmppath, path);
- strcat (tmppath, "/");
- strcat (tmppath, dirptr->d_name);
- lstat (tmppath, buf);
+ file_number ++;
- if (S_ISDIR (buf->st_mode))
- {
-#if 0
- printf ("directory %s \n", tmppath);
-#endif
- }
- else
- {
-#endif
- file_number ++;
-#if 0
- }
-#endif
- }
closedir(dir);
+
return file_number;
}
-double
-get_directory_size (char *path)
+
+double get_directory_size (char *path)
{
double directory_size;
directory_size = 0.;
dir = opendir (path);
- if (dir == NULL)
- {
- printf ("open dir: %s failed\n", path);
+ if (dir == NULL) {
+ syslog(LOG_ERR, "%s, %d: trying to open directory %s: %s",
+ __FILE__, __LINE__, path, strerror(errno));
exit (EXIT_FAILURE);
- }
+ }
- while (NULL != (dirptr = readdir (dir)))
- {
+ while (NULL != (dirptr = readdir (dir))) {
strcpy (tmppath, path);
strcat (tmppath, "/");
strcat (tmppath, dirptr->d_name);
lstat (tmppath, buf);
- if (S_ISDIR (buf->st_mode))
- {
-#if 0
- printf ("directory %s \n", tmppath);
-#endif
- }
- else
- {
+ if( !(S_ISDIR (buf->st_mode)) ) {
directory_size += (double)(buf->st_size/(double)(1024.*1024.));
- }
- }
+ }
+ }
+
closedir(dir);
return directory_size;
}
#define DEBUG 0
-static int
-remove_file (char *path)
+static int remove_file (char *path)
{
-/*delete files if limit was reached*/
-#if 0
- printf ("limit overflow: ressizelimit: %f, res_dirSize: %f\n",
- theArgs->ressizelimit, res_dirSize);
-#endif
+ /*delete files if limit was reached*/
+
DIR *dir;
struct dirent *dirptr;
struct stat bufS, *buf = &bufS;
char tmppath[PARAM_MAX_VALUE_LEN];
char last_modification_file[PARAM_MAX_VALUE_LEN];
dir = opendir (path);
- if (dir == NULL)
- {
- printf ("open dir: %s failed\n", path);
+
+ if (dir == NULL) {
+ syslog(LOG_ERR, "%s, %d: opening dir %s failed", __FILE__, __LINE__, path);
exit (EXIT_FAILURE);
- }
+ }
- while (1)
- {
- if (NULL == (dirptr = readdir (dir)))
- {
- break;
- }
- strcpy (tmppath, path);
- strcat (tmppath, "/");
- strcat (tmppath, dirptr->d_name);
- lstat (tmppath, buf);
- if(strstr(tmppath,"hld") == NULL){
- continue;
- }
+ while (1) {
- if (S_ISDIR (buf->st_mode))
- {
-#if 0
- printf ("directory %s \n", tmppath);
-#endif
- }
- else
- {
- if (buf->st_mtime < last_modification_time)
- {
- last_modification_time = buf->st_mtime;
- recover_size = buf->st_size;
- strcpy (last_modification_file, tmppath);
- }
- }
+ if (NULL == (dirptr = readdir (dir)))
+ break;
+
+ strcpy (tmppath, path);
+ strcat (tmppath, "/");
+ strcat (tmppath, dirptr->d_name);
+ lstat (tmppath, buf);
+
+ if(strstr(tmppath,"hld") == NULL)
+ continue;
+
+
+ if ( !(S_ISDIR(buf->st_mode)) ) {
+ if (buf->st_mtime < last_modification_time) {
+ last_modification_time = buf->st_mtime;
+ recover_size = buf->st_size;
+ strcpy (last_modification_file, tmppath);
+ }
}
+ }
+
closedir(dir);
-#if 0
- printf ("unlink: %s\n", last_modification_file);
-#endif
- if(0!=unlink (last_modification_file)){
- printf("cannot unlink %s\n",last_modification_file);
+
+ if( 0 != unlink (last_modification_file) ) {
+ syslog(LOG_ERR, "%s, %d: cannot unlink %s", __FILE__, __LINE__, last_modification_file);
exit(0);
}
return recover_size;
char fileName[_POSIX_PATH_MAX];
static char outPath[_POSIX_PATH_MAX];
+ static char outLustrePath[_POSIX_PATH_MAX];
static char sec_path[_POSIX_PATH_MAX];
static once = 1;
diff_time = 1;
- runNr = genId32 ();
+
+ if( !(theArgs->epicsCtrl) )
+ runNr = genId32 ();
+
seqNr = 0;
theArgs->runNr = runNr;
-
+
if (once) {
file_size = theArgs->maxFileSz;
strcpy (sec_path, theArgs->sec_path);
strcpy (outPath, theArgs->outPath);
+ strcpy (outLustrePath, theArgs->lustrePath);
once = 0;
}
else {
strcpy (theArgs->outPath, outPath);
+ strcpy (theArgs->lustrePath, outLustrePath);
strcpy (theArgs->sec_path, sec_path);
}
outTape = NULL;
outFile = NULL;
outSecondFile = NULL;
+ outLustreFile = NULL;
+
if (strcmp (theArgs->outDev, "null") == 0) {
outFile = NULL;
}
strcat (theArgs->outPath, fileName);
}
}
-
+
if (NULL == (outFile = fopen (theArgs->outPath, "wb"))) {
syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath,
strerror (errno));
statfs (theArgs->outPath, buf);
if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) {
errno = ENOSPC;
-/* if (0 != fclose (outFile)) { */
-/* fprintf (stderr, "<E> evtbuild.c, openFile: fclose 'File' failed\n"); */
-/* } */
outFile = NULL;
unlink (theArgs->outPath);
- syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath,
- strerror (errno));
+ syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, strerror (errno));
return -1;
}
}
strcpy (theArgs->outPath, fileName);
}
if (NULL == (outTape = openAnsiTape (theArgs->outPath, "/dev/tape"))) {
- syslog (LOG_ERR, "opening tape %s: %s", theArgs->outPath,
- strerror (errno));
+ syslog (LOG_ERR, "opening tape %s: %s", theArgs->outPath, strerror (errno));
outFile = NULL;
return -1;
}
syslog (LOG_ERR, "unknown outputDev \"%s\"", theArgs->outDev);
return -1;
}
+
if (theArgs->write_data) {
dirSize = get_directory_size (sec_path);
-#if 0
- printf("dirSize: %f, secsizelimit: %f\n",dirSize,theArgs->secsizelimit);
-#endif
- while (dirSize > theArgs->secsizelimit) {
- /*delete files if limit was reached */
-#if 0
- printf ("limit overflow: secsizelimit: %f, dirSize: %f\n",
- theArgs->secsizelimit, dirSize);
- getchar();
-#endif
- remove_file (sec_path);
- dirSize = get_directory_size (sec_path);
- }
- if (strcmp (theArgs->sec_path, "") == 0) {
- strcpy (theArgs->sec_path, fileName);
+
+ while (dirSize > theArgs->secsizelimit) {
+ /*delete files if limit was reached */
+ remove_file (sec_path);
+ dirSize = get_directory_size (sec_path);
+ }
+ if (strcmp (theArgs->sec_path, "") == 0) {
+ strcpy (theArgs->sec_path, fileName);
+ }
+ else {
+ struct stat bufS, *buf = &bufS;
+
+ stat (theArgs->sec_path, buf);
+ if (S_ISDIR (buf->st_mode)) {
+ strcat (theArgs->sec_path, "/");
+ strcat (theArgs->sec_path, fileName);
}
- else {
- struct stat bufS, *buf = &bufS;
-
- stat (theArgs->sec_path, buf);
- if (S_ISDIR (buf->st_mode)) {
- strcat (theArgs->sec_path, "/");
- strcat (theArgs->sec_path, fileName);
+ }
+ if (NULL == (outSecondFile = fopen (theArgs->sec_path, "wb"))) {
+ syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path, strerror (errno));
+ outSecondFile = NULL;
+ return -1;
+ }
+ else {
+ struct statfs bufS, *buf = &bufS;
+ statfs (theArgs->sec_path, buf);
+ if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) {
+ errno = ENOSPC;
+ if (0 != fclose (outSecondFile)) {
+ fprintf (stderr, "<E> evtbuild.c, openFile: fclose 'outSecondFile' failed\n");
}
- }
- if (NULL == (outSecondFile = fopen (theArgs->sec_path, "wb"))) {
- syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path,
- strerror (errno));
outSecondFile = NULL;
+ unlink (theArgs->sec_path);
+ syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path, strerror (errno));
return -1;
}
- else {
- struct statfs bufS, *buf = &bufS;
- statfs (theArgs->sec_path, buf);
- if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) {
- errno = ENOSPC;
- if (0 != fclose (outSecondFile)) {
- fprintf (stderr, "<E> evtbuild.c, openFile: fclose 'outSecondFile' failed\n");
- }
- outSecondFile = NULL;
- unlink (theArgs->sec_path);
- syslog (LOG_ERR, "opening file %s: %s",
- theArgs->sec_path, strerror (errno));
- return -1;
- }
+ }
+ }
+
+ /* Open file on Lustre */
+ if( strcmp (theArgs->lustrePath, "") != 0 ) {
+ struct stat bufS, *buf = &bufS;
+ stat (theArgs->lustrePath, buf);
+
+ /* If lustrePath is a dir name, add fileName to the lustrePath */
+ if (S_ISDIR (buf->st_mode)) {
+ strcat (theArgs->lustrePath, "/");
+ strcat (theArgs->lustrePath, fileName);
+ }
+
+ if (NULL == (outLustreFile = fopen (theArgs->lustrePath, "wb"))) {
+ syslog (LOG_ERR, "opening file %s: %s", theArgs->lustrePath, strerror (errno));
+ }
+ else {
+ struct statfs bufS, *buf = &bufS;
+ statfs (theArgs->lustrePath, buf);
+
+ /* Check if the number of free available blocks
+ * is enough to write the file with maximum size.
+ */
+ if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) {
+ errno = ENOSPC;
+
+ outLustreFile = NULL;
+ unlink (theArgs->lustrePath);
+ syslog (LOG_ERR, "%s, %d: opening file %s: %s",
+ __FILE__, __LINE__, theArgs->lustrePath, strerror (errno));
+ return -1;
}
+ }
}
+
+#ifdef RFIO
+ /* open file on Data Mover */
+ if( fRemote != NULL ) {
+ char rfioPath[_POSIX_PATH_MAX];
+ strcpy( rfioPath, theArgs->rfioRemotePath );
+ strcat( rfioPath, "/" );
+ strcat( rfioPath, fileName );
+
+ fprintf ( stderr, "<D> evtbuild.c, rfio_fnewfile(): trying to open remote file %s\n", rfioPath );
+ if( 0 != rfio_fnewfile( fRemote, rfioPath ) ) {
+ syslog( LOG_ERR, "%s, %d: cannot open remote file %s: %s",
+ __FILE__, __LINE__, rfioPath, strerror(errno) );
+ exit (EXIT_FAILURE);
+ }
+ }
+#endif
+
return 0;
}
-
+
static int openRESFile (TheArgs * theArgs)
{
char fileName[_POSIX_PATH_MAX];
if (theArgs->ressizelimit != 0) {
res_dirNr = get_file_number_in_dir (respath);
-#if 0
- printf("res_dirNr: %d, theArgs->ressizelimit: %d\n,", res_dirNr,theArgs->ressizelimit);
-#endif
while (res_dirNr > theArgs->ressizelimit) {
/*delete files if limit was reached */
-#if 0
- printf ("limit overflow: res_sizelimit: %ul, res_dirSize: %ul\n",
- theArgs->ressizelimit, res_dirSize);
-#endif
remove_file (respath);
res_dirNr--;
}
- }
+ }
strcpy (fileName, theArgs->expId);
strftime (fileName + strlen (fileName), 15, "%y%j%H%M%S_",
localtime (&res_time));
static int filecounter = 1;
- if(diff_time == 1){
+
+ if( diff_time == 1 ) {
filecounter = 1;
diff_time = 0;
- }else {
+ } else {
filecounter++;
}
+
char app[8];
sprintf (app, "%d", filecounter);
if (strcmp (theArgs->respath, "") == 0) {
strcpy (theArgs->respath, fileName);
- }
+ }
else {
struct stat bufS, *buf = &bufS;
if (S_ISDIR (buf->st_mode)) {
strcat (theArgs->respath, "/");
strcat (theArgs->respath, fileName);
- }
- }
+ }
+ }
/* construct a default filename */
- outRESFile = NULL;
+ outRESFile = NULL;
-#if 0
- /*if files are created more often than 1 sec, postfix is added to their name */
- struct stat bufS, *buf = &bufS;
- while (0 == stat (theArgs->respath, buf))
- { /*file exists */
- strcpy (theArgs->respath, respath);
- sprintf (file_app, "%d", filecounter);
- strcat (theArgs->respath, "/");
- strcat (theArgs->respath, fileName);
- strcat (theArgs->respath, file_app);
- filecounter++;
- }
-#endif
if (NULL == (outRESFile = fopen (theArgs->respath, "wb"))) {
- syslog (LOG_ERR, "opening file %s: %s", theArgs->respath,
- strerror (errno));
+ syslog (LOG_ERR, "%s, %d: opening file %s: %s",
+ __FILE__, __LINE__, theArgs->respath, strerror (errno));
outRESFile = NULL;
return -1;
}
{
errno = ENOSPC;
if (0 != fclose (outRESFile)) {
- fprintf (stderr, "<E> evtbuild.c, openRESFile: fclose 'outRESFile' failed\n");
- }
+ syslog(LOG_ERR, "%s, %d: trying fclose 'outRESFile': %s",
+ __FILE__, __LINE__, strerror(errno));
+ }
outRESFile = NULL;
unlink (theArgs->respath);
- syslog (LOG_ERR, "opening file %s: %s", theArgs->respath,
- strerror (errno));
+
+ syslog (LOG_ERR, "%s, %d: opening file %s: %s",
+ __FILE__, __LINE__, theArgs->respath, strerror (errno));
return -1;
}
}
else if (outTape != NULL) {
writeFileR = writeAnsiTape (outTape, evt, Evt_paddedSize (evt));
}
+
#ifdef RFIO
- else if( fRemote != NULL ) {
- int rfioStatus = 0;
- rfioStatus = rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote );
- if (rfioStatus <= 0)
- fprintf ( stderr, "<E> evtbuild.c, rfio_fwrite: writing file failed\n");
+ /* write to Data Mover via RFIO */
+ if( fRemote != NULL ) {
+ if( 0 >= rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote ) )
+ syslog(LOG_ERR, "%s, %d: writing file via RFIO: %s", __FILE__, __LINE__, strerror(errno));
}
#endif
+ /* write to Lustre */
+ if (outLustreFile != NULL) {
+ writeFileR = fwrite (evt, 1, Evt_paddedSize (evt), outLustreFile);
+ }
+
/* writing file in the second dir */
if (outSecondFile != NULL) {
fwrite (evt, 1, Evt_paddedSize (evt), outSecondFile);
return writeFileR;
}
-static int
-writeRESFile (void *evt)
+static int writeRESFile (void *evt)
{
int writeFileR;
Evt_setSeqNr (evt, res_seqNr++);
static int closeFile ()
{
int closeFileR;
+ int closeLustreFileR;
/* closing file in the second dir */
if (outSecondFile != NULL) {
dirSize += file_size;
if (0 != fclose (outSecondFile)) {
- fprintf (stderr, "<E> evtbuild.c, closeFile: fclose 'outSecondFile2' failed\n");
+ syslog(LOG_ERR, "%s, %d: trying fclose 'outSecondFile': %s",
+ __FILE__, __LINE__, strerror(errno));
}
}
if (outFile != NULL) {
closeFileR = fclose (outFile);
if (0 != closeFileR) {
- printf ("<E> evtbuild.c, closeFile: fclose 'closeFileR' failed\n");
+ syslog(LOG_ERR, "%s, %d: trying fclose 'outFile': %s",
+ __FILE__, __LINE__, strerror(errno));
}
}
else if (outTape != NULL) {
closeFileR = closeAnsiTape (outTape);
}
+ /* close file on Lustre */
+ if (outLustreFile != NULL) {
+ closeLustreFileR = fclose (outLustreFile);
+ if (0 != closeLustreFileR) {
+ syslog(LOG_ERR, "%s, %d: trying fclose 'outLustreFile': %s",
+ __FILE__, __LINE__, strerror(errno));
+ exit (EXIT_FAILURE);
+ }
+ }
+
+#ifdef RFIO
+ /* close file on Data Mover */
+ if( fRemote != NULL ) {
+ if( 0 != rfio_fendfile( fRemote ) ) {
+ syslog(LOG_ERR, "%s, %d: trying rfio_fendfile: %s",
+ __FILE__, __LINE__, strerror(errno));
+ exit (EXIT_FAILURE);
+ }
+ }
+#endif
+
return closeFileR;
}
res_dirSize += res_file_size;
closeFileR = fclose (outRESFile);
if (0 != closeFileR) {
- fprintf ( stderr, "<E> evtbuild.c, closeRESFile: fclose 'outRESFile2' failed!\n");
- }
- }
- else{
- fprintf ( stderr,"<E> evtbuild.c, closeRESFile: closeFile error outRESFile == NULL\n");
+ syslog(LOG_ERR, "%s, %d: trying fclose 'outRESFile': %s",
+ __FILE__, __LINE__, strerror(errno));
+ }
+ }
+ else {
+ syslog(LOG_ERR, "%s, %d: closeRESFile failed: outRESFile is NULL",
+ __FILE__, __LINE__);
}
return closeFileR;
#ifdef RFIO
static int rfio_openConnection (TheArgs * theArgs)
{
- if( theArgs->rfioFlag ){
+
+ if( (strcmp (theArgs->rfioRemotePath, "") != 0) ){
char rfioBase[128] = "";
char *pcc;
strcpy( rfioBase, theArgs->rfioRemotePath );
pcc++;
strncpy(pcc, "\0", 1); /* terminates after node name */
- fprintf ( stderr, "<D> evtbuild.c:rfio_openConnection(): try to open connection to server\n" );
+ fprintf ( stderr, "<I> evtbuild.c, rfio_fopen(): trying to open connection to Data Mover: %s\n", rfioBase );
fRemote = rfio_fopen( rfioBase, "wb" );
- fprintf ( stderr, "<D> evtbuild.c:rfio_openConnection(): opened connection to server\n" );
-
- if (fRemote == NULL)
- {
- fprintf ( stderr, "<E> evtbuild.c:rfio_openConnection(): cannot open connection to server\n" );
- syslog (LOG_ERR, "<E> evtbuild.c:rfio_openConnection(): cannot open connection to server");
+ if (fRemote == NULL) {
+ syslog(LOG_ERR, "%s, %d: trying to open connection to Data Mover %s: %s",
+ __FILE__, __LINE__, rfioBase, strerror(errno));
exit (EXIT_FAILURE);
}
}
static int rfio_closeConnection (TheArgs * theArgs)
{
- if( theArgs->rfioFlag && fRemote != NULL ){
- int rfioStatus;
-
- rfioStatus = rfio_fclose( fRemote );
+ if( (strcmp (theArgs->rfioRemotePath, "") != 0) && fRemote != NULL ) {
- if (rfioStatus)
- {
- fprintf ( stderr, "<E> evtbuild.c:rfio_closeConnection(): closing connection to server failed\n" );
- syslog (LOG_ERR, "<E> evtbuild.c:rfio_closeConnection(): closing connection to server failed");
+ if ( 0 != rfio_fclose( fRemote ) ) {
+ syslog(LOG_ERR, "%s, %d: trying to close connection to Data Mover: %s",
+ __FILE__, __LINE__, strerror(errno));
exit (EXIT_FAILURE);
}
}
return 0;
}
-
-static int rfio_openFile (TheArgs * theArgs)
-{
-
- if( theArgs->rfioFlag && fRemote != NULL){
- int rfioStatus;
- char rfioFileName[_POSIX_PATH_MAX];
- char rfioPath[_POSIX_PATH_MAX];
-
- strcpy( rfioPath, theArgs->rfioRemotePath );
-
-/* Evt_setSeqNr (evt, seqNr++); */
-/* Evt_setRunNr (evt, runNr); */
-
- strcpy (rfioFileName, theArgs->expId);
- strftime (rfioFileName + strlen (rfioFileName), 18, "%y%j%H%M%S.hld",
- localtime (&ourTime));
- strcat (rfioPath, "/");
- strcat (rfioPath, rfioFileName);
- fprintf ( stderr, "<D> evtbuild.c:rfio_openConnection(): try to open remote file %s\n", rfioPath );
- rfioStatus = rfio_fnewfile( fRemote, rfioPath );
- fprintf ( stderr, "<D> evtbuild.c:rfio_openConnection(): opened remote file %s\n", rfioPath );
-
- if (rfioStatus) {
- fprintf ( stderr, "<E> evtbuild.c:rfio_openFile(): cannot open remote file %s\n", rfioPath );
- syslog (LOG_ERR, "<E> evtbuild.c:rfio_openFile(): cannot open remote file %s", rfioPath);
- exit (EXIT_FAILURE);
- }
- }
-
- return 0;
-}
-
-static int rfio_closeFile (TheArgs * theArgs)
-{
- if( theArgs->rfioFlag && fRemote != NULL ){
- int rfioStatus;
-
- rfioStatus = rfio_fendfile( fRemote );
-
- if (rfioStatus)
- {
- fprintf ( stderr, "<E> evtbuild.c:rfio_openFile(): cannot close remote file\n");
- syslog (LOG_ERR, "<E> evtbuild.c:rfio_openFile(): cannot close remote file");
- exit (EXIT_FAILURE);
-
- }
- }
-
- return 0;
-}
-
-static int rfio_writeFile (TheArgs * theArgs, void *evt)
-{
- if( theArgs->rfioFlag && fRemote != NULL ){
- int rfioStatus = 0;
-
- Evt_setSeqNr( evt, seqNr++ );
- Evt_setRunNr( evt, runNr );
-
- rfioStatus = rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote );
-
- if (rfioStatus <= 0)
- {
- fprintf ( stderr, "<E> evtbuild.c:rfio_writeFile(): writing file failed\n");
-
- return 1;
- }
- }
-
- return 0;
-}
-
-#endif /* ifdef RFIO */
+#endif
/* BUGBUG bailOut not proper yet */
int main (int argc, char *argv[])
{
+
int i;
TheArgs theArgsS, *theArgs = &theArgsS;
TheStats theStatsS, *theStats = &theStatsS;
argsDefault (theArgs);
argsFromParam (theArgs, argc, argv);
- printf("theArgs->varQSizeCnt = %d\n",theArgs->varQSizeCnt);
+ if (0 > argsFromCL (theArgs, argc, argv)) {
+ usage (argv[0]);
+ exit (EXIT_FAILURE);
+ }
- for (i = 0; i < theArgs->varQSizeCnt; i++) {
- printf("theArgs->varQSize = %d\n", theArgs->varQSize[i]);
+ if( argsCheck(theArgs) ) {
+ sleep(10);
+ exit (EXIT_FAILURE);
}
- printf("theArgs->queueSize: %d\n", theArgs->queueSize);
- printf("theArgs->nrOfMsgs: %d\n", theArgs->nrOfMsgs);
- printf("theArgs->outPath: %s\n", theArgs->outPath);
- printf("theArgs->outDev: %s\n", theArgs->outDev);
+ for (i = 0; prioritynames[i].c_name != NULL &&
+ 0 != strcmp (prioritynames[i].c_name, theArgs->verbosity); i++) {}
- if (0 > argsFromCL (theArgs, argc, argv))
- {
- usage (argv[0]);
- exit (EXIT_FAILURE);
- }
- for (i = 0;
- prioritynames[i].c_name != NULL
- && 0 != strcmp (prioritynames[i].c_name, theArgs->verbosity); i++)
- {
- }
if (prioritynames[i].c_name == NULL)
- {
exit (EXIT_FAILURE);
- }
else
- {
setlogmask (LOG_UPTO (prioritynames[i].c_val));
- }
/* normalize experiment id */
theArgs->expId[0] = tolower (theArgs->expId[0]);
theArgs->expId[1] = tolower (theArgs->expId[1]);
theArgs->expId[2] = '\0';
- if (NULL ==
- (worker =
- Worker_initBegin (argv[0], sigHandler, theArgs->priority,
- theArgs->isStandalone)))
- {
+ if (NULL == (worker = Worker_initBegin (argv[0], sigHandler, theArgs->priority,
+ theArgs->isStandalone))) {
syslog (LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror (errno));
exit (EXIT_FAILURE);
- }
- if (theArgs->no_rpc)
- {
+ }
+
+ if (theArgs->no_rpc)
syslog (LOG_WARNING, "DISABLE of online service");
- }
- else
- {
- if (-1 == initOnline ())
- {
+ else
+ if (-1 == initOnline ())
syslog (LOG_WARNING, "unable to initialize online service");
- }
- }
- if (theArgs->nrOfMsgs == 0)
- {
+
+ if (theArgs->nrOfMsgs == 0) {
/*
no '-m' option was on command line, we assume that the
readout task (daq_readout) is running on the same node and
shmTrans[0] = ShmTrans_create ("subevtqueue", 2 * theArgs->queueSize);
hadTuQueue[0] = NULL;
theArgs->nrOfMsgs = 1;
- }
- else
- {
+ }
+ else {
shmTrans = malloc (theArgs->nrOfMsgs * sizeof (ShmTrans *));
hadTuQueue = malloc (theArgs->nrOfMsgs * sizeof (HadTuQueue *));
- for (i = 0; i < theArgs->nrOfMsgs; i++)
- {
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
char buf[_POSIX_PATH_MAX];
sprintf (buf, "netqueue%d", i);
}
hadTuQueue[i] = NULL;
- }
- }
+ }
+ }
- theStats->evtsDiscarded = Worker_addStatistic (worker, "evtsDiscarded");
- theStats->evtsComplete = Worker_addStatistic (worker, "evtsComplete");
- theStats->evtsDataError = Worker_addStatistic (worker, "evtsDataError");
- theStats->evtsTagError = Worker_addStatistic (worker, "evtsTagError");
- theStats->bytesWritten = Worker_addStatistic (worker, "bytesWritten");
+ theStats->evtsDiscarded = Worker_addStatistic (worker, "evtsDiscarded");
+ theStats->evtsComplete = Worker_addStatistic (worker, "evtsComplete");
+ theStats->evtsCompletePS = Worker_addStatistic (worker, "evtsCompletePS");
+ theStats->evtsDataError = Worker_addStatistic (worker, "evtsDataError");
+ theStats->evtsTagError = Worker_addStatistic (worker, "evtsTagError");
+ theStats->bytesWritten = Worker_addStatistic (worker, "bytesWritten");
+ theStats->bytesWrittenPS = Worker_addStatistic (worker, "bytesWrittenPS");
+ theStats->runId = Worker_addStatistic (worker, "runId");
+ (*theStats->runId) = 0; /* initialize to zero */
unsigned long *retVal = NULL;
-
theStats->evtsRes = (unsigned long *) &retVal;
- for (i = 0; i < NEVTIDS; i++)
- {
- char buf[WORKER_MAX_NAME_LEN];
- sprintf (buf, "evtId%d", i);
- theStats->evtId[i] = Worker_addStatistic (worker, buf);
- }
- for (i = 0; i < theArgs->nrOfMsgs; i++)
- {
- char buf[WORKER_MAX_NAME_LEN];
+ for (i = 0; i < NEVTIDS; i++) {
+ char buf[WORKER_MAX_NAME_LEN];
+
+ sprintf (buf, "evtId%d", i);
+ theStats->evtId[i] = Worker_addStatistic (worker, buf);
+ }
+ for (i = 0; i < theArgs->nrOfMsgs; i++) {
+ char buf[WORKER_MAX_NAME_LEN];
+
+ sprintf (buf, "trigNr%d", i);
+ theStats->trigNr[i] = Worker_addStatistic (worker, buf);
+ }
- sprintf (buf, "trigNr%d", i);
- theStats->trigNr[i] = Worker_addStatistic (worker, buf);
- }
+ /* Add statistic for fill levels of buffers. */
+ for( i=0; i<theArgs->nrOfMsgs; i++ ) {
+ char buf[WORKER_MAX_NAME_LEN];
+ sprintf( buf, "evtbuildBuff%d", i );
+ theStats->evtbuildBuff[i] = Worker_addStatistic( worker, buf );
+ }
+
+ theStats->nrOfMsgs = Worker_addStatistic( worker, "nrOfMsgs" );
+ (*theStats->nrOfMsgs) = theArgs->nrOfMsgs;
argsDump (theArgs);
int dataError = 0;
int tagError = 0;
+ add2Stat( theArgs, theStats, 1, shmTrans );
statsBufferDump( theArgs, theStats, 1, hadTuQueue, shmTrans );
statsDump (theArgs, theStats, 1);
if (*theStats->bytesWritten == 0)
{
- res_time = ourTime = time (NULL);
+
+ if( theArgs->epicsCtrl ) {
+ runNr = getRunId( theArgs );
+ res_time = ourTime = runNr + TIMEOFFSET;
+ }
+ else
+ res_time = ourTime = time (NULL);
+
+
if (-1 == openFile (theArgs))
{
syslog (LOG_ERR, "error opening output file, exiting");
exit (EXIT_FAILURE);
}
-#ifdef RFIO
- rfio_openFile( theArgs );
-#endif
-
storeInfoStart (argv[0], ourTime, theArgs);
/* store simple start run info */
storeRunInfoStart(ourTime, theArgs);
evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStart);
- for (i = 0; i < theArgs->slowCtrlFileCnt; i++)
- {
+
+ for (i = 0; i < theArgs->slowCtrlFileCnt; i++) {
evt = appendFile (evt, theArgs->slowCtrlFiles[i]);
}
+
(*theStats->bytesWritten) += Evt_size (evt);
writeFile (evt);
deleteEvt (evt);
}
+
if ((*theStats->evtsRes) == 0)
{
#if 0
ourTime = time (NULL);
#endif
/* remote event server - resdownscale - resnumevents*/
- if (theArgs->resdownscale)
- {
- if (-1 == openRESFile (theArgs))
- {
+ if (theArgs->resdownscale) {
+ if (-1 == openRESFile (theArgs)) {
syslog (LOG_ERR, "error opening RES output file");
exit (EXIT_FAILURE);
}
+
evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStart);
(*theStats->evtsRes)++;
writeRESFile (evt);
deleteEvt (evt);
}
-
}
+
evt = newEvt (EvtDecoding_64bitAligned, EvtId_data);
+
for (i = 0; i < theArgs->nrOfMsgs && !evtIsBroken; i += step)
{
uint32_t trigNr;
{
currTrigNr = SubEvt_trigNr (subEvt) >> 8;
currTrigTag = SubEvt_trigNr (subEvt) & 0xff;
+
if (theArgs->evtId != 0)
- {
currId = theArgs->evtId;
- }
else
- {
currId = SubEvt_pureId (subEvt);
- }
+
syslog (LOG_DEBUG,
"currTrigNr: 0x%06x, currTrigTag 0x%02x, currId 0x%08x",
currTrigNr, currTrigTag, currId);
}
+
trigNr = SubEvt_trigNr (subEvt) >> 8;
trigTag = SubEvt_trigNr (subEvt) & 0xff;
+
if (trigNr == currTrigNr)
{
if (SubEvt_size (subEvt) > SubEvt_hdrSize ())
{
/* sub evt is not empty */
if (SubEvt_dataError (subEvt))
- {
dataError = 1;
- }
+
if (trigTag != currTrigTag)
- {
tagError = 1;
- }
+
evt = Evt_appendSubEvt (evt, subEvt);
}
HadTuQueue_pop (hadTuQueue[i]);
}
(*theStats->bytesWritten) += Evt_size (evt);
writeFile (evt);
+
if (theArgs->resdownscale)
{
- (*theStats->evtsRes)++;
-/* if ((*theStats->evtsRes) % (theArgs->resdownscale) == 0)*/
- if (((*theStats->evtsRes) % EVENT_NUM_OFFSET) <
+
+ /*
+ * If the setting are as follows:
+ * EVENT_NUM_OFFSET == 100
+ * theArgs->resdown_offset == 5 (100/20)
+ * then from each 100 events only
+ * first 5 events are written to refFile.
+ */
+ if (((*theStats->evtsComplete) % EVENT_NUM_OFFSET) <
theArgs->resdown_offset)
{
+ (*theStats->evtsRes)++;
writeRESFile (evt);
}
}
is_mismatch_enough_to_stop(theArgs, theStats);
#endif
}
+
deleteEvt (evt);
- if ((*theStats->evtsRes) % 600 == 0)
- {
-#if 0
- printf ("condition: %u > %u\n", (*theStats->bytesWritten),
- ((theArgs->maxFileSz) - (theArgs->queueSize)));
- printf ("maxFileSz %u ; queueSize: %u\n", (theArgs->maxFileSz),
- theArgs->queueSize);
-#endif
- }
- if ((*theStats->bytesWritten) >=
- ((theArgs->maxFileSz) - (theArgs->queueSize)))
+
+ newRunId = getRunId( theArgs );
+
+ /*
+ * The following conditions mean:
+ * theArgs->epicsCtrl == 1 && runNr < newRunId
+ * New RUN Id was generated, close the file and open new one.
+ * theArgs->epicsCtrl == 1 && (*theStats->bytesWritten) >= 1900000000
+ * Due to whatever reason the file size exceeded an allowed limit,
+ * close the file.
+ * theArgs->epicsCtrl == 1 && runNr == 0
+ * Something went wrong with sinchronization of Event Builders,
+ * close the file.
+ */
+ if ( ( !(theArgs->epicsCtrl) && (*theStats->bytesWritten) >= ((theArgs->maxFileSz) - (theArgs->queueSize)) ) ||
+ (theArgs->epicsCtrl && runNr < newRunId) ||
+ (theArgs->epicsCtrl && (*theStats->bytesWritten) >= 1900000000) ||
+ (theArgs->epicsCtrl && newRunId == 0))
{
evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStop);
+
for (i = 0; i < theArgs->slowCtrlFileCnt; i++)
- {
evt = appendFile (evt, theArgs->slowCtrlFiles[i]);
- }
+
(*theStats->bytesWritten) += Evt_size (evt);
writeFile (evt);
deleteEvt (evt);
- ourTime = time (NULL);
+ if( theArgs->epicsCtrl )
+ ourTime = newRunId + TIMEOFFSET;
+ else
+ ourTime = time (NULL);
+
closeFile ();
-#ifdef RFIO
- rfio_closeFile( theArgs );
-#endif
+
storeInfoStop (argv[0], ourTime - 2, worker, theArgs);
/* store simple stop run info */
(*theStats->evtsDiscarded) = 0;
(*theStats->evtsDataError) = 0;
(*theStats->evtsTagError) = 0;
+
for (i = 0; i < theArgs->nrOfMsgs; i++)
- {
(*theStats->trigNr[i]) = 0;
- }
+
for (i = 0; i < NEVTIDS; i++)
- {
(*theStats->evtId[i]) = 0;
- }
+
}
+
if (theArgs->resdownscale)
{
- if ((*theStats->evtsRes) >=
- (theArgs->resdownscale) * (theArgs->resnumevents))
+ /*
+ * Number of events written to resFile is
+ * limited to resnumevents.
+ */
+ if ((*theStats->evtsRes) >= theArgs->resnumevents)
{
#if 0
ourTime = time (NULL);
}
}
}
+
ourTime = time (NULL);
closeFile ();
+
#ifdef RFIO
- rfio_closeFile( theArgs );
rfio_closeConnection( theArgs );
#endif
+
if (theArgs->resdownscale)
- {
closeRESFile (theArgs);
- }
+
storeInfoStop (argv[0], ourTime - 2, worker, theArgs);
/* store simple stop run info */
statsDump (theArgs, theStats, 1);
for (i = 0; i < theArgs->nrOfMsgs; i++)
- {
ShmTrans_remove (shmTrans[i]);
- }
+
finiOnline ();
Worker_fini (worker);
exit (EXIT_SUCCESS);