From: hadaq Date: Tue, 8 Apr 2008 13:59:51 +0000 (+0000) Subject: 1. RFIO functionality added. 2. Fill levels of buffers can be monitored. S.Y. X-Git-Url: https://jspc29.x-matter.uni-frankfurt.de/git/?a=commitdiff_plain;h=6529c70e8891f4edcbb3c1b84db45766598b8624;p=daqdata.git 1. RFIO functionality added. 2. Fill levels of buffers can be monitored. S.Y. --- diff --git a/hadaq/evtbuild.c b/hadaq/evtbuild.c index 1cfb255..6e05b8b 100644 --- a/hadaq/evtbuild.c +++ b/hadaq/evtbuild.c @@ -1,9 +1,10 @@ static char *rcsId = - "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.80 2008-03-31 17:54:40 hadaq Exp $"; + "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.81 2008-04-08 13:59:51 hadaq Exp $"; #define _POSIX_C_SOURCE 199309L #define SYSLOG_NAMES +#define RFIO #include #include @@ -17,6 +18,7 @@ static char *rcsId = #include #include #include +#include #include #include @@ -34,6 +36,11 @@ static char *rcsId = #include "ansiTape.h" #include "genid32.h" +#ifdef RFIO +#include "rawapin.h" /* for rfio */ +static RFILE *fRemote = NULL; +#endif + #define MAXINPATH 100 #define NEVTIDS 64UL /* must be 2^n */ @@ -58,6 +65,7 @@ static unsigned int res_dirNr; static time_t res_time; static int diff_time; static int trig_mismatch; + typedef struct TheArgsS { unsigned long nrOfMsgs; @@ -87,6 +95,10 @@ typedef struct TheArgsS /* the following arguments are for the variable queue size. S.Y. */ long varQSize[MAXINPATH]; int varQSizeCnt; + + unsigned int rfioFlag; + char rfioRemotePath[PARAM_MAX_VALUE_LEN]; + unsigned int buffStat; } TheArgs; typedef struct TheStatsS @@ -175,6 +187,11 @@ usage (const char *progName) syslog (LOG_ERR, "Usage: [--ressizelimit max number of files in res dir]"); syslog (LOG_ERR, "Usage: [--write_data path_where_the_mirroring_data_are_ written]"); +#ifdef RFIO + syslog (LOG_ERR, "[--rfio 0|1] write data to tape via rfio (0=no/1=yes)"); + syslog (LOG_ERR, "[--rfiopath path_to_tape_archive]"); +#endif + syslog (LOG_ERR, "[--buffstat 0|1] show fill levels of buffers]"); } static void @@ -247,6 +264,9 @@ argsDefault (TheArgs * my) for (i = 0; i < MAXINPATH; i++) { my->varQSize[i] = 1 * 1024 * 1024; } + my->rfioFlag = 0; + strcpy (my->rfioRemotePath, ""); + my->buffStat = 0; } static int @@ -258,24 +278,25 @@ argsFromCL (TheArgs * my, int argc, char *argv[]) { int this_option_optind = optind ? optind : 1; int option_index = 0; -#if 0 - second arg:no_argument (0), required_argument (1), optional_argument (2) -#endif + static struct option long_options[] = { - {"norpc", 0, 0, 't'}, - {"filesize", 1, 0, 'z'}, + {"norpc", 0, 0, 't'}, + {"filesize", 1, 0, 'z'}, {"resdownscale", 1, 0, 'e'}, {"resnumevents", 1, 0, 'n'}, - {"respath", 1, 0, 'h'}, + {"respath", 1, 0, 'h'}, {"secsizelimit", 1, 0, 'l'}, {"ressizelimit", 1, 0, 's'}, - {"write_data", 1, 0, 'w'}, - {"help", 0, 0, 'H'}, + {"write_data", 1, 0, 'w'}, + {"help", 0, 0, 'H'}, + {"rfio", 1, 0, 'R'}, + {"rfiopath", 1, 0, 'P'}, + {"buffstat", 1, 0, 'b'}, {0, 0, 0, 0} }; i = getopt_long (argc, argv, - "am:f:r:o:d:q:p:v:x:I:S:tz:e:n:h:w:tz:e:n:h:w:Hs:l:", + "am:f:r:o:d:q:p:v:x:I:S:tz:e:n:h:w:tz:e:n:Hs:l:R:P:b", long_options, &option_index); if (i == -1) break; @@ -381,6 +402,15 @@ argsFromCL (TheArgs * my, int argc, char *argv[]) my->write_data = 1; strcpy (my->sec_path, optarg); break; + case 'R': + my->rfioFlag = strtoul (optarg, NULL, 0); + break; + case 'P': + strcpy (my->rfioRemotePath, optarg); + break; + case 'b': + my->buffStat = strtoul (optarg, NULL, 0); + break; case 'H': usage (argv[0]); return -1; @@ -421,25 +451,14 @@ the condition that resdownscale resnumevents respath have to be together printf ("--filesize has to be > queuesize(-q)\n"); exit (0); } -/* -the condition ressizelimit has to be together with respath -*/ + /* the condition ressizelimit has to be together with respath */ if ((my->ressizelimit) != 0) { if (!strcmp (my->respath, "")) -#if 0 - if (my->respath != NULL) -#endif { - printf - ("you have to define path, where the files will be collect. use 'respath' option\n"); + printf ( "you have to define path, where the files will be collect. use 'respath' option\n" ); exit (EXIT_FAILURE); } - else - - { - - } } return 0; } @@ -486,8 +505,165 @@ unit (unsigned long v) return retVal; } -static void -statsDump (TheArgs * theArgs, TheStats * my, int interval) +static void printTime() +{ + struct timeval tv; + struct tm* ptm; + char time_string[40]; + long milliseconds; + + gettimeofday( &tv, NULL ); + + /* Obtain the time of day, and convert it to a tm struct. */ + ptm = localtime (&tv.tv_sec); + + /* Format the date and time, down to a single second. */ + strftime (time_string, sizeof (time_string), "%Y-%m-%d %H:%M:%S", ptm); + + /* Compute milliseconds from microseconds. */ + milliseconds = tv.tv_usec / 1000; + + /* Print the formatted time, in seconds, followed by a decimal point + and the milliseconds. */ + printf ("Time: %s.%03ld\n", time_string, milliseconds); +} + +static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, HadTuQueue **htuq, ShmTrans **shmtr) +{ + static unsigned long lastEC2; + static unsigned long lastBW2; + + static time_t t0 = 0; + time_t t, dT; + int i, j; + int col = 0; + char emptybuffer[] = "-"; + int outputGraph = 1; + int outputNum = 0; + + if (theArgs->isStandalone && theArgs->buffStat == 1) { + + t = time (NULL); + dT = t - t0; + + if (dT >= interval) { + + if( outputNum == 1) { + fputs("==============================================================================\n\n",stderr); + + for(i=0; inrOfMsgs; i++) { + + fprintf (stderr, "q[%2d]: ", i); + if (!HadTuQueue_empty(shmtr[i]->rdQueue)) + fprintf (stderr, "%8d ", HadTuQueue_size(shmtr[i]->rdQueue)); + else + fprintf (stderr, "%8s ", emptybuffer); + + col++; + if (col == 6) { + fputc ('\n', stderr); + col = 0; + } + } + } + if( outputGraph == 1 ){ + fputs ("------------------ buffer fill levels ----------------------------------------\n", stderr); + + float buffSize, queueSize; + int maxnorm = 10.; + + for( j=0; jnrOfMsgs; i++ ){ + buffSize = 2*theArgs->varQSize[i]; + queueSize = HadTuQueue_size(shmtr[i]->rdQueue); + + if(maxnorm - maxnorm*queueSize/buffSize < j){ + if (!HadTuQueue_empty(shmtr[i]->rdQueue)) + fputc( '|', stderr ); + else + fputc( '-', stderr ); + + } + else{ + fputc( ' ', stderr ); + } + } + fputc( '\n', stderr ); + } + + /* The following is just to print the numbers of buffers*/ + int factor, mod; + + fputs( "q:", stderr ); + factor = 0; + for( i=0; inrOfMsgs; i++ ) { + mod = i%10; + fprintf (stderr, "%1d", mod); + } + fputc( '\n', stderr ); + + fputs( " ", stderr ); + for( i=0; inrOfMsgs; i++ ) { + mod = i%10; + if( mod == 0 ) + fprintf (stderr, "%1d", i/10 ); + else + fputc( ' ', stderr ); + } + fputc( '\n', stderr ); + fputs( "------------------------------------------------------------------------------\n", stderr ); + + /* Print Trigger Numbers for all queues */ + char trigNum[theArgs->nrOfMsgs][10]; + + for( i=0; i < theArgs->nrOfMsgs; i++ ) { + sprintf( trigNum[i], "%08x", *my->trigNr[i] ); + } + + for( j=0; j < 8; j++ ) { + fputc (' ', stderr); + fputc (' ', stderr); + for( i=0; i < theArgs->nrOfMsgs; i++ ) { + fprintf (stderr, "%c", trigNum[i][j]); + } + fputc ('\n', stderr); + } + fputs( "------------------------------------------------------------------------------\n", stderr ); + + fprintf (stderr, "%7s:%6s", "evtComp", + unit (*my->evtsComplete)); + if( dT > 0 ) + fprintf (stderr, "%10s:%6s", " evtComp/s", + unit ((*my->evtsComplete - lastEC2) / dT)); + fprintf (stderr, "%10s:%6s", " bytesWrit", + unit (*my->bytesWritten)); + if( dT > 0 ) + fprintf (stderr, "%12s:%6s", " bytesWrit/s", + unit ((*my->bytesWritten - lastBW2) / dT)); + fputc ('\n', stderr); + fprintf (stderr, "%7s:%6s", "evtDisc", + unit (*my->evtsDiscarded)); + fprintf (stderr, "%10s:%6s", " evtDatErr", + unit (*my->evtsDataError)); + fprintf (stderr, "%10s:%6s", " evtTagErr", + unit (*my->evtsTagError)); + + fputc ('\n', stderr); + + lastEC2 = *my->evtsComplete; + lastBW2 = *my->bytesWritten; + + /* get wall-clock time */ + printTime(); + } + } + + t0 = t; + } +} + +static void statsDump (TheArgs * theArgs, TheStats * my, int interval) { static unsigned long lastEC; static unsigned long lastEE; @@ -499,7 +675,8 @@ statsDump (TheArgs * theArgs, TheStats * my, int interval) int i; - if (theArgs->isStandalone && strcmp (theArgs->verbosity, "info") == 0) + if (theArgs->isStandalone && strcmp (theArgs->verbosity, "info") == 0 && + theArgs->buffStat == 0) { t = time (NULL); dT = t - t0; @@ -523,18 +700,21 @@ statsDump (TheArgs * theArgs, TheStats * my, int interval) unit (*my->evtsTagError)); fputc ('\n', stderr); - fprintf (stderr, "%19s:%6s", "evtsComplete/s", - unit ((*my->evtsComplete - lastEC) / dT)); - fprintf (stderr, "%19s:%6s", "evtsDiscarded/s", - unit ((*my->evtsDiscarded - lastED) / dT)); - fprintf (stderr, "%19s:%6s", "bytesWritten/s", - unit ((*my->bytesWritten - lastBW) / dT)); - fputc ('\n', stderr); - fprintf (stderr, "%19s:%6s", "evtsDataError/s", - unit ((*my->evtsDataError - lastEE) / dT)); - fprintf (stderr, "%19s:%6s", "evtsTagError/s", - unit ((*my->evtsTagError - lastTE) / dT)); - fputc ('\n', stderr); + if ( dT > 0 ){ + fprintf (stderr, "%19s:%6s", "evtsComplete/s", + unit ((*my->evtsComplete - lastEC) / dT)); + fprintf (stderr, "%19s:%6s", "evtsDiscarded/s", + unit ((*my->evtsDiscarded - lastED) / dT)); + fprintf (stderr, "%19s:%6s", "bytesWritten/s", + unit ((*my->bytesWritten - lastBW) / dT)); + fputc ('\n', stderr); + fprintf (stderr, "%19s:%6s", "evtsDataError/s", + unit ((*my->evtsDataError - lastEE) / dT)); + fprintf (stderr, "%19s:%6s", "evtsTagError/s", + unit ((*my->evtsTagError - lastTE) / dT)); + fputc ('\n', stderr); + } + lastEC = *my->evtsComplete; lastEE = *my->evtsDataError; lastTE = *my->evtsTagError; @@ -693,8 +873,7 @@ storeInfoStart (const char *n, time_t t, TheArgs * my) desParam (p); } -static void -storeInfoStop (const char *n, time_t t, Worker * w, TheArgs * my) +static void storeInfoStop (const char *n, time_t t, Worker * w, TheArgs * my) { Param pS, *p = &pS; int i; @@ -712,8 +891,7 @@ storeInfoStop (const char *n, time_t t, Worker * w, TheArgs * my) desParam (p); } -int -is_mismatch_enough_to_stop(TheArgs * theArgs, TheStats * my) { +int is_mismatch_enough_to_stop(TheArgs * theArgs, TheStats * my) { /*unsigned long filesize_enough = (theArgs->maxFileSz) * 0.02;*/ unsigned long filesize_enough = (theArgs->maxFileSz) * 0.1; /*unsigned long discarded_enough = (*my->evtsComplete) * 0.03;*/ @@ -893,12 +1071,9 @@ remove_file (char *path) return recover_size; } -static int -openFile (TheArgs * theArgs) +static int openFile (TheArgs * theArgs) { -#if DEBUG - printf ("openFile\n"); -#endif + char fileName[_POSIX_PATH_MAX]; static char outPath[_POSIX_PATH_MAX]; static char sec_path[_POSIX_PATH_MAX]; @@ -910,18 +1085,16 @@ openFile (TheArgs * theArgs) theArgs->runNr = runNr; - if (once) - { - file_size = theArgs->maxFileSz; - strcpy (sec_path, theArgs->sec_path); - strcpy (outPath, theArgs->outPath); - once = 0; - } - else - { - strcpy (theArgs->outPath, outPath); - strcpy (theArgs->sec_path, sec_path); - } + if (once) { + file_size = theArgs->maxFileSz; + strcpy (sec_path, theArgs->sec_path); + strcpy (outPath, theArgs->outPath); + once = 0; + } + else { + strcpy (theArgs->outPath, outPath); + strcpy (theArgs->sec_path, sec_path); + } /* construct a default filename */ strcpy (fileName, theArgs->expId); @@ -931,172 +1104,139 @@ openFile (TheArgs * theArgs) outTape = NULL; outFile = NULL; outSecondFile = NULL; - if (strcmp (theArgs->outDev, "null") == 0) - { - outFile = NULL; + if (strcmp (theArgs->outDev, "null") == 0) { + outFile = NULL; + } + else if (strcmp (theArgs->outDev, "stdout") == 0) { + outFile = stdout; + } + else if (strcmp (theArgs->outDev, "file") == 0) { + if (strcmp (theArgs->outPath, "") == 0) { + strcpy (theArgs->outPath, fileName); } - else if (strcmp (theArgs->outDev, "stdout") == 0) - { - outFile = stdout; + else { + struct stat bufS, *buf = &bufS; + + stat (theArgs->outPath, buf); + if (S_ISDIR (buf->st_mode)) { + strcat (theArgs->outPath, "/"); + strcat (theArgs->outPath, fileName); + } } - else if (strcmp (theArgs->outDev, "file") == 0) - { - if (strcmp (theArgs->outPath, "") == 0) - { - strcpy (theArgs->outPath, fileName); - } - else - { - struct stat bufS, *buf = &bufS; - - stat (theArgs->outPath, buf); - if (S_ISDIR (buf->st_mode)) - { - strcat (theArgs->outPath, "/"); - strcat (theArgs->outPath, fileName); - } - } - if (NULL == (outFile = fopen (theArgs->outPath, "wb"))) - { - syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, - strerror (errno)); - outFile = NULL; - return -1; - } - else - { - struct statfs bufS, *buf = &bufS; - statfs (theArgs->outPath, buf); - if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) - { - errno = ENOSPC; - if (0 != fclose (outFile)) - { -#if DEBUG2 - printf ("fclose 'File' failed\n"); -#endif - } - outFile = NULL; - unlink (theArgs->outPath); - syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, - strerror (errno)); - return -1; - } - } + if (NULL == (outFile = fopen (theArgs->outPath, "wb"))) { + syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, + strerror (errno)); + outFile = NULL; + return -1; } - else if (strcmp (theArgs->outDev, "tape") == 0) - { - if (strcmp (theArgs->outPath, "") == 0) - { - strcpy (theArgs->outPath, fileName); - } - if (NULL == (outTape = openAnsiTape (theArgs->outPath, "/dev/tape"))) - { - syslog (LOG_ERR, "opening tape %s: %s", theArgs->outPath, - strerror (errno)); - outFile = NULL; - return -1; - } + else { + struct statfs bufS, *buf = &bufS; + statfs (theArgs->outPath, buf); + if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) { + errno = ENOSPC; +/* if (0 != fclose (outFile)) { */ +/* fprintf (stderr, " evtbuild.c, openFile: fclose 'File' failed\n"); */ +/* } */ + outFile = NULL; + unlink (theArgs->outPath); + syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, + strerror (errno)); + return -1; + } } - else - { - syslog (LOG_ERR, "unknown outputDev \"%s\"", theArgs->outDev); + } + else if (strcmp (theArgs->outDev, "tape") == 0) { + if (strcmp (theArgs->outPath, "") == 0) { + strcpy (theArgs->outPath, fileName); + } + if (NULL == (outTape = openAnsiTape (theArgs->outPath, "/dev/tape"))) { + syslog (LOG_ERR, "opening tape %s: %s", theArgs->outPath, + strerror (errno)); + outFile = NULL; return -1; } - if (theArgs->write_data) - { - dirSize = get_directory_size (sec_path); + } + else { + syslog (LOG_ERR, "unknown outputDev \"%s\"", theArgs->outDev); + return -1; + } + if (theArgs->write_data) { + dirSize = get_directory_size (sec_path); #if 0 printf("dirSize: %f, secsizelimit: %f\n",dirSize,theArgs->secsizelimit); #endif - while (dirSize > theArgs->secsizelimit) - { - /*delete files if limit was reached */ + while (dirSize > theArgs->secsizelimit) { + /*delete files if limit was reached */ #if 0 - printf ("limit overflow: secsizelimit: %f, dirSize: %f\n", - theArgs->secsizelimit, dirSize); - getchar(); + printf ("limit overflow: secsizelimit: %f, dirSize: %f\n", + theArgs->secsizelimit, dirSize); + getchar(); #endif - remove_file (sec_path); - dirSize = get_directory_size (sec_path); - } - if (strcmp (theArgs->sec_path, "") == 0) - { - strcpy (theArgs->sec_path, fileName); - } - else - { - struct stat bufS, *buf = &bufS; - - stat (theArgs->sec_path, buf); - if (S_ISDIR (buf->st_mode)) - { - strcat (theArgs->sec_path, "/"); - strcat (theArgs->sec_path, fileName); - } + remove_file (sec_path); + dirSize = get_directory_size (sec_path); + } + if (strcmp (theArgs->sec_path, "") == 0) { + strcpy (theArgs->sec_path, fileName); + } + else { + struct stat bufS, *buf = &bufS; + + stat (theArgs->sec_path, buf); + if (S_ISDIR (buf->st_mode)) { + strcat (theArgs->sec_path, "/"); + strcat (theArgs->sec_path, fileName); } - if (NULL == (outSecondFile = fopen (theArgs->sec_path, "wb"))) - { - syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path, - strerror (errno)); + } + if (NULL == (outSecondFile = fopen (theArgs->sec_path, "wb"))) { + syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path, + strerror (errno)); + outSecondFile = NULL; + return -1; + } + else { + struct statfs bufS, *buf = &bufS; + statfs (theArgs->sec_path, buf); + if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) { + errno = ENOSPC; + if (0 != fclose (outSecondFile)) { + fprintf (stderr, " evtbuild.c, openFile: fclose 'outSecondFile' failed\n"); + } outSecondFile = NULL; + unlink (theArgs->sec_path); + syslog (LOG_ERR, "opening file %s: %s", + theArgs->sec_path, strerror (errno)); return -1; } - else - { - struct statfs bufS, *buf = &bufS; - statfs (theArgs->sec_path, buf); - if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) - { - errno = ENOSPC; - if (0 != fclose (outSecondFile)) - { -#if DEBUG2 - printf ("fclose 'outSecondFile' failed\n"); -#endif - } - outSecondFile = NULL; - unlink (theArgs->sec_path); - syslog (LOG_ERR, "opening file %s: %s", - theArgs->sec_path, strerror (errno)); - return -1; - } - } - } + } + } return 0; } -static int -openRESFile (TheArgs * theArgs) +static int openRESFile (TheArgs * theArgs) { -#if 0 - printf ("openRESFile\n"); - getchar (); -#endif char fileName[_POSIX_PATH_MAX]; static res_once = 1; static char respath[_POSIX_PATH_MAX]; res_seqNr = 0; - if (res_once) - { - res_once = 0; - strcpy (respath, theArgs->respath); - } - else - { - strcpy (theArgs->respath, respath); - } - if (theArgs->ressizelimit != 0) - { + if (res_once) { + res_once = 0; + strcpy (respath, theArgs->respath); + } + else { + strcpy (theArgs->respath, respath); + } + + if (theArgs->ressizelimit != 0) { res_dirNr = get_file_number_in_dir (respath); #if 0 printf("res_dirNr: %d, theArgs->ressizelimit: %d\n,", res_dirNr,theArgs->ressizelimit); #endif - while (res_dirNr > theArgs->ressizelimit) - { + + while (res_dirNr > theArgs->ressizelimit) { + /*delete files if limit was reached */ #if 0 printf ("limit overflow: res_sizelimit: %ul, res_dirSize: %ul\n", @@ -1117,30 +1257,28 @@ openRESFile (TheArgs * theArgs) filecounter++; } char app[8]; -#if 0 -// strcpy (theArgs->respath, respath); -// strcat (theArgs->respath,fileName); -#endif + sprintf (app, "%d", filecounter); strcat (fileName,app); strcat (fileName,".hld"); - if (strcmp (theArgs->respath, "") == 0) - { + + if (strcmp (theArgs->respath, "") == 0) { strcpy (theArgs->respath, fileName); } - else - { + else { struct stat bufS, *buf = &bufS; stat (theArgs->respath, buf); - if (S_ISDIR (buf->st_mode)) - { + + if (S_ISDIR (buf->st_mode)) { strcat (theArgs->respath, "/"); strcat (theArgs->respath, fileName); } } + /* construct a default filename */ outRESFile = NULL; + #if 0 /*if files are created more often than 1 sec, postfix is added to their name */ struct stat bufS, *buf = &bufS; @@ -1154,8 +1292,7 @@ openRESFile (TheArgs * theArgs) filecounter++; } #endif - if (NULL == (outRESFile = fopen (theArgs->respath, "wb"))) - { + if (NULL == (outRESFile = fopen (theArgs->respath, "wb"))) { syslog (LOG_ERR, "opening file %s: %s", theArgs->respath, strerror (errno)); outRESFile = NULL; @@ -1168,12 +1305,8 @@ openRESFile (TheArgs * theArgs) if ((theArgs->maxFileSz) / buf->f_bsize > buf->f_bavail) { errno = ENOSPC; - if (0 != fclose (outRESFile)) - { -#if DEBUG2 - - printf ("fclose 'outRESFile' failed\n"); -#endif + if (0 != fclose (outRESFile)) { + fprintf (stderr, " evtbuild.c, openRESFile: fclose 'outRESFile' failed\n"); } outRESFile = NULL; unlink (theArgs->respath); @@ -1182,139 +1315,223 @@ openRESFile (TheArgs * theArgs) return -1; } } -#if 0 - printf ("theArgs->respath %s\n", theArgs->respath); -#endif + return 0; } -static int -writeFile (void *evt) + +static int writeFile (void *evt) { -#if DEBUG - printf ("writeFile\n"); -#endif + int writeFileR; Evt_setSeqNr (evt, seqNr++); Evt_setRunNr (evt, runNr); - if (outFile != NULL) - { + if (outFile != NULL) { writeFileR = fwrite (evt, 1, Evt_paddedSize (evt), outFile); - } - else if (outTape != NULL) - { - writeFileR = writeAnsiTape (outTape, evt, Evt_paddedSize (evt)); - } + } + else if (outTape != NULL) { + writeFileR = writeAnsiTape (outTape, evt, Evt_paddedSize (evt)); + } +#ifdef RFIO + else if( fRemote != NULL ) { + int rfioStatus = 0; + rfioStatus = rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote ); + if (rfioStatus <= 0) + fprintf ( stderr, " evtbuild.c, rfio_fwrite: writing file failed\n"); + } +#endif + /* writing file in the second dir */ - if (outSecondFile != NULL) - { - fwrite (evt, 1, Evt_paddedSize (evt), outSecondFile); - } + if (outSecondFile != NULL) { + fwrite (evt, 1, Evt_paddedSize (evt), outSecondFile); + } + return writeFileR; } static int writeRESFile (void *evt) { -#if DEBUG - printf ("writeRESFile\n"); -#endif int writeFileR; Evt_setSeqNr (evt, res_seqNr++); Evt_setRunNr (evt, runNr); - if (outRESFile != NULL) - { + if (outRESFile != NULL) { writeFileR = fwrite (evt, 1, Evt_paddedSize (evt), outRESFile); - } + } Evt_setSeqNr (evt, seqNr); -#if DEBUG - printf ("END writeRESFile\n"); -#endif + return writeFileR; } -static int -closeFile () +static int closeFile () { -#if DEBUG - printf ("closeFile\n"); -#endif int closeFileR; + /* closing file in the second dir */ - if (outSecondFile != NULL) - { + if (outSecondFile != NULL) { dirSize += file_size; - if (0 != fclose (outSecondFile)) - { -#if DEBUG2 - printf ("fclose 'outSecondFile2' failed\n"); -#endif - } + if (0 != fclose (outSecondFile)) { + fprintf (stderr, " evtbuild.c, closeFile: fclose 'outSecondFile2' failed\n"); + } } - else - { -#if DEBUG2 - printf ("closeFile error outSecondFile == NULL\n"); -#endif + if (outFile != NULL) { + closeFileR = fclose (outFile); + if (0 != closeFileR) { + printf (" evtbuild.c, closeFile: fclose 'closeFileR' failed\n"); + } } + else if (outTape != NULL) { + closeFileR = closeAnsiTape (outTape); + } - if (outFile == NULL) - { -#if DEBUG2 - printf ("closeFile error outFile == NULL\n"); -#endif + return closeFileR; +} - } - if (outFile != NULL) - { - closeFileR = fclose (outFile); - if (0 != closeFileR) - { -#if DEBUG2 - printf ("fclose 'closeFileR' failed\n"); -#endif +static int closeRESFile (TheArgs * theArgs) +{ + int closeFileR; + if (outRESFile != NULL) { + res_dirSize += res_file_size; + closeFileR = fclose (outRESFile); + if (0 != closeFileR) { + fprintf ( stderr, " evtbuild.c, closeRESFile: fclose 'outRESFile2' failed!\n"); } } - else if (outTape != NULL) + else{ + fprintf ( stderr," evtbuild.c, closeRESFile: closeFile error outRESFile == NULL\n"); + } + + return closeFileR; +} + +#ifdef RFIO +static int rfio_openConnection (TheArgs * theArgs) +{ + if( theArgs->rfioFlag ){ + char rfioBase[128] = ""; + char *pcc; + strcpy( rfioBase, theArgs->rfioRemotePath ); + /* + * rfio_fopen requires a base name (something like rfiodaq:gstore:). + * Get pointer pcc to last ":" in the string rfioBase + * and cut away everything what follows. + */ + pcc = (char *) strrchr( rfioBase, ':'); + pcc++; + strncpy(pcc, "\0", 1); /* terminates after node name */ + + fprintf ( stderr, " evtbuild.c:rfio_openConnection(): try to open connection to server\n" ); + fRemote = rfio_fopen( rfioBase, "wb" ); + fprintf ( stderr, " evtbuild.c:rfio_openConnection(): opened connection to server\n" ); + + if (fRemote == NULL) + { + fprintf ( stderr, " evtbuild.c:rfio_openConnection(): cannot open connection to server\n" ); + syslog (LOG_ERR, " evtbuild.c:rfio_openConnection(): cannot open connection to server"); + + exit (EXIT_FAILURE); + } + } + + return 0; +} + +static int rfio_closeConnection (TheArgs * theArgs) +{ + if( theArgs->rfioFlag && fRemote != NULL ){ + int rfioStatus; + + rfioStatus = rfio_fclose( fRemote ); + + if (rfioStatus) { - closeFileR = closeAnsiTape (outTape); + fprintf ( stderr, " evtbuild.c:rfio_closeConnection(): closing connection to server failed\n" ); + syslog (LOG_ERR, " evtbuild.c:rfio_closeConnection(): closing connection to server failed"); + exit (EXIT_FAILURE); } - return closeFileR; + } + + return 0; } -static int -closeRESFile (TheArgs * theArgs) +static int rfio_openFile (TheArgs * theArgs) { -#if DEBUG - printf ("closeRESFile\n"); -#endif - int closeFileR; - if (outRESFile != NULL) + + if( theArgs->rfioFlag && fRemote != NULL){ + int rfioStatus; + char rfioFileName[_POSIX_PATH_MAX]; + char rfioPath[_POSIX_PATH_MAX]; + + strcpy( rfioPath, theArgs->rfioRemotePath ); + +/* Evt_setSeqNr (evt, seqNr++); */ +/* Evt_setRunNr (evt, runNr); */ + + strcpy (rfioFileName, theArgs->expId); + strftime (rfioFileName + strlen (rfioFileName), 18, "%y%j%H%M%S.hld", + localtime (&ourTime)); + strcat (rfioPath, "/"); + strcat (rfioPath, rfioFileName); + fprintf ( stderr, " evtbuild.c:rfio_openConnection(): try to open remote file %s\n", rfioPath ); + rfioStatus = rfio_fnewfile( fRemote, rfioPath ); + fprintf ( stderr, " evtbuild.c:rfio_openConnection(): opened remote file %s\n", rfioPath ); + + if (rfioStatus) { + fprintf ( stderr, " evtbuild.c:rfio_openFile(): cannot open remote file %s\n", rfioPath ); + syslog (LOG_ERR, " evtbuild.c:rfio_openFile(): cannot open remote file %s", rfioPath); + exit (EXIT_FAILURE); + } + } + + return 0; +} + +static int rfio_closeFile (TheArgs * theArgs) +{ + if( theArgs->rfioFlag && fRemote != NULL ){ + int rfioStatus; + + rfioStatus = rfio_fendfile( fRemote ); + + if (rfioStatus) { - res_dirSize += res_file_size; - closeFileR = fclose (outRESFile); - if (0 != closeFileR) - { -#if DEBUG2 - printf ("fclose 'outRESFile2' failed\n"); -#endif - } + fprintf ( stderr, " evtbuild.c:rfio_openFile(): cannot close remote file\n"); + syslog (LOG_ERR, " evtbuild.c:rfio_openFile(): cannot close remote file"); + exit (EXIT_FAILURE); + } - else + } + + return 0; +} + +static int rfio_writeFile (TheArgs * theArgs, void *evt) +{ + if( theArgs->rfioFlag && fRemote != NULL ){ + int rfioStatus = 0; + + Evt_setSeqNr( evt, seqNr++ ); + Evt_setRunNr( evt, runNr ); + + rfioStatus = rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote ); + + if (rfioStatus <= 0) { -#if DEBUG2 - printf ("closeFile error outRESFile == NULL\n"); -#endif + fprintf ( stderr, " evtbuild.c:rfio_writeFile(): writing file failed\n"); + return 1; } - return closeFileR; + } + + return 0; } +#endif /* ifdef RFIO */ + /* BUGBUG bailOut not proper yet */ -int -main (int argc, char *argv[]) +int main (int argc, char *argv[]) { int i; TheArgs theArgsS, *theArgs = &theArgsS; @@ -1332,7 +1549,6 @@ main (int argc, char *argv[]) signal(SIGTERM, sigHandler); signal(SIGHUP, sigHandler); - openlog (argv[0], LOG_PID | LOG_PERROR, LOG_LOCAL0); setlogmask (LOG_UPTO (LOG_INFO)); @@ -1426,6 +1642,10 @@ main (int argc, char *argv[]) } else{ shmTrans[i] = ShmTrans_create (buf, 2 * (long)theArgs->varQSize[i]); + + if( shmTrans[i] == NULL ) { + fprintf ( stderr, " ShmTrans_create: could not create buffer /dev/shm/%s.shm\n", buf ); + } } hadTuQueue[i] = NULL; @@ -1460,6 +1680,10 @@ main (int argc, char *argv[]) Worker_initEnd (worker); +#ifdef RFIO + rfio_openConnection( theArgs ); +#endif + currId = 0; while (setjmp (terminateJmp) == 0) { @@ -1470,6 +1694,7 @@ main (int argc, char *argv[]) int dataError = 0; int tagError = 0; + statsBufferDump( theArgs, theStats, 1, hadTuQueue, shmTrans ); statsDump (theArgs, theStats, 1); if (*theStats->bytesWritten == 0) @@ -1480,6 +1705,11 @@ main (int argc, char *argv[]) syslog (LOG_ERR, "error opening output file, exiting"); exit (EXIT_FAILURE); } + +#ifdef RFIO + rfio_openFile( theArgs ); +#endif + storeInfoStart (argv[0], ourTime, theArgs); /* store simple start run info */ @@ -1655,6 +1885,9 @@ main (int argc, char *argv[]) ourTime = time (NULL); closeFile (); +#ifdef RFIO + rfio_closeFile( theArgs ); +#endif storeInfoStop (argv[0], ourTime - 2, worker, theArgs); /* store simple stop run info */ @@ -1692,6 +1925,10 @@ main (int argc, char *argv[]) } ourTime = time (NULL); closeFile (); +#ifdef RFIO + rfio_closeFile( theArgs ); + rfio_closeConnection( theArgs ); +#endif if (theArgs->resdownscale) { closeRESFile (theArgs);