]> jspc29.x-matter.uni-frankfurt.de Git - daqdata.git/commitdiff
New options: rfio to DM, lustre, synch via ioc (still to be tested), run Id from...
authorhadaq <hadaq>
Wed, 28 May 2008 14:54:22 +0000 (14:54 +0000)
committerhadaq <hadaq>
Wed, 28 May 2008 14:54:22 +0000 (14:54 +0000)
hadaq/evtbuild.c

index 35a04c324c5e0c3d61fcb1ad935b52dbba04f465..7093ecdef5946b1a6a7126441cf4bdf062d9ed97 100644 (file)
@@ -1,5 +1,5 @@
 static char *rcsId =
-  "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.82 2008-04-08 14:45:14 hadaq Exp $";
+  "$Header: /misc/hadesprojects/daq/cvsroot/eventbuilder/hadaq/evtbuild.c,v 6.83 2008-05-28 14:54:22 hadaq Exp $";
 
 
 #define _POSIX_C_SOURCE 199309L
@@ -49,13 +49,16 @@ static RFILE *fRemote = NULL;
 #define DEBUG2 0
 #define CHECK_MISMATCH 1
 #define BEAM 1
+#define TIMEOFFSET 1200000000   /* needed to reconstruct time from runId */
 static FILE *outFile;
+static FILE *outLustreFile;
 static FILE *outSecondFile;
 static FILE *outRESFile;
 static AnsiTape *outTape;
 static uint32_t seqNr;
 static uint32_t res_seqNr;
 static uint32_t runNr;
+static uint32_t newRunId;  /* needed to get new RUN id from epics ctrl */
 static time_t ourTime;
 static long file_size;
 static long res_file_size;
@@ -98,7 +101,9 @@ typedef struct TheArgsS
   
   unsigned int rfioFlag;
   char rfioRemotePath[PARAM_MAX_VALUE_LEN];
-  unsigned int buffStat;
+  char lustrePath[PARAM_MAX_VALUE_LEN];
+  unsigned short buffStat;
+  unsigned short epicsCtrl;
 } TheArgs;
 
 typedef struct TheStatsS
@@ -111,20 +116,21 @@ typedef struct TheStatsS
   unsigned long *evtId[NEVTIDS];
   unsigned long *trigNr[MAXINPATH];
   unsigned long *evtsRes;
+  unsigned long *evtbuildBuff[MAXINPATH];
+  unsigned long *nrOfMsgs;
+  unsigned long *runId;
+  unsigned long *evtsCompletePS;
+  unsigned long *bytesWrittenPS;
 } TheStats;
 
 static jmp_buf terminateJmp;
 
-
-
-void
-sigHandler (int sig)
+void sigHandler (int sig)
 {
   longjmp (terminateJmp, sig);
 }
 
-static void *
-appendFile (void *my, const char *path)
+static void *appendFile (void *my, const char *path)
 {
   void *subEvt;
   char *dataBuf;
@@ -167,43 +173,38 @@ appendFile (void *my, const char *path)
   return my;
 }
 
-static void
-usage (const char *progName)
+static void usage (const char *progName)
 {
   syslog (LOG_ERR, "Usage: %s [-x expId]", progName);
   syslog (LOG_ERR, "Usage: [-m nrOfMsgs] [-f slowCtrlFile ...]");
   syslog (LOG_ERR, "Usage: [-o outPath] [-d null|tape|file|stdout]");
   syslog (LOG_ERR, "Usage: [-q queueSize] [-r runNumber]");
   syslog (LOG_ERR, "Usage: [-a (agent)] [-p priority] [-I evtId]");
-  syslog (LOG_ERR,
-         "Usage: [-v debug|info|notice|warning|err|alert|crit|emerg]");
+  syslog (LOG_ERR, "Usage: [-v debug|info|notice|warning|err|alert|crit|emerg]");
   syslog (LOG_ERR, "Usage: [--norpc]");
   syslog (LOG_ERR, "Usage: [--filesize maximum_size_of_output_file[in MB]]");
-  syslog (LOG_ERR, "Usage: [--resdownscale resdownscale_of_events]");
-  syslog (LOG_ERR, "Usage: [--resnumevents number_of_events_in_one_resfile]");
-  syslog (LOG_ERR,
-         "Usage: [--respath path_where_the_downscaling_data_are_written]");
-  syslog (LOG_ERR, "Usage: [--secsizelimit max size of second dir[in MB]]");
-  syslog (LOG_ERR, "Usage: [--ressizelimit max number of files in res dir]");
-  syslog (LOG_ERR,
-         "Usage: [--write_data path_where_the_mirroring_data_are_ written]");
+  syslog (LOG_ERR, "Usage: [--resdownscale resdownscale_factor] downscale factor for the res events");
+  syslog (LOG_ERR, "Usage: [--resnumevents evt_num] maximum number of events in a resfile");
+  syslog (LOG_ERR, "Usage: [--respath path] path for the res directory");
+  syslog (LOG_ERR, "Usage: [--secsizelimit max_size] maximum size of second directory with the mirrored data [in MB]");
+  syslog (LOG_ERR, "Usage: [--ressizelimit max_file_num] maximum  number of files in res dir");
+  syslog (LOG_ERR, "Usage: [--write_data path] path to the directory with mirrored data");
 #ifdef RFIO
-  syslog (LOG_ERR, "[--rfio 0|1] write data to tape via rfio (0=no/1=yes)");
-  syslog (LOG_ERR, "[--rfiopath path_to_tape_archive]");
+  syslog (LOG_ERR, "Usage: [--rfio path_to_tape_archive] example: --rfio rfiodaq:gstore:/hadaqtest/test002");
 #endif
-  syslog (LOG_ERR, "[--buffstat 0|1] show fill levels of buffers]");
+  syslog (LOG_ERR, "Usage: [--buffstat] show fill levels of buffers");
+  syslog (LOG_ERR, "Usage: [--epicsctrl] enable synch and distribution of RUN Id by Epics for parallel event builders");
+  syslog (LOG_ERR, "Usage: [--lustre path_to_lustre] path to the file on the Lustre cluster");
 }
 
-static void
-argsDump (TheArgs * my)
+static void argsDump (TheArgs * my)
 {
   int i;
 
   syslog (LOG_DEBUG, "nrOfMsgs: %d", my->nrOfMsgs);
-  for (i = 0; i < my->slowCtrlFileCnt; i++)
-    {
-      syslog (LOG_DEBUG, "slowCtrlFiles[%d]: %s", i, my->slowCtrlFiles[i]);
-    }
+  for (i = 0; i < my->slowCtrlFileCnt; i++) {
+    syslog (LOG_DEBUG, "slowCtrlFiles[%d]: %s", i, my->slowCtrlFiles[i]);
+  }
   syslog (LOG_DEBUG, "outPath: %s", my->outPath);
   syslog (LOG_DEBUG, "outDev: %s", my->outDev);
   syslog (LOG_DEBUG, "runNr: %d", my->runNr);
@@ -214,26 +215,28 @@ argsDump (TheArgs * my)
   syslog (LOG_DEBUG, "verbosity: %s", my->verbosity);
   syslog (LOG_DEBUG, "evtId: %ld", my->evtId);
   syslog (LOG_DEBUG, "maxFileSz: %ld", my->maxFileSz);
-  if (my->resdownscale != 0)
-    {
+  if (my->resdownscale != 0) {
       syslog (LOG_DEBUG, "resdownscale: %ld", my->resdownscale);
       syslog (LOG_DEBUG, "resnumevents: %ld", my->resnumevents);
       syslog (LOG_DEBUG, "respath: %s", my->respath);
       syslog (LOG_DEBUG, "secsizelimit: %ld", my->secsizelimit);
       syslog (LOG_DEBUG, "ressizelimit: %d", my->ressizelimit);
-    }
-  if (my->no_rpc == 1)
-    {
+  }
+  if (my->no_rpc == 1) {
       syslog (LOG_DEBUG, "no rpc is set");
-    }
-  if (my->write_data == 1)
-    {
+  }
+  if (my->write_data == 1) {
       syslog (LOG_DEBUG, "sec_path: %s", my->sec_path);
-    }
+  }
+  if( strcmp( my->rfioRemotePath, "" ) != 0 ) {
+      syslog (LOG_DEBUG, "rfio path: %s", my->rfioRemotePath);
+  }
+  if( strcmp( my->lustrePath, "" ) != 0 ) {
+      syslog (LOG_DEBUG, "lustre path: %s", my->lustrePath);
+  }
 }
 
-static void
-argsDefault (TheArgs * my)
+static void argsDefault (TheArgs * my)
 {
   int i;
 
@@ -248,7 +251,7 @@ argsDefault (TheArgs * my)
   strcpy (my->expId, "xx");
   my->priority = 0;
   my->isStandalone = 1;
-  my->queueSize = 1 * 1024 * 1024;
+  my->queueSize = 4 * 1024 * 1024UL;
   strcpy (my->verbosity, "info");
   my->evtId = 0;
   my->maxFileSz = (2 * 1024 * 1024 * 1024UL - 1);
@@ -262,15 +265,16 @@ argsDefault (TheArgs * my)
   strcpy (my->respath, "");
 
   for (i = 0; i < MAXINPATH; i++) {
-    my->varQSize[i] = 1 * 1024 * 1024;
+    my->varQSize[i] = 4 * 1024 * 1024UL;
   }
-  my->rfioFlag = 0;
+
   strcpy (my->rfioRemotePath, "");
+  strcpy (my->lustrePath, "");
   my->buffStat = 0;
+  my->epicsCtrl = 0;
 }
 
