From 762b3c92b9cf7108e3c1333a6799515efb149f8a Mon Sep 17 00:00:00 2001 From: hadaq Date: Wed, 28 May 2008 14:54:22 +0000 Subject: [PATCH] New options: rfio to DM, lustre, synch via ioc (still to be tested), run Id from ioc, plus some cleaning. Sergey Yurevich. --- hadaq/evtbuild.c | 1194 +++++++++++++++++++++++----------------------- 1 file changed, 598 insertions(+), 596 deletions(-) diff --git a/hadaq/evtbuild.c b/hadaq/evtbuild.c index 35a04c3..7093ecd 100644 --- a/hadaq/evtbuild.c +++ b/hadaq/evtbuild.c @@ -1,5 +1,5 @@ static char *rcsId = - "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.82 2008-04-08 14:45:14 hadaq Exp $"; + "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.83 2008-05-28 14:54:22 hadaq Exp $"; #define _POSIX_C_SOURCE 199309L @@ -49,13 +49,16 @@ static RFILE *fRemote = NULL; #define DEBUG2 0 #define CHECK_MISMATCH 1 #define BEAM 1 +#define TIMEOFFSET 1200000000 /* needed to reconstruct time from runId */ static FILE *outFile; +static FILE *outLustreFile; static FILE *outSecondFile; static FILE *outRESFile; static AnsiTape *outTape; static uint32_t seqNr; static uint32_t res_seqNr; static uint32_t runNr; +static uint32_t newRunId; /* needed to get new RUN id from epics ctrl */ static time_t ourTime; static long file_size; static long res_file_size; @@ -98,7 +101,9 @@ typedef struct TheArgsS unsigned int rfioFlag; char rfioRemotePath[PARAM_MAX_VALUE_LEN]; - unsigned int buffStat; + char lustrePath[PARAM_MAX_VALUE_LEN]; + unsigned short buffStat; + unsigned short epicsCtrl; } TheArgs; typedef struct TheStatsS @@ -111,20 +116,21 @@ typedef struct TheStatsS unsigned long *evtId[NEVTIDS]; unsigned long *trigNr[MAXINPATH]; unsigned long *evtsRes; + unsigned long *evtbuildBuff[MAXINPATH]; + unsigned long *nrOfMsgs; + unsigned long *runId; + unsigned long *evtsCompletePS; + unsigned long *bytesWrittenPS; } TheStats; static jmp_buf terminateJmp; - - -void -sigHandler (int sig) +void sigHandler (int sig) { longjmp (terminateJmp, sig); } -static void * -appendFile (void *my, const char *path) +static void *appendFile (void *my, const char *path) { void *subEvt; char *dataBuf; @@ -167,43 +173,38 @@ appendFile (void *my, const char *path) return my; } -static void -usage (const char *progName) +static void usage (const char *progName) { syslog (LOG_ERR, "Usage: %s [-x expId]", progName); syslog (LOG_ERR, "Usage: [-m nrOfMsgs] [-f slowCtrlFile ...]"); syslog (LOG_ERR, "Usage: [-o outPath] [-d null|tape|file|stdout]"); syslog (LOG_ERR, "Usage: [-q queueSize] [-r runNumber]"); syslog (LOG_ERR, "Usage: [-a (agent)] [-p priority] [-I evtId]"); - syslog (LOG_ERR, - "Usage: [-v debug|info|notice|warning|err|alert|crit|emerg]"); + syslog (LOG_ERR, "Usage: [-v debug|info|notice|warning|err|alert|crit|emerg]"); syslog (LOG_ERR, "Usage: [--norpc]"); syslog (LOG_ERR, "Usage: [--filesize maximum_size_of_output_file[in MB]]"); - syslog (LOG_ERR, "Usage: [--resdownscale resdownscale_of_events]"); - syslog (LOG_ERR, "Usage: [--resnumevents number_of_events_in_one_resfile]"); - syslog (LOG_ERR, - "Usage: [--respath path_where_the_downscaling_data_are_written]"); - syslog (LOG_ERR, "Usage: [--secsizelimit max size of second dir[in MB]]"); - syslog (LOG_ERR, "Usage: [--ressizelimit max number of files in res dir]"); - syslog (LOG_ERR, - "Usage: [--write_data path_where_the_mirroring_data_are_ written]"); + syslog (LOG_ERR, "Usage: [--resdownscale resdownscale_factor] downscale factor for the res events"); + syslog (LOG_ERR, "Usage: [--resnumevents evt_num] maximum number of events in a resfile"); + syslog (LOG_ERR, "Usage: [--respath path] path for the res directory"); + syslog (LOG_ERR, "Usage: [--secsizelimit max_size] maximum size of second directory with the mirrored data [in MB]"); + syslog (LOG_ERR, "Usage: [--ressizelimit max_file_num] maximum number of files in res dir"); + syslog (LOG_ERR, "Usage: [--write_data path] path to the directory with mirrored data"); #ifdef RFIO - syslog (LOG_ERR, "[--rfio 0|1] write data to tape via rfio (0=no/1=yes)"); - syslog (LOG_ERR, "[--rfiopath path_to_tape_archive]"); + syslog (LOG_ERR, "Usage: [--rfio path_to_tape_archive] example: --rfio rfiodaq:gstore:/hadaqtest/test002"); #endif - syslog (LOG_ERR, "[--buffstat 0|1] show fill levels of buffers]"); + syslog (LOG_ERR, "Usage: [--buffstat] show fill levels of buffers"); + syslog (LOG_ERR, "Usage: [--epicsctrl] enable synch and distribution of RUN Id by Epics for parallel event builders"); + syslog (LOG_ERR, "Usage: [--lustre path_to_lustre] path to the file on the Lustre cluster"); } -static void -argsDump (TheArgs * my) +static void argsDump (TheArgs * my) { int i; syslog (LOG_DEBUG, "nrOfMsgs: %d", my->nrOfMsgs); - for (i = 0; i < my->slowCtrlFileCnt; i++) - { - syslog (LOG_DEBUG, "slowCtrlFiles[%d]: %s", i, my->slowCtrlFiles[i]); - } + for (i = 0; i < my->slowCtrlFileCnt; i++) { + syslog (LOG_DEBUG, "slowCtrlFiles[%d]: %s", i, my->slowCtrlFiles[i]); + } syslog (LOG_DEBUG, "outPath: %s", my->outPath); syslog (LOG_DEBUG, "outDev: %s", my->outDev); syslog (LOG_DEBUG, "runNr: %d", my->runNr); @@ -214,26 +215,28 @@ argsDump (TheArgs * my) syslog (LOG_DEBUG, "verbosity: %s", my->verbosity); syslog (LOG_DEBUG, "evtId: %ld", my->evtId); syslog (LOG_DEBUG, "maxFileSz: %ld", my->maxFileSz); - if (my->resdownscale != 0) - { + if (my->resdownscale != 0) { syslog (LOG_DEBUG, "resdownscale: %ld", my->resdownscale); syslog (LOG_DEBUG, "resnumevents: %ld", my->resnumevents); syslog (LOG_DEBUG, "respath: %s", my->respath); syslog (LOG_DEBUG, "secsizelimit: %ld", my->secsizelimit); syslog (LOG_DEBUG, "ressizelimit: %d", my->ressizelimit); - } - if (my->no_rpc == 1) - { + } + if (my->no_rpc == 1) { syslog (LOG_DEBUG, "no rpc is set"); - } - if (my->write_data == 1) - { + } + if (my->write_data == 1) { syslog (LOG_DEBUG, "sec_path: %s", my->sec_path); - } + } + if( strcmp( my->rfioRemotePath, "" ) != 0 ) { + syslog (LOG_DEBUG, "rfio path: %s", my->rfioRemotePath); + } + if( strcmp( my->lustrePath, "" ) != 0 ) { + syslog (LOG_DEBUG, "lustre path: %s", my->lustrePath); + } } -static void -argsDefault (TheArgs * my) +static void argsDefault (TheArgs * my) { int i; @@ -248,7 +251,7 @@ argsDefault (TheArgs * my) strcpy (my->expId, "xx"); my->priority = 0; my->isStandalone = 1; - my->queueSize = 1 * 1024 * 1024; + my->queueSize = 4 * 1024 * 1024UL; strcpy (my->verbosity, "info"); my->evtId = 0; my->maxFileSz = (2 * 1024 * 1024 * 1024UL - 1); @@ -262,15 +265,16 @@ argsDefault (TheArgs * my) strcpy (my->respath, ""); for (i = 0; i < MAXINPATH; i++) { - my->varQSize[i] = 1 * 1024 * 1024; + my->varQSize[i] = 4 * 1024 * 1024UL; } - my->rfioFlag = 0; + strcpy (my->rfioRemotePath, ""); + strcpy (my->lustrePath, ""); my->buffStat = 0; + my->epicsCtrl = 0; } -static int -argsFromCL (TheArgs * my, int argc, char *argv[]) +static int argsFromCL (TheArgs * my, int argc, char *argv[]) { extern char *optarg; int i; @@ -290,13 +294,13 @@ argsFromCL (TheArgs * my, int argc, char *argv[]) {"write_data", 1, 0, 'w'}, {"help", 0, 0, 'H'}, {"rfio", 1, 0, 'R'}, - {"rfiopath", 1, 0, 'P'}, - {"buffstat", 1, 0, 'b'}, + {"buffstat", 0, 0, 'b'}, + {"epicsctrl", 0, 0, 'E'}, + {"lustre", 1, 0, 'L'}, {0, 0, 0, 0} }; - i = - getopt_long (argc, argv, - "am:f:r:o:d:q:p:v:x:I:S:tz:e:n:h:w:tz:e:n:Hs:l:R:P:b:", + i = getopt_long (argc, argv, + "am:f:r:o:d:q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:bEL:", long_options, &option_index); if (i == -1) break; @@ -332,84 +336,44 @@ argsFromCL (TheArgs * my, int argc, char *argv[]) case 'I': my->evtId = strtoul (optarg, NULL, 0); break; - case 'S': - my->maxFileSz = strtoul (optarg, NULL, 0); - break; - case 't': /*norpc - no arg */ + case 't': /* norpc - no arg */ my->no_rpc = 1; break; - case 'z': /*filesize - one arg */ - if (strtoul (optarg, NULL, 0) > 0 - && strtoul (optarg, NULL, 0) < 20000) - { - my->maxFileSz = (1024 * 1024UL * strtoul (optarg, NULL, 0)); - } - else - { - printf - ("filesize option incorrect,it should be >0 and <20000\n"); - exit (0); - } + case 'z': /* filesize - one arg */ + my->maxFileSz = (1024 * 1024UL * strtoul (optarg, NULL, 0)); break; - case 'e': /*resdownscale- one arg & need resnumevents & respath */ - if (strtoul (optarg, NULL, 0) > 0 - && strtoul (optarg, NULL, 0) < 100001) - { - my->resdownscale = strtoul (optarg, NULL, 0); - my->resdown_offset = EVENT_NUM_OFFSET / (my->resdownscale); - } - else - { - printf - ("resdownscale option incorrect,it should be >0 and <100001\n"); - } + case 'e': /* need resnumevents & respath */ + my->resdownscale = strtoul (optarg, NULL, 0); + if(my->resdownscale > 0) + my->resdown_offset = EVENT_NUM_OFFSET / (my->resdownscale); break; - case 'n': /*resnumevents - one arg */ - if (strtoul (optarg, NULL, 0) > 99 - && strtoul (optarg, NULL, 0) < 1000000001) - { - my->resnumevents = strtoul (optarg, NULL, 0); - } - else - { - printf - ("resnumevents incorrect, it should be >99 and <1000000001\n"); - } + case 'n': + my->resnumevents = strtoul (optarg, NULL, 0); break; - case 'l': /*secsizelimit - one arg */ - if (strtoul (optarg, NULL,0) > 10 - && strtoul (optarg, NULL,0) < 100001) - { - unsigned long tmp = (/*1024 * 1024UL **/ strtoul (optarg, NULL,0)); -#if 0 - my->secsizelimit = (((double)tmp)/((double)(1024.*1024.))); -#endif - my->secsizelimit = (double)tmp; - printf("set secsizelimit: %d MB \n",my->secsizelimit); - } + case 'l': + my->secsizelimit = (double) strtoul (optarg, NULL,0); break; - case 's': /*ressizelimit - one arg */ - if (strtoul (optarg, NULL, 0) > 5 - && strtoul (optarg, NULL, 0) < 1000) - { - my->ressizelimit = strtoul (optarg, NULL, 0); - } + case 's': + my->ressizelimit = strtoul (optarg, NULL, 0); break; - case 'h': /*respath - one arg as dir_path */ + case 'h': strcpy (my->respath, optarg); break; - case 'w': /*write_data - one arg as dir_path */ + case 'w': my->write_data = 1; strcpy (my->sec_path, optarg); break; case 'R': - my->rfioFlag = strtoul (optarg, NULL, 0); - break; - case 'P': strcpy (my->rfioRemotePath, optarg); break; + case 'L': + strcpy (my->lustrePath, optarg); + break; case 'b': - my->buffStat = strtoul (optarg, NULL, 0); + my->buffStat = 1; + break; + case 'E': + my->epicsCtrl = 1; break; case 'H': usage (argv[0]); @@ -428,43 +392,78 @@ argsFromCL (TheArgs * my, int argc, char *argv[]) printf ("%s ", argv[optind++]); printf ("\n"); } -/* -the condition that resdownscale resnumevents respath have to be together -*/ - if (((my->resdownscale != 0) || (my->resnumevents != -1) - || (strcmp (my->respath, ""))) && ((my->resdownscale == 0) - || (my->resnumevents == -1) - || !strcmp (my->respath, ""))) -#if 0 - if (((my->resdownscale != 0) || (my->resnumevents != -1) - || (my->respath != NULL)) && ((my->resdownscale == 0) - || (my->resnumevents == -1) - || (my->respath == NULL))) -#endif - { - printf - ("if you are using Remote Events Server, you should fill: resdownscale, resnumevents, respath options\n"); - exit (EXIT_FAILURE); - } - if (((my->maxFileSz) < (my->queueSize))) - { - printf ("--filesize has to be > queuesize(-q)\n"); - exit (0); - } + + return 0; +} + +static int argsCheck( TheArgs *my ) +{ + /* + * Check the content of TheArgs. + */ + + /* check my->maxFileSz */ + if( (strcmp(my->outDev, "null") != 0) && (my->maxFileSz <= 0 || my->maxFileSz >= 1024 * 1024UL * 2000) ) { + fprintf( stderr, " evtbuild.c, argsCheck(): --filesize must be >0 and <20000\n"); + return 1; + } + + /* Conditions: if the Remote Event Server is used */ + if ( ( my->resdownscale != 0 && ( my->resnumevents == -1 || + (strcmp(my->respath, "") == 0) ) ) || + ( my->resnumevents != -1 && ( my->resdownscale == 0 || + (strcmp(my->respath, "") == 0) ) ) || + ( (strcmp(my->respath, "") != 0) && ( my->resnumevents == -1 || + my->resdownscale == 0 ) ) ) { + + fprintf( stderr, " evtbuild.c, argsCheck(): options --resdownscale --resnumevents --respath must be specified together\n"); + return 1; + } + + /* check my->resdownscale */ + if( (strcmp(my->respath, "") != 0) && + ( my->resdownscale <= 0 || my->resdownscale >= 100001 ) ) { + fprintf( stderr," evtbuild.c, argsCheck(): --resdownscale must be >0 and <100001\n"); + return 1; + } + + /* check my->resnumevents */ + if( (strcmp(my->respath, "") != 0) && + ( my->resnumevents <= 99 || my->resnumevents >= 1000000001 ) ) { + fprintf( stderr," evtbuild.c, argsCheck(): --resnumevents must be >99 and <1000000001\n"); + return 1; + } + + /* check my->secsizelimit */ + if( (strcmp(my->respath, "") != 0) && + ( my->secsizelimit <= -0.0001 || my->secsizelimit >= 0.0001 ) && + ( my->secsizelimit <= 10 || my->secsizelimit >= 100001 ) ) { + fprintf( stderr," evtbuild.c, argsCheck(): --secsizelimit must be >10 and <100000 MB\n"); + return 1; + } + + /* check my->ressizelimit */ + if( (strcmp(my->respath, "") != 0) && + ( my->ressizelimit <= 5 || my->ressizelimit >= 1000 ) ) { + fprintf( stderr, " evtbuild.c, argsCheck(): --ressizelimit must be >5 and <1000\n"); + return 1; + } + + if (((my->maxFileSz) < (my->queueSize))) { + fprintf( stderr, " evtbuild.c, argsCheck(): --filesize must be larger than queuesize(-q)\n"); + return 1; + } + /* the condition ressizelimit has to be together with respath */ - if ((my->ressizelimit) != 0) - { - if (!strcmp (my->respath, "")) - { - printf ( "you have to define path, where the files will be collect. use 'respath' option\n" ); - exit (EXIT_FAILURE); - } - } + if ((my->ressizelimit) != 0 && (strcmp(my->respath, "") == 0) ) { + fprintf( stderr, " evtbuild.c, argsCheck(): --respath is not given\n" ); + return 1; + } + return 0; } -static int -argsFromParam (TheArgs * my, int argc, char *argv[]) +static int argsFromParam (TheArgs *my, int argc, char *argv[]) { Param paramS, *param = ¶mS; int paramWasFound; @@ -473,33 +472,30 @@ argsFromParam (TheArgs * my, int argc, char *argv[]) conSetupParam (param, getenv ("DAQ_SETUP")); name = (char *) basename (argv[0]); - Param_getInt (param, name, "nrofmsgs", ¶mWasFound, &my->nrOfMsgs); + Param_getInt (param, name, "nrofmsgs", ¶mWasFound, &my->nrOfMsgs); Param_getStringArray (param, name, "slwctrlfile", PARAM_MAX_ARRAY_LEN, &my->slowCtrlFileCnt, my->slowCtrlFiles); Param_getString (param, name, "outpath", ¶mWasFound, my->outPath); - Param_getString (param, name, "outdev", ¶mWasFound, my->outDev); - Param_getString (param, name, "expid", ¶mWasFound, my->expId); - Param_getInt (param, name, "stndln", ¶mWasFound, &my->isStandalone); - Param_getInt (param, name, "prio", ¶mWasFound, &my->priority); - Param_getInt (param, name, "qsize", ¶mWasFound, &my->queueSize); - Param_getString (param, name, "verb", ¶mWasFound, my->verbosity); - Param_getInt (param, name, "evtid", ¶mWasFound, &my->evtId); - Param_getInt (param, name, "maxfilesz", ¶mWasFound, &my->maxFileSz); + Param_getString (param, name, "outdev", ¶mWasFound, my->outDev); + Param_getString (param, name, "expid", ¶mWasFound, my->expId); + Param_getInt (param, name, "stndln", ¶mWasFound, &my->isStandalone); + Param_getInt (param, name, "prio", ¶mWasFound, &my->priority); + Param_getInt (param, name, "qsize", ¶mWasFound, &my->queueSize); + Param_getString (param, name, "verb", ¶mWasFound, my->verbosity); + Param_getInt (param, name, "evtid", ¶mWasFound, &my->evtId); + Param_getInt (param, name, "maxfilesz", ¶mWasFound, &my->maxFileSz); Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize); desParam (param); } -static char * -unit (unsigned long v) +static char *unit (unsigned long v) { static char retVal[6]; static char u[] = " kM"; int i; - for (i = 0; v >= 10000 && i < sizeof (u) - 2; v /= 1000, i++) - { - } + for (i = 0; v >= 10000 && i < sizeof (u) - 2; v /= 1000, i++) {} sprintf (retVal, "%4d%c", v, u[i]); return retVal; @@ -528,7 +524,41 @@ static void printTime() printf ("Time: %s.%03ld\n", time_string, milliseconds); } -static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, HadTuQueue **htuq, ShmTrans **shmtr) +static void add2Stat( TheArgs * theArgs, TheStats * my, float interval, ShmTrans **shmtr ) +{ + /* Add statistic for fill levels of buffers in percentage. */ + + static unsigned long lastEvtsComplete; + static unsigned long lastBytesWritten; + static time_t t_0 = 0; + float buffSize, queueSize; + time_t t, dT; + int i; + unsigned long fillLevel; + + t = time (NULL); + dT = t - t_0; + + if( dT >= interval ) { + for( i=0; inrOfMsgs; i++ ) { + buffSize = 2*theArgs->varQSize[i]; + queueSize = HadTuQueue_size(shmtr[i]->rdQueue); + + /* Add here statistic for fill levels of buffers */ + fillLevel = (unsigned long) (100*queueSize+0.5)/buffSize; + (*my->evtbuildBuff[i]) = fillLevel; + } + + /* Add more statistic for evtsComplete and bytesWritten per second */ + (*my->evtsCompletePS) = (*my->evtsComplete - lastEvtsComplete) / dT; + (*my->bytesWrittenPS) = (*my->bytesWritten - lastBytesWritten) / dT; + } + + t_0 = t; +} + +static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, + HadTuQueue **htuq, ShmTrans **shmtr) { static unsigned long lastEC2; static unsigned long lastBW2; @@ -541,7 +571,7 @@ static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, H int outputGraph = 1; int outputNum = 0; - if (theArgs->isStandalone && theArgs->buffStat == 1) { + if (theArgs->isStandalone && theArgs->buffStat) { t = time (NULL); dT = t - t0; @@ -656,6 +686,10 @@ static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, H /* get wall-clock time */ printTime(); + + unsigned long runId2print; + Worker_getStatistic( "daq_evtbuild", "runId", &runId2print); + printf("ioc: RUN Id = %lu\n", runId2print); } } @@ -663,6 +697,27 @@ static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, H } } +unsigned long getRunId( TheArgs *my ) +{ + unsigned long myRunId = 0; + struct timespec tv = { 0, 1e+8 }; + + while( my->epicsCtrl && myRunId == 0 ) { + if( Worker_getStatistic( "daq_evtbuild", "runId", &myRunId) == -1 ) { + fprintf( stderr, " evtbuild.c: getRunId: Worker_getStatistic: cannot get runId!" ); + sleep(1); + } + else { + + if( myRunId == 0 ) + nanosleep( &tv, NULL ); + } + + } + + return myRunId; +} + static void statsDump (TheArgs * theArgs, TheStats * my, int interval) { static unsigned long lastEC; @@ -674,9 +729,8 @@ static void statsDump (TheArgs * theArgs, TheStats * my, int interval) time_t t, dT; int i; - if (theArgs->isStandalone && strcmp (theArgs->verbosity, "info") == 0 && - theArgs->buffStat == 0) + !(theArgs->buffStat)) { t = time (NULL); dT = t - t0; @@ -812,9 +866,7 @@ static void storeRunInfoStop(time_t t, TheArgs *myArgs, TheStats *myStats) fclose(fp); } -#define NJUNK 128 -static void -storeInfoStart (const char *n, time_t t, TheArgs * my) +static void storeInfoStart (const char *n, time_t t, TheArgs * my) { Param pS, *p = &pS; int i; @@ -845,10 +897,6 @@ storeInfoStart (const char *n, time_t t, TheArgs * my) Param_storeString (p, n, "respath", my->respath); Param_storeInt (p, n, "ressizelimit", my->ressizelimit); - } - if (my->secsizelimit != 0.) - { -/* Param_storeString (p, n, "secsizelimit", my->secsizelimit);*/ } if (my->no_rpc == 1) { @@ -858,17 +906,6 @@ storeInfoStart (const char *n, time_t t, TheArgs * my) { Param_storeString (p, n, "sec_path", my->sec_path); } - /* This storing junk is for having the run start in - * oracle definately -- BS - */ - - /* - *for (i = 0; i < NJUNK; i++) - * { - * Param_storeString (p, n, "junk", - * "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); - * } - */ desParam (p); } @@ -891,41 +928,46 @@ static void storeInfoStop (const char *n, time_t t, Worker * w, TheArgs * my) desParam (p); } -int is_mismatch_enough_to_stop(TheArgs * theArgs, TheStats * my) { - /*unsigned long filesize_enough = (theArgs->maxFileSz) * 0.02;*/ - unsigned long filesize_enough = (theArgs->maxFileSz) * 0.1; - /*unsigned long discarded_enough = (*my->evtsComplete) * 0.03;*/ - unsigned long discarded_enough = (*my->evtsComplete) * 0.1; - unsigned long tagerror_enough = (*my->evtsComplete) * 0.03; - if ((*my->bytesWritten) >= filesize_enough) - { - if ((*my->evtsDiscarded) > discarded_enough) - { - printf("Too many events are broken.\n"); - printf("Debug Inf: filesize_enough: %u,fileWritten: %u, DISCARDED_enough: %u, discarded: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),discarded_enough, (*my->evtsDiscarded), (*my->evtsComplete)); - syslog (LOG_ERR, "Too many events are broken.\n"); - syslog (LOG_ERR, "Debug Inf: filesize_enough: %u,fileWritten: %u, DISCARDED_enough: %u, discarded: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),discarded_enough, (*my->evtsDiscarded), (*my->evtsComplete)); +int is_mismatch_enough_to_stop(TheArgs * theArgs, TheStats * my) +{ + /* min file size to start checking */ + unsigned long minfilesize = (theArgs->maxFileSz) * 0.1; /* 0.02 */ + /* max number of discarded evts allowed */ + unsigned long maxdiscarded = (*my->evtsComplete) * 0.1; /* 0.03 */ + /* max number of evts with tag error allowed */ + unsigned long maxtagerror = (*my->evtsComplete) * 0.03; + + if ((*my->bytesWritten) >= minfilesize) { + if ((*my->evtsDiscarded) > maxdiscarded) { + + syslog (LOG_ERR, " evtbuild.c: Too many events are broken!"); + syslog (LOG_ERR, " evtbuild.c: bytes written: %u, minimum file size: %u", + minfilesize, (*my->bytesWritten) ); + syslog (LOG_ERR, " evtbuild.c: discarded evts: %u, allowed: %u, total evts num: %u", + (*my->evtsDiscarded), maxdiscarded, (*my->evtsComplete)); #if BEAM - system ("echo tagerror | netcat -w1 hadesdaq 12122"); + system ("echo tagerror | netcat -w1 hadesdaq 12122"); #endif - sleep(5); + sleep(5); } - if ((*my->evtsTagError) > tagerror_enough) - { - printf("Too many TagErrors.\n"); - printf("Debug Inf: filesize_enough: %u,fileWritten: %u, TAGERROR_enough: %u, tag_error: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),tagerror_enough, (*my->evtsTagError), (*my->evtsComplete)); - syslog(LOG_ERR, "Too many TagErrors.\n"); - syslog(LOG_ERR, "Debug Inf: filesize_enough: %u,fileWritten: %u, TAGERROR_enough: %u, tag_error: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),tagerror_enough, (*my->evtsTagError), (*my->evtsComplete)); + if ((*my->evtsTagError) > maxtagerror) { + + syslog(LOG_ERR, " evtbuild.c: Too many TagErrors!"); + syslog (LOG_ERR, " evtbuild.c: bytes written: %u, minimum file size: %u", + minfilesize, (*my->bytesWritten) ); + syslog (LOG_ERR, " evtbuild.c: tagerror evts: %u, allowed: %u, total evts num: %u", + (*my->evtsTagError), maxtagerror, (*my->evtsComplete)); + #if BEAM - system ("echo tagerror | netcat -w1 hadesdaq 12122"); + system ("echo tagerror | netcat -w1 hadesdaq 12122"); #endif - sleep(5); + sleep(5); } } return 0; } -int -get_file_number_in_dir (char *path) + +int get_file_number_in_dir (char *path) { int file_number; file_number = 0; @@ -936,39 +978,21 @@ get_file_number_in_dir (char *path) dir = opendir (path); - if (dir == NULL) - { - printf ("open dir: %s failed\n", path); - exit (EXIT_FAILURE); - } + if (dir == NULL) { + syslog(LOG_ERR, "%s, %d: trying to open directory %s: %s", + __FILE__, __LINE__, path, strerror(errno)); + exit (EXIT_FAILURE); + } while (NULL != (dirptr = readdir (dir))) - { -#if 0 - strcpy (tmppath, path); - strcat (tmppath, "/"); - strcat (tmppath, dirptr->d_name); - lstat (tmppath, buf); + file_number ++; - if (S_ISDIR (buf->st_mode)) - { -#if 0 - printf ("directory %s \n", tmppath); -#endif - } - else - { -#endif - file_number ++; -#if 0 - } -#endif - } closedir(dir); + return file_number; } -double -get_directory_size (char *path) + +double get_directory_size (char *path) { double directory_size; directory_size = 0.; @@ -979,43 +1003,32 @@ get_directory_size (char *path) dir = opendir (path); - if (dir == NULL) - { - printf ("open dir: %s failed\n", path); + if (dir == NULL) { + syslog(LOG_ERR, "%s, %d: trying to open directory %s: %s", + __FILE__, __LINE__, path, strerror(errno)); exit (EXIT_FAILURE); - } + } - while (NULL != (dirptr = readdir (dir))) - { + while (NULL != (dirptr = readdir (dir))) { strcpy (tmppath, path); strcat (tmppath, "/"); strcat (tmppath, dirptr->d_name); lstat (tmppath, buf); - if (S_ISDIR (buf->st_mode)) - { -#if 0 - printf ("directory %s \n", tmppath); -#endif - } - else - { + if( !(S_ISDIR (buf->st_mode)) ) { directory_size += (double)(buf->st_size/(double)(1024.*1024.)); - } - } + } + } + closedir(dir); return directory_size; } #define DEBUG 0 -static int -remove_file (char *path) +static int remove_file (char *path) { -/*delete files if limit was reached*/ -#if 0 - printf ("limit overflow: ressizelimit: %f, res_dirSize: %f\n", - theArgs->ressizelimit, res_dirSize); -#endif + /*delete files if limit was reached*/ + DIR *dir; struct dirent *dirptr; struct stat bufS, *buf = &bufS; @@ -1024,48 +1037,39 @@ remove_file (char *path) char tmppath[PARAM_MAX_VALUE_LEN]; char last_modification_file[PARAM_MAX_VALUE_LEN]; dir = opendir (path); - if (dir == NULL) - { - printf ("open dir: %s failed\n", path); + + if (dir == NULL) { + syslog(LOG_ERR, "%s, %d: opening dir %s failed", __FILE__, __LINE__, path); exit (EXIT_FAILURE); - } + } - while (1) - { - if (NULL == (dirptr = readdir (dir))) - { - break; - } - strcpy (tmppath, path); - strcat (tmppath, "/"); - strcat (tmppath, dirptr->d_name); - lstat (tmppath, buf); - if(strstr(tmppath,"hld") == NULL){ - continue; - } + while (1) { - if (S_ISDIR (buf->st_mode)) - { -#if 0 - printf ("directory %s \n", tmppath); -#endif - } - else - { - if (buf->st_mtime < last_modification_time) - { - last_modification_time = buf->st_mtime; - recover_size = buf->st_size; - strcpy (last_modification_file, tmppath); - } - } + if (NULL == (dirptr = readdir (dir))) + break; + + strcpy (tmppath, path); + strcat (tmppath, "/"); + strcat (tmppath, dirptr->d_name); + lstat (tmppath, buf); + + if(strstr(tmppath,"hld") == NULL) + continue; + + + if ( !(S_ISDIR(buf->st_mode)) ) { + if (buf->st_mtime < last_modification_time) { + last_modification_time = buf->st_mtime; + recover_size = buf->st_size; + strcpy (last_modification_file, tmppath); + } } + } + closedir(dir); -#if 0 - printf ("unlink: %s\n", last_modification_file); -#endif - if(0!=unlink (last_modification_file)){ - printf("cannot unlink %s\n",last_modification_file); + + if( 0 != unlink (last_modification_file) ) { + syslog(LOG_ERR, "%s, %d: cannot unlink %s", __FILE__, __LINE__, last_modification_file); exit(0); } return recover_size; @@ -1076,23 +1080,29 @@ static int openFile (TheArgs * theArgs) char fileName[_POSIX_PATH_MAX]; static char outPath[_POSIX_PATH_MAX]; + static char outLustrePath[_POSIX_PATH_MAX]; static char sec_path[_POSIX_PATH_MAX]; static once = 1; diff_time = 1; - runNr = genId32 (); + + if( !(theArgs->epicsCtrl) ) + runNr = genId32 (); + seqNr = 0; theArgs->runNr = runNr; - + if (once) { file_size = theArgs->maxFileSz; strcpy (sec_path, theArgs->sec_path); strcpy (outPath, theArgs->outPath); + strcpy (outLustrePath, theArgs->lustrePath); once = 0; } else { strcpy (theArgs->outPath, outPath); + strcpy (theArgs->lustrePath, outLustrePath); strcpy (theArgs->sec_path, sec_path); } @@ -1104,6 +1114,8 @@ static int openFile (TheArgs * theArgs) outTape = NULL; outFile = NULL; outSecondFile = NULL; + outLustreFile = NULL; + if (strcmp (theArgs->outDev, "null") == 0) { outFile = NULL; } @@ -1123,7 +1135,7 @@ static int openFile (TheArgs * theArgs) strcat (theArgs->outPath, fileName); } } - + if (NULL == (outFile = fopen (theArgs->outPath, "wb"))) { syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, strerror (errno)); @@ -1135,13 +1147,9 @@ static int openFile (TheArgs * theArgs) statfs (theArgs->outPath, buf); if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) { errno = ENOSPC; -/* if (0 != fclose (outFile)) { */ -/* fprintf (stderr, " evtbuild.c, openFile: fclose 'File' failed\n"); */ -/* } */ outFile = NULL; unlink (theArgs->outPath); - syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, - strerror (errno)); + syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, strerror (errno)); return -1; } } @@ -1151,8 +1159,7 @@ static int openFile (TheArgs * theArgs) strcpy (theArgs->outPath, fileName); } if (NULL == (outTape = openAnsiTape (theArgs->outPath, "/dev/tape"))) { - syslog (LOG_ERR, "opening tape %s: %s", theArgs->outPath, - strerror (errno)); + syslog (LOG_ERR, "opening tape %s: %s", theArgs->outPath, strerror (errno)); outFile = NULL; return -1; } @@ -1161,58 +1168,101 @@ static int openFile (TheArgs * theArgs) syslog (LOG_ERR, "unknown outputDev \"%s\"", theArgs->outDev); return -1; } + if (theArgs->write_data) { dirSize = get_directory_size (sec_path); -#if 0 - printf("dirSize: %f, secsizelimit: %f\n",dirSize,theArgs->secsizelimit); -#endif - while (dirSize > theArgs->secsizelimit) { - /*delete files if limit was reached */ -#if 0 - printf ("limit overflow: secsizelimit: %f, dirSize: %f\n", - theArgs->secsizelimit, dirSize); - getchar(); -#endif - remove_file (sec_path); - dirSize = get_directory_size (sec_path); - } - if (strcmp (theArgs->sec_path, "") == 0) { - strcpy (theArgs->sec_path, fileName); + + while (dirSize > theArgs->secsizelimit) { + /*delete files if limit was reached */ + remove_file (sec_path); + dirSize = get_directory_size (sec_path); + } + if (strcmp (theArgs->sec_path, "") == 0) { + strcpy (theArgs->sec_path, fileName); + } + else { + struct stat bufS, *buf = &bufS; + + stat (theArgs->sec_path, buf); + if (S_ISDIR (buf->st_mode)) { + strcat (theArgs->sec_path, "/"); + strcat (theArgs->sec_path, fileName); } - else { - struct stat bufS, *buf = &bufS; - - stat (theArgs->sec_path, buf); - if (S_ISDIR (buf->st_mode)) { - strcat (theArgs->sec_path, "/"); - strcat (theArgs->sec_path, fileName); + } + if (NULL == (outSecondFile = fopen (theArgs->sec_path, "wb"))) { + syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path, strerror (errno)); + outSecondFile = NULL; + return -1; + } + else { + struct statfs bufS, *buf = &bufS; + statfs (theArgs->sec_path, buf); + if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) { + errno = ENOSPC; + if (0 != fclose (outSecondFile)) { + fprintf (stderr, " evtbuild.c, openFile: fclose 'outSecondFile' failed\n"); } - } - if (NULL == (outSecondFile = fopen (theArgs->sec_path, "wb"))) { - syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path, - strerror (errno)); outSecondFile = NULL; + unlink (theArgs->sec_path); + syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path, strerror (errno)); return -1; } - else { - struct statfs bufS, *buf = &bufS; - statfs (theArgs->sec_path, buf); - if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) { - errno = ENOSPC; - if (0 != fclose (outSecondFile)) { - fprintf (stderr, " evtbuild.c, openFile: fclose 'outSecondFile' failed\n"); - } - outSecondFile = NULL; - unlink (theArgs->sec_path); - syslog (LOG_ERR, "opening file %s: %s", - theArgs->sec_path, strerror (errno)); - return -1; - } + } + } + + /* Open file on Lustre */ + if( strcmp (theArgs->lustrePath, "") != 0 ) { + struct stat bufS, *buf = &bufS; + stat (theArgs->lustrePath, buf); + + /* If lustrePath is a dir name, add fileName to the lustrePath */ + if (S_ISDIR (buf->st_mode)) { + strcat (theArgs->lustrePath, "/"); + strcat (theArgs->lustrePath, fileName); + } + + if (NULL == (outLustreFile = fopen (theArgs->lustrePath, "wb"))) { + syslog (LOG_ERR, "opening file %s: %s", theArgs->lustrePath, strerror (errno)); + } + else { + struct statfs bufS, *buf = &bufS; + statfs (theArgs->lustrePath, buf); + + /* Check if the number of free available blocks + * is enough to write the file with maximum size. + */ + if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) { + errno = ENOSPC; + + outLustreFile = NULL; + unlink (theArgs->lustrePath); + syslog (LOG_ERR, "%s, %d: opening file %s: %s", + __FILE__, __LINE__, theArgs->lustrePath, strerror (errno)); + return -1; } + } } + +#ifdef RFIO + /* open file on Data Mover */ + if( fRemote != NULL ) { + char rfioPath[_POSIX_PATH_MAX]; + strcpy( rfioPath, theArgs->rfioRemotePath ); + strcat( rfioPath, "/" ); + strcat( rfioPath, fileName ); + + fprintf ( stderr, " evtbuild.c, rfio_fnewfile(): trying to open remote file %s\n", rfioPath ); + if( 0 != rfio_fnewfile( fRemote, rfioPath ) ) { + syslog( LOG_ERR, "%s, %d: cannot open remote file %s: %s", + __FILE__, __LINE__, rfioPath, strerror(errno) ); + exit (EXIT_FAILURE); + } + } +#endif + return 0; } - + static int openRESFile (TheArgs * theArgs) { char fileName[_POSIX_PATH_MAX]; @@ -1231,31 +1281,26 @@ static int openRESFile (TheArgs * theArgs) if (theArgs->ressizelimit != 0) { res_dirNr = get_file_number_in_dir (respath); -#if 0 - printf("res_dirNr: %d, theArgs->ressizelimit: %d\n,", res_dirNr,theArgs->ressizelimit); -#endif while (res_dirNr > theArgs->ressizelimit) { /*delete files if limit was reached */ -#if 0 - printf ("limit overflow: res_sizelimit: %ul, res_dirSize: %ul\n", - theArgs->ressizelimit, res_dirSize); -#endif remove_file (respath); res_dirNr--; } - } + } strcpy (fileName, theArgs->expId); strftime (fileName + strlen (fileName), 15, "%y%j%H%M%S_", localtime (&res_time)); static int filecounter = 1; - if(diff_time == 1){ + + if( diff_time == 1 ) { filecounter = 1; diff_time = 0; - }else { + } else { filecounter++; } + char app[8]; sprintf (app, "%d", filecounter); @@ -1264,7 +1309,7 @@ static int openRESFile (TheArgs * theArgs) if (strcmp (theArgs->respath, "") == 0) { strcpy (theArgs->respath, fileName); - } + } else { struct stat bufS, *buf = &bufS; @@ -1273,28 +1318,15 @@ static int openRESFile (TheArgs * theArgs) if (S_ISDIR (buf->st_mode)) { strcat (theArgs->respath, "/"); strcat (theArgs->respath, fileName); - } - } + } + } /* construct a default filename */ - outRESFile = NULL; + outRESFile = NULL; -#if 0 - /*if files are created more often than 1 sec, postfix is added to their name */ - struct stat bufS, *buf = &bufS; - while (0 == stat (theArgs->respath, buf)) - { /*file exists */ - strcpy (theArgs->respath, respath); - sprintf (file_app, "%d", filecounter); - strcat (theArgs->respath, "/"); - strcat (theArgs->respath, fileName); - strcat (theArgs->respath, file_app); - filecounter++; - } -#endif if (NULL == (outRESFile = fopen (theArgs->respath, "wb"))) { - syslog (LOG_ERR, "opening file %s: %s", theArgs->respath, - strerror (errno)); + syslog (LOG_ERR, "%s, %d: opening file %s: %s", + __FILE__, __LINE__, theArgs->respath, strerror (errno)); outRESFile = NULL; return -1; } @@ -1306,12 +1338,14 @@ static int openRESFile (TheArgs * theArgs) { errno = ENOSPC; if (0 != fclose (outRESFile)) { - fprintf (stderr, " evtbuild.c, openRESFile: fclose 'outRESFile' failed\n"); - } + syslog(LOG_ERR, "%s, %d: trying fclose 'outRESFile': %s", + __FILE__, __LINE__, strerror(errno)); + } outRESFile = NULL; unlink (theArgs->respath); - syslog (LOG_ERR, "opening file %s: %s", theArgs->respath, - strerror (errno)); + + syslog (LOG_ERR, "%s, %d: opening file %s: %s", + __FILE__, __LINE__, theArgs->respath, strerror (errno)); return -1; } } @@ -1333,15 +1367,20 @@ static int writeFile (void *evt) else if (outTape != NULL) { writeFileR = writeAnsiTape (outTape, evt, Evt_paddedSize (evt)); } + #ifdef RFIO - else if( fRemote != NULL ) { - int rfioStatus = 0; - rfioStatus = rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote ); - if (rfioStatus <= 0) - fprintf ( stderr, " evtbuild.c, rfio_fwrite: writing file failed\n"); + /* write to Data Mover via RFIO */ + if( fRemote != NULL ) { + if( 0 >= rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote ) ) + syslog(LOG_ERR, "%s, %d: writing file via RFIO: %s", __FILE__, __LINE__, strerror(errno)); } #endif + /* write to Lustre */ + if (outLustreFile != NULL) { + writeFileR = fwrite (evt, 1, Evt_paddedSize (evt), outLustreFile); + } + /* writing file in the second dir */ if (outSecondFile != NULL) { fwrite (evt, 1, Evt_paddedSize (evt), outSecondFile); @@ -1350,8 +1389,7 @@ static int writeFile (void *evt) return writeFileR; } -static int -writeRESFile (void *evt) +static int writeRESFile (void *evt) { int writeFileR; Evt_setSeqNr (evt, res_seqNr++); @@ -1367,25 +1405,49 @@ writeRESFile (void *evt) static int closeFile () { int closeFileR; + int closeLustreFileR; /* closing file in the second dir */ if (outSecondFile != NULL) { dirSize += file_size; if (0 != fclose (outSecondFile)) { - fprintf (stderr, " evtbuild.c, closeFile: fclose 'outSecondFile2' failed\n"); + syslog(LOG_ERR, "%s, %d: trying fclose 'outSecondFile': %s", + __FILE__, __LINE__, strerror(errno)); } } if (outFile != NULL) { closeFileR = fclose (outFile); if (0 != closeFileR) { - printf (" evtbuild.c, closeFile: fclose 'closeFileR' failed\n"); + syslog(LOG_ERR, "%s, %d: trying fclose 'outFile': %s", + __FILE__, __LINE__, strerror(errno)); } } else if (outTape != NULL) { closeFileR = closeAnsiTape (outTape); } + /* close file on Lustre */ + if (outLustreFile != NULL) { + closeLustreFileR = fclose (outLustreFile); + if (0 != closeLustreFileR) { + syslog(LOG_ERR, "%s, %d: trying fclose 'outLustreFile': %s", + __FILE__, __LINE__, strerror(errno)); + exit (EXIT_FAILURE); + } + } + +#ifdef RFIO + /* close file on Data Mover */ + if( fRemote != NULL ) { + if( 0 != rfio_fendfile( fRemote ) ) { + syslog(LOG_ERR, "%s, %d: trying rfio_fendfile: %s", + __FILE__, __LINE__, strerror(errno)); + exit (EXIT_FAILURE); + } + } +#endif + return closeFileR; } @@ -1396,11 +1458,13 @@ static int closeRESFile (TheArgs * theArgs) res_dirSize += res_file_size; closeFileR = fclose (outRESFile); if (0 != closeFileR) { - fprintf ( stderr, " evtbuild.c, closeRESFile: fclose 'outRESFile2' failed!\n"); - } - } - else{ - fprintf ( stderr," evtbuild.c, closeRESFile: closeFile error outRESFile == NULL\n"); + syslog(LOG_ERR, "%s, %d: trying fclose 'outRESFile': %s", + __FILE__, __LINE__, strerror(errno)); + } + } + else { + syslog(LOG_ERR, "%s, %d: closeRESFile failed: outRESFile is NULL", + __FILE__, __LINE__); } return closeFileR; @@ -1409,7 +1473,8 @@ static int closeRESFile (TheArgs * theArgs) #ifdef RFIO static int rfio_openConnection (TheArgs * theArgs) { - if( theArgs->rfioFlag ){ + + if( (strcmp (theArgs->rfioRemotePath, "") != 0) ){ char rfioBase[128] = ""; char *pcc; strcpy( rfioBase, theArgs->rfioRemotePath ); @@ -1422,15 +1487,12 @@ static int rfio_openConnection (TheArgs * theArgs) pcc++; strncpy(pcc, "\0", 1); /* terminates after node name */ - fprintf ( stderr, " evtbuild.c:rfio_openConnection(): try to open connection to server\n" ); + fprintf ( stderr, " evtbuild.c, rfio_fopen(): trying to open connection to Data Mover: %s\n", rfioBase ); fRemote = rfio_fopen( rfioBase, "wb" ); - fprintf ( stderr, " evtbuild.c:rfio_openConnection(): opened connection to server\n" ); - - if (fRemote == NULL) - { - fprintf ( stderr, " evtbuild.c:rfio_openConnection(): cannot open connection to server\n" ); - syslog (LOG_ERR, " evtbuild.c:rfio_openConnection(): cannot open connection to server"); + if (fRemote == NULL) { + syslog(LOG_ERR, "%s, %d: trying to open connection to Data Mover %s: %s", + __FILE__, __LINE__, rfioBase, strerror(errno)); exit (EXIT_FAILURE); } } @@ -1440,99 +1502,23 @@ static int rfio_openConnection (TheArgs * theArgs) static int rfio_closeConnection (TheArgs * theArgs) { - if( theArgs->rfioFlag && fRemote != NULL ){ - int rfioStatus; - - rfioStatus = rfio_fclose( fRemote ); + if( (strcmp (theArgs->rfioRemotePath, "") != 0) && fRemote != NULL ) { - if (rfioStatus) - { - fprintf ( stderr, " evtbuild.c:rfio_closeConnection(): closing connection to server failed\n" ); - syslog (LOG_ERR, " evtbuild.c:rfio_closeConnection(): closing connection to server failed"); + if ( 0 != rfio_fclose( fRemote ) ) { + syslog(LOG_ERR, "%s, %d: trying to close connection to Data Mover: %s", + __FILE__, __LINE__, strerror(errno)); exit (EXIT_FAILURE); } } return 0; } - -static int rfio_openFile (TheArgs * theArgs) -{ - - if( theArgs->rfioFlag && fRemote != NULL){ - int rfioStatus; - char rfioFileName[_POSIX_PATH_MAX]; - char rfioPath[_POSIX_PATH_MAX]; - - strcpy( rfioPath, theArgs->rfioRemotePath ); - -/* Evt_setSeqNr (evt, seqNr++); */ -/* Evt_setRunNr (evt, runNr); */ - - strcpy (rfioFileName, theArgs->expId); - strftime (rfioFileName + strlen (rfioFileName), 18, "%y%j%H%M%S.hld", - localtime (&ourTime)); - strcat (rfioPath, "/"); - strcat (rfioPath, rfioFileName); - fprintf ( stderr, " evtbuild.c:rfio_openConnection(): try to open remote file %s\n", rfioPath ); - rfioStatus = rfio_fnewfile( fRemote, rfioPath ); - fprintf ( stderr, " evtbuild.c:rfio_openConnection(): opened remote file %s\n", rfioPath ); - - if (rfioStatus) { - fprintf ( stderr, " evtbuild.c:rfio_openFile(): cannot open remote file %s\n", rfioPath ); - syslog (LOG_ERR, " evtbuild.c:rfio_openFile(): cannot open remote file %s", rfioPath); - exit (EXIT_FAILURE); - } - } - - return 0; -} - -static int rfio_closeFile (TheArgs * theArgs) -{ - if( theArgs->rfioFlag && fRemote != NULL ){ - int rfioStatus; - - rfioStatus = rfio_fendfile( fRemote ); - - if (rfioStatus) - { - fprintf ( stderr, " evtbuild.c:rfio_openFile(): cannot close remote file\n"); - syslog (LOG_ERR, " evtbuild.c:rfio_openFile(): cannot close remote file"); - exit (EXIT_FAILURE); - - } - } - - return 0; -} - -static int rfio_writeFile (TheArgs * theArgs, void *evt) -{ - if( theArgs->rfioFlag && fRemote != NULL ){ - int rfioStatus = 0; - - Evt_setSeqNr( evt, seqNr++ ); - Evt_setRunNr( evt, runNr ); - - rfioStatus = rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote ); - - if (rfioStatus <= 0) - { - fprintf ( stderr, " evtbuild.c:rfio_writeFile(): writing file failed\n"); - - return 1; - } - } - - return 0; -} - -#endif /* ifdef RFIO */ +#endif /* BUGBUG bailOut not proper yet */ int main (int argc, char *argv[]) { + int i; TheArgs theArgsS, *theArgs = &theArgsS; TheStats theStatsS, *theStats = &theStatsS; @@ -1555,62 +1541,42 @@ int main (int argc, char *argv[]) argsDefault (theArgs); argsFromParam (theArgs, argc, argv); - printf("theArgs->varQSizeCnt = %d\n",theArgs->varQSizeCnt); + if (0 > argsFromCL (theArgs, argc, argv)) { + usage (argv[0]); + exit (EXIT_FAILURE); + } - for (i = 0; i < theArgs->varQSizeCnt; i++) { - printf("theArgs->varQSize = %d\n", theArgs->varQSize[i]); + if( argsCheck(theArgs) ) { + sleep(10); + exit (EXIT_FAILURE); } - printf("theArgs->queueSize: %d\n", theArgs->queueSize); - printf("theArgs->nrOfMsgs: %d\n", theArgs->nrOfMsgs); - printf("theArgs->outPath: %s\n", theArgs->outPath); - printf("theArgs->outDev: %s\n", theArgs->outDev); + for (i = 0; prioritynames[i].c_name != NULL && + 0 != strcmp (prioritynames[i].c_name, theArgs->verbosity); i++) {} - if (0 > argsFromCL (theArgs, argc, argv)) - { - usage (argv[0]); - exit (EXIT_FAILURE); - } - for (i = 0; - prioritynames[i].c_name != NULL - && 0 != strcmp (prioritynames[i].c_name, theArgs->verbosity); i++) - { - } if (prioritynames[i].c_name == NULL) - { exit (EXIT_FAILURE); - } else - { setlogmask (LOG_UPTO (prioritynames[i].c_val)); - } /* normalize experiment id */ theArgs->expId[0] = tolower (theArgs->expId[0]); theArgs->expId[1] = tolower (theArgs->expId[1]); theArgs->expId[2] = '\0'; - if (NULL == - (worker = - Worker_initBegin (argv[0], sigHandler, theArgs->priority, - theArgs->isStandalone))) - { + if (NULL == (worker = Worker_initBegin (argv[0], sigHandler, theArgs->priority, + theArgs->isStandalone))) { syslog (LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror (errno)); exit (EXIT_FAILURE); - } - if (theArgs->no_rpc) - { + } + + if (theArgs->no_rpc) syslog (LOG_WARNING, "DISABLE of online service"); - } - else - { - if (-1 == initOnline ()) - { + else + if (-1 == initOnline ()) syslog (LOG_WARNING, "unable to initialize online service"); - } - } - if (theArgs->nrOfMsgs == 0) - { + + if (theArgs->nrOfMsgs == 0) { /* no '-m' option was on command line, we assume that the readout task (daq_readout) is running on the same node and @@ -1624,14 +1590,12 @@ int main (int argc, char *argv[]) shmTrans[0] = ShmTrans_create ("subevtqueue", 2 * theArgs->queueSize); hadTuQueue[0] = NULL; theArgs->nrOfMsgs = 1; - } - else - { + } + else { shmTrans = malloc (theArgs->nrOfMsgs * sizeof (ShmTrans *)); hadTuQueue = malloc (theArgs->nrOfMsgs * sizeof (HadTuQueue *)); - for (i = 0; i < theArgs->nrOfMsgs; i++) - { + for (i = 0; i < theArgs->nrOfMsgs; i++) { char buf[_POSIX_PATH_MAX]; sprintf (buf, "netqueue%d", i); @@ -1649,32 +1613,44 @@ int main (int argc, char *argv[]) } hadTuQueue[i] = NULL; - } - } + } + } - theStats->evtsDiscarded = Worker_addStatistic (worker, "evtsDiscarded"); - theStats->evtsComplete = Worker_addStatistic (worker, "evtsComplete"); - theStats->evtsDataError = Worker_addStatistic (worker, "evtsDataError"); - theStats->evtsTagError = Worker_addStatistic (worker, "evtsTagError"); - theStats->bytesWritten = Worker_addStatistic (worker, "bytesWritten"); + theStats->evtsDiscarded = Worker_addStatistic (worker, "evtsDiscarded"); + theStats->evtsComplete = Worker_addStatistic (worker, "evtsComplete"); + theStats->evtsCompletePS = Worker_addStatistic (worker, "evtsCompletePS"); + theStats->evtsDataError = Worker_addStatistic (worker, "evtsDataError"); + theStats->evtsTagError = Worker_addStatistic (worker, "evtsTagError"); + theStats->bytesWritten = Worker_addStatistic (worker, "bytesWritten"); + theStats->bytesWrittenPS = Worker_addStatistic (worker, "bytesWrittenPS"); + theStats->runId = Worker_addStatistic (worker, "runId"); + (*theStats->runId) = 0; /* initialize to zero */ unsigned long *retVal = NULL; - theStats->evtsRes = (unsigned long *) &retVal; - for (i = 0; i < NEVTIDS; i++) - { - char buf[WORKER_MAX_NAME_LEN]; - sprintf (buf, "evtId%d", i); - theStats->evtId[i] = Worker_addStatistic (worker, buf); - } - for (i = 0; i < theArgs->nrOfMsgs; i++) - { - char buf[WORKER_MAX_NAME_LEN]; + for (i = 0; i < NEVTIDS; i++) { + char buf[WORKER_MAX_NAME_LEN]; + + sprintf (buf, "evtId%d", i); + theStats->evtId[i] = Worker_addStatistic (worker, buf); + } + for (i = 0; i < theArgs->nrOfMsgs; i++) { + char buf[WORKER_MAX_NAME_LEN]; + + sprintf (buf, "trigNr%d", i); + theStats->trigNr[i] = Worker_addStatistic (worker, buf); + } - sprintf (buf, "trigNr%d", i); - theStats->trigNr[i] = Worker_addStatistic (worker, buf); - } + /* Add statistic for fill levels of buffers. */ + for( i=0; inrOfMsgs; i++ ) { + char buf[WORKER_MAX_NAME_LEN]; + sprintf( buf, "evtbuildBuff%d", i ); + theStats->evtbuildBuff[i] = Worker_addStatistic( worker, buf ); + } + + theStats->nrOfMsgs = Worker_addStatistic( worker, "nrOfMsgs" ); + (*theStats->nrOfMsgs) = theArgs->nrOfMsgs; argsDump (theArgs); @@ -1694,57 +1670,64 @@ int main (int argc, char *argv[]) int dataError = 0; int tagError = 0; + add2Stat( theArgs, theStats, 1, shmTrans ); statsBufferDump( theArgs, theStats, 1, hadTuQueue, shmTrans ); statsDump (theArgs, theStats, 1); if (*theStats->bytesWritten == 0) { - res_time = ourTime = time (NULL); + + if( theArgs->epicsCtrl ) { + runNr = getRunId( theArgs ); + res_time = ourTime = runNr + TIMEOFFSET; + } + else + res_time = ourTime = time (NULL); + + if (-1 == openFile (theArgs)) { syslog (LOG_ERR, "error opening output file, exiting"); exit (EXIT_FAILURE); } -#ifdef RFIO - rfio_openFile( theArgs ); -#endif - storeInfoStart (argv[0], ourTime, theArgs); /* store simple start run info */ storeRunInfoStart(ourTime, theArgs); evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStart); - for (i = 0; i < theArgs->slowCtrlFileCnt; i++) - { + + for (i = 0; i < theArgs->slowCtrlFileCnt; i++) { evt = appendFile (evt, theArgs->slowCtrlFiles[i]); } + (*theStats->bytesWritten) += Evt_size (evt); writeFile (evt); deleteEvt (evt); } + if ((*theStats->evtsRes) == 0) { #if 0 ourTime = time (NULL); #endif /* remote event server - resdownscale - resnumevents*/ - if (theArgs->resdownscale) - { - if (-1 == openRESFile (theArgs)) - { + if (theArgs->resdownscale) { + if (-1 == openRESFile (theArgs)) { syslog (LOG_ERR, "error opening RES output file"); exit (EXIT_FAILURE); } + evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStart); (*theStats->evtsRes)++; writeRESFile (evt); deleteEvt (evt); } - } + evt = newEvt (EvtDecoding_64bitAligned, EvtId_data); + for (i = 0; i < theArgs->nrOfMsgs && !evtIsBroken; i += step) { uint32_t trigNr; @@ -1770,33 +1753,31 @@ int main (int argc, char *argv[]) { currTrigNr = SubEvt_trigNr (subEvt) >> 8; currTrigTag = SubEvt_trigNr (subEvt) & 0xff; + if (theArgs->evtId != 0) - { currId = theArgs->evtId; - } else - { currId = SubEvt_pureId (subEvt); - } + syslog (LOG_DEBUG, "currTrigNr: 0x%06x, currTrigTag 0x%02x, currId 0x%08x", currTrigNr, currTrigTag, currId); } + trigNr = SubEvt_trigNr (subEvt) >> 8; trigTag = SubEvt_trigNr (subEvt) & 0xff; + if (trigNr == currTrigNr) { if (SubEvt_size (subEvt) > SubEvt_hdrSize ()) { /* sub evt is not empty */ if (SubEvt_dataError (subEvt)) - { dataError = 1; - } + if (trigTag != currTrigTag) - { tagError = 1; - } + evt = Evt_appendSubEvt (evt, subEvt); } HadTuQueue_pop (hadTuQueue[i]); @@ -1842,13 +1823,21 @@ int main (int argc, char *argv[]) } (*theStats->bytesWritten) += Evt_size (evt); writeFile (evt); + if (theArgs->resdownscale) { - (*theStats->evtsRes)++; -/* if ((*theStats->evtsRes) % (theArgs->resdownscale) == 0)*/ - if (((*theStats->evtsRes) % EVENT_NUM_OFFSET) < + + /* + * If the setting are as follows: + * EVENT_NUM_OFFSET == 100 + * theArgs->resdown_offset == 5 (100/20) + * then from each 100 events only + * first 5 events are written to refFile. + */ + if (((*theStats->evtsComplete) % EVENT_NUM_OFFSET) < theArgs->resdown_offset) { + (*theStats->evtsRes)++; writeRESFile (evt); } } @@ -1861,33 +1850,43 @@ int main (int argc, char *argv[]) is_mismatch_enough_to_stop(theArgs, theStats); #endif } + deleteEvt (evt); - if ((*theStats->evtsRes) % 600 == 0) - { -#if 0 - printf ("condition: %u > %u\n", (*theStats->bytesWritten), - ((theArgs->maxFileSz) - (theArgs->queueSize))); - printf ("maxFileSz %u ; queueSize: %u\n", (theArgs->maxFileSz), - theArgs->queueSize); -#endif - } - if ((*theStats->bytesWritten) >= - ((theArgs->maxFileSz) - (theArgs->queueSize))) + + newRunId = getRunId( theArgs ); + + /* + * The following conditions mean: + * theArgs->epicsCtrl == 1 && runNr < newRunId + * New RUN Id was generated, close the file and open new one. + * theArgs->epicsCtrl == 1 && (*theStats->bytesWritten) >= 1900000000 + * Due to whatever reason the file size exceeded an allowed limit, + * close the file. + * theArgs->epicsCtrl == 1 && runNr == 0 + * Something went wrong with sinchronization of Event Builders, + * close the file. + */ + if ( ( !(theArgs->epicsCtrl) && (*theStats->bytesWritten) >= ((theArgs->maxFileSz) - (theArgs->queueSize)) ) || + (theArgs->epicsCtrl && runNr < newRunId) || + (theArgs->epicsCtrl && (*theStats->bytesWritten) >= 1900000000) || + (theArgs->epicsCtrl && newRunId == 0)) { evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStop); + for (i = 0; i < theArgs->slowCtrlFileCnt; i++) - { evt = appendFile (evt, theArgs->slowCtrlFiles[i]); - } + (*theStats->bytesWritten) += Evt_size (evt); writeFile (evt); deleteEvt (evt); - ourTime = time (NULL); + if( theArgs->epicsCtrl ) + ourTime = newRunId + TIMEOFFSET; + else + ourTime = time (NULL); + closeFile (); -#ifdef RFIO - rfio_closeFile( theArgs ); -#endif + storeInfoStop (argv[0], ourTime - 2, worker, theArgs); /* store simple stop run info */ @@ -1898,19 +1897,22 @@ int main (int argc, char *argv[]) (*theStats->evtsDiscarded) = 0; (*theStats->evtsDataError) = 0; (*theStats->evtsTagError) = 0; + for (i = 0; i < theArgs->nrOfMsgs; i++) - { (*theStats->trigNr[i]) = 0; - } + for (i = 0; i < NEVTIDS; i++) - { (*theStats->evtId[i]) = 0; - } + } + if (theArgs->resdownscale) { - if ((*theStats->evtsRes) >= - (theArgs->resdownscale) * (theArgs->resnumevents)) + /* + * Number of events written to resFile is + * limited to resnumevents. + */ + if ((*theStats->evtsRes) >= theArgs->resnumevents) { #if 0 ourTime = time (NULL); @@ -1923,16 +1925,17 @@ int main (int argc, char *argv[]) } } } + ourTime = time (NULL); closeFile (); + #ifdef RFIO - rfio_closeFile( theArgs ); rfio_closeConnection( theArgs ); #endif + if (theArgs->resdownscale) - { closeRESFile (theArgs); - } + storeInfoStop (argv[0], ourTime - 2, worker, theArgs); /* store simple stop run info */ @@ -1941,9 +1944,8 @@ int main (int argc, char *argv[]) statsDump (theArgs, theStats, 1); for (i = 0; i < theArgs->nrOfMsgs; i++) - { ShmTrans_remove (shmTrans[i]); - } + finiOnline (); Worker_fini (worker); exit (EXIT_SUCCESS); -- 2.43.0