From 76151cd251455d328f86e9b2155ff1cfd9181015 Mon Sep 17 00:00:00 2001 From: hadaq Date: Thu, 25 Jan 2007 16:08:20 +0000 Subject: [PATCH] this code was running during april06 beamtime --- hadaq/evtbuild.c | 2085 +++++++++++++++++++++++++++++++++------------- 1 file changed, 1492 insertions(+), 593 deletions(-) diff --git a/hadaq/evtbuild.c b/hadaq/evtbuild.c index dbbec6b..f811fdc 100644 --- a/hadaq/evtbuild.c +++ b/hadaq/evtbuild.c @@ -1,11 +1,12 @@ -static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.74 2004-08-28 12:27:09 hadaq Exp $"; +static char *rcsId = + "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.75 2007-01-25 16:08:20 hadaq Exp $"; #define _POSIX_C_SOURCE 199309L #define SYSLOG_NAMES #include - +#include #include #include #include @@ -17,6 +18,7 @@ static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hada #include #include #include +#include #include @@ -32,683 +34,1580 @@ static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hada #include "ansiTape.h" #include "genid32.h" -#define NEVTIDS 64UL /* must be 2^n */ -#define NEVTIDS_IN_FILE 0UL /* must be 2^n */ +#define NEVTIDS 64UL /* must be 2^n */ +#define NEVTIDS_IN_FILE 0UL /* must be 2^n */ +#define EVENT_NUM_OFFSET 100 +#define DEBUG2 0 +#define CHECK_MISMATCH 1 +#define BEAM 1 static FILE *outFile; +static FILE *outSecondFile; +static FILE *outRESFile; static AnsiTape *outTape; static uint32_t seqNr; +static uint32_t res_seqNr; static uint32_t runNr; static time_t ourTime; - -typedef struct TheArgsS { - unsigned long nrOfMsgs; - char outPath[PARAM_MAX_VALUE_LEN]; - char outDev[PARAM_MAX_VALUE_LEN]; - unsigned long runNr; - char expId[PARAM_MAX_VALUE_LEN]; - char *slowCtrlFiles[PARAM_MAX_ARRAY_LEN]; - char slowCtrlFilesS[PARAM_MAX_ARRAY_LEN][PARAM_MAX_NAME_LEN]; - int slowCtrlFileCnt; - unsigned long isStandalone; - unsigned long priority; - unsigned long queueSize; - char verbosity[PARAM_MAX_VALUE_LEN]; - unsigned long evtId; - off_t maxFileSz; +static long file_size; +static long res_file_size; +static double dirSize; +static unsigned long res_dirSize; +static unsigned int res_dirNr; +static time_t res_time; +static int diff_time; +static int trig_mismatch; +typedef struct TheArgsS +{ + unsigned long nrOfMsgs; + char outPath[PARAM_MAX_VALUE_LEN]; + char outDev[PARAM_MAX_VALUE_LEN]; + unsigned long runNr; + char expId[PARAM_MAX_VALUE_LEN]; + char *slowCtrlFiles[PARAM_MAX_ARRAY_LEN]; + char slowCtrlFilesS[PARAM_MAX_ARRAY_LEN][PARAM_MAX_NAME_LEN]; + int slowCtrlFileCnt; + unsigned long isStandalone; + unsigned long priority; + long queueSize; + char verbosity[PARAM_MAX_VALUE_LEN]; + unsigned long evtId; + long maxFileSz; + unsigned short no_rpc; + unsigned int resdownscale; + unsigned int resnumevents; + char respath[PARAM_MAX_VALUE_LEN]; + unsigned short write_data; + char sec_path[PARAM_MAX_VALUE_LEN]; + unsigned int ressizelimit; + double secsizelimit; + double resdown_offset; } TheArgs; -typedef struct TheStatsS { - unsigned long *evtsDiscarded; - unsigned long *evtsComplete; - unsigned long *evtsDataError; - unsigned long *evtsTagError; - unsigned long *bytesWritten; - unsigned long *evtId[NEVTIDS]; - unsigned long *trigNr[32]; +typedef struct TheStatsS +{ + unsigned long *evtsDiscarded; + unsigned long *evtsComplete; + unsigned long *evtsDataError; + unsigned long *evtsTagError; + unsigned long *bytesWritten; + unsigned long *evtId[NEVTIDS]; + unsigned long *trigNr[32]; + unsigned long *evtsRes; } TheStats; static jmp_buf terminateJmp; -void sigHandler(int sig) +void +sigHandler (int sig) { - longjmp(terminateJmp, sig); + longjmp (terminateJmp, sig); } -static void *appendFile(void *my, const char *path) +static void * +appendFile (void *my, const char *path) { - void *subEvt; - char *dataBuf; - FILE *file; - size_t fileSize; - - if (NULL == (file = fopen(path, "r"))) { - syslog(LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror(errno)); - return my; - } - subEvt = newSubEvt(SubEvtDecoding_text, SubEvtId_slowTest, 0); - - if (NULL == (dataBuf = malloc(strlen(path + 2)))) { - syslog(LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror(errno)); - return my; - } - sprintf(dataBuf, "#%s\n", path); - subEvt = SubEvt_appendData(subEvt, dataBuf, strlen(dataBuf)); - free(dataBuf); - - fseek(file, 0, SEEK_END); - fileSize = ftell(file); - fseek(file, 0, SEEK_SET); - if (NULL == (dataBuf = malloc(fileSize))) { - syslog(LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror(errno)); - return my; - } - fread(dataBuf, 1, fileSize, file); - - subEvt = SubEvt_appendData(subEvt, dataBuf, fileSize); - my = Evt_appendSubEvt(my, subEvt); - - deleteSubEvt(subEvt); - free(dataBuf); - fclose(file); - - return my; + void *subEvt; + char *dataBuf; + FILE *file; + size_t fileSize; + + if (NULL == (file = fopen (path, "r"))) + { + syslog (LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror (errno)); + return my; + } + subEvt = newSubEvt (SubEvtDecoding_text, SubEvtId_slowTest, 0); + + if (NULL == (dataBuf = malloc (strlen (path + 2)))) + { + syslog (LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror (errno)); + return my; + } + sprintf (dataBuf, "#%s\n", path); + subEvt = SubEvt_appendData (subEvt, dataBuf, strlen (dataBuf)); + free (dataBuf); + + fseek (file, 0, SEEK_END); + fileSize = ftell (file); + fseek (file, 0, SEEK_SET); + if (NULL == (dataBuf = malloc (fileSize))) + { + syslog (LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror (errno)); + return my; + } + fread (dataBuf, 1, fileSize, file); + + subEvt = SubEvt_appendData (subEvt, dataBuf, fileSize); + my = Evt_appendSubEvt (my, subEvt); + + deleteSubEvt (subEvt); + free (dataBuf); + fclose (file); + + return my; } -static void usage(const char *progName) +static void +usage (const char *progName) { - syslog(LOG_ERR, "Usage: %s [-x expId]", progName); - syslog(LOG_ERR, "Usage: [-m nrOfMsgs] [-f slowCtrlFile ...]"); - syslog(LOG_ERR, "Usage: [-o outPath] [-d null|tape|file|stdout]"); - syslog(LOG_ERR, "Usage: [-q queueSize] [-r runNumber]"); - syslog(LOG_ERR, "Usage: [-a (agent)] [-p priority] [-I evtId]"); - syslog(LOG_ERR, "Usage: [-v debug|info|notice|warning|err|alert|crit|emerg]"); + syslog (LOG_ERR, "Usage: %s [-x expId]", progName); + syslog (LOG_ERR, "Usage: [-m nrOfMsgs] [-f slowCtrlFile ...]"); + syslog (LOG_ERR, "Usage: [-o outPath] [-d null|tape|file|stdout]"); + syslog (LOG_ERR, "Usage: [-q queueSize] [-r runNumber]"); + syslog (LOG_ERR, "Usage: [-a (agent)] [-p priority] [-I evtId]"); + syslog (LOG_ERR, + "Usage: [-v debug|info|notice|warning|err|alert|crit|emerg]"); + syslog (LOG_ERR, "Usage: [--norpc]"); + syslog (LOG_ERR, "Usage: [--filesize maximum_size_of_output_file[in MB]]"); + syslog (LOG_ERR, "Usage: [--resdownscale resdownscale_of_events]"); + syslog (LOG_ERR, "Usage: [--resnumevents number_of_events_in_one_resfile]"); + syslog (LOG_ERR, + "Usage: [--respath path_where_the_downscaling_data_are_written]"); + syslog (LOG_ERR, "Usage: [--secsizelimit max size of second dir[in MB]]"); + syslog (LOG_ERR, "Usage: [--ressizelimit max number of files in res dir]"); + syslog (LOG_ERR, + "Usage: [--write_data path_where_the_mirroring_data_are_ written]"); } -static void argsDump(TheArgs *my) +static void +argsDump (TheArgs * my) { - int i; - - syslog(LOG_DEBUG, "nrOfMsgs: %d", my->nrOfMsgs); - for (i = 0; i < my->slowCtrlFileCnt; i++) { - syslog(LOG_DEBUG, "slowCtrlFiles[%d]: %s", i, my->slowCtrlFiles[i]); - } - syslog(LOG_DEBUG, "outPath: %s", my->outPath); - syslog(LOG_DEBUG, "outDev: %s", my->outDev); - syslog(LOG_DEBUG, "runNr: %d", my->runNr); - syslog(LOG_DEBUG, "expId: %s", my->expId); - syslog(LOG_DEBUG, "priority: %d", my->priority); - syslog(LOG_DEBUG, "isStandalone: %d", my->isStandalone); - syslog(LOG_DEBUG, "queueSize: %d", my->queueSize); - syslog(LOG_DEBUG, "verbosity: %s", my->verbosity); - syslog(LOG_DEBUG, "evtId: %ld", my->evtId); - syslog(LOG_DEBUG, "maxFileSz: %ld", my->maxFileSz); + int i; + + syslog (LOG_DEBUG, "nrOfMsgs: %d", my->nrOfMsgs); + for (i = 0; i < my->slowCtrlFileCnt; i++) + { + syslog (LOG_DEBUG, "slowCtrlFiles[%d]: %s", i, my->slowCtrlFiles[i]); + } + syslog (LOG_DEBUG, "outPath: %s", my->outPath); + syslog (LOG_DEBUG, "outDev: %s", my->outDev); + syslog (LOG_DEBUG, "runNr: %d", my->runNr); + syslog (LOG_DEBUG, "expId: %s", my->expId); + syslog (LOG_DEBUG, "priority: %d", my->priority); + syslog (LOG_DEBUG, "isStandalone: %d", my->isStandalone); + syslog (LOG_DEBUG, "queueSize: %d", my->queueSize); + syslog (LOG_DEBUG, "verbosity: %s", my->verbosity); + syslog (LOG_DEBUG, "evtId: %ld", my->evtId); + syslog (LOG_DEBUG, "maxFileSz: %ld", my->maxFileSz); + if (my->resdownscale != 0) + { + syslog (LOG_DEBUG, "resdownscale: %ld", my->resdownscale); + syslog (LOG_DEBUG, "resnumevents: %ld", my->resnumevents); + syslog (LOG_DEBUG, "respath: %s", my->respath); + syslog (LOG_DEBUG, "secsizelimit: %ld", my->secsizelimit); + syslog (LOG_DEBUG, "ressizelimit: %d", my->ressizelimit); + } + if (my->no_rpc == 1) + { + syslog (LOG_DEBUG, "no rpc is set"); + } + if (my->write_data == 1) + { + syslog (LOG_DEBUG, "sec_path: %s", my->sec_path); + } } -static void argsDefault(TheArgs *my) +static void +argsDefault (TheArgs * my) { - int i; - - my->nrOfMsgs = 0; - for (i = 0; i < PARAM_MAX_ARRAY_LEN; i++) { - my->slowCtrlFiles[i] = my->slowCtrlFilesS[i]; - } - my->slowCtrlFileCnt = 0; - strcpy(my->outPath, ""); - strcpy(my->outDev, "null"); - strcpy(my->expId, "xx"); - my->priority = 0; - my->isStandalone = 1; - my->queueSize = 1 * 1024 * 1024; - strcpy(my->verbosity, "info"); - my->evtId = 0; - my->maxFileSz = (2 * 1024 * 1024 * 1024UL - 1); + int i; + + my->nrOfMsgs = 0; + for (i = 0; i < PARAM_MAX_ARRAY_LEN; i++) + { + my->slowCtrlFiles[i] = my->slowCtrlFilesS[i]; + } + my->slowCtrlFileCnt = 0; + strcpy (my->outPath, ""); + strcpy (my->outDev, "null"); + strcpy (my->expId, "xx"); + my->priority = 0; + my->isStandalone = 1; + my->queueSize = 1 * 1024 * 1024; + strcpy (my->verbosity, "info"); + my->evtId = 0; + my->maxFileSz = (2 * 1024 * 1024 * 1024UL - 1); + my->no_rpc = 0; + my->write_data = 0; + my->resdownscale = 0; + my->resdown_offset = 0; + my->resnumevents = -1; + my->secsizelimit = 0.; + my->ressizelimit = 0; + strcpy (my->respath, ""); } -static int argsFromCL(TheArgs *my, int argc, char *argv[]) +static int +argsFromCL (TheArgs * my, int argc, char *argv[]) { - extern char *optarg; - int i; - - while ((i = getopt(argc, argv, "am:f:r:o:d:q:p:v:x:I:S:")) != -1) { - switch (i) { - case 'm': - my->nrOfMsgs = strtoul(optarg, NULL, 0); - break; - case 'f': - strcpy(my->slowCtrlFiles[my->slowCtrlFileCnt++], optarg); - break; - case 'o': - strcpy(my->outPath, optarg); - break; - case 'd': - strcpy(my->outDev, optarg); - break; - case 'x': - strcpy(my->expId, optarg); - break; - case 'a': - my->isStandalone = 0; - break; - case 'p': - my->priority = strtoul(optarg, NULL, 0); - break; - case 'q': - my->queueSize = strtoul(optarg, NULL, 0); - break; - case 'v': - strcpy(my->verbosity, optarg); - break; - case 'I': - my->evtId = strtoul(optarg, NULL, 0); - break; - case 'S': - my->maxFileSz = strtoul(optarg, NULL, 0); - break; - default: - return -1; - break; - } + extern char *optarg; + int i; + while (1) + { + int this_option_optind = optind ? optind : 1; + int option_index = 0; +#if 0 + second arg:no_argument (0), required_argument (1), optional_argument (2) +#endif + static struct option long_options[] = { + {"norpc", 0, 0, 't'}, + {"filesize", 1, 0, 'z'}, + {"resdownscale", 1, 0, 'e'}, + {"resnumevents", 1, 0, 'n'}, + {"respath", 1, 0, 'h'}, + {"secsizelimit", 1, 0, 'l'}, + {"ressizelimit", 1, 0, 's'}, + {"write_data", 1, 0, 'w'}, + {"help", 0, 0, 'H'}, + {0, 0, 0, 0} + }; + i = + getopt_long (argc, argv, + "am:f:r:o:d:q:p:v:x:I:S:tz:e:n:h:w:tz:e:n:h:w:Hs:l:", + long_options, &option_index); + if (i == -1) + break; + switch (i) + { + case 'm': + my->nrOfMsgs = strtoul (optarg, NULL, 0); + break; + case 'f': + strcpy (my->slowCtrlFiles[my->slowCtrlFileCnt++], optarg); + break; + case 'o': + strcpy (my->outPath, optarg); + break; + case 'd': + strcpy (my->outDev, optarg); + break; + case 'x': + strcpy (my->expId, optarg); + break; + case 'a': + my->isStandalone = 0; + break; + case 'p': + my->priority = strtoul (optarg, NULL, 0); + break; + case 'q': + my->queueSize = strtoul (optarg, NULL, 0); + break; + case 'v': + strcpy (my->verbosity, optarg); + break; + case 'I': + my->evtId = strtoul (optarg, NULL, 0); + break; + case 'S': + my->maxFileSz = strtoul (optarg, NULL, 0); + break; + case 't': /*norpc - no arg */ + my->no_rpc = 1; + break; + case 'z': /*filesize - one arg */ + if (strtoul (optarg, NULL, 0) > 0 + && strtoul (optarg, NULL, 0) < 20000) + { + my->maxFileSz = (1024 * 1024UL * strtoul (optarg, NULL, 0)); + } + else + { + printf + ("filesize option incorrect,it should be >0 and <20000\n"); + exit (0); + } + break; + case 'e': /*resdownscale- one arg & need resnumevents & respath */ + if (strtoul (optarg, NULL, 0) > 0 + && strtoul (optarg, NULL, 0) < 100001) + { + my->resdownscale = strtoul (optarg, NULL, 0); + my->resdown_offset = EVENT_NUM_OFFSET / (my->resdownscale); + } + else + { + printf + ("resdownscale option incorrect,it should be >0 and <100001\n"); + } + break; + case 'n': /*resnumevents - one arg */ + if (strtoul (optarg, NULL, 0) > 99 + && strtoul (optarg, NULL, 0) < 1000000001) + { + my->resnumevents = strtoul (optarg, NULL, 0); + } + else + { + printf + ("resnumevents incorrect, it should be >99 and <1000000001\n"); + } + break; + case 'l': /*secsizelimit - one arg */ + if (strtoul (optarg, NULL,0) > 10 + && strtoul (optarg, NULL,0) < 100001) + { + unsigned long tmp = (/*1024 * 1024UL **/ strtoul (optarg, NULL,0)); +#if 0 + my->secsizelimit = (((double)tmp)/((double)(1024.*1024.))); +#endif + my->secsizelimit = (double)tmp; + printf("set secsizelimit: %d MB \n",my->secsizelimit); + } + break; + case 's': /*ressizelimit - one arg */ + if (strtoul (optarg, NULL, 0) > 5 + && strtoul (optarg, NULL, 0) < 1000) + { + my->ressizelimit = strtoul (optarg, NULL, 0); + } + break; + case 'h': /*respath - one arg as dir_path */ + strcpy (my->respath, optarg); + break; + case 'w': /*write_data - one arg as dir_path */ + my->write_data = 1; + strcpy (my->sec_path, optarg); + break; + case 'H': + usage (argv[0]); + return -1; + break; + default: + usage (argv[0]); + return -1; + break; } - return 0; + } + if (optind < argc) + { + printf ("non-option ARGV-elements: "); + while (optind < argc) + printf ("%s ", argv[optind++]); + printf ("\n"); + } +/* +the condition that resdownscale resnumevents respath have to be together +*/ + if (((my->resdownscale != 0) || (my->resnumevents != -1) + || (strcmp (my->respath, ""))) && ((my->resdownscale == 0) + || (my->resnumevents == -1) + || !strcmp (my->respath, ""))) +#if 0 + if (((my->resdownscale != 0) || (my->resnumevents != -1) + || (my->respath != NULL)) && ((my->resdownscale == 0) + || (my->resnumevents == -1) + || (my->respath == NULL))) +#endif + { + printf + ("if you are using Remote Events Server, you should fill: resdownscale, resnumevents, respath options\n"); + exit (EXIT_FAILURE); + } + if (((my->maxFileSz) < (my->queueSize))) + { + printf ("--filesize has to be > queuesize(-q)\n"); + exit (0); + } +/* +the condition ressizelimit has to be together with respath +*/ + if ((my->ressizelimit) != 0) + { + if (!strcmp (my->respath, "")) +#if 0 + if (my->respath != NULL) +#endif + { + printf + ("you have to define path, where the files will be collect. use 'respath' option\n"); + exit (EXIT_FAILURE); + } + else + + { + + } + } + return 0; } -static int argsFromParam(TheArgs *my, int argc, char *argv[]) +static int +argsFromParam (TheArgs * my, int argc, char *argv[]) { - Param paramS, *param = ¶mS; - int paramWasFound; - char *name; - - conSetupParam(param, getenv("DAQ_SETUP")); - - name = (char *) basename(argv[0]); - Param_getInt(param, name, "nrofmsgs", ¶mWasFound, &my->nrOfMsgs); - Param_getStringArray(param, name, "slwctrlfile", PARAM_MAX_ARRAY_LEN, &my->slowCtrlFileCnt, my->slowCtrlFiles); - Param_getString(param, name, "outpath", ¶mWasFound, my->outPath); - Param_getString(param, name, "outdev", ¶mWasFound, my->outDev); - Param_getString(param, name, "expid", ¶mWasFound, my->expId); - Param_getInt(param, name, "stndln", ¶mWasFound, &my->isStandalone); - Param_getInt(param, name, "prio", ¶mWasFound, &my->priority); - Param_getInt(param, name, "qsize", ¶mWasFound, &my->queueSize); - Param_getString(param, name, "verb", ¶mWasFound, my->verbosity); - Param_getInt(param, name, "evtid", ¶mWasFound, &my->evtId); - Param_getInt(param, name, "maxfilesz", ¶mWasFound, &my->maxFileSz); - desParam(param); + Param paramS, *param = ¶mS; + int paramWasFound; + char *name; + + conSetupParam (param, getenv ("DAQ_SETUP")); + + name = (char *) basename (argv[0]); + Param_getInt (param, name, "nrofmsgs", ¶mWasFound, &my->nrOfMsgs); + Param_getStringArray (param, name, "slwctrlfile", PARAM_MAX_ARRAY_LEN, + &my->slowCtrlFileCnt, my->slowCtrlFiles); + Param_getString (param, name, "outpath", ¶mWasFound, my->outPath); + Param_getString (param, name, "outdev", ¶mWasFound, my->outDev); + Param_getString (param, name, "expid", ¶mWasFound, my->expId); + Param_getInt (param, name, "stndln", ¶mWasFound, &my->isStandalone); + Param_getInt (param, name, "prio", ¶mWasFound, &my->priority); + Param_getInt (param, name, "qsize", ¶mWasFound, &my->queueSize); + Param_getString (param, name, "verb", ¶mWasFound, my->verbosity); + Param_getInt (param, name, "evtid", ¶mWasFound, &my->evtId); + Param_getInt (param, name, "maxfilesz", ¶mWasFound, &my->maxFileSz); + desParam (param); } -static char *unit(unsigned long v) +static char * +unit (unsigned long v) { - static char retVal[6]; - static char u[] = " kM"; - int i; + static char retVal[6]; + static char u[] = " kM"; + int i; - for (i = 0; v >= 10000 && i < sizeof(u) - 2; v /= 1000, i++) { - } - sprintf(retVal, "%4d%c", v, u[i]); + for (i = 0; v >= 10000 && i < sizeof (u) - 2; v /= 1000, i++) + { + } + sprintf (retVal, "%4d%c", v, u[i]); - return retVal; + return retVal; } -static void statsDump(TheArgs *theArgs, TheStats *my, int interval) +static void +statsDump (TheArgs * theArgs, TheStats * my, int interval) { - static unsigned long lastEC; - static unsigned long lastEE; - static unsigned long lastTE; - static unsigned long lastED; - static unsigned long lastBW; - static time_t t0 = 0; - time_t t, dT; - int i; - - - if (theArgs->isStandalone && strcmp(theArgs->verbosity, "info") == 0) { - t = time(NULL); - dT = t - t0; - if (dT >= interval) { - int col = 0; - - fputs("==============================================================================\n", stderr); - fprintf(stderr, "%19s:%6s", "evtsComplete ", unit(*my->evtsComplete)); - fprintf(stderr, "%19s:%6s", "evtsDiscarded ", unit(*my->evtsDiscarded)); - fprintf(stderr, "%19s:%6s", "bytesWritten ", unit(*my->bytesWritten)); - fputc('\n', stderr); - fprintf(stderr, "%19s:%6s", "evtsDataError ", unit(*my->evtsDataError)); - fprintf(stderr, "%19s:%6s", "evtsTagError ", unit(*my->evtsTagError)); - fputc('\n', stderr); - - fprintf(stderr, "%19s:%6s", "evtsComplete/s", unit((*my->evtsComplete - lastEC) / dT)); - fprintf(stderr, "%19s:%6s", "evtsDiscarded/s", unit((*my->evtsDiscarded - lastED) / dT)); - fprintf(stderr, "%19s:%6s", "bytesWritten/s", unit((*my->bytesWritten - lastBW) / dT)); - fputc('\n', stderr); - fprintf(stderr, "%19s:%6s", "evtsDataError/s", unit((*my->evtsDataError - lastEE) / dT)); - fprintf(stderr, "%19s:%6s", "evtsTagError/s", unit((*my->evtsTagError - lastTE) / dT)); - fputc('\n', stderr); - lastEC = *my->evtsComplete; - lastEE = *my->evtsDataError; - lastTE = *my->evtsTagError; - lastED = *my->evtsDiscarded; - lastBW = *my->bytesWritten; - - fputs("------------------------------------------------------------------------------\n", stderr); - col = 0; - for (i = 0; i < NEVTIDS; i++) { - if (*my->evtId[i] != 0) { - fprintf(stderr, "%17s%02x:%6s", "evtId", i, unit(*my->evtId[i])); - if (++col == 3) { - fputc('\n', stderr); - col = 0; - } - } - } - if (col != 0) { - fputc('\n', stderr); - } - col = 0; - - fputs("------------------------------------------------------------------------------\n", stderr); - for (i = 0; i < theArgs->nrOfMsgs; i++) { - fprintf(stderr, "%12s%02d: 0x%08x", "trigNr", i, *my->trigNr[i]); - if (++col == 3) { - fputc('\n', stderr); - col = 0; - } - } - if (col != 0) { - fputc('\n', stderr); - } - t0 = t; + static unsigned long lastEC; + static unsigned long lastEE; + static unsigned long lastTE; + static unsigned long lastED; + static unsigned long lastBW; + static time_t t0 = 0; + time_t t, dT; + int i; + + + if (theArgs->isStandalone && strcmp (theArgs->verbosity, "info") == 0) + { + t = time (NULL); + dT = t - t0; + if (dT >= interval) + { + int col = 0; + + fputs + ("==============================================================================\n", + stderr); + fprintf (stderr, "%19s:%6s", "evtsComplete ", + unit (*my->evtsComplete)); + fprintf (stderr, "%19s:%6s", "evtsDiscarded ", + unit (*my->evtsDiscarded)); + fprintf (stderr, "%19s:%6s", "bytesWritten ", + unit (*my->bytesWritten)); + fputc ('\n', stderr); + fprintf (stderr, "%19s:%6s", "evtsDataError ", + unit (*my->evtsDataError)); + fprintf (stderr, "%19s:%6s", "evtsTagError ", + unit (*my->evtsTagError)); + fputc ('\n', stderr); + + fprintf (stderr, "%19s:%6s", "evtsComplete/s", + unit ((*my->evtsComplete - lastEC) / dT)); + fprintf (stderr, "%19s:%6s", "evtsDiscarded/s", + unit ((*my->evtsDiscarded - lastED) / dT)); + fprintf (stderr, "%19s:%6s", "bytesWritten/s", + unit ((*my->bytesWritten - lastBW) / dT)); + fputc ('\n', stderr); + fprintf (stderr, "%19s:%6s", "evtsDataError/s", + unit ((*my->evtsDataError - lastEE) / dT)); + fprintf (stderr, "%19s:%6s", "evtsTagError/s", + unit ((*my->evtsTagError - lastTE) / dT)); + fputc ('\n', stderr); + lastEC = *my->evtsComplete; + lastEE = *my->evtsDataError; + lastTE = *my->evtsTagError; + lastED = *my->evtsDiscarded; + lastBW = *my->bytesWritten; + + fputs + ("------------------------------------------------------------------------------\n", + stderr); + col = 0; + for (i = 0; i < NEVTIDS; i++) + { + if (*my->evtId[i] != 0) + { + fprintf (stderr, "%17s%02x:%6s", "evtId", i, + unit (*my->evtId[i])); + if (++col == 3) + { + fputc ('\n', stderr); + col = 0; + } } + } + if (col != 0) + { + fputc ('\n', stderr); + } + col = 0; + + fputs + ("------------------------------------------------------------------------------\n", + stderr); + for (i = 0; i < theArgs->nrOfMsgs; i++) + { + fprintf (stderr, "%12s%02d: 0x%08x", "trigNr", i, + *my->trigNr[i]); + if (++col == 3) + { + fputc ('\n', stderr); + col = 0; + } + } + if (col != 0) + { + fputc ('\n', stderr); + } + t0 = t; } + } } #define NJUNK 128 -static void storeInfoStart(const char *n, time_t t, TheArgs *my) +static void +storeInfoStart (const char *n, time_t t, TheArgs * my) { - Param pS, *p = &pS; - int i; - char s[20]; - - conSetupParam(p, getenv("DAQ_SETUP")); - - strftime(s, 20, "%Y-%m-%dT%H:%M:%S", localtime(&t)); - Param_storeString(p, n, "startdate", s); - Param_storeInt(p, n, "nrofmsgs", my->nrOfMsgs); - for (i = 0; i < my->slowCtrlFileCnt; i++) { - sprintf(s, "slwctrlfile%d", i); - Param_storeString(p, n, s, my->slowCtrlFiles[i]); - } - Param_storeString(p, n, "outpath", my->outPath); - Param_storeString(p, n, "outdev", my->outDev); - Param_storeInt(p, n, "runnr", my->runNr); - Param_storeString(p, n, "expid", my->expId); - Param_storeInt(p, n, "prio", my->priority); - Param_storeInt(p, n, "stndln", my->isStandalone); - Param_storeInt(p, n, "qsize", my->queueSize); - Param_storeString(p, n, "verb", my->verbosity); - /* This storing junk is for having the run start in - * oracle definately -- BS - */ - for (i = 0; i < NJUNK; i++) { - Param_storeString(p, n, "junk", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); - } - desParam(p); + Param pS, *p = &pS; + int i; + char s[20]; + + conSetupParam (p, getenv ("DAQ_SETUP")); + + strftime (s, 20, "%Y-%m-%dT%H:%M:%S", localtime (&t)); + Param_storeString (p, n, "startdate", s); + Param_storeInt (p, n, "nrofmsgs", my->nrOfMsgs); + for (i = 0; i < my->slowCtrlFileCnt; i++) + { + sprintf (s, "slwctrlfile%d", i); + Param_storeString (p, n, s, my->slowCtrlFiles[i]); + } + Param_storeString (p, n, "outpath", my->outPath); + Param_storeString (p, n, "outdev", my->outDev); + Param_storeInt (p, n, "runnr", my->runNr); + Param_storeString (p, n, "expid", my->expId); + Param_storeInt (p, n, "prio", my->priority); + Param_storeInt (p, n, "stndln", my->isStandalone); + Param_storeInt (p, n, "qsize", my->queueSize); + Param_storeString (p, n, "verb", my->verbosity); + if (my->resdownscale != 0) + { + Param_storeInt (p, n, "resdownscale", my->resdownscale); + Param_storeInt (p, n, "resnumevents", my->resnumevents); + Param_storeString (p, n, "respath", my->respath); + Param_storeInt (p, n, "ressizelimit", my->ressizelimit); + + } + if (my->secsizelimit != 0.) + { +/* Param_storeString (p, n, "secsizelimit", my->secsizelimit);*/ + } + if (my->no_rpc == 1) + { + Param_storeInt (p, n, "rpc", my->no_rpc); + } + if (my->write_data == 1) + { + Param_storeString (p, n, "sec_path", my->sec_path); + } + /* This storing junk is for having the run start in + * oracle definately -- BS + */ + for (i = 0; i < NJUNK; i++) + { + Param_storeString (p, n, "junk", + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); + } + desParam (p); } -static void storeInfoStop(const char *n, time_t t, Worker *w) +static void +storeInfoStop (const char *n, time_t t, Worker * w) { - Param pS, *p = &pS; - int i; - char s[20]; + Param pS, *p = &pS; + int i; + char s[20]; + + conSetupParam (p, getenv ("DAQ_SETUP")); - conSetupParam(p, getenv("DAQ_SETUP")); + for (i = 0; i < 32 && strcmp (w->statistics[i].name, "") != 0; i++) + { + Param_storeInt (p, n, w->statistics[i].name, w->statistics[i].value); + } + strftime (s, 20, "%Y-%m-%dT%H:%M:%S", localtime (&t)); + Param_storeString (p, n, "stopdate", s); + + desParam (p); +} - for (i = 0; i < 32 && strcmp(w->statistics[i].name, "") != 0; i++) { - Param_storeInt(p, n, w->statistics[i].name, w->statistics[i].value); +int +is_mismatch_enough_to_stop(TheArgs * theArgs, TheStats * my) { + unsigned long filesize_enough = (theArgs->maxFileSz) * 0.02; + unsigned long discarded_enough = (*my->evtsComplete) * 0.03; + unsigned long tagerror_enough = (*my->evtsComplete) * 0.03; + if ((*my->bytesWritten) >= filesize_enough) + { + if ((*my->evtsDiscarded) > discarded_enough) + { + printf("Too many events is broken.\n"); + printf("Debug Inf: filesize_enough: %u,fileWrittem: %u, DISCARDED_enough: %u, discarded: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),discarded_enough, (*my->evtsDiscarded), (*my->evtsComplete)); + syslog (LOG_ERR, "Too many events is broken.\n"); + syslog (LOG_ERR, "Debug Inf: filesize_enough: %u,fileWrittem: %u, DISCARDED_enough: %u, discarded: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),discarded_enough, (*my->evtsDiscarded), (*my->evtsComplete)); +#if BEAM + system ("echo tagerror | netcat -w1 hadesdaq 12122"); +#endif + sleep(5); + } + if ((*my->evtsTagError) > tagerror_enough) + { + printf("Too many TagError.\n"); + printf("Debug Inf: filesize_enough: %u,fileWrittem: %u, TAGERROR_enough: %u, tag_error: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),tagerror_enough, (*my->evtsTagError), (*my->evtsComplete)); + syslog(LOG_ERR, "Too many TagError.\n"); + syslog(LOG_ERR, "Debug Inf: filesize_enough: %u,fileWrittem: %u, TAGERROR_enough: %u, tag_error: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),tagerror_enough, (*my->evtsTagError), (*my->evtsComplete)); +#if BEAM + system ("echo tagerror | netcat -w1 hadesdaq 12122"); +#endif + sleep(5); + } + } + return 0; +} +int +get_file_number_in_dir (char *path) +{ + int file_number; + file_number = 0; + struct dirent *dirptr; + struct stat bufS, *buf = &bufS; + DIR *dir; + char tmppath[PARAM_MAX_VALUE_LEN]; + + dir = opendir (path); + + if (dir == NULL) + { + printf ("open dir: %s failed\n", path); + exit (EXIT_FAILURE); + } + + while (NULL != (dirptr = readdir (dir))) + { +#if 0 + strcpy (tmppath, path); + strcat (tmppath, "/"); + strcat (tmppath, dirptr->d_name); + lstat (tmppath, buf); + + if (S_ISDIR (buf->st_mode)) + { +#if 0 + printf ("directory %s \n", tmppath); +#endif + } + else + { +#endif + file_number ++; +#if 0 } - strftime(s, 20, "%Y-%m-%dT%H:%M:%S", localtime(&t)); - Param_storeString(p, n, "stopdate", s); +#endif + } + closedir(dir); + return file_number; +} +double +get_directory_size (char *path) +{ + double directory_size; + directory_size = 0.; + struct dirent *dirptr; + struct stat bufS, *buf = &bufS; + DIR *dir; + char tmppath[PARAM_MAX_VALUE_LEN]; + + dir = opendir (path); + + if (dir == NULL) + { + printf ("open dir: %s failed\n", path); + exit (EXIT_FAILURE); + } + + while (NULL != (dirptr = readdir (dir))) + { + strcpy (tmppath, path); + strcat (tmppath, "/"); + strcat (tmppath, dirptr->d_name); + lstat (tmppath, buf); + + if (S_ISDIR (buf->st_mode)) + { +#if 0 + printf ("directory %s \n", tmppath); +#endif + } + else + { + directory_size += (double)(buf->st_size/(double)(1024.*1024.)); + } + } + closedir(dir); + return directory_size; +} - desParam(p); +#define DEBUG 0 +static int +remove_file (char *path) +{ +/*delete files if limit was reached*/ +#if 0 + printf ("limit overflow: ressizelimit: %f, res_dirSize: %f\n", + theArgs->ressizelimit, res_dirSize); +#endif + DIR *dir; + struct dirent *dirptr; + struct stat bufS, *buf = &bufS; + int recover_size; + time_t last_modification_time = 2147483647; + char tmppath[PARAM_MAX_VALUE_LEN]; + char last_modification_file[PARAM_MAX_VALUE_LEN]; + dir = opendir (path); + if (dir == NULL) + { + printf ("open dir: %s failed\n", path); + exit (EXIT_FAILURE); + } + + while (1) + { + if (NULL == (dirptr = readdir (dir))) + { + break; + } + strcpy (tmppath, path); + strcat (tmppath, "/"); + strcat (tmppath, dirptr->d_name); + lstat (tmppath, buf); + if(strstr(tmppath,"hld") == NULL){ + continue; + } + + if (S_ISDIR (buf->st_mode)) + { +#if 0 + printf ("directory %s \n", tmppath); +#endif + } + else + { + if (buf->st_mtime < last_modification_time) + { + last_modification_time = buf->st_mtime; + recover_size = buf->st_size; + strcpy (last_modification_file, tmppath); + } + } + } + closedir(dir); +#if 0 + printf ("unlink: %s\n", last_modification_file); +#endif + if(0!=unlink (last_modification_file)){ + printf("cannot unlink %s\n",last_modification_file); + exit(0); + } + return recover_size; } -static int openFile(TheArgs *theArgs) +static int +openFile (TheArgs * theArgs) { - char fileName[_POSIX_PATH_MAX]; - static char outPath[_POSIX_PATH_MAX]; - static once = 1; - - runNr = genId32(); - seqNr = 0; - - theArgs->runNr = runNr; - - if (once) { - strcpy(outPath, theArgs->outPath); - once = 0; - } else { - strcpy(theArgs->outPath, outPath); - } - - /* construct a default filename */ - strcpy(fileName, theArgs->expId); - strftime(fileName + strlen(fileName), 18, "%y%j%H%M%S.hld", localtime(&ourTime)); - - outTape = NULL; - outFile = NULL; - if (strcmp(theArgs->outDev, "null") == 0) { - outFile = NULL; - } else if (strcmp(theArgs->outDev, "stdout") == 0) { - outFile = stdout; - } else if (strcmp(theArgs->outDev, "file") == 0) { - if (strcmp(theArgs->outPath, "") == 0) { - strcpy(theArgs->outPath, fileName); - } else { - struct stat bufS, *buf = &bufS; - - stat(theArgs->outPath, buf); - if (S_ISDIR(buf->st_mode)) { - strcat(theArgs->outPath, "/"); - strcat(theArgs->outPath, fileName); - } - } +#if DEBUG + printf ("openFile\n"); +#endif + char fileName[_POSIX_PATH_MAX]; + static char outPath[_POSIX_PATH_MAX]; + static char sec_path[_POSIX_PATH_MAX]; + static once = 1; + + diff_time = 1; + runNr = genId32 (); + seqNr = 0; + + theArgs->runNr = runNr; + + if (once) + { + file_size = theArgs->maxFileSz; + strcpy (sec_path, theArgs->sec_path); + strcpy (outPath, theArgs->outPath); + once = 0; + } + else + { + strcpy (theArgs->outPath, outPath); + strcpy (theArgs->sec_path, sec_path); + } + + /* construct a default filename */ + strcpy (fileName, theArgs->expId); + strftime (fileName + strlen (fileName), 18, "%y%j%H%M%S.hld", + localtime (&ourTime)); + + outTape = NULL; + outFile = NULL; + outSecondFile = NULL; + if (strcmp (theArgs->outDev, "null") == 0) + { + outFile = NULL; + } + else if (strcmp (theArgs->outDev, "stdout") == 0) + { + outFile = stdout; + } + else if (strcmp (theArgs->outDev, "file") == 0) + { + if (strcmp (theArgs->outPath, "") == 0) + { + strcpy (theArgs->outPath, fileName); + } + else + { + struct stat bufS, *buf = &bufS; + + stat (theArgs->outPath, buf); + if (S_ISDIR (buf->st_mode)) + { + strcat (theArgs->outPath, "/"); + strcat (theArgs->outPath, fileName); + } + } - if (NULL == (outFile = fopen(theArgs->outPath, "wb"))) { - syslog(LOG_ERR, "opening file %s: %s", theArgs->outPath, strerror(errno)); - outFile = NULL; - return -1; - } else { - struct statfs bufS, *buf = &bufS; - statfs(theArgs->outPath, buf); - if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) { - errno = ENOSPC; - fclose(outFile); - outFile = NULL; - unlink(theArgs->outPath); - syslog(LOG_ERR, "opening file %s: %s", theArgs->outPath, strerror(errno)); - return -1; - } - } - } else if (strcmp(theArgs->outDev, "tape") == 0) { - if (strcmp(theArgs->outPath, "") == 0) { - strcpy(theArgs->outPath, fileName); + if (NULL == (outFile = fopen (theArgs->outPath, "wb"))) + { + syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, + strerror (errno)); + outFile = NULL; + return -1; + } + else + { + struct statfs bufS, *buf = &bufS; + statfs (theArgs->outPath, buf); + if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) + { + errno = ENOSPC; + if (0 != fclose (outFile)) + { +#if DEBUG2 + printf ("fclose 'File' failed\n"); +#endif } - if (NULL == (outTape = openAnsiTape(theArgs->outPath, "/dev/tape"))) { - syslog(LOG_ERR, "opening tape %s: %s", theArgs->outPath, strerror(errno)); - outFile = NULL; - return -1; + outFile = NULL; + unlink (theArgs->outPath); + syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, + strerror (errno)); + return -1; + } + } + } + else if (strcmp (theArgs->outDev, "tape") == 0) + { + if (strcmp (theArgs->outPath, "") == 0) + { + strcpy (theArgs->outPath, fileName); + } + if (NULL == (outTape = openAnsiTape (theArgs->outPath, "/dev/tape"))) + { + syslog (LOG_ERR, "opening tape %s: %s", theArgs->outPath, + strerror (errno)); + outFile = NULL; + return -1; + } + } + else + { + syslog (LOG_ERR, "unknown outputDev \"%s\"", theArgs->outDev); + return -1; + } + if (theArgs->write_data) + { + dirSize = get_directory_size (sec_path); +#if 0 + printf("dirSize: %f, secsizelimit: %f\n",dirSize,theArgs->secsizelimit); +#endif + while (dirSize > theArgs->secsizelimit) + { + /*delete files if limit was reached */ +#if 0 + printf ("limit overflow: secsizelimit: %f, dirSize: %f\n", + theArgs->secsizelimit, dirSize); + getchar(); +#endif + remove_file (sec_path); + dirSize = get_directory_size (sec_path); + } + if (strcmp (theArgs->sec_path, "") == 0) + { + strcpy (theArgs->sec_path, fileName); + } + else + { + struct stat bufS, *buf = &bufS; + + stat (theArgs->sec_path, buf); + if (S_ISDIR (buf->st_mode)) + { + strcat (theArgs->sec_path, "/"); + strcat (theArgs->sec_path, fileName); + } + } + if (NULL == (outSecondFile = fopen (theArgs->sec_path, "wb"))) + { + syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path, + strerror (errno)); + outSecondFile = NULL; + return -1; + } + else + { + struct statfs bufS, *buf = &bufS; + statfs (theArgs->sec_path, buf); + if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) + { + errno = ENOSPC; + if (0 != fclose (outSecondFile)) + { +#if DEBUG2 + printf ("fclose 'outSecondFile' failed\n"); +#endif } - } else { - syslog(LOG_ERR, "unknown outputDev \"%s\"", theArgs->outDev); - return -1; + outSecondFile = NULL; + unlink (theArgs->sec_path); + syslog (LOG_ERR, "opening file %s: %s", + theArgs->sec_path, strerror (errno)); + return -1; + } } - return 0; + } + return 0; } -static int writeFile(void *evt) +static int +openRESFile (TheArgs * theArgs) { - int writeFileR; - - Evt_setSeqNr(evt, seqNr++); - Evt_setRunNr(evt, runNr); - - if (outFile != NULL) { - writeFileR = fwrite(evt, 1, Evt_paddedSize(evt), outFile); - } else if (outTape != NULL) { - writeFileR = writeAnsiTape(outTape, evt, Evt_paddedSize(evt)); +#if 0 + printf ("openRESFile\n"); + getchar (); +#endif + char fileName[_POSIX_PATH_MAX]; + static res_once = 1; + static char respath[_POSIX_PATH_MAX]; + + res_seqNr = 0; + + if (res_once) + { + res_once = 0; + strcpy (respath, theArgs->respath); + } + else + { + strcpy (theArgs->respath, respath); + } + if (theArgs->ressizelimit != 0) + { + res_dirNr = get_file_number_in_dir (respath); +#if 0 + printf("res_dirNr: %d, theArgs->ressizelimit: %d\n,", res_dirNr,theArgs->ressizelimit); +#endif + while (res_dirNr > theArgs->ressizelimit) + { + /*delete files if limit was reached */ +#if 0 + printf ("limit overflow: res_sizelimit: %ul, res_dirSize: %ul\n", + theArgs->ressizelimit, res_dirSize); +#endif + remove_file (respath); + res_dirNr--; } - return writeFileR; + } + strcpy (fileName, theArgs->expId); + strftime (fileName + strlen (fileName), 15, "%y%j%H%M%S_", + localtime (&res_time)); + static int filecounter = 1; + if(diff_time == 1){ + filecounter = 1; + diff_time = 0; + }else { + filecounter++; + } + char app[8]; +#if 0 +// strcpy (theArgs->respath, respath); +// strcat (theArgs->respath,fileName); +#endif + sprintf (app, "%d", filecounter); + strcat (fileName,app); + strcat (fileName,".hld"); + if (strcmp (theArgs->respath, "") == 0) + { + strcpy (theArgs->respath, fileName); + } + else + { + struct stat bufS, *buf = &bufS; + + stat (theArgs->respath, buf); + if (S_ISDIR (buf->st_mode)) + { + strcat (theArgs->respath, "/"); + strcat (theArgs->respath, fileName); + } + } + /* construct a default filename */ + outRESFile = NULL; +#if 0 + /*if files are created more often than 1 sec, postfix is added to their name */ + struct stat bufS, *buf = &bufS; + while (0 == stat (theArgs->respath, buf)) + { /*file exists */ + strcpy (theArgs->respath, respath); + sprintf (file_app, "%d", filecounter); + strcat (theArgs->respath, "/"); + strcat (theArgs->respath, fileName); + strcat (theArgs->respath, file_app); + filecounter++; + } +#endif + if (NULL == (outRESFile = fopen (theArgs->respath, "wb"))) + { + syslog (LOG_ERR, "opening file %s: %s", theArgs->respath, + strerror (errno)); + outRESFile = NULL; + return -1; + } + else + { + struct statfs bufS, *buf = &bufS; + statfs (theArgs->respath, buf); + if ((theArgs->maxFileSz) / buf->f_bsize > buf->f_bavail) + { + errno = ENOSPC; + if (0 != fclose (outRESFile)) + { +#if DEBUG2 + + printf ("fclose 'outRESFile' failed\n"); +#endif + } + outRESFile = NULL; + unlink (theArgs->respath); + syslog (LOG_ERR, "opening file %s: %s", theArgs->respath, + strerror (errno)); + return -1; + } + } +#if 0 + printf ("theArgs->respath %s\n", theArgs->respath); +#endif + return 0; } - -static int closeFile() +static int +writeFile (void *evt) { - int closeFileR; +#if DEBUG + printf ("writeFile\n"); +#endif + int writeFileR; + + Evt_setSeqNr (evt, seqNr++); + Evt_setRunNr (evt, runNr); + + if (outFile != NULL) + { + writeFileR = fwrite (evt, 1, Evt_paddedSize (evt), outFile); + } + else if (outTape != NULL) + { + writeFileR = writeAnsiTape (outTape, evt, Evt_paddedSize (evt)); + } + /* writing file in the second dir */ + if (outSecondFile != NULL) + { + fwrite (evt, 1, Evt_paddedSize (evt), outSecondFile); + } + return writeFileR; +} - if (outFile != NULL) { - closeFileR = fclose(outFile); - } else if (outTape != NULL) { - closeFileR = closeAnsiTape(outTape); - } - return closeFileR; +static int +writeRESFile (void *evt) +{ +#if DEBUG + printf ("writeRESFile\n"); +#endif + int writeFileR; + Evt_setSeqNr (evt, res_seqNr++); + Evt_setRunNr (evt, runNr); + if (outRESFile != NULL) + { + writeFileR = fwrite (evt, 1, Evt_paddedSize (evt), outRESFile); + } + Evt_setSeqNr (evt, seqNr); +#if DEBUG + printf ("END writeRESFile\n"); +#endif + return writeFileR; } -/* BUGBUG bailOut not proper yet */ -int main(int argc, char *argv[]) +static int +closeFile () { - int i; - TheArgs theArgsS, *theArgs = &theArgsS; - TheStats theStatsS, *theStats = &theStatsS; - ShmTrans **shmTrans; - HadTuQueue **hadTuQueue; - Worker *worker; - - int scanWasSuccessful; - uint32_t currTrigNr; - uint32_t currTrigTag; - uint32_t currId; - - openlog(argv[0], LOG_PID | LOG_PERROR, LOG_LOCAL0); - setlogmask(LOG_UPTO(LOG_INFO)); - - argsDefault(theArgs); - argsFromParam(theArgs, argc, argv); - if (0 > argsFromCL(theArgs, argc, argv)) { - usage(argv[0]); - exit(EXIT_FAILURE); - } - for (i = 0; prioritynames[i].c_name != NULL && 0 != strcmp(prioritynames[i].c_name, theArgs->verbosity); i++) { - } - if (prioritynames[i].c_name == NULL) { - exit(EXIT_FAILURE); - } else { - setlogmask(LOG_UPTO(prioritynames[i].c_val)); - } - - /* normalize experiment id */ - theArgs->expId[0] = tolower(theArgs->expId[0]); - theArgs->expId[1] = tolower(theArgs->expId[1]); - theArgs->expId[2] = '\0'; - - if (NULL == (worker = Worker_initBegin(argv[0], sigHandler, theArgs->priority, theArgs->isStandalone))) { - syslog(LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror(errno)); - exit(EXIT_FAILURE); - } - if (-1 == initOnline()) { - syslog(LOG_WARNING, "unable to initialize online service"); - } - if (theArgs->nrOfMsgs == 0) { - /* - no '-m' option was on command line, we assume that the - readout task (daq_readout) is running on the same node and - communicates directly via shared memory - */ - syslog(LOG_NOTICE, "nrOfMsgs not set, assuming readout is local"); - - shmTrans = malloc(sizeof(ShmTrans *)); - hadTuQueue = malloc(sizeof(HadTuQueue *)); - - shmTrans[0] = ShmTrans_create("subevtqueue", 2 * theArgs->queueSize); - hadTuQueue[0] = NULL; - theArgs->nrOfMsgs = 1; - } else { - shmTrans = malloc(theArgs->nrOfMsgs * sizeof(ShmTrans *)); - hadTuQueue = malloc(theArgs->nrOfMsgs * sizeof(HadTuQueue *)); - - for (i = 0; i < theArgs->nrOfMsgs; i++) { - char buf[_POSIX_PATH_MAX]; - - sprintf(buf, "netqueue%d", i); - shmTrans[i] = ShmTrans_create(buf, 2 * theArgs->queueSize); - hadTuQueue[i] = NULL; - } +#if DEBUG + printf ("closeFile\n"); +#endif + int closeFileR; + /* closing file in the second dir */ + if (outSecondFile != NULL) + { + dirSize += file_size; + if (0 != fclose (outSecondFile)) + { +#if DEBUG2 + printf ("fclose 'outSecondFile2' failed\n"); +#endif } + } + else + { +#if DEBUG2 + printf ("closeFile error outSecondFile == NULL\n"); +#endif - theStats->evtsDiscarded = Worker_addStatistic(worker, "evtsDiscarded"); - theStats->evtsComplete = Worker_addStatistic(worker, "evtsComplete"); - theStats->evtsDataError = Worker_addStatistic(worker, "evtsDataError"); - theStats->evtsTagError = Worker_addStatistic(worker, "evtsTagError"); - theStats->bytesWritten = Worker_addStatistic(worker, "bytesWritten"); - for (i = 0; i < NEVTIDS; i++) { - char buf[WORKER_MAX_NAME_LEN]; + } - sprintf(buf, "evtId%d", i); - theStats->evtId[i] = Worker_addStatistic(worker, buf); - } - for (i = 0; i < theArgs->nrOfMsgs; i++) { - char buf[WORKER_MAX_NAME_LEN]; + if (outFile == NULL) + { +#if DEBUG2 + printf ("closeFile error outFile == NULL\n"); +#endif - sprintf(buf, "trigNr%d", i); - theStats->trigNr[i] = Worker_addStatistic(worker, buf); + } + if (outFile != NULL) + { + closeFileR = fclose (outFile); + if (0 != closeFileR) + { +#if DEBUG2 + printf ("fclose 'closeFileR' failed\n"); +#endif } + } + else if (outTape != NULL) + { + closeFileR = closeAnsiTape (outTape); + } + return closeFileR; +} - argsDump(theArgs); +static int +closeRESFile (TheArgs * theArgs) +{ +#if DEBUG + printf ("closeRESFile\n"); +#endif + int closeFileR; + if (outRESFile != NULL) + { + res_dirSize += res_file_size; + closeFileR = fclose (outRESFile); + if (0 != closeFileR) + { +#if DEBUG2 + printf ("fclose 'outRESFile2' failed\n"); +#endif + } + } + else + { +#if DEBUG2 + printf ("closeFile error outRESFile == NULL\n"); +#endif - Worker_initEnd(worker); + } + return closeFileR; +} - currId = 0; - while (setjmp(terminateJmp) == 0) { - void *evt; - void *subEvt; - int step; - int evtIsBroken = 0; - int dataError = 0; - int tagError = 0; +/* BUGBUG bailOut not proper yet */ +int +main (int argc, char *argv[]) +{ + int i; + TheArgs theArgsS, *theArgs = &theArgsS; + TheStats theStatsS, *theStats = &theStatsS; + ShmTrans **shmTrans; + HadTuQueue **hadTuQueue; + Worker *worker; + + int scanWasSuccessful; + uint32_t currTrigNr; + uint32_t currTrigTag; + uint32_t currId; + + openlog (argv[0], LOG_PID | LOG_PERROR, LOG_LOCAL0); + setlogmask (LOG_UPTO (LOG_INFO)); + + argsDefault (theArgs); + argsFromParam (theArgs, argc, argv); + if (0 > argsFromCL (theArgs, argc, argv)) + { + usage (argv[0]); + exit (EXIT_FAILURE); + } + for (i = 0; + prioritynames[i].c_name != NULL + && 0 != strcmp (prioritynames[i].c_name, theArgs->verbosity); i++) + { + } + if (prioritynames[i].c_name == NULL) + { + exit (EXIT_FAILURE); + } + else + { + setlogmask (LOG_UPTO (prioritynames[i].c_val)); + } + + /* normalize experiment id */ + theArgs->expId[0] = tolower (theArgs->expId[0]); + theArgs->expId[1] = tolower (theArgs->expId[1]); + theArgs->expId[2] = '\0'; + + if (NULL == + (worker = + Worker_initBegin (argv[0], sigHandler, theArgs->priority, + theArgs->isStandalone))) + { + syslog (LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror (errno)); + exit (EXIT_FAILURE); + } + if (theArgs->no_rpc) + { + syslog (LOG_WARNING, "DISABLE of online service"); + } + else + { + if (-1 == initOnline ()) + { + syslog (LOG_WARNING, "unable to initialize online service"); + } + } + if (theArgs->nrOfMsgs == 0) + { + /* + no '-m' option was on command line, we assume that the + readout task (daq_readout) is running on the same node and + communicates directly via shared memory + */ + syslog (LOG_NOTICE, "nrOfMsgs not set, assuming readout is local"); + + shmTrans = malloc (sizeof (ShmTrans *)); + hadTuQueue = malloc (sizeof (HadTuQueue *)); + + shmTrans[0] = ShmTrans_create ("subevtqueue", 2 * theArgs->queueSize); + hadTuQueue[0] = NULL; + theArgs->nrOfMsgs = 1; + } + else + { + shmTrans = malloc (theArgs->nrOfMsgs * sizeof (ShmTrans *)); + hadTuQueue = malloc (theArgs->nrOfMsgs * sizeof (HadTuQueue *)); + + for (i = 0; i < theArgs->nrOfMsgs; i++) + { + char buf[_POSIX_PATH_MAX]; + + sprintf (buf, "netqueue%d", i); + shmTrans[i] = ShmTrans_create (buf, 2 * theArgs->queueSize); + hadTuQueue[i] = NULL; + } + } + + theStats->evtsDiscarded = Worker_addStatistic (worker, "evtsDiscarded"); + theStats->evtsComplete = Worker_addStatistic (worker, "evtsComplete"); + theStats->evtsDataError = Worker_addStatistic (worker, "evtsDataError"); + theStats->evtsTagError = Worker_addStatistic (worker, "evtsTagError"); + theStats->bytesWritten = Worker_addStatistic (worker, "bytesWritten"); + unsigned long *retVal = NULL; + theStats->evtsRes = &retVal; + for (i = 0; i < NEVTIDS; i++) + { + char buf[WORKER_MAX_NAME_LEN]; + + sprintf (buf, "evtId%d", i); + theStats->evtId[i] = Worker_addStatistic (worker, buf); + } + for (i = 0; i < theArgs->nrOfMsgs; i++) + { + char buf[WORKER_MAX_NAME_LEN]; + + sprintf (buf, "trigNr%d", i); + theStats->trigNr[i] = Worker_addStatistic (worker, buf); + } + + argsDump (theArgs); + + Worker_initEnd (worker); + + currId = 0; + while (setjmp (terminateJmp) == 0) + { + void *evt; + void *subEvt; + int step; + int evtIsBroken = 0; + int dataError = 0; + int tagError = 0; + + statsDump (theArgs, theStats, 1); + + if (*theStats->bytesWritten == 0) + { + res_time = ourTime = time (NULL); + if (-1 == openFile (theArgs)) + { + syslog (LOG_ERR, "error opening output file, exiting"); + exit (EXIT_FAILURE); + } + storeInfoStart (argv[0], ourTime, theArgs); + + evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStart); + for (i = 0; i < theArgs->slowCtrlFileCnt; i++) + { + evt = appendFile (evt, theArgs->slowCtrlFiles[i]); + } + (*theStats->bytesWritten) += Evt_size (evt); + writeFile (evt); + deleteEvt (evt); + } + if ((*theStats->evtsRes) == 0) + { +#if 0 + ourTime = time (NULL); +#endif +/* remote event server - resdownscale - resnumevents*/ + if (theArgs->resdownscale) + { + if (-1 == openRESFile (theArgs)) + { + syslog (LOG_ERR, "error opening RES output file"); + exit (EXIT_FAILURE); + } + evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStart); + (*theStats->evtsRes)++; + writeRESFile (evt); + deleteEvt (evt); + } - statsDump(theArgs, theStats, 1); + } + evt = newEvt (EvtDecoding_64bitAligned, EvtId_data); + for (i = 0; i < theArgs->nrOfMsgs && !evtIsBroken; i += step) + { + uint32_t trigNr; + uint32_t trigTag; + + if (hadTuQueue[i] == NULL) + { + void *storage; + + storage = ShmTrans_recv (shmTrans[i]); + hadTuQueue[i] = malloc (HadTuQueue_sizeOf ()); + conHadTuQueue_voidP (hadTuQueue[i], storage); + } + subEvt = HadTuQueue_front (hadTuQueue[i]); +#ifndef NDEBUG + syslog (LOG_DEBUG, "hadTuQueue[%d]: %p = subEvt: %s", i, subEvt, + SubEvt_2charP (subEvt)); +#endif - if (*theStats->bytesWritten == 0) { - ourTime = time(NULL); - if (-1 == openFile(theArgs)) { - syslog(LOG_ERR, "error opening output file, exiting"); - exit(EXIT_FAILURE); - } - storeInfoStart(argv[0], ourTime, theArgs); + (*theStats->trigNr[i]) = SubEvt_trigNr (subEvt); - evt = newEvt(EvtDecoding_64bitAligned, EvtId_runStart); - for (i = 0; i < theArgs->slowCtrlFileCnt; i++) { - evt = appendFile(evt, theArgs->slowCtrlFiles[i]); - } - (*theStats->bytesWritten) += Evt_size(evt); - writeFile(evt); - deleteEvt(evt); + if (i == 0) + { + currTrigNr = SubEvt_trigNr (subEvt) >> 8; + currTrigTag = SubEvt_trigNr (subEvt) & 0xff; + if (theArgs->evtId != 0) + { + currId = theArgs->evtId; } - evt = newEvt(EvtDecoding_64bitAligned, EvtId_data); - for (i = 0; i < theArgs->nrOfMsgs && !evtIsBroken; i += step) { - uint32_t trigNr; - uint32_t trigTag; - - if (hadTuQueue[i] == NULL) { - void *storage; - - storage = ShmTrans_recv(shmTrans[i]); - hadTuQueue[i] = malloc(HadTuQueue_sizeOf()); - conHadTuQueue_voidP(hadTuQueue[i], storage); - } - subEvt = HadTuQueue_front(hadTuQueue[i]); -#ifndef NDEBUG - syslog(LOG_DEBUG, "hadTuQueue[%d]: %p = subEvt: %s", i, subEvt, SubEvt_2charP(subEvt)); -#endif - - (*theStats->trigNr[i]) = SubEvt_trigNr(subEvt); - - if (i == 0) { - currTrigNr = SubEvt_trigNr(subEvt) >> 8; - currTrigTag = SubEvt_trigNr(subEvt) & 0xff; - if (theArgs->evtId != 0) { - currId = theArgs->evtId; - } else { - currId = SubEvt_pureId(subEvt); - } - syslog(LOG_DEBUG, "currTrigNr: 0x%06x, currTrigTag 0x%02x, currId 0x%08x", currTrigNr, currTrigTag, currId); - } - trigNr = SubEvt_trigNr(subEvt) >> 8; - trigTag = SubEvt_trigNr(subEvt) & 0xff; - if (trigNr == currTrigNr) { - if (SubEvt_size(subEvt) > SubEvt_hdrSize()) { - /* sub evt is not empty */ - if (SubEvt_dataError(subEvt)) { - dataError = 1; - } - if (trigTag != currTrigTag) { - tagError = 1; - } - evt = Evt_appendSubEvt(evt, subEvt); - } - HadTuQueue_pop(hadTuQueue[i]); - step = 1; - } else if (trigNr < currTrigNr) { - /* BUGBUG subevt discarded, not in statistic */ - HadTuQueue_pop(hadTuQueue[i]); - step = 0; - } else { - evtIsBroken = 1; - } - if (HadTuQueue_empty(hadTuQueue[i])) { - desHadTuQueue(hadTuQueue[i]); - free(hadTuQueue[i]); - hadTuQueue[i] = NULL; - ShmTrans_free(shmTrans[i]); - } + else + { + currId = SubEvt_pureId (subEvt); } - if (!evtIsBroken) { - (*theStats->evtsComplete)++; - - (*theStats->evtId[currId & (NEVTIDS - 1)])++; - Evt_setId(evt, currId & (NEVTIDS_IN_FILE - 1)); - - if (dataError) { - Evt_setDataError(evt); - (*theStats->evtsDataError)++; - } - if (tagError) { - Evt_setDataError(evt); - (*theStats->evtsTagError)++; - } - (*theStats->bytesWritten) += Evt_size(evt); - writeFile(evt); - Evt_online(evt); - } else { - (*theStats->evtsDiscarded)++; + syslog (LOG_DEBUG, + "currTrigNr: 0x%06x, currTrigTag 0x%02x, currId 0x%08x", + currTrigNr, currTrigTag, currId); + } + trigNr = SubEvt_trigNr (subEvt) >> 8; + trigTag = SubEvt_trigNr (subEvt) & 0xff; + if (trigNr == currTrigNr) + { + if (SubEvt_size (subEvt) > SubEvt_hdrSize ()) + { + /* sub evt is not empty */ + if (SubEvt_dataError (subEvt)) + { + dataError = 1; + } + if (trigTag != currTrigTag) + { + tagError = 1; + } + evt = Evt_appendSubEvt (evt, subEvt); } - deleteEvt(evt); - if (*theStats->bytesWritten >= theArgs->maxFileSz - theArgs->queueSize) { - evt = newEvt(EvtDecoding_64bitAligned, EvtId_runStop); - for (i = 0; i < theArgs->slowCtrlFileCnt; i++) { - evt = appendFile(evt, theArgs->slowCtrlFiles[i]); - } - (*theStats->bytesWritten) += Evt_size(evt); - writeFile(evt); - deleteEvt(evt); - - ourTime = time(NULL); - closeFile(); - storeInfoStop(argv[0], ourTime - 2, worker); - - (*theStats->bytesWritten) = 0; - (*theStats->evtsComplete) = 0; - (*theStats->evtsDiscarded) = 0; - (*theStats->evtsDataError) = 0; - (*theStats->evtsTagError) = 0; - for (i = 0; i < theArgs->nrOfMsgs; i++) { - (*theStats->trigNr[i]) = 0; - } - for (i = 0; i < NEVTIDS; i++) { - (*theStats->evtId[i]) = 0; - } + HadTuQueue_pop (hadTuQueue[i]); + step = 1; + } + else if (trigNr < currTrigNr) + { + /* BUGBUG subevt discarded, not in statistic */ + HadTuQueue_pop (hadTuQueue[i]); + step = 0; + } + else + { + evtIsBroken = 1; + } + if (HadTuQueue_empty (hadTuQueue[i])) + { + desHadTuQueue (hadTuQueue[i]); + free (hadTuQueue[i]); + hadTuQueue[i] = NULL; + ShmTrans_free (shmTrans[i]); + } + } + if (!evtIsBroken) + { + (*theStats->evtsComplete)++; + + (*theStats->evtId[currId & (NEVTIDS - 1)])++; + Evt_setId (evt, currId & (NEVTIDS_IN_FILE - 1)); + + if (dataError) + { + Evt_setDataError (evt); + (*theStats->evtsDataError)++; + } + if (tagError) + { + Evt_setDataError (evt); + (*theStats->evtsTagError)++; +#if CHECK_MISMATCH + is_mismatch_enough_to_stop(theArgs, theStats); +#endif + } + (*theStats->bytesWritten) += Evt_size (evt); + writeFile (evt); + if (theArgs->resdownscale) + { + (*theStats->evtsRes)++; +/* if ((*theStats->evtsRes) % (theArgs->resdownscale) == 0)*/ + if (((*theStats->evtsRes) % EVENT_NUM_OFFSET) < + theArgs->resdown_offset) + { + writeRESFile (evt); } + } + Evt_online (evt); } - - ourTime = time(NULL); - closeFile(); - storeInfoStop(argv[0], ourTime - 2, worker); - - statsDump(theArgs, theStats, 1); - - for (i = 0; i < theArgs->nrOfMsgs; i++) { - ShmTrans_remove(shmTrans[i]); + else + { + (*theStats->evtsDiscarded)++; +#if CHECK_MISMATCH + is_mismatch_enough_to_stop(theArgs, theStats); +#endif } - finiOnline(); - Worker_fini(worker); - - exit(EXIT_SUCCESS); + deleteEvt (evt); + if ((*theStats->evtsRes) % 600 == 0) + { +#if 0 + printf ("condition: %u > %u\n", (*theStats->bytesWritten), + ((theArgs->maxFileSz) - (theArgs->queueSize))); + printf ("maxFileSz %u ; queueSize: %u\n", (theArgs->maxFileSz), + theArgs->queueSize); +#endif + } + if ((*theStats->bytesWritten) >= + ((theArgs->maxFileSz) - (theArgs->queueSize))) + { + evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStop); + for (i = 0; i < theArgs->slowCtrlFileCnt; i++) + { + evt = appendFile (evt, theArgs->slowCtrlFiles[i]); + } + (*theStats->bytesWritten) += Evt_size (evt); + writeFile (evt); + deleteEvt (evt); + + ourTime = time (NULL); + closeFile (); + storeInfoStop (argv[0], ourTime - 2, worker); + + (*theStats->bytesWritten) = 0; + (*theStats->evtsComplete) = 0; + (*theStats->evtsDiscarded) = 0; + (*theStats->evtsDataError) = 0; + (*theStats->evtsTagError) = 0; + for (i = 0; i < theArgs->nrOfMsgs; i++) + { + (*theStats->trigNr[i]) = 0; + } + for (i = 0; i < NEVTIDS; i++) + { + (*theStats->evtId[i]) = 0; + } + } + if (theArgs->resdownscale) + { + if ((*theStats->evtsRes) >= + (theArgs->resdownscale) * (theArgs->resnumevents)) + { +#if 0 + ourTime = time (NULL); +#endif + evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStop); + writeRESFile (evt); + deleteEvt (evt); + closeRESFile (theArgs); + (*theStats->evtsRes) = 0; + } + } + } + ourTime = time (NULL); + closeFile (); + if (theArgs->resdownscale) + { + closeRESFile (theArgs); + } + storeInfoStop (argv[0], ourTime - 2, worker); + + statsDump (theArgs, theStats, 1); + + for (i = 0; i < theArgs->nrOfMsgs; i++) + { + ShmTrans_remove (shmTrans[i]); + } + finiOnline (); + Worker_fini (worker); + exit (EXIT_SUCCESS); } -- 2.43.0