-static int
-argsFromCL (TheArgs * my, int argc, char *argv[])
+static int argsFromCL (TheArgs * my, int argc, char *argv[])
 {
   extern char *optarg;
   int i;
@@ -290,13 +294,13 @@ argsFromCL (TheArgs * my, int argc, char *argv[])
        {"write_data",   1, 0, 'w'},
        {"help",         0, 0, 'H'},
        {"rfio",         1, 0, 'R'},
-       {"rfiopath",     1, 0, 'P'},
-       {"buffstat",     1, 0, 'b'},
+       {"buffstat",     0, 0, 'b'},
+       {"epicsctrl",    0, 0, 'E'},
+       {"lustre",       1, 0, 'L'},
        {0, 0, 0, 0}
       };
-      i =
-       getopt_long (argc, argv,
-                    "am:f:r:o:d:q:p:v:x:I:S:tz:e:n:h:w:tz:e:n:Hs:l:R:P:b:",
+      i = getopt_long (argc, argv,
+                    "am:f:r:o:d:q:p:v:x:I:tz:e:n:h:w:tz:e:n:Hs:l:R:bEL:",
                     long_options, &option_index);
       if (i == -1)
        break;
@@ -332,84 +336,44 @@ argsFromCL (TheArgs * my, int argc, char *argv[])
        case 'I':
          my->evtId = strtoul (optarg, NULL, 0);
          break;
-       case 'S':
-         my->maxFileSz = strtoul (optarg, NULL, 0);
-         break;
-       case 't':               /*norpc - no arg */
+       case 't':                           /* norpc - no arg */
          my->no_rpc = 1;
          break;
-       case 'z':               /*filesize - one arg */
-         if (strtoul (optarg, NULL, 0) > 0
-             && strtoul (optarg, NULL, 0) < 20000)
-           {
-             my->maxFileSz = (1024 * 1024UL * strtoul (optarg, NULL, 0));
-           }
-         else
-           {
-             printf
-               ("filesize option incorrect,it should be >0 and <20000\n");
-             exit (0);
-           }
+       case 'z':                           /* filesize - one arg */
+         my->maxFileSz = (1024 * 1024UL * strtoul (optarg, NULL, 0));
          break;
-       case 'e':               /*resdownscale- one arg & need resnumevents & respath */
-         if (strtoul (optarg, NULL, 0) > 0
-             && strtoul (optarg, NULL, 0) < 100001)
-           {
-             my->resdownscale = strtoul (optarg, NULL, 0);
-             my->resdown_offset = EVENT_NUM_OFFSET / (my->resdownscale);
-           }
-         else
-           {
-             printf
-               ("resdownscale option incorrect,it should be >0 and <100001\n");
-           }
+       case 'e':                           /* need resnumevents & respath */
+         my->resdownscale = strtoul (optarg, NULL, 0);
+         if(my->resdownscale > 0)
+           my->resdown_offset = EVENT_NUM_OFFSET / (my->resdownscale);
          break;
-       case 'n':               /*resnumevents - one arg */
-         if (strtoul (optarg, NULL, 0) > 99
-             && strtoul (optarg, NULL, 0) < 1000000001)
-           {
-             my->resnumevents = strtoul (optarg, NULL, 0);
-           }
-         else
-           {
-             printf
-               ("resnumevents incorrect, it should be >99 and <1000000001\n");
-           }
+       case 'n':               
+         my->resnumevents = strtoul (optarg, NULL, 0);
          break;
-       case 'l':               /*secsizelimit - one arg */
-         if (strtoul (optarg, NULL,0) > 10
-             && strtoul (optarg, NULL,0) < 100001)
-           {
-             unsigned long tmp = (/*1024 * 1024UL **/ strtoul (optarg, NULL,0));
-#if 0
-             my->secsizelimit = (((double)tmp)/((double)(1024.*1024.)));
-#endif
-             my->secsizelimit = (double)tmp;
-             printf("set secsizelimit: %d MB \n",my->secsizelimit);
-           }
+       case 'l':         
+         my->secsizelimit = (double) strtoul (optarg, NULL,0);
          break;
-       case 's':               /*ressizelimit - one arg */
-         if (strtoul (optarg, NULL, 0) > 5
-             && strtoul (optarg, NULL, 0) < 1000)
-           {
-             my->ressizelimit = strtoul (optarg, NULL, 0);
-           }
+       case 's':          
+         my->ressizelimit = strtoul (optarg, NULL, 0);
          break;
-       case 'h':               /*respath - one arg as dir_path */
+       case 'h':           
          strcpy (my->respath, optarg);
          break;
-       case 'w':               /*write_data - one arg as dir_path */
+       case 'w':            
          my->write_data = 1;
          strcpy (my->sec_path, optarg);
          break;
        case 'R':
-         my->rfioFlag = strtoul (optarg, NULL, 0);
-         break;
-       case 'P':
          strcpy (my->rfioRemotePath, optarg);
          break;
+       case 'L':
+         strcpy (my->lustrePath, optarg);
+         break;
        case 'b':
-         my->buffStat = strtoul (optarg, NULL, 0);
+         my->buffStat = 1;
+         break;
+       case 'E':
+         my->epicsCtrl = 1;
          break;
        case 'H':
          usage (argv[0]);
@@ -428,43 +392,78 @@ argsFromCL (TheArgs * my, int argc, char *argv[])
        printf ("%s ", argv[optind++]);
       printf ("\n");
     }
-/*
-the condition that resdownscale resnumevents respath have to be together
-*/
-  if (((my->resdownscale != 0) || (my->resnumevents != -1)
-       || (strcmp (my->respath, ""))) && ((my->resdownscale == 0)
-                                         || (my->resnumevents == -1)
-                                         || !strcmp (my->respath, "")))
-#if 0
-    if (((my->resdownscale != 0) || (my->resnumevents != -1)
-        || (my->respath != NULL)) && ((my->resdownscale == 0)
-                                      || (my->resnumevents == -1)
-                                      || (my->respath == NULL)))
-#endif
-      {
-       printf
-         ("if you are using Remote Events Server, you should fill: resdownscale, resnumevents, respath options\n");
-       exit (EXIT_FAILURE);
-      }
-  if (((my->maxFileSz) < (my->queueSize)))
-    {
-      printf ("--filesize has to be > queuesize(-q)\n");
-      exit (0);
-    }
+
+  return 0;
+}
+
+static int argsCheck( TheArgs *my )
+{
+  /*
+   *   Check the content of TheArgs.
+   */
+
+  /* check my->maxFileSz */
+  if( (strcmp(my->outDev, "null") != 0) && (my->maxFileSz <= 0 || my->maxFileSz >= 1024 * 1024UL * 2000) ) {
+    fprintf( stderr, "<E> evtbuild.c, argsCheck(): --filesize must be >0 and <20000\n");
+    return 1;
+  }
+
+  /* Conditions: if the Remote Event Server is used */
+  if ( ( my->resdownscale != 0 && ( my->resnumevents == -1 || 
+                                   (strcmp(my->respath, "") == 0) ) ) ||
+       ( my->resnumevents != -1 && ( my->resdownscale == 0 ||
+                                    (strcmp(my->respath, "") == 0) ) ) ||
+       ( (strcmp(my->respath, "") != 0) && ( my->resnumevents == -1 ||
+                                            my->resdownscale == 0 ) ) ) {
+       
+    fprintf( stderr, "<E> evtbuild.c, argsCheck(): options --resdownscale --resnumevents --respath must be specified together\n");
+    return 1;
+  }
+
+  /* check my->resdownscale */
+  if( (strcmp(my->respath, "") != 0) && 
+      ( my->resdownscale <= 0 || my->resdownscale >= 100001 ) ) { 
+    fprintf( stderr,"<E> evtbuild.c, argsCheck(): --resdownscale must be >0 and <100001\n");
+    return 1;
+  }
+
+  /* check my->resnumevents */
+  if( (strcmp(my->respath, "") != 0) && 
+      ( my->resnumevents <= 99 || my->resnumevents >= 1000000001 ) ) {
+     fprintf( stderr,"<E> evtbuild.c, argsCheck(): --resnumevents must be >99 and <1000000001\n");
+     return 1;
+  }
+
+  /* check my->secsizelimit */
+  if( (strcmp(my->respath, "") != 0) && 
+      ( my->secsizelimit <= -0.0001 || my->secsizelimit >= 0.0001 ) &&
+      ( my->secsizelimit <= 10 || my->secsizelimit >= 100001 ) ) {
+    fprintf( stderr,"<E> evtbuild.c, argsCheck(): --secsizelimit must be >10 and <100000 MB\n");
+    return 1;
+  }
+
+  /* check my->ressizelimit */
+  if( (strcmp(my->respath, "") != 0) && 
+      ( my->ressizelimit <= 5 || my->ressizelimit >= 1000 ) ) {
+    fprintf( stderr, "<E> evtbuild.c, argsCheck(): --ressizelimit must be >5 and <1000\n");
+    return 1;
+  }
+
+  if (((my->maxFileSz) < (my->queueSize))) {
+    fprintf( stderr, "<E> evtbuild.c, argsCheck(): --filesize must be larger than queuesize(-q)\n");
+    return 1;
+  }
+
   /* the condition ressizelimit has to be together with respath */
-  if ((my->ressizelimit) != 0)
-    {
-      if (!strcmp (my->respath, ""))
-         {
-           printf ( "you have to define path, where the files will be collect. use 'respath' option\n" );
-           exit (EXIT_FAILURE);
-         }
-    }
+  if ((my->ressizelimit) != 0 && (strcmp(my->respath, "") == 0) ) {
+    fprintf( stderr, "<E> evtbuild.c, argsCheck(): --respath is not given\n" );
+    return 1;
+  }
+  
   return 0;
 }
 
