From: hadaq Date: Tue, 6 Jul 2010 15:36:37 +0000 (+0000) Subject: Write to multiple disks support added. Sergey. X-Git-Url: https://jspc29.x-matter.uni-frankfurt.de/git/?a=commitdiff_plain;h=dd65f0a72d57563241aefb1d2a90ea62508a49cf;p=daqdata.git Write to multiple disks support added. Sergey. --- diff --git a/hadaq/evtbuild.c b/hadaq/evtbuild.c index baad418..c794ae8 100644 --- a/hadaq/evtbuild.c +++ b/hadaq/evtbuild.c @@ -1,4 +1,4 @@ -static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.115 2010-06-18 11:19:30 hadaq Exp $"; +static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.116 2010-07-06 15:36:37 hadaq Exp $"; #define _POSIX_C_SOURCE 199309L #define SYSLOG_NAMES @@ -136,6 +136,7 @@ typedef struct TheArgsS { char evtstat[PARAM_MAX_VALUE_LEN]; /* file name for statistics of discarded event */ char fileName[PARAM_MAX_VALUE_LEN]; + unsigned short multiDisks; /* Write files to diferent disks */ } TheArgs; typedef struct TheStatsS { @@ -153,6 +154,9 @@ typedef struct TheStatsS { unsigned long *evtbuildBuff[MAXINPATH]; unsigned long *nrOfMsgs; unsigned long *runId; + unsigned long *dataMover; + unsigned long *diskNr; + unsigned long *diskNrEB; } TheStats; typedef struct TheDebugS { @@ -230,7 +234,7 @@ static void usage(const char *progName) syslog(LOG_ERR, " [-Q queueNr:queueSize] Set different queue sizes for different queue numbers,"); syslog(LOG_ERR, " example: -Q 2:1000000 -Q 4:4000000 -Q 5:12000000"); syslog(LOG_ERR, " [-r runNumber]"); - syslog(LOG_ERR, " [--shmname shmem_name] Extension of shared memory segment name."); + syslog(LOG_ERR, " [-S|--shmname shmem_name] Extension of shared memory segment name."); syslog(LOG_ERR, " [-a (agent)] [-p priority]"); syslog(LOG_ERR, " [-I trigType] Trigger type can be set by Event Builder."); syslog(LOG_ERR, " [-T evtId:wordNr:bitMask] EB will read trigger type from the first data stream"); @@ -472,6 +476,8 @@ static void argsDefault(TheArgs *my) sprintf(my->runinfo2ora, "%s_runinfo2ora.txt", getenv("DAQ_SETUP")); sprintf(my->evtstat, "%s_evtstat.txt", getenv("DAQ_SETUP")); + my->multiDisks = 0; /* Write files only on one disk */ + /* Debugging args */ my->debug_trignr = 0; my->debug_errbit = 0; @@ -517,9 +523,10 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[]) {"ignore", 0, 0, 'i'}, {"debug", 1, 0, 'D'}, {"trigtype", 1, 0, 'T'}, + {"multidisks", 0, 0, 'k'}, {0, 0, 0, 0} }; - i = getopt_long(argc, argv, "am:f:r:o:d:q:Q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:A:bEL:S:B:O:iW:M:F:X:C:D:T:", long_options, + i = getopt_long(argc, argv, "am:f:r:o:d:q:Q:p:v:x:I:tz:e:n:h:w:Hs:l:R:A:bEL:S:B:O:iW:M:F:X:C:D:T:k", long_options, &option_index); if (i == -1) break; @@ -568,6 +575,9 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[]) case 't': /* online: 0=off, 1=on */ my->online = 1; break; + case 'k': + my->multiDisks = 1; + break; case 'z': /* from MBytes to Bytes */ my->maxFileSz = (1024 * 1024UL * strtoul(optarg, NULL, 0)); break; @@ -1056,6 +1066,10 @@ static void statsDump(TheArgs *theArgs, TheStats *my, int interval) t = time(NULL); dT = t - t0; if (dT >= interval) { + + if (theArgs->multiDisks) + printf("Write data to disk nr %d\n", *my->diskNrEB); + int col = 0; fputs @@ -1383,7 +1397,113 @@ static int remove_file(char *path) return recover_size; } -static int openFile(TheArgs *theArgs) +char *strReplace(char const *const original, char const *const pattern, char const *const replacement) +{ + size_t const replen = strlen(replacement); + size_t const patlen = strlen(pattern); + size_t const orilen = strlen(original); + + size_t patcnt = 0; + const char *oriptr; + const char *patloc; + + /* find how many times the pattern occurs in the original string */ + for (oriptr = original; patloc = strstr(oriptr, pattern); oriptr = patloc + patlen) { + patcnt++; + } + + /* allocate memory for the new string */ + size_t const retlen = orilen + patcnt * (replen - patlen); + char *const returned = (char *) malloc(sizeof(char) * (retlen + 1)); + + if (returned != NULL) { + /* copy the original string, replacing all the instances of the pattern */ + char *retptr = returned; + for (oriptr = original; patloc = strstr(oriptr, pattern); oriptr = patloc + patlen) { + size_t const skplen = patloc - oriptr; + + /* copy the section until the occurence of the pattern */ + strncpy(retptr, oriptr, skplen); + retptr += skplen; + + /* copy the replacement */ + strncpy(retptr, replacement, replen); + retptr += replen; + } + + /* copy the rest of the string */ + strcpy(retptr, oriptr); + } + + return returned; +} + +static void changeData(int once, TheArgs *theArgs, TheStats *theStats) +{ + if (theArgs->multiDisks) { + + char ebOutPath[_POSIX_PATH_MAX]; + char newOutPath[_POSIX_PATH_MAX]; + char sec_path[_POSIX_PATH_MAX]; + + sprintf(ebOutPath, "/data%02d", theArgs->ebnum); + + if (once) { + /* + * At the beginning for the first file there might be no diskNr + * in shared memory yet. Thus we take diskNr equal to EB number. + */ + strcpy(theArgs->outPath, strReplace(theArgs->outPath, "/data01", ebOutPath)); + strcpy(theArgs->sec_path, strReplace(theArgs->sec_path, "/data01", ebOutPath)); + + (*theStats->diskNrEB) = theArgs->ebnum; + } else { + /* + * For the next files we should be able to get diskNr from shared memory, + * if there is a diskNr provided. + */ + + unsigned long diskNr; + char buf[_POSIX_PATH_MAX]; + sprintf(buf, "daq_evtbuild%s", theArgs->shmname); + + if (Worker_getStatistic(buf, "diskNum", &diskNr) == -1) { + diskNr = 0; + syslog(LOG_ERR, "Worker_getStatistic failed for diskNum!"); + } + + char *tmpstr; + + if (diskNr == 0) { + (*theStats->diskNrEB) = theArgs->ebnum; + + tmpstr = strReplace(theArgs->outPath, "/data01", ebOutPath); + strcpy(theArgs->outPath, tmpstr); + free(tmpstr); + + syslog(LOG_ERR, "Write data to %s!", theArgs->outPath); + + tmpstr = strReplace(theArgs->sec_path, "/data01", ebOutPath); + strcpy(theArgs->sec_path, tmpstr); + free(tmpstr); + } else { + (*theStats->diskNrEB) = diskNr; + + sprintf(newOutPath, "/data%02d", diskNr); + + tmpstr = strReplace(theArgs->outPath, ebOutPath, newOutPath); + strcpy(theArgs->outPath, tmpstr); + free(tmpstr); + + tmpstr = strReplace(theArgs->sec_path, ebOutPath, newOutPath); + strcpy(theArgs->sec_path, tmpstr); + free(tmpstr); + } + } + } +} + +static int openFile(TheArgs *theArgs, TheStats *theStats) { char fileName[_POSIX_PATH_MAX]; @@ -1407,6 +1527,7 @@ static int openFile(TheArgs *theArgs) theArgs->runNr = runNr; if (once) { + changeData(once, theArgs, theStats); file_size = theArgs->maxFileSz; strcpy(sec_path, theArgs->sec_path); strcpy(outPath, theArgs->outPath); @@ -1416,6 +1537,7 @@ static int openFile(TheArgs *theArgs) strcpy(theArgs->outPath, outPath); strcpy(theArgs->lustrePath, outLustrePath); strcpy(theArgs->sec_path, sec_path); + changeData(once, theArgs, theStats); } /* construct a default filename */ @@ -1439,15 +1561,15 @@ static int openFile(TheArgs *theArgs) sprintf(fileName, "%s.hld", fileName); /* Copy file name to theArgs */ -/* if( strcmp(theArgs->outDev, "file") == 0 || */ -/* #ifdef RFIO */ -/* fRemote != NULL || */ -/* #endif */ -/* strcmp(theArgs->lustrePath, "") != 0 ){ */ + /* if( strcmp(theArgs->outDev, "file") == 0 || */ + /* #ifdef RFIO */ + /* fRemote != NULL || */ + /* #endif */ + /* strcmp(theArgs->lustrePath, "") != 0 ){ */ strcpy(theArgs->fileName, fileName); -/* } */ + /* } */ outTape = NULL; @@ -1777,7 +1899,7 @@ static int closeRESFile(TheArgs *theArgs) } #ifdef RFIO -static int rfio_openConnection(TheArgs *theArgs) +static int rfio_openConnection(TheArgs *theArgs, TheStats *theStats) { if ((strcmp(theArgs->rfioRemotePath, "") != 0)) { @@ -1796,17 +1918,22 @@ static int rfio_openConnection(TheArgs *theArgs) fprintf(stderr, " evtbuild.c, rfio_fopen(): trying to open connection to Data Mover: %s\n", rfioBase); /* fRemote = rfio_fopen( rfioBase, "wb" ); */ - fRemote = rfio_fopen_gsidaq(rfioBase, - theArgs->rfio_pcOption, - theArgs->rfio_iCopyMode, - theArgs->rfioLustrePath, - theArgs->rfio_iCopyFrac, theArgs->rfio_iMaxFile, theArgs->rfio_iPathConv); + char pcDataMover[16]; /* name of the Data Mover */ + int piDataMover; /* number of the Data Mover */ + + fRemote = rfio_fopen_gsidaq_dm(rfioBase, theArgs->rfio_pcOption, + pcDataMover, &piDataMover, + theArgs->rfio_iCopyMode, theArgs->rfioLustrePath, + theArgs->rfio_iCopyFrac, theArgs->rfio_iMaxFile, theArgs->rfio_iPathConv); if (fRemote == NULL) { syslog(LOG_ERR, "%s, %d: trying to open connection to Data Mover %s: %s", __FILE__, __LINE__, rfioBase, strerror(errno)); exit(EXIT_FAILURE); } + + /* Save to stats */ + (*theStats->dataMover) = (unsigned long) piDataMover; } return 0; @@ -1944,8 +2071,22 @@ int main(int argc, char *argv[]) theStats->runId = Worker_addStatistic(worker, "runId"); (*theStats->runId) = 0; /* initialize to zero */ + /* Disk number provided by daq_disks */ + theStats->diskNr = Worker_addStatistic(worker, "diskNum"); + + /* + * Disk number where EB writes the data. + * If daq_disks is dead, EB will use its own + * number to pick up the data disk. This number + * we store in diskNumEB for monitoring and debugging. + */ + theStats->diskNrEB = Worker_addStatistic(worker, "diskNumEB"); + + theStats->dataMover = Worker_addStatistic(worker, "dataMover"); + unsigned long *retVal = NULL; theStats->evtsRes = (unsigned long *) &retVal; + theStats->diskNr = (unsigned long *) &retVal; for (i = 0; i < NEVTIDS; i++) { char buf[WORKER_MAX_NAME_LEN]; @@ -1996,7 +2137,7 @@ int main(int argc, char *argv[]) Worker_initEnd(worker); #ifdef RFIO - rfio_openConnection(theArgs); + rfio_openConnection(theArgs, theStats); #endif uint32_t trigType; @@ -2033,7 +2174,7 @@ int main(int argc, char *argv[]) printf("ioc time: %s\n", s); printf("ioc runid: %d\n", runNr); - if (-1 == openFile(theArgs)) { + if (-1 == openFile(theArgs, theStats)) { syslog(LOG_ERR, "error opening output file, exiting"); exit(EXIT_FAILURE); }