]> jspc29.x-matter.uni-frankfurt.de Git - daqdata.git/commitdiff
Write to multiple disks support added. Sergey.
authorhadaq <hadaq>
Tue, 6 Jul 2010 15:36:37 +0000 (15:36 +0000)
committerhadaq <hadaq>
Tue, 6 Jul 2010 15:36:37 +0000 (15:36 +0000)
hadaq/evtbuild.c

index baad418d6833c664984516ca5b0c5cf1f32932b3..c794ae8f0c8242dac68321a99bd90fc9c8a9163f 100644 (file)
@@ -1,4 +1,4 @@
-static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.115 2010-06-18 11:19:30 hadaq Exp $";
+static char *rcsId = "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.116 2010-07-06 15:36:37 hadaq Exp $";
 
 #define _POSIX_C_SOURCE 199309L
 #define SYSLOG_NAMES
@@ -136,6 +136,7 @@ typedef struct TheArgsS {
        char evtstat[PARAM_MAX_VALUE_LEN];      /* file name for statistics of discarded event */
 
        char fileName[PARAM_MAX_VALUE_LEN];
+       unsigned short multiDisks;      /*  Write files to diferent disks */
 } TheArgs;
 
 typedef struct TheStatsS {
@@ -153,6 +154,9 @@ typedef struct TheStatsS {
        unsigned long *evtbuildBuff[MAXINPATH];
        unsigned long *nrOfMsgs;
        unsigned long *runId;
+       unsigned long *dataMover;
+       unsigned long *diskNr;
+       unsigned long *diskNrEB;
 } TheStats;
 
 typedef struct TheDebugS {
@@ -230,7 +234,7 @@ static void usage(const char *progName)
        syslog(LOG_ERR, "       [-Q queueNr:queueSize]       Set different queue sizes for different queue numbers,");
        syslog(LOG_ERR, "                                    example: -Q 2:1000000 -Q 4:4000000 -Q 5:12000000");
        syslog(LOG_ERR, "       [-r runNumber]");
-       syslog(LOG_ERR, "       [--shmname shmem_name]       Extension of shared memory segment name.");
+       syslog(LOG_ERR, "       [-S|--shmname shmem_name]    Extension of shared memory segment name.");
        syslog(LOG_ERR, "       [-a (agent)] [-p priority]");
        syslog(LOG_ERR, "       [-I trigType]                Trigger type can be set by Event Builder.");
        syslog(LOG_ERR, "       [-T evtId:wordNr:bitMask]    EB will read trigger type from the first data stream");
@@ -472,6 +476,8 @@ static void argsDefault(TheArgs *my)
        sprintf(my->runinfo2ora, "%s_runinfo2ora.txt", getenv("DAQ_SETUP"));
        sprintf(my->evtstat, "%s_evtstat.txt", getenv("DAQ_SETUP"));
 
+       my->multiDisks = 0;                     /* Write files only on one disk */
+
        /* Debugging args */
        my->debug_trignr = 0;
        my->debug_errbit = 0;
@@ -517,9 +523,10 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[])
                        {"ignore", 0, 0, 'i'},
                        {"debug", 1, 0, 'D'},
                        {"trigtype", 1, 0, 'T'},
+                       {"multidisks", 0, 0, 'k'},
                        {0, 0, 0, 0}
                };