-static int
-argsFromParam (TheArgs * my, int argc, char *argv[])
+static int argsFromParam (TheArgs *my, int argc, char *argv[])
 {
   Param paramS, *param = &paramS;
   int paramWasFound;
@@ -473,33 +472,30 @@ argsFromParam (TheArgs * my, int argc, char *argv[])
   conSetupParam (param, getenv ("DAQ_SETUP"));
 
   name = (char *) basename (argv[0]);
-  Param_getInt (param, name, "nrofmsgs", &paramWasFound, &my->nrOfMsgs);
+  Param_getInt    (param, name, "nrofmsgs", &paramWasFound, &my->nrOfMsgs);
   Param_getStringArray (param, name, "slwctrlfile", PARAM_MAX_ARRAY_LEN,
                        &my->slowCtrlFileCnt, my->slowCtrlFiles);
   Param_getString (param, name, "outpath", &paramWasFound, my->outPath);
-  Param_getString (param, name, "outdev", &paramWasFound, my->outDev);
-  Param_getString (param, name, "expid", &paramWasFound, my->expId);
-  Param_getInt (param, name, "stndln", &paramWasFound, &my->isStandalone);
-  Param_getInt (param, name, "prio", &paramWasFound, &my->priority);
-  Param_getInt (param, name, "qsize", &paramWasFound, &my->queueSize);
-  Param_getString (param, name, "verb", &paramWasFound, my->verbosity);
-  Param_getInt (param, name, "evtid", &paramWasFound, &my->evtId);
-  Param_getInt (param, name, "maxfilesz", &paramWasFound, &my->maxFileSz);
+  Param_getString (param, name, "outdev",  &paramWasFound, my->outDev);
+  Param_getString (param, name, "expid",   &paramWasFound, my->expId);
+  Param_getInt    (param, name, "stndln",  &paramWasFound, &my->isStandalone);
+  Param_getInt    (param, name, "prio",    &paramWasFound, &my->priority);
+  Param_getInt    (param, name, "qsize",   &paramWasFound, &my->queueSize);
+  Param_getString (param, name, "verb",    &paramWasFound, my->verbosity);
+  Param_getInt    (param, name, "evtid",   &paramWasFound, &my->evtId);
+  Param_getInt    (param, name, "maxfilesz", &paramWasFound, &my->maxFileSz);
 
   Param_getIntArray(param, name, "varqsize", MAXINPATH, &my->varQSizeCnt, my->varQSize);
   desParam (param);
 }
 
-static char *
-unit (unsigned long v)
+static char *unit (unsigned long v)
 {
   static char retVal[6];
   static char u[] = " kM";
   int i;
 
-  for (i = 0; v >= 10000 && i < sizeof (u) - 2; v /= 1000, i++)
-    {
-    }
+  for (i = 0; v >= 10000 && i < sizeof (u) - 2; v /= 1000, i++) {}
   sprintf (retVal, "%4d%c", v, u[i]);
 
   return retVal;
@@ -528,7 +524,41 @@ static void printTime()
   printf ("Time: %s.%03ld\n", time_string, milliseconds);
 }
 
-static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, HadTuQueue **htuq, ShmTrans **shmtr)
+static void add2Stat( TheArgs * theArgs, TheStats * my, float interval, ShmTrans **shmtr ) 
+{
+  /* Add statistic for fill levels of buffers in percentage. */
+
+  static unsigned long lastEvtsComplete;
+  static unsigned long lastBytesWritten;
+  static time_t t_0 = 0;
+  float buffSize, queueSize;
+  time_t t, dT;
+  int i;
+  unsigned long fillLevel;
+
+  t = time (NULL);
+  dT = t - t_0;
+  
+  if( dT >= interval ) {    
+    for( i=0; i<theArgs->nrOfMsgs; i++ ) {
+      buffSize  = 2*theArgs->varQSize[i];
+      queueSize = HadTuQueue_size(shmtr[i]->rdQueue);
+      
+      /* Add here statistic for fill levels of buffers */
+      fillLevel = (unsigned long) (100*queueSize+0.5)/buffSize;
+      (*my->evtbuildBuff[i]) = fillLevel;
+    }
+
+    /* Add more statistic for evtsComplete and bytesWritten per second */
+    (*my->evtsCompletePS) = (*my->evtsComplete - lastEvtsComplete) / dT;
+    (*my->bytesWrittenPS) = (*my->bytesWritten - lastBytesWritten) / dT;
+  }
+
+  t_0 = t;
+}
+
+static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, 
+                            HadTuQueue **htuq, ShmTrans **shmtr)
 {
   static unsigned long lastEC2;
   static unsigned long lastBW2;
@@ -541,7 +571,7 @@ static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, H
   int outputGraph = 1;
   int outputNum = 0;
 
-  if (theArgs->isStandalone && theArgs->buffStat == 1) {
+  if (theArgs->isStandalone && theArgs->buffStat) {
 
     t = time (NULL);
     dT = t - t0;
@@ -656,6 +686,10 @@ static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, H
        
        /* get wall-clock time */
        printTime();
+
+       unsigned long runId2print;
+       Worker_getStatistic( "daq_evtbuild", "runId", &runId2print);
+       printf("ioc: RUN Id = %lu\n", runId2print);
       }
     }
     
@@ -663,6 +697,27 @@ static void statsBufferDump (TheArgs * theArgs, TheStats * my, float interval, H
   }
 }
 
+unsigned long getRunId( TheArgs *my )
+{
+  unsigned long myRunId = 0;
+  struct timespec tv = { 0, 1e+8 };
+
+  while( my->epicsCtrl && myRunId == 0 ) {
+    if( Worker_getStatistic( "daq_evtbuild", "runId", &myRunId) == -1 ) {
+      fprintf( stderr, "<E> evtbuild.c: getRunId: Worker_getStatistic: cannot get runId!" );
+      sleep(1);
+    }
+    else {
+
+      if( myRunId == 0 ) 
+       nanosleep( &tv, NULL );
+    }
+    
+  }
+
+  return myRunId;
+}
+
 static void statsDump (TheArgs * theArgs, TheStats * my, int interval)
 {
   static unsigned long lastEC;
@@ -674,9 +729,8 @@ static void statsDump (TheArgs * theArgs, TheStats * my, int interval)
   time_t t, dT;
   int i;
 
-
   if (theArgs->isStandalone && strcmp (theArgs->verbosity, "info") == 0 &&
-      theArgs->buffStat == 0)
+      !(theArgs->buffStat))
     {
       t = time (NULL);
       dT = t - t0;
@@ -812,9 +866,7 @@ static void storeRunInfoStop(time_t t, TheArgs *myArgs, TheStats *myStats)
   fclose(fp);  
 }
 
-#define NJUNK 128
-static void
-storeInfoStart (const char *n, time_t t, TheArgs * my)
+static void storeInfoStart (const char *n, time_t t, TheArgs * my)
 {
   Param pS, *p = &pS;
   int i;
@@ -845,10 +897,6 @@ storeInfoStart (const char *n, time_t t, TheArgs * my)
       Param_storeString (p, n, "respath", my->respath);
       Param_storeInt (p, n, "ressizelimit", my->ressizelimit);
 
-    }
-  if (my->secsizelimit != 0.)
-    {
-/*    Param_storeString (p, n, "secsizelimit", my->secsizelimit);*/
     }
   if (my->no_rpc == 1)
     {
@@ -858,17 +906,6 @@ storeInfoStart (const char *n, time_t t, TheArgs * my)
     {
       Param_storeString (p, n, "sec_path", my->sec_path);
     }
-  /* This storing junk is for having the run start in
-   * oracle definately -- BS
-   */
-
-  /*
-   *for (i = 0; i < NJUNK; i++)
-   * {
-   *   Param_storeString (p, n, "junk",
-   *                    "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
-   *  }
-   */
 
   desParam (p);
 }
@@ -891,41 +928,46 @@ static void storeInfoStop (const char *n, time_t t, Worker * w, TheArgs * my)
   desParam (p);
 }
 
