]> jspc29.x-matter.uni-frankfurt.de Git - daqdata.git/commitdiff
Initial revision
authorhadaq <hadaq>
Sat, 1 May 2010 12:05:40 +0000 (12:05 +0000)
committerhadaq <hadaq>
Sat, 1 May 2010 12:05:40 +0000 (12:05 +0000)
disks/Makefile [new file with mode: 0644]
disks/cleanup.pl [new file with mode: 0755]
disks/daq_disks [new file with mode: 0755]
disks/disks.c [new file with mode: 0644]

diff --git a/disks/Makefile b/disks/Makefile
new file mode 100644 (file)
index 0000000..7dc6494
--- /dev/null
@@ -0,0 +1,16 @@
+CPPFLAGS = -I/home/hadaq/soft/evtbuild/include
+LDFLAGS  = -L/home/hadaq/soft/evtbuild/lib
+LIBS = -lhadaq -lcompat -lrt
+
+daq_disks: disks.o
+       $(CC) -o daq_disks $(LDFLAGS) disks.o $(LIBS)
+
+disks.o: disks.c
+       $(CC) -c $(CPPFLAGS) -c disks.c
+
+clean:
+       rm  -f  daq_disks *~ core *.o
+
+
+
+
diff --git a/disks/cleanup.pl b/disks/cleanup.pl
new file mode 100755 (executable)
index 0000000..a9b77ad
--- /dev/null
@@ -0,0 +1,257 @@
+#!/usr/bin/perl -w
+
+use strict;
+use Getopt::Long;
+use Data::Dumper;
+use threads;
+use threads::shared;
+use IO::Socket;
+use IO::Select;
+
+# Assume that there are four (or less) EB processes running per server.
+# Each process writes to disks about 40 MB/s. 
+# Total rate: 4 procs x 40 MB/s x 60 sec x 60 min = 0.5 TB/hour
+#
+# Total disk space per server: 2 TB x 22 disks = 44 TB
+# 20 % of total disk space: 8 TB which means 4000 files (of 2 GB size)
+# or 8 TB = 16 hours of running assuming 0.5 TB/hour
+# It should be enough to remove 250 files each 30 minutes 
+# if the limit of 20 % is reached.
+#
+# The following we are not going to do (we simply rm old files from disk):
+# Secondly, we have to move files from the disks with numbers equal to
+# EB proc numbers (if these disks are getting full) to other disks.
+# If less than 20% disk space left -> we move the files (0.5 TB) to other disks.
+# 0.5 TB / 2 GB = 250 files to move  
+#
+
+my $threshold = 20;  # free disk space in %
+my $opt_test = 0;
+my $opt_verb = 0;
+
+GetOptions ('t|test'      => \$opt_test,
+           'v|verb'      => \$opt_verb);
+
+my %disks_hash;  # Hash of disks with their available free space
+my $disks_href = \%disks_hash;
+
+my $status : shared = "OK";
+my @screenPID;
+
+our $server_port = '50501';
+our $protocol    = 'tcp';
+
+threads->new( \&statusServer);
+
+while(1){
+    &checkDisks();
+    &cleanup();
+    
+    sleep(600);  # 10 minutes
+}
+
+exit(0);
+
+########################### END OF MAIN #########################
+
+sub cleanup(){
+
+    my $tot_space  = 0;
+    my $free_space = 0;
+
+    #- Loop over disk numbers
+    foreach my $num (1..22){
+       $tot_space  = $tot_space  + $disks_href->{$num}->{'tot'};
+       $free_space = $free_space + $disks_href->{$num}->{'free'};
+    }
+
+    my $free_all = int(100*$free_space/$tot_space);
+
+    $status = "";
+
+    print "Total free disk space: $free_all\n" if( $opt_verb );
+
+    if( $free_all < $threshold ){
+       $status = "WARNING - cleaning up, total free disk space: $free_all";
+       &rmData("/data*/data/*", 500);
+    }
+    else{
+       $status = "OK - total free disk space: $free_all";
+    }
+
+    # There are special data disks, where
+    # EB procs start to write their first files before EB procs
+    # got disk numbers from daq_disks process.
+    # The numbers of these disks are equal to the numbers of
+    # shared mem segments of EB procs.
+    # We have to take special care of cleaning these disks up.
+
+    my @eb_list;
+
+    #- Get EB numbers from shared mem segment names
+    foreach my $num (1..16){
+       my $shmem = "/dev/shm/daq_evtbuild" . $num . ".shm";
+
+       #- If the shared memory segment exists
+       if( -e $shmem ){
+           push(@eb_list, $num);
+       } 
+    }
+
+    #- Check free disk space on the disks with the numbers from @eb_list
+    foreach my $ebnum (@eb_list){
+       my $free = $disks_href->{$ebnum}->{'free'};
+       my $tot  = $disks_href->{$ebnum}->{'tot'};
+       
+       #- If free space is below 20%
+       if( 100 * $free / $tot < 20 ){
+           my $path = sprintf("/data%02d/data/*", $ebnum);
+
+           &rmData("/data*/data/*", 100);
+       }
+    }
+}
+
+sub rmData()
+{
+    #- Remove old data
+
+    my ($path, $numOfFiles) = @_;
+
+    my @data = glob($path);
+
+    #- Get the files sorted by size. File with largest size comes first.
+    my @sorted_data = sort {-M $b <=> -M $a} @data;
+
+    my $file_counter = 0;
+
+    foreach my $hldfile (@sorted_data){
+       my $cmd = "rm $hldfile";
+       print "exe: $cmd\n" if( $opt_verb );
+       #system($cmd);
+
+       if($file_counter >= $numOfFiles){
+           last;
+       }
+
+       $file_counter++;
+
+       sleep(1);
+    }
+
+    exit(0);
+}
+
+sub checkDisks()
+{
+
+    my @df_info = `df -m`;
+
+    foreach my $line (@df_info){
+
+       my ($fsys, $tot, $used, $avail, $pers, $mount) = split(/ +/, $line);
+
+       chomp($mount);
+
+       #- Loop over disk numbers
+       foreach my $num (1..22){
+           my $diskName = sprintf("/data%02d", $num);
+           
+           if($diskName eq $mount){
+               #- Same free space for given '/dataxx' disk
+               $disks_href->{$num}->{'tot'}   = $tot;
+               $disks_href->{$num}->{'free'}  = $avail;
+           }
+       }
+    }
+}
+
+sub statusServer{ my $server_socket;
+    my $client_socket;
+    my $selector;
+
+    unless (defined( $server_socket =
+                     IO::Socket::INET->new( LocalPort => $server_port,
+                                            Proto     => 'tcp',
+                                            Listen    => SOMAXCONN ) ))
+    {
+        print "ERROR: Cannot start status server!\n";
+    }
+
+    $selector = new IO::Select( $server_socket );
+
+    while(1) {
+
+        # wait 5 seconds for connections
+        while (my @file_handles = $selector->can_read( 5 )) {
+
+            foreach my $file_handle (@file_handles) {
+
+                if($file_handle == $server_socket) {
+
+                    # create a new socket for this transaction
+                    unless (defined( $client_socket = $server_socket->accept() ))
+                    {
+                        print "ERROR: Cannot open socket to send status!\n";
+                    }
+
+                    print $client_socket $status;
+
+                    close( $client_socket );
+                }
+            }
+        }
+    }
+}
+
+sub mvData()
+{
+    #- Move old data
+    #
+    # This is required for special data disks, where
+    # EB procs start to write their first files before EB procs
+    # got disk numbers from daq_disks process.
+
+    my @eb_list;
+
+    #- Get EB numbers from shared mem segment names
+    foreach my $num (1..16){
+       my $shmem = "/dev/shm/daq_evtbuild" . $num . ".shm";
+
+       #- If the shared memory segment exists
+       if( -e $shmem ){
+           push(@eb_list, $num);
+       } 
+    }
+    
+    #- Check free disk space on the disks with the numbers from @eb_list
+    foreach my $ebnum (@eb_list){
+       my $free = $disks_href->{$ebnum}->{'free'};
+       my $tot  = $disks_href->{$ebnum}->{'tot'};
+
+       #- If free space is below 20%
+       if( 100 * $free / $tot < 20 ){
+
+           my $diskName = sprintf("/data%02d", $ebnum);
+
+           my @data = glob("$diskName/data/*");
+
+           #- In this sorting, oldest file comes first
+           my @sorted_data = sort {-M $b <=> -M $a} @data;
+
+           #- In this sorting, disk with largest free space comes first
+           my @disk_nums = sort { $disks_hash{$b}{'free'} cmp $disks_hash{$a}{'free'} } keys %$disks_href;
+
+           #- Move 0.5 TB (250 files of 2 GB size) to other disks
+           my $i_file = 0;
+           my $i_disk = 0;
+
+           foreach my $file (@sorted_data){
+               
+               my $free_space = $disks_href->{$disk_nums[$i_disk]}->{'free'};
+
+               #print "empty_disk: $empty_disk, $fspace\n";
+           }
+       }
+    }
+}
diff --git a/disks/daq_disks b/disks/daq_disks
new file mode 100755 (executable)
index 0000000..ec4d215
Binary files /dev/null and b/disks/daq_disks differ
diff --git a/disks/disks.c b/disks/disks.c
new file mode 100644 (file)
index 0000000..edf9d20
--- /dev/null
@@ -0,0 +1,276 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <getopt.h>
+#include <sys/stat.h>
+#include <string.h>
+#include <assert.h>
+
+#include "shmtrans.h"
+#include "worker.h" 
+
+#define BUFFSIZE = 10000
+
+FILE *popen(const char *, const char *);
+char *strdup(const char *);
+
+const char data[22][8] = {"/data01", "/data02", "/data03", "/data04", "/data05",
+                         "/data06", "/data07", "/data08", "/data09", "/data10",
+                         "/data11", "/data12", "/data13", "/data14", "/data15",
+                         "/data16", "/data17", "/data18", "/data19", "/data20",
+                         "/data21", "/data22"};
+
+const char space[] = " ";
+
+char diskNum[] = "diskNum";
+
+int data_disks[22];  /* Available disk space */
+
+char shmemBase[] = "daq_evtbuild";  /* Base name for shared memory segments */
+int shmem_nums[16];  /* Numbers for found shared memory segments */
+int nrOfShmSegms;    /* Total number of found shared memory segments */
+
+typedef struct TheArgsS {
+  int sleepTime;   /*seconds*/
+  int debug;
+} TheArgs;
+
+static void usage(const char *progName)
+{
+        printf( "Usage: %s [-h] : Print help.", progName);
+       printf( "       [-s <time>] : Sleep time.");
+}
+
+static void argsDefault(TheArgs *my)
+{
+  my->sleepTime = 1;
+
+  /* Initialize the array with shared mem segment numbers */
+  int i;
+  for(i=0; i<16; i++){
+    shmem_nums[i] = -1;
+  }
+}
+
+static int argsFromCL(TheArgs *my, int argc, char *argv[])
+{
+       extern char *optarg;
+       int i;
+       while (1) {
+               /* int this_option_optind = optind ? optind : 1; */
+               int option_index = 0;
+
+               static struct option long_options[] = {
+                       {"help", 0, 0, 'h'},
+                       {"sleep", 1, 0, 's'},
+                       {"debug", 0, 0, 'D'},
+                       {0, 0, 0, 0}
+               };
+               i = getopt_long(argc, argv, "Dhs:", long_options,
+                               &option_index);
+               if (i == -1)
+                       break;
+               switch (i) {
+               case 'h':
+                       usage(argv[0]);
+                       return -1;
+                       break;
+               case 's':
+                       my->sleepTime = strtoul(optarg, NULL, 0);
+                       break;
+               case 'D':
+                       my->debug = 1;
+                       break;
+               default:
+                       usage(argv[0]);
+                       return -1;
+                       break;
+               }
+       }
+       return 0;
+}
+
+int checkDisks(TheArgs *theArgs)
+{
+  /* Read the available disk space with 'df' */
+
+  int  count;
+  char buff[200] = "";
+  FILE* stream = popen("/bin/df -m", "r");
+
+  char **array;
+  array = malloc( sizeof(char *) * 100 );
+  assert(array != NULL);
+
+  count = 0;
+
+  while (1)
+  {
+    fgets( buff, 200, stream );
+    if( feof(stream) )
+      break;
+
+    array[count++] = strdup(buff);
+  }
+
+  fclose(stream);
+
+  array[count++] = (char *) NULL;
+
+  char tmp[8];
+  char *token;
+  char *ptr;
+
+  for( ; *array != NULL; array++ ){
+
+    int k;
+    for(k=0; k<22; k++){
+
+      /* Find string with info on the data disks */
+      ptr = strstr(*array, data[k]);
+      if(ptr == NULL)
+       continue;
+
+      /* Split string by spaces */
+      strcpy(tmp,*array);
+      token = strtok(tmp, space);     /* token => "Filesystem" */
+      token = strtok(NULL, space);    /* token => "1M-blocks" */
+      token = strtok(NULL, space);    /* token => "Used" */
+      token = strtok(NULL, space);    /* token => "Available" */
+
+      data_disks[k] = atoi(token);
+
+      token = strtok(NULL, space);    /* token => "Use%" */
+      token = strtok(NULL, space);    /* token => "Mounted on" */
+
+    }
+  }
+
+  int i;
+  for(i=0; i<22; i++){
+    printf("sizes[%d] = %d\n", i, data_disks[i]);
+  }
+
+  return 0;
+}
+
+int cleanup(TheArgs *theArgs)
+{
+  /* There is a Perl script which runs as a daemon and performs cleanup 
+   * (removes oldest files from disks) thus we do here nothing.
+   */
+
+  return 0;
+}
+
+int findMaxElement( int tmp[] ){
+  
+  /* Function returns index of max element of the array */
+
+  int k;
+  int maxElement = 0;
+  int maxIndex = 0;
+
+  for( k=0; k<22; k++ ){
+    if( tmp[k] > maxElement ){
+      maxElement = tmp[k];
+      maxIndex   = k;
+    }
+  }
+
+  return k;
+}
+
+int writeShmem(TheArgs *theArgs)
+{
+  /* Loop over all found shared mem segments
+   * and write there disk numbers which have
+   * enough disk space for the data.
+   */
+
+  int i;
+  char shmemName[20];
+  int tmp_data_disks[22];
+  int diskSpace;
+  int diskIndex;
+  unsigned long int iMax;
+
+  /* Temp copy of array with free disk space */
+  for(i=0; i<22; i++){
+    tmp_data_disks[i] = data_disks[i];
+  }
+
+  for(i=0; i<16; i++){
+
+    /* The array contains endings (EB numbers) of shared memory names
+     * which contain diskNum variable. 
+     */
+    if( shmem_nums[i] >= 0 ){
+      sprintf( shmemName, "%s%d", shmemBase, i );
+
+      iMax = findMaxElement(tmp_data_disks);
+
+      if( Worker_setStatistic( shmemName, diskNum, &iMax ) == -1 ) {
+       printf("Error: cannot set '%d' to '%s' in shared memory '%s.shm'\n", iMax, diskNum, shmemName);
+      }
+
+      tmp_data_disks[iMax] = 0;  /* Exclude this disk */
+    }
+  }
+
+  return 0;
+}
+
+int findShmem(TheArgs *theArgs)
+{
+  /*  Loop over all EB numbers (1..16)
+   *  and find shared memory segments opened by daq_evtbuild.
+   *  Save them in a shmem_nums array.
+   */
+
+  char shmemName[20];
+  char shmemPathName[30];
+  int i;
+  int j = 0;
+  unsigned long int tmp;
+
+  for(i=0; i<17; i++){
+    sprintf( shmemName, "%s%d", shmemBase, i );
+    sprintf( shmemPathName, "/dev/shm/%s%d.shm", shmemBase, i );
+    
+    if( access(shmemPathName, F_OK) == 0 ){
+
+      if( Worker_getStatistic( shmemName, diskNum, &tmp ) == -1 ) {
+       if(theArgs->debug)
+         printf("Warning: there is no '%s' in shared memory '%s.shm'\n", diskNum, shmemName);
+      }
+      else{
+       shmem_nums[j] = i;
+       j++;
+      }
+    }
+  }
+
+  return 0;
+}
+
+int main(int argc, char *argv[])
+{
+  TheArgs theArgsS, *theArgs = &theArgsS;
+
+  argsDefault(theArgs);
+  if (0 > argsFromCL(theArgs, argc, argv)) {
+    usage(argv[0]);
+    exit(1);
+  }
+
+  while(1){
+    checkDisks(theArgs);
+    findShmem(theArgs);
+    writeShmem(theArgs);
+
+    sleep(theArgs->sleepTime);
+  }
+
+  return 0;
+}