-               i = getopt_long(argc, argv, "am:f:r:o:d:q:Q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:A:bEL:S:B:O:iW:M:F:X:C:D:T:", long_options,
+               i = getopt_long(argc, argv, "am:f:r:o:d:q:Q:p:v:x:I:tz:e:n:h:w:Hs:l:R:A:bEL:S:B:O:iW:M:F:X:C:D:T:k", long_options,
                                                &option_index);
                if (i == -1)
                        break;
@@ -568,6 +575,9 @@ static int argsFromCL(TheArgs *my, int argc, char *argv[])
                case 't':                               /* online: 0=off, 1=on */
                        my->online = 1;
                        break;
+               case 'k':
+                       my->multiDisks = 1;
+                       break;
                case 'z':                               /* from MBytes to Bytes */
                        my->maxFileSz = (1024 * 1024UL * strtoul(optarg, NULL, 0));
                        break;
@@ -1056,6 +1066,10 @@ static void statsDump(TheArgs *theArgs, TheStats *my, int interval)
                t = time(NULL);
                dT = t - t0;
                if (dT >= interval) {
+
+                       if (theArgs->multiDisks)
+                               printf("Write data to disk nr %d\n", *my->diskNrEB);
+
                        int col = 0;
 
                        fputs
@@ -1383,7 +1397,113 @@ static int remove_file(char *path)
        return recover_size;
 }
 
-static int openFile(TheArgs *theArgs)
+char *strReplace(char const *const original, char const *const pattern, char const *const replacement)
+{
+       size_t const replen = strlen(replacement);
+       size_t const patlen = strlen(pattern);
+       size_t const orilen = strlen(original);
+
+       size_t patcnt = 0;
+       const char *oriptr;
+       const char *patloc;
+
+       /* find how many times the pattern occurs in the original string */
+       for (oriptr = original; patloc = strstr(oriptr, pattern); oriptr = patloc + patlen) {
+               patcnt++;
+       }
+
+       /* allocate memory for the new string */
+       size_t const retlen = orilen + patcnt * (replen - patlen);
+       char *const returned = (char *) malloc(sizeof(char) * (retlen + 1));
+
+       if (returned != NULL) {
+               /* copy the original string, replacing all the instances of the pattern */
+               char *retptr = returned;
+               for (oriptr = original; patloc = strstr(oriptr, pattern); oriptr = patloc + patlen) {
+                       size_t const skplen = patloc - oriptr;
+
+                       /* copy the section until the occurence of the pattern */
+                       strncpy(retptr, oriptr, skplen);
+                       retptr += skplen;
+
+                       /* copy the replacement */
+                       strncpy(retptr, replacement, replen);
+                       retptr += replen;
+               }
+
+               /* copy the rest of the string */
+               strcpy(retptr, oriptr);
+       }
+
+       return returned;
+}
+
+static void changeData(int once, TheArgs *theArgs, TheStats *theStats)
+{
+       if (theArgs->multiDisks) {
+
+               char ebOutPath[_POSIX_PATH_MAX];
+               char newOutPath[_POSIX_PATH_MAX];
+               char sec_path[_POSIX_PATH_MAX];
+
+               sprintf(ebOutPath, "/data%02d", theArgs->ebnum);
+
+               if (once) {
+                       /* 
+                        *  At the beginning for the first file there might be no diskNr
+                        *  in shared memory yet. Thus we take diskNr equal to EB number.
+                        */
+                       strcpy(theArgs->outPath, strReplace(theArgs->outPath, "/data01", ebOutPath));
+                       strcpy(theArgs->sec_path, strReplace(theArgs->sec_path, "/data01", ebOutPath));
+
+                       (*theStats->diskNrEB) = theArgs->ebnum;
+               } else {
+                       /* 
+                        *  For the next files we should be able to get diskNr from shared memory,
+                        *  if there is a diskNr provided.
+                        */
+
+                       unsigned long diskNr;
+                       char buf[_POSIX_PATH_MAX];
+                       sprintf(buf, "daq_evtbuild%s", theArgs->shmname);
+
+                       if (Worker_getStatistic(buf, "diskNum", &diskNr) == -1) {
+                               diskNr = 0;
+                               syslog(LOG_ERR, "Worker_getStatistic failed for diskNum!");
+                       }
+
+                       char *tmpstr;
+
+                       if (diskNr == 0) {
+                               (*theStats->diskNrEB) = theArgs->ebnum;
+
+                               tmpstr = strReplace(theArgs->outPath, "/data01", ebOutPath);
+                               strcpy(theArgs->outPath, tmpstr);
+                               free(tmpstr);
+
+                               syslog(LOG_ERR, "Write data to %s!", theArgs->outPath);
+
+                               tmpstr = strReplace(theArgs->sec_path, "/data01", ebOutPath);
+                               strcpy(theArgs->sec_path, tmpstr);
+                               free(tmpstr);
+                       } else {
+                               (*theStats->diskNrEB) = diskNr;
+
+                               sprintf(newOutPath, "/data%02d", diskNr);
+
+                               tmpstr = strReplace(theArgs->outPath, ebOutPath, newOutPath);
+                               strcpy(theArgs->outPath, tmpstr);
+                               free(tmpstr);
+
+                               tmpstr = strReplace(theArgs->sec_path, ebOutPath, newOutPath);
+                               strcpy(theArgs->sec_path, tmpstr);
+                               free(tmpstr);
+                       }
+               }
+       }
+}
+
+static int openFile(TheArgs *theArgs, TheStats *theStats)
 {
 
        char fileName[_POSIX_PATH_MAX];
@@ -1407,6 +1527,7 @@ static int openFile(TheArgs *theArgs)
        theArgs->runNr = runNr;
 
        if (once) {
+               changeData(once, theArgs, theStats);
                file_size = theArgs->maxFileSz;
                strcpy(sec_path, theArgs->sec_path);
                strcpy(outPath, theArgs->outPath);
@@ -1416,6 +1537,7 @@ static int openFile(TheArgs *theArgs)
                strcpy(theArgs->outPath, outPath);
                strcpy(theArgs->lustrePath, outLustrePath);
                strcpy(theArgs->sec_path, sec_path);
+               changeData(once, theArgs, theStats);
        }
 
        /* construct a default filename */
@@ -1439,15 +1561,15 @@ static int openFile(TheArgs *theArgs)
                sprintf(fileName, "%s.hld", fileName);
 
        /* Copy file name to theArgs */
-/*     if( strcmp(theArgs->outDev, "file") == 0 || */
-/* #ifdef RFIO */
-/*         fRemote != NULL || */
-/* #endif */
-/*         strcmp(theArgs->lustrePath, "") != 0 ){ */
+       /*  if( strcmp(theArgs->outDev, "file") == 0 || */
+       /* #ifdef RFIO */
+       /*      fRemote != NULL || */
+       /* #endif */
+       /*      strcmp(theArgs->lustrePath, "") != 0 ){ */
 
        strcpy(theArgs->fileName, fileName);
 
-/*     } */
+       /*  } */
 
 
        outTape = NULL;
@@ -1777,7 +1899,7 @@ static int closeRESFile(TheArgs *theArgs)
 }
 
 #ifdef RFIO
-static int rfio_openConnection(TheArgs *theArgs)
+static int rfio_openConnection(TheArgs *theArgs, TheStats *theStats)
 {
 
        if ((strcmp(theArgs->rfioRemotePath, "") != 0)) {
@@ -1796,17 +1918,22 @@ static int rfio_openConnection(TheArgs *theArgs)
                fprintf(stderr, "<I> evtbuild.c, rfio_fopen(): trying to open connection to Data Mover: %s\n", rfioBase);
                /* fRemote = rfio_fopen( rfioBase, "wb" ); */
 
-               fRemote = rfio_fopen_gsidaq(rfioBase,
-                                                                       theArgs->rfio_pcOption,
-                                                                       theArgs->rfio_iCopyMode,
-                                                                       theArgs->rfioLustrePath,
-                                                                       theArgs->rfio_iCopyFrac, theArgs->rfio_iMaxFile, theArgs->rfio_iPathConv);
+               char pcDataMover[16];   /* name of the Data Mover */
+               int piDataMover;                /* number of the Data Mover */
+
+               fRemote = rfio_fopen_gsidaq_dm(rfioBase, theArgs->rfio_pcOption,
+                                                                          pcDataMover, &piDataMover,
+                                                                          theArgs->rfio_iCopyMode, theArgs->rfioLustrePath,
+                                                                          theArgs->rfio_iCopyFrac, theArgs->rfio_iMaxFile, theArgs->rfio_iPathConv);
 
                if (fRemote == NULL) {
                        syslog(LOG_ERR, "%s, %d: trying to open connection to Data Mover %s: %s",
                                   __FILE__, __LINE__, rfioBase, strerror(errno));
                        exit(EXIT_FAILURE);
                }
+
+               /* Save to stats */
+               (*theStats->dataMover) = (unsigned long) piDataMover;
        }
 
        return 0;
@@ -1944,8 +2071,22 @@ int main(int argc, char *argv[])
        theStats->runId = Worker_addStatistic(worker, "runId");
        (*theStats->runId) = 0;         /* initialize to zero */
 
+       /* Disk number provided by daq_disks */
+       theStats->diskNr = Worker_addStatistic(worker, "diskNum");
+
+       /* 
+        *  Disk number where EB writes the data.
+        *  If daq_disks is dead, EB will use its own
+        *  number to pick up the data disk. This number
+        *  we store in diskNumEB for monitoring and debugging.
+        */
+       theStats->diskNrEB = Worker_addStatistic(worker, "diskNumEB");
+
+       theStats->dataMover = Worker_addStatistic(worker, "dataMover");
+
        unsigned long *retVal = NULL;
        theStats->evtsRes = (unsigned long *) &retVal;
+       theStats->diskNr = (unsigned long *) &retVal;
 
        for (i = 0; i < NEVTIDS; i++) {
                char buf[WORKER_MAX_NAME_LEN];
@@ -1996,7 +2137,7 @@ int main(int argc, char *argv[])
        Worker_initEnd(worker);
 
 #ifdef RFIO
-       rfio_openConnection(theArgs);
+       rfio_openConnection(theArgs, theStats);
 #endif
 
        uint32_t trigType;
@@ -2033,7 +2174,7 @@ int main(int argc, char *argv[])
                        printf("ioc time: %s\n", s);
                        printf("ioc runid: %d\n", runNr);
 
-                       if (-1 == openFile(theArgs)) {
+                       if (-1 == openFile(theArgs, theStats)) {
                                syslog(LOG_ERR, "error opening output file, exiting");
                                exit(EXIT_FAILURE);
                        }