-int is_mismatch_enough_to_stop(TheArgs * theArgs, TheStats * my) {
-  /*unsigned long filesize_enough = (theArgs->maxFileSz) * 0.02;*/
-  unsigned long filesize_enough = (theArgs->maxFileSz) * 0.1;
-  /*unsigned long discarded_enough = (*my->evtsComplete) * 0.03;*/
-  unsigned long discarded_enough = (*my->evtsComplete) * 0.1;
-  unsigned long tagerror_enough = (*my->evtsComplete) * 0.03;
-  if ((*my->bytesWritten) >= filesize_enough)
-  {
-    if ((*my->evtsDiscarded) > discarded_enough) 
-    {
-     printf("Too many events are broken.\n");
-     printf("Debug Inf: filesize_enough: %u,fileWritten: %u, DISCARDED_enough: %u, discarded: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),discarded_enough, (*my->evtsDiscarded), (*my->evtsComplete));
-     syslog (LOG_ERR, "Too many events are broken.\n");
-     syslog (LOG_ERR, "Debug Inf: filesize_enough: %u,fileWritten: %u, DISCARDED_enough: %u, discarded: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),discarded_enough, (*my->evtsDiscarded), (*my->evtsComplete));
+int is_mismatch_enough_to_stop(TheArgs * theArgs, TheStats * my) 
+{
+  /* min file size to start checking */
+  unsigned long minfilesize  = (theArgs->maxFileSz) * 0.1; /* 0.02 */
+  /* max number of discarded evts allowed */
+  unsigned long maxdiscarded = (*my->evtsComplete) * 0.1; /* 0.03 */
+  /* max number of evts with tag error allowed */
+  unsigned long maxtagerror  = (*my->evtsComplete) * 0.03;
+
+  if ((*my->bytesWritten) >= minfilesize) {
+    if ((*my->evtsDiscarded) > maxdiscarded) {
+
+      syslog (LOG_ERR, "<E> evtbuild.c: Too many events are broken!");
+      syslog (LOG_ERR, "<D> evtbuild.c: bytes written: %u, minimum file size: %u",
+             minfilesize, (*my->bytesWritten) );
+      syslog (LOG_ERR, "<D> evtbuild.c: discarded evts: %u, allowed: %u, total evts num: %u", 
+             (*my->evtsDiscarded), maxdiscarded, (*my->evtsComplete));
 #if BEAM
-     system ("echo tagerror | netcat -w1 hadesdaq 12122");
+      system ("echo tagerror | netcat -w1 hadesdaq 12122");
 #endif
-     sleep(5); 
+      sleep(5); 
     }
-    if ((*my->evtsTagError) > tagerror_enough) 
-    {
-     printf("Too many TagErrors.\n");
-     printf("Debug Inf: filesize_enough: %u,fileWritten: %u, TAGERROR_enough: %u, tag_error: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),tagerror_enough, (*my->evtsTagError), (*my->evtsComplete));
-     syslog(LOG_ERR, "Too many TagErrors.\n");
-     syslog(LOG_ERR, "Debug Inf: filesize_enough: %u,fileWritten: %u, TAGERROR_enough: %u, tag_error: %u, all_events: %u\n",filesize_enough , (*my->bytesWritten),tagerror_enough, (*my->evtsTagError), (*my->evtsComplete));
+    if ((*my->evtsTagError) > maxtagerror) {
+
+      syslog(LOG_ERR, "<E> evtbuild.c: Too many TagErrors!");
+      syslog (LOG_ERR, "<D> evtbuild.c: bytes written: %u, minimum file size: %u",
+             minfilesize, (*my->bytesWritten) );
+      syslog (LOG_ERR, "<D> evtbuild.c: tagerror evts: %u, allowed: %u, total evts num: %u", 
+             (*my->evtsTagError), maxtagerror, (*my->evtsComplete));
+
 #if BEAM
-     system ("echo tagerror | netcat -w1 hadesdaq 12122");
+      system ("echo tagerror | netcat -w1 hadesdaq 12122");
 #endif
-     sleep(5); 
+      sleep(5); 
     }
   }
   return 0;
 }
-int
-get_file_number_in_dir (char *path)
+
+int get_file_number_in_dir (char *path)
 {
   int file_number;
   file_number = 0;
@@ -936,39 +978,21 @@ get_file_number_in_dir (char *path)
 
   dir = opendir (path);
 
-  if (dir == NULL)
-    {
-      printf ("open dir: %s failed\n", path);
-      exit (EXIT_FAILURE);
-    }
+  if (dir == NULL) {
+    syslog(LOG_ERR, "%s, %d: trying to open directory %s: %s", 
+          __FILE__, __LINE__, path, strerror(errno));
+    exit (EXIT_FAILURE);
+  }
 
   while (NULL != (dirptr = readdir (dir)))
-    {
-#if 0
-      strcpy (tmppath, path);
-      strcat (tmppath, "/");
-      strcat (tmppath, dirptr->d_name);
-      lstat (tmppath, buf);
+    file_number ++;
 
-      if (S_ISDIR (buf->st_mode))
-       {
-#if 0
-         printf ("directory %s \n", tmppath);
-#endif
-       }
-      else
-       {
-#endif
-         file_number ++;
-#if 0
-       }
-#endif
-    }
   closedir(dir);
+
   return file_number;
 }
-double
-get_directory_size (char *path)
+
+double get_directory_size (char *path)
 {
   double directory_size;
   directory_size = 0.;
@@ -979,43 +1003,32 @@ get_directory_size (char *path)
 
   dir = opendir (path);
 
-  if (dir == NULL)
-    {
-      printf ("open dir: %s failed\n", path);
+  if (dir == NULL) {
+      syslog(LOG_ERR, "%s, %d: trying to open directory %s: %s", 
+            __FILE__, __LINE__, path, strerror(errno));
       exit (EXIT_FAILURE);
-    }
+  }
 
-  while (NULL != (dirptr = readdir (dir)))
-    {
+  while (NULL != (dirptr = readdir (dir))) {
       strcpy (tmppath, path);
       strcat (tmppath, "/");
       strcat (tmppath, dirptr->d_name);
       lstat (tmppath, buf);
 
-      if (S_ISDIR (buf->st_mode))
-       {
-#if 0
-         printf ("directory %s \n", tmppath);
-#endif
-       }
-      else
-       {
+      if( !(S_ISDIR (buf->st_mode)) ) {
          directory_size += (double)(buf->st_size/(double)(1024.*1024.));
-       }
-    }
+      }
+  }
+
   closedir(dir);
   return directory_size;
 }
 
 #define DEBUG 0
-static int
-remove_file (char *path)
+static int remove_file (char *path)
 {
-/*delete files if limit was reached*/
-#if 0
-  printf ("limit overflow: ressizelimit: %f, res_dirSize: %f\n",
-         theArgs->ressizelimit, res_dirSize);
-#endif
+  /*delete files if limit was reached*/
+
   DIR *dir;
   struct dirent *dirptr;
   struct stat bufS, *buf = &bufS;
@@ -1024,48 +1037,39 @@ remove_file (char *path)
   char tmppath[PARAM_MAX_VALUE_LEN];
   char last_modification_file[PARAM_MAX_VALUE_LEN];
   dir = opendir (path);
-  if (dir == NULL)
-    {
-      printf ("open dir: %s failed\n", path);
+
+  if (dir == NULL) {
+      syslog(LOG_ERR, "%s, %d: opening dir %s failed", __FILE__, __LINE__, path);
       exit (EXIT_FAILURE);
-    }
+  }
 
-  while (1)
-    {
-      if (NULL == (dirptr = readdir (dir)))
-       {
-         break;
-       }
-      strcpy (tmppath, path);
-      strcat (tmppath, "/");
-      strcat (tmppath, dirptr->d_name);
-      lstat (tmppath, buf);
-      if(strstr(tmppath,"hld") == NULL){
-       continue;
-      }
+  while (1) {
 
-      if (S_ISDIR (buf->st_mode))
-       {
-#if 0
-         printf ("directory %s \n", tmppath);
-#endif
-       }
-      else
-       {
-         if (buf->st_mtime < last_modification_time)
-           {
-             last_modification_time = buf->st_mtime;
-             recover_size = buf->st_size;
-             strcpy (last_modification_file, tmppath);
-           }
-       }
+    if (NULL == (dirptr = readdir (dir)))
+        break;
+    
+    strcpy (tmppath, path);
+    strcat (tmppath, "/");
+    strcat (tmppath, dirptr->d_name);
+    lstat (tmppath, buf);
+
+    if(strstr(tmppath,"hld") == NULL)
+        continue;
+      
+
+    if ( !(S_ISDIR(buf->st_mode)) ) {
+      if (buf->st_mtime < last_modification_time) {
+       last_modification_time = buf->st_mtime;
+       recover_size = buf->st_size;
+       strcpy (last_modification_file, tmppath);
+      }
     }
+  }
+
   closedir(dir);
-#if 0
-  printf ("unlink: %s\n", last_modification_file);
-#endif
-  if(0!=unlink (last_modification_file)){
-    printf("cannot unlink %s\n",last_modification_file);
+
+  if( 0 != unlink (last_modification_file) ) {
+    syslog(LOG_ERR, "%s, %d: cannot unlink %s", __FILE__, __LINE__, last_modification_file);
     exit(0);
   }
   return recover_size;
@@ -1076,23 +1080,29 @@ static int openFile (TheArgs * theArgs)
 
   char fileName[_POSIX_PATH_MAX];
   static char outPath[_POSIX_PATH_MAX];
+  static char outLustrePath[_POSIX_PATH_MAX];
   static char sec_path[_POSIX_PATH_MAX];
   static once = 1;
 
   diff_time = 1;
-  runNr = genId32 ();
+  
+  if( !(theArgs->epicsCtrl) ) 
+    runNr = genId32 ();
+
   seqNr = 0;
   
   theArgs->runNr = runNr;
-
+  
   if (once) {
     file_size = theArgs->maxFileSz;
     strcpy (sec_path, theArgs->sec_path);
     strcpy (outPath, theArgs->outPath);
+    strcpy (outLustrePath, theArgs->lustrePath);
     once = 0;
   }
   else {
     strcpy (theArgs->outPath, outPath);
+    strcpy (theArgs->lustrePath, outLustrePath);
     strcpy (theArgs->sec_path, sec_path);
   }
 
@@ -1104,6 +1114,8 @@ static int openFile (TheArgs * theArgs)
   outTape = NULL;
   outFile = NULL;
   outSecondFile = NULL;
+  outLustreFile = NULL;
+
   if (strcmp (theArgs->outDev, "null") == 0) {
     outFile = NULL;
   }
@@ -1123,7 +1135,7 @@ static int openFile (TheArgs * theArgs)
        strcat (theArgs->outPath, fileName);
       }
     }
-
+    
     if (NULL == (outFile = fopen (theArgs->outPath, "wb"))) {
       syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath,
              strerror (errno));
@@ -1135,13 +1147,9 @@ static int openFile (TheArgs * theArgs)
       statfs (theArgs->outPath, buf);
       if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) {
        errno = ENOSPC;
-/*     if (0 != fclose (outFile)) { */
-/*       fprintf (stderr, "<E> evtbuild.c, openFile: fclose 'File' failed\n"); */
-/*     } */
        outFile = NULL;
        unlink (theArgs->outPath);
-       syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath,
-               strerror (errno));
+       syslog (LOG_ERR, "opening file %s: %s", theArgs->outPath, strerror (errno));
        return -1;
       }
     }
@@ -1151,8 +1159,7 @@ static int openFile (TheArgs * theArgs)
       strcpy (theArgs->outPath, fileName);
     }
     if (NULL == (outTape = openAnsiTape (theArgs->outPath, "/dev/tape"))) {
-      syslog (LOG_ERR, "opening tape %s: %s", theArgs->outPath,
-             strerror (errno));
+      syslog (LOG_ERR, "opening tape %s: %s", theArgs->outPath, strerror (errno));
       outFile = NULL;
       return -1;
     }
@@ -1161,58 +1168,101 @@ static int openFile (TheArgs * theArgs)
     syslog (LOG_ERR, "unknown outputDev \"%s\"", theArgs->outDev);
     return -1;
   }
+
   if (theArgs->write_data) {
     dirSize = get_directory_size (sec_path);
-#if 0
-      printf("dirSize: %f, secsizelimit: %f\n",dirSize,theArgs->secsizelimit);
-#endif
-      while (dirSize > theArgs->secsizelimit) {
-       /*delete files if limit was reached */
-#if 0
-       printf ("limit overflow: secsizelimit: %f, dirSize: %f\n",
-               theArgs->secsizelimit, dirSize);
-       getchar();
-#endif
-       remove_file (sec_path);
-       dirSize = get_directory_size (sec_path);
-      }
-      if (strcmp (theArgs->sec_path, "") == 0) {
-       strcpy (theArgs->sec_path, fileName);
+    
+    while (dirSize > theArgs->secsizelimit) {
+      /*delete files if limit was reached */
+      remove_file (sec_path);
+      dirSize = get_directory_size (sec_path);
+    }
+    if (strcmp (theArgs->sec_path, "") == 0) {
+      strcpy (theArgs->sec_path, fileName);
+    }
+    else {
+      struct stat bufS, *buf = &bufS;
+      
+      stat (theArgs->sec_path, buf);
+      if (S_ISDIR (buf->st_mode)) {
+       strcat (theArgs->sec_path, "/");
+       strcat (theArgs->sec_path, fileName);
       }
-      else {
-       struct stat bufS, *buf = &bufS;
-       
-       stat (theArgs->sec_path, buf);
-       if (S_ISDIR (buf->st_mode)) {
-         strcat (theArgs->sec_path, "/");
-         strcat (theArgs->sec_path, fileName);
+    }
+    if (NULL == (outSecondFile = fopen (theArgs->sec_path, "wb"))) {
+      syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path, strerror (errno));
+      outSecondFile = NULL;
+      return -1;
+    }
+    else {
+      struct statfs bufS, *buf = &bufS;
+      statfs (theArgs->sec_path, buf);
+      if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) {
+       errno = ENOSPC;
+       if (0 != fclose (outSecondFile)) {
+         fprintf (stderr, "<E> evtbuild.c, openFile: fclose 'outSecondFile' failed\n");
        }
-      }
-      if (NULL == (outSecondFile = fopen (theArgs->sec_path, "wb"))) {
-       syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path,
-               strerror (errno));
        outSecondFile = NULL;
+       unlink (theArgs->sec_path);
+       syslog (LOG_ERR, "opening file %s: %s", theArgs->sec_path, strerror (errno));
        return -1;
       }
-      else {
-       struct statfs bufS, *buf = &bufS;
-       statfs (theArgs->sec_path, buf);
-       if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) {
-         errno = ENOSPC;
-         if (0 != fclose (outSecondFile)) {
-           fprintf (stderr, "<E> evtbuild.c, openFile: fclose 'outSecondFile' failed\n");
-         }
-         outSecondFile = NULL;
-         unlink (theArgs->sec_path);
-         syslog (LOG_ERR, "opening file %s: %s",
-                 theArgs->sec_path, strerror (errno));
-         return -1;
-       }
+    }
+  }
+  
+  /* Open file on Lustre */
+  if( strcmp (theArgs->lustrePath, "") != 0 ) {
+    struct stat bufS, *buf = &bufS;
+    stat (theArgs->lustrePath, buf);
+    
+    /* If lustrePath is a dir name, add fileName to the lustrePath */
+    if (S_ISDIR (buf->st_mode)) {
+      strcat (theArgs->lustrePath, "/");
+      strcat (theArgs->lustrePath, fileName);
+    }    
+    
+    if (NULL == (outLustreFile = fopen (theArgs->lustrePath, "wb"))) {
+      syslog (LOG_ERR, "opening file %s: %s", theArgs->lustrePath, strerror (errno));
+    }
+    else {
+      struct statfs bufS, *buf = &bufS;
+      statfs (theArgs->lustrePath, buf);
+      
+      /*   Check if the number of free available blocks
+       *   is enough to write the file with maximum size.
+       */
+      if (theArgs->maxFileSz / buf->f_bsize > buf->f_bavail) {
+       errno = ENOSPC;
+       
+       outLustreFile = NULL;
+       unlink (theArgs->lustrePath);
+       syslog (LOG_ERR, "%s, %d: opening file %s: %s", 
+               __FILE__, __LINE__, theArgs->lustrePath, strerror (errno));
+       return -1;      
       }
+    }
   }
+
+#ifdef RFIO
+  /* open file on Data Mover */
+  if( fRemote != NULL ) {
+    char rfioPath[_POSIX_PATH_MAX];
+    strcpy( rfioPath, theArgs->rfioRemotePath );
+    strcat( rfioPath, "/" );
+    strcat( rfioPath, fileName );    
+
+    fprintf ( stderr, "<D> evtbuild.c, rfio_fnewfile(): trying to open remote file %s\n", rfioPath );
+    if( 0 != rfio_fnewfile( fRemote, rfioPath ) ) {
+      syslog( LOG_ERR, "%s, %d: cannot open remote file %s: %s", 
+             __FILE__, __LINE__, rfioPath, strerror(errno) );
+      exit (EXIT_FAILURE);
+    }
+  }
+#endif
+
   return 0;
 }
-
+  
 static int openRESFile (TheArgs * theArgs)
 {
   char fileName[_POSIX_PATH_MAX];
@@ -1231,31 +1281,26 @@ static int openRESFile (TheArgs * theArgs)
 
   if (theArgs->ressizelimit != 0) {
       res_dirNr = get_file_number_in_dir (respath);
-#if 0
-      printf("res_dirNr: %d, theArgs->ressizelimit: %d\n,", res_dirNr,theArgs->ressizelimit);
-#endif
 
       while (res_dirNr > theArgs->ressizelimit) {
 
          /*delete files if limit was reached */
-#if 0
-         printf ("limit overflow: res_sizelimit: %ul, res_dirSize: %ul\n",
-                 theArgs->ressizelimit, res_dirSize);
-#endif
          remove_file (respath);
          res_dirNr--;
        }
- }
 }
   strcpy (fileName, theArgs->expId);
   strftime (fileName + strlen (fileName), 15, "%y%j%H%M%S_",
            localtime (&res_time));
   static int filecounter = 1;
-  if(diff_time == 1){
+
+  if( diff_time == 1 ) {
          filecounter = 1;
          diff_time = 0;
-  }else { 
+  } else { 
          filecounter++;
   }
+
   char app[8];
 
   sprintf (app, "%d", filecounter);
@@ -1264,7 +1309,7 @@ static int openRESFile (TheArgs * theArgs)
 
   if (strcmp (theArgs->respath, "") == 0) {
       strcpy (theArgs->respath, fileName);
-    }
+  }
   else {
       struct stat bufS, *buf = &bufS;
 
@@ -1273,28 +1318,15 @@ static int openRESFile (TheArgs * theArgs)
       if (S_ISDIR (buf->st_mode)) {
          strcat (theArgs->respath, "/");
          strcat (theArgs->respath, fileName);
-       }
-    }
+      }
+  }
 
   /* construct a default filename */
-    outRESFile = NULL;
+  outRESFile = NULL;
 
-#if 0
-  /*if files are created more often than 1 sec, postfix is added to their name */
-  struct stat bufS, *buf = &bufS;
-  while (0 == stat (theArgs->respath, buf))
-    {                          /*file exists */
-      strcpy (theArgs->respath, respath);
-      sprintf (file_app, "%d", filecounter);
-      strcat (theArgs->respath, "/");
-      strcat (theArgs->respath, fileName);
-      strcat (theArgs->respath, file_app);
-      filecounter++;
-    }
-#endif
   if (NULL == (outRESFile = fopen (theArgs->respath, "wb"))) {
-      syslog (LOG_ERR, "opening file %s: %s", theArgs->respath,
-             strerror (errno));
+      syslog (LOG_ERR, "%s, %d: opening file %s: %s", 
+             __FILE__, __LINE__, theArgs->respath, strerror (errno));
       outRESFile = NULL;
       return -1;
     }
@@ -1306,12 +1338,14 @@ static int openRESFile (TheArgs * theArgs)
        {
          errno = ENOSPC;
          if (0 != fclose (outRESFile)) {
-           fprintf (stderr, "<E> evtbuild.c, openRESFile: fclose 'outRESFile' failed\n");
-           }
+           syslog(LOG_ERR, "%s, %d: trying fclose 'outRESFile': %s", 
+                  __FILE__, __LINE__, strerror(errno));
+         }
          outRESFile = NULL;
          unlink (theArgs->respath);
-         syslog (LOG_ERR, "opening file %s: %s", theArgs->respath,
-                 strerror (errno));
+
+         syslog (LOG_ERR, "%s, %d: opening file %s: %s", 
+                 __FILE__, __LINE__, theArgs->respath, strerror (errno));
          return -1;
        }
     }
@@ -1333,15 +1367,20 @@ static int writeFile (void *evt)
   else if (outTape != NULL) {
     writeFileR = writeAnsiTape (outTape, evt, Evt_paddedSize (evt));
   }
+
 #ifdef RFIO
-  else if( fRemote != NULL ) {
-    int rfioStatus = 0;
-    rfioStatus = rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote );
-    if (rfioStatus <= 0)
-      fprintf ( stderr, "<E> evtbuild.c, rfio_fwrite: writing file failed\n");
+  /* write to Data Mover via RFIO */
+  if( fRemote != NULL ) {
+    if( 0 >= rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote ) )
+      syslog(LOG_ERR, "%s, %d: writing file via RFIO: %s", __FILE__, __LINE__, strerror(errno));
   }
 #endif
 
+  /* write to Lustre */
+  if (outLustreFile != NULL) {
+      writeFileR = fwrite (evt, 1, Evt_paddedSize (evt), outLustreFile);
+  }
+
   /* writing file in the second dir  */
   if (outSecondFile != NULL) {
     fwrite (evt, 1, Evt_paddedSize (evt), outSecondFile);
@@ -1350,8 +1389,7 @@ static int writeFile (void *evt)
   return writeFileR;
 }
 
-static int
-writeRESFile (void *evt)
+static int writeRESFile (void *evt)
 {
   int writeFileR;
   Evt_setSeqNr (evt, res_seqNr++);
@@ -1367,25 +1405,49 @@ writeRESFile (void *evt)
 static int closeFile ()
 {
   int closeFileR;
+  int closeLustreFileR;
 
   /* closing file in the second dir  */
   if (outSecondFile != NULL) {
       dirSize += file_size;
       if (0 != fclose (outSecondFile)) {
-       fprintf (stderr, "<E> evtbuild.c, closeFile: fclose 'outSecondFile2' failed\n");
+       syslog(LOG_ERR, "%s, %d: trying fclose 'outSecondFile': %s", 
+              __FILE__, __LINE__, strerror(errno));
       }
     }
 
   if (outFile != NULL) {
       closeFileR = fclose (outFile);
       if (0 != closeFileR) {
-         printf ("<E> evtbuild.c, closeFile: fclose 'closeFileR' failed\n");
+       syslog(LOG_ERR, "%s, %d: trying fclose 'outFile': %s", 
+              __FILE__, __LINE__, strerror(errno));
       }
     }
   else if (outTape != NULL) {
     closeFileR = closeAnsiTape (outTape);
   }
 
+  /* close file on Lustre */
+  if (outLustreFile != NULL) {
+    closeLustreFileR = fclose (outLustreFile);
+    if (0 != closeLustreFileR) {
+      syslog(LOG_ERR, "%s, %d: trying fclose 'outLustreFile': %s", 
+            __FILE__, __LINE__, strerror(errno));
+      exit (EXIT_FAILURE);
+    }
+  }
+
+#ifdef RFIO
+  /* close file on Data Mover */
+  if( fRemote != NULL ) { 
+    if( 0 != rfio_fendfile( fRemote ) ) {
+      syslog(LOG_ERR, "%s, %d: trying rfio_fendfile: %s", 
+            __FILE__, __LINE__, strerror(errno));
+      exit (EXIT_FAILURE);
+    }
+  }
+#endif
+
   return closeFileR;
 }
 
@@ -1396,11 +1458,13 @@ static int closeRESFile (TheArgs * theArgs)
       res_dirSize += res_file_size;
       closeFileR = fclose (outRESFile);
       if (0 != closeFileR) {
-       fprintf ( stderr, "<E> evtbuild.c, closeRESFile: fclose 'outRESFile2' failed!\n");
-       }
-    }
-  else{
-      fprintf ( stderr,"<E> evtbuild.c, closeRESFile: closeFile error outRESFile == NULL\n");
+       syslog(LOG_ERR, "%s, %d: trying fclose 'outRESFile': %s", 
+              __FILE__, __LINE__, strerror(errno));
+      }
+  }
+  else {
+    syslog(LOG_ERR, "%s, %d: closeRESFile failed: outRESFile is NULL", 
+          __FILE__, __LINE__);
   }
 
   return closeFileR;
@@ -1409,7 +1473,8 @@ static int closeRESFile (TheArgs * theArgs)
 #ifdef RFIO
 static int rfio_openConnection (TheArgs * theArgs)
 {
-  if( theArgs->rfioFlag ){
+
+  if( (strcmp (theArgs->rfioRemotePath, "") != 0) ){
     char rfioBase[128] = "";
     char *pcc;
     strcpy( rfioBase, theArgs->rfioRemotePath );
@@ -1422,15 +1487,12 @@ static int rfio_openConnection (TheArgs * theArgs)
     pcc++;
     strncpy(pcc, "\0", 1);             /* terminates after node name */
 
-    fprintf ( stderr, "<D> evtbuild.c:rfio_openConnection(): try to open connection to server\n" );
+    fprintf ( stderr, "<I> evtbuild.c, rfio_fopen(): trying to open connection to Data Mover: %s\n", rfioBase );
     fRemote = rfio_fopen( rfioBase, "wb" );
-    fprintf ( stderr, "<D> evtbuild.c:rfio_openConnection(): opened connection to server\n" );
-
-    if (fRemote == NULL)
-    {
-      fprintf ( stderr, "<E> evtbuild.c:rfio_openConnection(): cannot open connection to server\n" );
-      syslog (LOG_ERR, "<E> evtbuild.c:rfio_openConnection(): cannot open connection to server");
 
+    if (fRemote == NULL) {
+      syslog(LOG_ERR, "%s, %d: trying to open connection to Data Mover %s: %s", 
+            __FILE__, __LINE__, rfioBase, strerror(errno));
       exit (EXIT_FAILURE);
     }
   }
@@ -1440,99 +1502,23 @@ static int rfio_openConnection (TheArgs * theArgs)
 
 static int rfio_closeConnection (TheArgs * theArgs)
 {
-  if( theArgs->rfioFlag && fRemote != NULL ){
-    int rfioStatus;
-
-    rfioStatus = rfio_fclose( fRemote );
+  if( (strcmp (theArgs->rfioRemotePath, "") != 0) && fRemote != NULL ) {
     
-    if (rfioStatus)
-    {
-      fprintf ( stderr, "<E> evtbuild.c:rfio_closeConnection(): closing connection to server failed\n" );
-      syslog (LOG_ERR, "<E> evtbuild.c:rfio_closeConnection(): closing connection to server failed");
+    if ( 0 != rfio_fclose( fRemote ) ) {
+      syslog(LOG_ERR, "%s, %d: trying to close connection to Data Mover: %s", 
+            __FILE__, __LINE__, strerror(errno));
       exit (EXIT_FAILURE);
     }
   }
 
   return 0;
 }
-
-static int rfio_openFile (TheArgs * theArgs)
-{
-
-  if( theArgs->rfioFlag && fRemote != NULL){
-    int rfioStatus;
-    char rfioFileName[_POSIX_PATH_MAX];
-    char rfioPath[_POSIX_PATH_MAX];
-
-    strcpy( rfioPath, theArgs->rfioRemotePath );
-
-/*     Evt_setSeqNr (evt, seqNr++); */
-/*     Evt_setRunNr (evt, runNr); */
-
-    strcpy (rfioFileName, theArgs->expId);
-    strftime (rfioFileName + strlen (rfioFileName), 18, "%y%j%H%M%S.hld",
-             localtime (&ourTime));    
-    strcat (rfioPath, "/");
-    strcat (rfioPath, rfioFileName);
-    fprintf ( stderr, "<D> evtbuild.c:rfio_openConnection(): try to open remote file %s\n", rfioPath );
-    rfioStatus = rfio_fnewfile( fRemote, rfioPath );
-    fprintf ( stderr, "<D> evtbuild.c:rfio_openConnection(): opened remote file %s\n", rfioPath );
-
-    if (rfioStatus) {
-      fprintf ( stderr, "<E> evtbuild.c:rfio_openFile(): cannot open remote file %s\n", rfioPath );
-      syslog (LOG_ERR, "<E> evtbuild.c:rfio_openFile(): cannot open remote file %s", rfioPath);
-      exit (EXIT_FAILURE);
-    }
-  }
-
-  return 0;
-}
-
-static int rfio_closeFile (TheArgs * theArgs)
-{
-  if( theArgs->rfioFlag && fRemote != NULL ){
-    int rfioStatus;
-
-    rfioStatus = rfio_fendfile( fRemote );
-    
-    if (rfioStatus)
-    {
-      fprintf ( stderr, "<E> evtbuild.c:rfio_openFile(): cannot close remote file\n");
-      syslog (LOG_ERR, "<E> evtbuild.c:rfio_openFile(): cannot close remote file");
-      exit (EXIT_FAILURE);
-
-    }
-  }
-
-  return 0;
-}
-
-static int rfio_writeFile (TheArgs * theArgs, void *evt)
-{
-  if( theArgs->rfioFlag && fRemote != NULL ){
-    int rfioStatus = 0;
-
-    Evt_setSeqNr( evt, seqNr++ );
-    Evt_setRunNr( evt, runNr );
-
-    rfioStatus = rfio_fwrite( evt, 1, Evt_paddedSize(evt), fRemote );
-    
-    if (rfioStatus <= 0)
-    {
-      fprintf ( stderr, "<E> evtbuild.c:rfio_writeFile(): writing file failed\n");
-
-      return 1;
-    }
-  }
-
-  return 0;
-}
-
-#endif /* ifdef RFIO */
+#endif
 
 /* BUGBUG bailOut not proper yet */
 int main (int argc, char *argv[])
 {
+
   int i;
   TheArgs theArgsS, *theArgs = &theArgsS;
   TheStats theStatsS, *theStats = &theStatsS;
@@ -1555,62 +1541,42 @@ int main (int argc, char *argv[])
   argsDefault (theArgs);
   argsFromParam (theArgs, argc, argv);
 
-  printf("theArgs->varQSizeCnt = %d\n",theArgs->varQSizeCnt);
+  if (0 > argsFromCL (theArgs, argc, argv)) {
+      usage (argv[0]);
+      exit (EXIT_FAILURE);
+  }
 
-  for (i = 0; i < theArgs->varQSizeCnt; i++) {
-    printf("theArgs->varQSize = %d\n", theArgs->varQSize[i]);
+  if( argsCheck(theArgs) ) {
+    sleep(10);
+    exit (EXIT_FAILURE);
   }
 
-  printf("theArgs->queueSize: %d\n", theArgs->queueSize);
-  printf("theArgs->nrOfMsgs: %d\n", theArgs->nrOfMsgs);
-  printf("theArgs->outPath: %s\n", theArgs->outPath);
-  printf("theArgs->outDev: %s\n", theArgs->outDev);
+  for (i = 0; prioritynames[i].c_name != NULL && 
+        0 != strcmp (prioritynames[i].c_name, theArgs->verbosity); i++) {}
 
-  if (0 > argsFromCL (theArgs, argc, argv))
-    {
-      usage (argv[0]);
-      exit (EXIT_FAILURE);
-    }
-  for (i = 0;
-       prioritynames[i].c_name != NULL
-       && 0 != strcmp (prioritynames[i].c_name, theArgs->verbosity); i++)
-    {
-    }
   if (prioritynames[i].c_name == NULL)
-    {
       exit (EXIT_FAILURE);
-    }
   else
-    {
       setlogmask (LOG_UPTO (prioritynames[i].c_val));
-    }
 
   /* normalize experiment id */
   theArgs->expId[0] = tolower (theArgs->expId[0]);
   theArgs->expId[1] = tolower (theArgs->expId[1]);
   theArgs->expId[2] = '\0';
 
-  if (NULL ==
-      (worker =
-       Worker_initBegin (argv[0], sigHandler, theArgs->priority,
-                        theArgs->isStandalone)))
-    {
+  if (NULL == (worker = Worker_initBegin (argv[0], sigHandler, theArgs->priority,
+                        theArgs->isStandalone))) {
       syslog (LOG_ERR, "%s, %d: %s", __FILE__, __LINE__, strerror (errno));
       exit (EXIT_FAILURE);
-    }
-  if (theArgs->no_rpc)
-    {
+  }
+
+  if (theArgs->no_rpc) 
       syslog (LOG_WARNING, "DISABLE of online service");
-    }
-  else
-    {
-      if (-1 == initOnline ())
-       {
+  else 
+    if (-1 == initOnline ()) 
          syslog (LOG_WARNING, "unable to initialize online service");
-       }
-    }
-  if (theArgs->nrOfMsgs == 0)
-    {
+
+  if (theArgs->nrOfMsgs == 0) {
       /*
          no '-m' option was on command line, we assume that the
          readout task (daq_readout) is running on the same node and
@@ -1624,14 +1590,12 @@ int main (int argc, char *argv[])
       shmTrans[0] = ShmTrans_create ("subevtqueue", 2 * theArgs->queueSize);
       hadTuQueue[0] = NULL;
       theArgs->nrOfMsgs = 1;
-    }
-  else
-    {
+  }
+  else {
       shmTrans = malloc (theArgs->nrOfMsgs * sizeof (ShmTrans *));
       hadTuQueue = malloc (theArgs->nrOfMsgs * sizeof (HadTuQueue *));
 
-      for (i = 0; i < theArgs->nrOfMsgs; i++)
-       {
+      for (i = 0; i < theArgs->nrOfMsgs; i++) {
          char buf[_POSIX_PATH_MAX];
 
          sprintf (buf, "netqueue%d", i);
@@ -1649,32 +1613,44 @@ int main (int argc, char *argv[])
          }
 
          hadTuQueue[i] = NULL;
-       }
-    }
+      }
+  }
 
-  theStats->evtsDiscarded = Worker_addStatistic (worker, "evtsDiscarded");
-  theStats->evtsComplete = Worker_addStatistic (worker, "evtsComplete");
-  theStats->evtsDataError = Worker_addStatistic (worker, "evtsDataError");
-  theStats->evtsTagError = Worker_addStatistic (worker, "evtsTagError");
-  theStats->bytesWritten = Worker_addStatistic (worker, "bytesWritten");
+  theStats->evtsDiscarded    = Worker_addStatistic (worker, "evtsDiscarded");
+  theStats->evtsComplete     = Worker_addStatistic (worker, "evtsComplete");
+  theStats->evtsCompletePS   = Worker_addStatistic (worker, "evtsCompletePS");
+  theStats->evtsDataError    = Worker_addStatistic (worker, "evtsDataError");
+  theStats->evtsTagError     = Worker_addStatistic (worker, "evtsTagError");
+  theStats->bytesWritten     = Worker_addStatistic (worker, "bytesWritten");
+  theStats->bytesWrittenPS   = Worker_addStatistic (worker, "bytesWrittenPS");
+  theStats->runId            = Worker_addStatistic (worker, "runId");
+  (*theStats->runId)         = 0; /* initialize to zero */
 
   unsigned long *retVal = NULL;
-
   theStats->evtsRes = (unsigned long *) &retVal;
-  for (i = 0; i < NEVTIDS; i++)
-    {
-      char buf[WORKER_MAX_NAME_LEN];
 
-      sprintf (buf, "evtId%d", i);
-      theStats->evtId[i] = Worker_addStatistic (worker, buf);
-    }
-  for (i = 0; i < theArgs->nrOfMsgs; i++)
-    {
-      char buf[WORKER_MAX_NAME_LEN];
+  for (i = 0; i < NEVTIDS; i++) {
+    char buf[WORKER_MAX_NAME_LEN];
+    
+    sprintf (buf, "evtId%d", i);
+    theStats->evtId[i] = Worker_addStatistic (worker, buf);
+  }
+  for (i = 0; i < theArgs->nrOfMsgs; i++) {
+    char buf[WORKER_MAX_NAME_LEN];
+    
+    sprintf (buf, "trigNr%d", i);
+    theStats->trigNr[i] = Worker_addStatistic (worker, buf);
+  }
 
-      sprintf (buf, "trigNr%d", i);
-      theStats->trigNr[i] = Worker_addStatistic (worker, buf);
-    }
+  /* Add statistic for fill levels of buffers. */
+  for( i=0; i<theArgs->nrOfMsgs; i++ ) {
+    char buf[WORKER_MAX_NAME_LEN];
+    sprintf( buf, "evtbuildBuff%d", i );
+    theStats->evtbuildBuff[i] = Worker_addStatistic( worker, buf );
+  }
+
+  theStats->nrOfMsgs = Worker_addStatistic( worker, "nrOfMsgs" );
+  (*theStats->nrOfMsgs) = theArgs->nrOfMsgs;  
 
   argsDump (theArgs);
 
@@ -1694,57 +1670,64 @@ int main (int argc, char *argv[])
       int dataError = 0;
       int tagError = 0;
 
+      add2Stat( theArgs, theStats, 1, shmTrans );
       statsBufferDump( theArgs, theStats, 1, hadTuQueue, shmTrans );
       statsDump (theArgs, theStats, 1);
 
       if (*theStats->bytesWritten == 0)
        {
-         res_time = ourTime = time (NULL);
+
+         if( theArgs->epicsCtrl ) {
+           runNr = getRunId( theArgs );
+           res_time = ourTime = runNr + TIMEOFFSET;
+         }
+         else
+           res_time = ourTime = time (NULL);
+
+
          if (-1 == openFile (theArgs))
            {
              syslog (LOG_ERR, "error opening output file, exiting");
              exit (EXIT_FAILURE);
            }
 
-#ifdef RFIO
-         rfio_openFile( theArgs );
-#endif
-
          storeInfoStart (argv[0], ourTime, theArgs);
 
          /* store simple start run info */
          storeRunInfoStart(ourTime, theArgs);    
 
          evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStart);
-         for (i = 0; i < theArgs->slowCtrlFileCnt; i++)
-           {
+
+         for (i = 0; i < theArgs->slowCtrlFileCnt; i++) {
              evt = appendFile (evt, theArgs->slowCtrlFiles[i]);
            }
+
          (*theStats->bytesWritten) += Evt_size (evt);
          writeFile (evt);
          deleteEvt (evt);
        }
+
       if ((*theStats->evtsRes) == 0)
        {
 #if 0
          ourTime = time (NULL);
 #endif
 /* remote event server  - resdownscale - resnumevents*/
-         if (theArgs->resdownscale)
-           {
-             if (-1 == openRESFile (theArgs))
-               {
+         if (theArgs->resdownscale) {
+             if (-1 == openRESFile (theArgs)) {
                  syslog (LOG_ERR, "error opening RES output file");
                  exit (EXIT_FAILURE);
                }
+
              evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStart);
              (*theStats->evtsRes)++;
              writeRESFile (evt);
              deleteEvt (evt);
            }
-
        }
+
       evt = newEvt (EvtDecoding_64bitAligned, EvtId_data);
+
       for (i = 0; i < theArgs->nrOfMsgs && !evtIsBroken; i += step)
        {
          uint32_t trigNr;
@@ -1770,33 +1753,31 @@ int main (int argc, char *argv[])
            {
              currTrigNr = SubEvt_trigNr (subEvt) >> 8;
              currTrigTag = SubEvt_trigNr (subEvt) & 0xff;
+
              if (theArgs->evtId != 0)
-               {
                  currId = theArgs->evtId;
-               }
              else
-               {
                  currId = SubEvt_pureId (subEvt);
-               }
+
              syslog (LOG_DEBUG,
                      "currTrigNr: 0x%06x, currTrigTag 0x%02x, currId 0x%08x",
                      currTrigNr, currTrigTag, currId);
            }
+
          trigNr = SubEvt_trigNr (subEvt) >> 8;
          trigTag = SubEvt_trigNr (subEvt) & 0xff;
+
          if (trigNr == currTrigNr)
            {
              if (SubEvt_size (subEvt) > SubEvt_hdrSize ())
                {
                  /* sub evt is not empty */
                  if (SubEvt_dataError (subEvt))
-                   {
                      dataError = 1;
-                   }
+
                  if (trigTag != currTrigTag)
-                   {
                      tagError = 1;
-                   }
+
                  evt = Evt_appendSubEvt (evt, subEvt);
                }
              HadTuQueue_pop (hadTuQueue[i]);
@@ -1842,13 +1823,21 @@ int main (int argc, char *argv[])
            }
          (*theStats->bytesWritten) += Evt_size (evt);
          writeFile (evt);
+
          if (theArgs->resdownscale)
            {
-             (*theStats->evtsRes)++;
-/*           if ((*theStats->evtsRes) % (theArgs->resdownscale) == 0)*/
-             if (((*theStats->evtsRes) % EVENT_NUM_OFFSET) <
+
+             /*
+              *   If the setting are as follows: 
+              *   EVENT_NUM_OFFSET        == 100
+              *   theArgs->resdown_offset == 5     (100/20)
+              *   then from each 100 events only 
+              *   first 5 events are written to refFile. 
+              */
+             if (((*theStats->evtsComplete) % EVENT_NUM_OFFSET) <
                  theArgs->resdown_offset)
                {
+                 (*theStats->evtsRes)++;
                  writeRESFile (evt);
                }
            }
@@ -1861,33 +1850,43 @@ int main (int argc, char *argv[])
           is_mismatch_enough_to_stop(theArgs, theStats);
 #endif
        }
+
       deleteEvt (evt);
-      if ((*theStats->evtsRes) % 600 == 0)
-       {
-#if 0
-         printf ("condition: %u > %u\n", (*theStats->bytesWritten),
-                 ((theArgs->maxFileSz) - (theArgs->queueSize)));
-         printf ("maxFileSz %u ; queueSize: %u\n", (theArgs->maxFileSz),
-                 theArgs->queueSize);
-#endif
-       }
-      if ((*theStats->bytesWritten) >=
-         ((theArgs->maxFileSz) - (theArgs->queueSize)))
+
+      newRunId = getRunId( theArgs );
+
+      /*
+       *  The following conditions mean:
+       *  theArgs->epicsCtrl == 1 && runNr < newRunId
+       *           New RUN Id was generated, close the file and open new one.
+       *  theArgs->epicsCtrl == 1 && (*theStats->bytesWritten) >= 1900000000
+       *           Due to whatever reason the file size exceeded an allowed limit,
+       *           close the file.
+       *  theArgs->epicsCtrl == 1 && runNr == 0
+       *           Something went wrong with sinchronization of Event Builders,
+       *           close the file.
+       */          
+      if ( ( !(theArgs->epicsCtrl) && (*theStats->bytesWritten) >= ((theArgs->maxFileSz) - (theArgs->queueSize)) ) ||
+          (theArgs->epicsCtrl && runNr < newRunId) ||
+          (theArgs->epicsCtrl && (*theStats->bytesWritten) >= 1900000000) ||
+          (theArgs->epicsCtrl && newRunId == 0))
        {
          evt = newEvt (EvtDecoding_64bitAligned, EvtId_runStop);
+
          for (i = 0; i < theArgs->slowCtrlFileCnt; i++)
-           {
              evt = appendFile (evt, theArgs->slowCtrlFiles[i]);
-           }
+
          (*theStats->bytesWritten) += Evt_size (evt);
          writeFile (evt);
          deleteEvt (evt);
 
-         ourTime = time (NULL);
+         if( theArgs->epicsCtrl )
+           ourTime = newRunId + TIMEOFFSET;
+         else
+           ourTime = time (NULL);
+
          closeFile ();
-#ifdef RFIO
-         rfio_closeFile( theArgs );
-#endif
+
          storeInfoStop (argv[0], ourTime - 2, worker, theArgs);
 
          /* store simple stop run info */
@@ -1898,19 +1897,22 @@ int main (int argc, char *argv[])
          (*theStats->evtsDiscarded) = 0;
          (*theStats->evtsDataError) = 0;
          (*theStats->evtsTagError) = 0;
+
          for (i = 0; i < theArgs->nrOfMsgs; i++)
-           {
              (*theStats->trigNr[i]) = 0;
-           }
+
          for (i = 0; i < NEVTIDS; i++)
-           {
              (*theStats->evtId[i]) = 0;
-           }
+
        }
+
       if (theArgs->resdownscale)
        {
-         if ((*theStats->evtsRes) >=
-             (theArgs->resdownscale) * (theArgs->resnumevents))
+         /*
+          *   Number of events written to resFile is 
+          *   limited to resnumevents.
+          */
+         if ((*theStats->evtsRes) >= theArgs->resnumevents)
            {
 #if 0
              ourTime = time (NULL);
@@ -1923,16 +1925,17 @@ int main (int argc, char *argv[])
            }
        }
     }
+  
   ourTime = time (NULL);
   closeFile ();
+
 #ifdef RFIO
-  rfio_closeFile( theArgs );
   rfio_closeConnection( theArgs ); 
 #endif
+
   if (theArgs->resdownscale)
-    {
       closeRESFile (theArgs);
-    }
+
   storeInfoStop (argv[0], ourTime - 2, worker, theArgs);
 
   /* store simple stop run info */
@@ -1941,9 +1944,8 @@ int main (int argc, char *argv[])
   statsDump (theArgs, theStats, 1);
 
   for (i = 0; i < theArgs->nrOfMsgs; i++)
-    {
       ShmTrans_remove (shmTrans[i]);
-    }
+
   finiOnline ();
   Worker_fini (worker);
   exit (EXIT_SUCCESS);