From: hadaq@countinghouse Date: Tue, 10 Jan 2017 12:06:41 +0000 (+0100) Subject: JAM: add first version of cronjob for nightly export of postgres slow control data... X-Git-Tag: pre2018~1 X-Git-Url: https://jspc29.x-matter.uni-frankfurt.de/git/?a=commitdiff_plain;h=717b28e160e2ca89ef31eba974e557e49a73b754;p=hadesdaq.git JAM: add first version of cronjob for nightly export of postgres slow control data to hades oracle Since this runs in context of lxhadesdaq, we put it together with other daq related oracle stuff. --- diff --git a/oracle/postgres2ora.pl b/oracle/postgres2ora.pl new file mode 100755 index 0000000..ff4ab7b --- /dev/null +++ b/oracle/postgres2ora.pl @@ -0,0 +1,681 @@ +#!/usr/bin/perl -w +################### +# JAM2016 - cronjob script to transfer EPICS slow control archiver data from postgres to hades oracle +# following attempt was based on daq2ora_client.pl and other Perl examples +# it tries to implement Ilse Koenigs suggested code with DBI methods +####### +# V0.1 10-Oct-2016 by JAM( j.adamczewski@gsi.de) - first draft for discussion. Did never run +# V0.2 11-Oct-2016 by JAM( j.adamczewski@gsi.de) - added oracle option. Bugfixes. +# V0.3 13-Oct-2016 by JAM( j.adamczewski@gsi.de) - tested with read acccess to both databases +# V0.4 10-Jan-2017 by JAM - renamed to postgres2ora.pl, exclude passwords from code, check if table is empty + +use English; +use strict; +use Getopt::Long; +use Data::Dumper; +#use Scalar::Util qw(reftype); +use IO::Socket; +use IO::Select; +use FileHandle; +use Time::Local; +use DateTime; +use threads; +use threads::shared; + +use DBI; +use DBD::ODBC; + +#my $cmd_server_port = 4799; +#my $cmd_server_prtcl = 'tcp'; +#my $opt_sport = 50998; # open this port for status server +my $opt_help = 0; +my $opt_verb = 0; +my $opt_oracle = 0; + +# JAM optionally change account here: +my $orauser = 'DAQ_PUB'; +my $orapass = 'please_give_oracle_password'; +my $oradatabase = 'db-hades-test'; +#my $database = 'db-hades'; + +# JAM access details for postgres +my $pguser = 'report'; +my $pgpass = 'please_give_postgres_password'; + +my $pgdatabase = 'archive'; +my $pghost = 'lxhadeb05.gsi.de'; +my $pgport = '5432'; + +# JAM real passwords are delivered by command line options now. + + +GetOptions ('h|help' => \$opt_help, + 'v|verb' => \$opt_verb, + 'o|oracle' => \$opt_oracle, + 'opass=s' => \$orapass, + 'ppass=s' => \$pgpass + ); + +if( $opt_help ) { + &help(); + exit(0); +} + + + + + +# JAM here definition of table names +my $pgseverity = 'severity'; +my $oraseverity = 'hades_scs.pgimport_severity'; +my $pgstatus = 'status'; +my $orastatus = 'hades_scs.pgimport_status'; +my $orahcsschannel = 'hades_scs.hcss_import_channel'; +my $pgchannel = 'channel'; +my $pgnummeta = 'num_metadata'; +my $pgsample = 'sample'; +my $orasample = 'hades_scs.pgimport_sample'; +my $orachannel = 'hades_scs.pgimport_channel'; + +my $ExitCode : shared = -1; +my $status : shared = "OK"; + +#- POSIX signal handlers: see signal(7) or kill(1) for available signals +foreach my $signal ( qw(HUP INT QUIT ILL ABRT FPE SEGV TERM USR1 USR2) ){ + $SIG{$signal} = sub { &finishAndExit( $signal ); }; +} + + +my $app_logfile = "/home/hadaq/log/postgres2ora.log"; + +#- Daemonize +# if($opt_daemon){ +# open(STDIN, '>/dev/null'); +# open(STDOUT, ">$app_logfile") || die "Cannot redirect STDOUT"; +# open(STDERR, ">&STDOUT") || die "Cannot dup STDERR"; +# select STDERR; $| = 1; # make unbuffered +# select STDOUT; $| = 1; # make unbuffered +# } + + +# JAM2016 - we do not need statusserver for cronjob +#-------- Start status server thread +# threads->new( \&statusServer ); +# TODO if we should work with statusserver/daemon mode later, still need infinite main loop with check of time + + +#-------- need to explicitely set oracle environment if this was started by icinga restarthandler (not-login ssh!) +$ENV{ORACLE_HOME}='/usr/lib/oracle/12.1/client64'; +$ENV{LD_LIBRARY_PATH}='/usr/lib/oracle/12.1/client64/lib'; + +#-------- Connect the database +#$status = "Try to connect to the Oracle Data Base ..."; +print "Try to connect to the Oracle Data Base ...\n" if($opt_verb); + +#- Disable autocommit and enable error handling +#print "database=$oradatabase, user=$orauser,pass=$orapass \n"; +my $oradbh = DBI->connect( "dbi:Oracle:$oradatabase", $orauser, $orapass, {AutoCommit=>0, RaiseError=>1, 'PrintError'=>1}) + or die "Couldn't connect to oracle database: " . DBI->errstr; +#my $dbh = DBI->connect( "dbi:Oracle:$database", $user,$pass, {AutoCommit=>0}); +print "Oracle Connected!\n" if($opt_verb); +print Dumper $oradbh; + + +# JAM connect the postgresql here with second handle. + +print "Try to connect to the Postgres Data Base ...\n" if($opt_verb); +my $pgdbh = DBI->connect( "dbi:Pg:dbname=$pgdatabase;host=$pghost;port=$pgport", $pguser, $pgpass, {AutoCommit=>0, RaiseError=>0, 'PrintError'=>1}) + or die "Couldn't connect to Postgres database: " . DBI->errstr; +print "Postgres Connected!\n" if($opt_verb); +print Dumper $pgdbh; + + +################################################################ +# JAM: here we implement all functions according to ilses plan: + + +# JAM TODO: test if import table is empty here: + +&testOracleImportTableEmpty() or die "Oracle import table is not empty!"; + +#exit(0); +# test only connection up to here + +# function to get current date in perl. +my $dt = DateTime->now - DateTime::Duration->new( days => 1 ); +# -> one day before today +my $impDay = $dt->ymd; +#my $impDay = '2016-11-28'; +#import day ( = yesterday) defined by perl program + +#my $impDayBegin = '2016-09-20 00:00:00.000000'; +my $impDayBegin = "$impDay 00:00:00.000000"; +#begin of import day +my $impDayEnd = "$impDay 23:59:59.999999"; +#end of import day + + + + +print "Got begin:$impDayBegin end:$impDayEnd\n" if($opt_verb); + +# define list of channels to import, is filled later: +my @channelList =(); +# define list of channel last import times to be filled later: +my @channelTimes =(); + + +# The perl program stores all data read from Postgres in staging tables in Oracle +# and starts an Oracle job to process the data. +# 1. Reads full Postgres table SEVERITY and stores all records in Oracle table +# PGIMPORT_SEVERITY. + +&transferSeverities() or die "Failed to transfer severities\n"; + +# 2. Reads full Postgres table STATUS and stores the records in Oracle table +# PGIMPORT_SEVERITY. + +&transferStatus() or die "Failed to transfer status\n"; + + +# 3. Reads the list of active channels (only defined by name) from Oracle table +# HCSS_IMPORT_CHANNEL and stores them in memory (see remark A). + +&readChannelList() or die "Failed to read channel list\n"; + +print "After readChannelList, got (0 - $#channelList) channels\n" if ($opt_verb); + +# +# 4. Then it loops over this list of channels and reads for each channel: +# 4.1 the channel id and the sampling parameters from Postgres table CHANNEL +# and the precision and unit from table NUM_METADATA, +# 4.2 the data from Postgres table SAMPLE since last import (see remark B) +# and stores them in Oracle table PGIMPORT_SAMPLE. +# This must be done in larger batches, single read/write are too slow. +# (see also remark D) +# 4.3 Then the channel parameters are stored in the Oracle table +# PGIMPORT_CHANNEL together with the the start and end time of the import +# day (independing of the sample timestamps, because there might be no data +# at all in the import date range). + +# global variables to keep the per-channel parameters: +my $ch_id=0; +my $ch_name=0; +my $ch_smplid=0; +my $ch_smplval=0; +my $ch_smplper=0; +my $ch_prec=0; +my $ch_unit=0; + + + + +my $pgsth_channel_id = $pgdbh->prepare("SELECT a.channel_id, a.name, a.smpl_mode_id, a.smpl_val, a.smpl_per, + b.prec, b.unit + FROM $pgchannel a LEFT OUTER JOIN $pgnummeta b ON (a.channel_id = b.channel_id) + WHERE a.name = ? ") + or die "Couldn't prepare channel id statement: " . $pgdbh->errstr; + + +# arguments: channel_id, channel_id, lastImpEndtime, channel_id, +my $pgsth_get_samples = $pgdbh->prepare("SELECT channel_id, + to_char(smpl_time,'YYYY-MM-DD HH24:MI:SS.US') as smpl_time, + severity_id, status_id, num_val, float_val, str_val + FROM $pgsample + WHERE channel_id = ? + AND smpl_time = ( SELECT MAX(smpl_time) + FROM $pgsample + WHERE channel_id = ? + AND smpl_time > to_timestamp( ? ,'YYYY-MM-DD HH24:MI:SS.US') + AND smpl_time < to_timestamp('$impDayBegin','YYYY-MM-DD HH24:MI:SS.US') ) + UNION + SELECT channel_id, + to_char(smpl_time,'YYYY-MM-DD HH24:MI:SS.US') as smpl_time, + severity_id, status_id, num_val, float_val, str_val + FROM $pgsample + WHERE channel_id = ? + AND smpl_time >= to_timestamp('$impDayBegin','YYYY-MM-DD HH24:MI:SS.US') + AND smpl_time <= to_timestamp('$impDayEnd','YYYY-MM-DD HH24:MI:SS.US') + ORDER BY smpl_time") + or die "Couldn't prepare get sample statement: " . $pgdbh->errstr; + + +my $orasth_store_samples; +my $orasth_store_paras; + +if($opt_oracle){ +# arguments: channelId, smplTime, $severity, $statusId, $numVal, $floatVal, $strVal +$orasth_store_samples = $oradbh->prepare_cached("INSERT INTO $orasample + ( channel_id, smpl_time, severity_id, status_id, num_val, float_val, str_val) + VALUES ( ? , + to_timestamp(? , 'YYYY-MM-DD HH24:MI:SS.FF6'), + ? , ? , ? , ? , ? )") + or die "Couldn't prepare store samples statement: " . $oradbh->errstr; + + +# arguments: $channelId, $channelName, $smplModeId, $smplVal, $smplPer, $prec, $unit +$orasth_store_paras = $oradbh->prepare_cached("INSERT INTO $orachannel + ( channel_id, channel_name, smpl_mode_id, smpl_val, smpl_per, prec, unit, starttime, endtime ) + VALUES ( ? , ? , ? , ? , ? , ? , ? , + to_timestamp('$impDayBegin','YYYY-MM-DD HH24:MI:SS.FF6'), to_timestamp('$impDayEnd','YYYY-MM-DD HH24:MI:SS.FF6') )") + or die "Couldn't prepare store parameters statement: " . $oradbh->errstr; + +} + +#foreach my $channelName(@channelList) +for my $i (0 .. $#channelList) +{ + &transferChannelData($channelList[$i], $channelTimes[$i]) or printf "Failed to transfer channel of name $channelList[$i]\n"; +} + + + +# 5. Finally the perl program executes the Oracle procedure +# hades_scs.hcss_move_data_make_summary to process the import data. +# This procedure starts an Oracle job to check the data and to copy new data +# into the final tables. After import it starts two other jobs: the first one +# generates the run-based summaries while the second one running some hours +# later truncates the import tables. +&processOracleImportedData() or die "Failed to start Oracle jobs\n"; + + + + +#-------- Finish and disconnect +&finishAndDisconnect(); +print "The End.\n" if ($opt_verb); +exit(0); + +################### END OF MAIN #################### + + + + + + + +sub help() +{ + print "\n"; + print << 'EOF'; +postgres2ora.pl + + This script exports slow control data from postgres to oracle +Usage: + + Command line: postgres2ora.pl + [-h|--help] : Print this help. + [-o|--oracle] : Really import into Oracle (otherwise just dummy mode) + [-opass orapass] : Specify password for Oracle database + [-ppass pgpass] : Specify password for Posstgres database + [-v|--verb] : More verbose. + +Examples: + + Transfer slow control archiver data from postgres to Oracle and be verbose: + postgres2ora.pl -v -o + +EOF +} + +sub finishAndDisconnect() +{ + + $pgsth_channel_id->finish() if(defined $pgsth_channel_id); + $pgsth_get_samples->finish() if(defined $pgsth_get_samples); + + if($opt_oracle){ + $orasth_store_samples->finish() if(defined $orasth_store_samples); + $orasth_store_paras ->finish() if(defined $orasth_store_paras); + } + + if(defined $oradbh){ + $oradbh->disconnect || die "Failed to disconnect from Oracle\n"; + } + if(defined $pgdbh){ + $pgdbh->disconnect || die "Failed to disconnect from Postgres\n"; + } +} + +sub finishAndExit() +{ + # don't allow nested signal handling + return if ($ExitCode ne "-1"); + + # this will stop the treads, too + $ExitCode = shift; + + print "postgres2ora_client.pl exited (signal/exit code: $ExitCode).\n"; + + &finishAndDisconnect(); + + # wait until all threads ended - don't join the main thread or ourselves +# foreach my $thread (threads->list()){ +# $thread->join() +# if ($thread->tid() && !threads::equal( $thread, threads->self() )); +# } + + close(STDOUT); + close(STDERR); + + exit(1); +} + + + +sub transferSeverities() +{ +my $pgsth_severity = $pgdbh->prepare(" SELECT severity_id, name FROM $pgseverity ORDER BY severity_id") + or die "Couldn't prepare statement: " . $pgdbh->errstr; +my $orasth_severity; +if($opt_oracle) +{ + $orasth_severity = $oradbh->prepare_cached("INSERT INTO $oraseverity (severity_id, severity_name) VALUES (?,?)") + or die "Couldn't prepare statement: " . $oradbh->errstr; +} +# get values from postgres: + print "get severities\n" if ($opt_verb); +$pgsth_severity->execute() or die "Couldn't execute statement: " . $pgdbh->errstr; +my $success = 1; +my @data; +while (@data = $pgsth_severity->fetchrow_array()) { + my $sid = $data[0]; + my $sname = $data[1]; + print "inserting severities \t$sid $sname\n" if ($opt_verb); + if($opt_oracle) + { + $success &&= $orasth_severity->execute($sid, $sname); + } + else + { + print " - just pretend!\n" if ($opt_verb); + } +} +# end while +if($opt_oracle) +{ + my $result = ($success ? $oradbh->commit : $oradbh->rollback); + unless ($result) { + die "Couldn't finish transaction: " . $oradbh->errstr; + } + $orasth_severity->finish(); +} +$pgsth_severity->finish(); +return $success; +} + + +sub transferStatus() +{ +my $pgsth_status = $pgdbh->prepare(" SELECT status_id, name FROM $pgstatus ORDER BY status_id") + or die "Couldn't prepare statement: " . $pgdbh->errstr; +my $orasth_status; + +if($opt_oracle){ +$orasth_status = $oradbh->prepare_cached("INSERT INTO $orastatus (status_id, status_name) values (?,?)") + or die "Couldn't prepare statement: " . $oradbh->errstr; +} +# get values from postgres: + print "get status values\n" if ($opt_verb); +$pgsth_status->execute() or die "Couldn't execute statement: " . $pgdbh->errstr;; +my $success = 1; +my @data; +while (@data = $pgsth_status->fetchrow_array()) { + my $stid = $data[0]; + my $stname = $data[1]; + print "inserting status \t$stid $stname\n" if ($opt_verb); + if($opt_oracle) + { + $success &&= $orasth_status->execute($stid, $stname); + } + else + { + print " - just pretend!\n" if ($opt_verb); + } + + } +#end while + +if($opt_oracle) + { + my $result = ($success ? $oradbh->commit : $oradbh->rollback); + unless ($result) { + die "Couldn't finish transaction: " . $oradbh->errstr + } + $orasth_status->finish(); + } +$pgsth_status->finish(); +return $success; +} + + +sub readChannelList() +{ + +my $orasth_channel_list = $oradbh->prepare(" SELECT channel_name, + to_char(cast(nvl(imp_endtime,trunc(sysdate)-90) as timestamp),'YYYY-MM-DD HH24:MI:SS.FF6') AS last_imp_endtime + FROM $orahcsschannel WHERE active = 1 + AND ( imp_endtime IS NULL OR imp_endtime < to_date('$impDay','YYYY-MM-DD') ) + ORDER BY channel_name") + or die "Couldn't prepare statement: " . $oradbh->errstr; + +# get channel names from oracle: +print "get channel names\n"if ($opt_verb); +$orasth_channel_list->execute() or die "Couldn't execute statement: " . $pgdbh->errstr;; +my $success = 1; +my @data; +while (@data = $orasth_channel_list->fetchrow_array()) { + my $chname = $data[0]; + my $last_imp_endtime = $data[1]; + print "getting channel \t$chname with time $last_imp_endtime\n" if ($opt_verb); + push @channelList, $chname; + push @channelTimes, $last_imp_endtime; + } + +$orasth_channel_list->finish(); +return $success; +} + + + +sub transferChannelData() +{ + my ($currentChannel, $currentTime) = @_; + my @data; + print "transferChannelData for $currentChannel\n" if ($opt_verb); + +# 4.1 the channel id and the sampling parameters from Postgres table CHANNEL +# and the precision and unit from table NUM_METADATA, +# a.channel_id, a.name, a.smpl_mode_id, a.smpl_val, a.smpl_per, b.prec, b.unit + $pgsth_channel_id->execute($currentChannel) or print "Couldn't execute channel id statement: " . $pgdbh->errstr; + + + + + unless (@data = $pgsth_channel_id->fetchrow_array()){ + print "Couldn't get channel id from postgres!!!\n " if ($opt_verb); + return 1; + } + $ch_id = $data[0]; + $ch_name = $data[1]; + $ch_smplid = $data[2]; + $ch_smplval = $data[3]; + $ch_smplper = $data[4]; + $ch_prec = $data[5]; + $ch_unit = $data[6]; + + + print "got id:$ch_id name:$ch_name smplid:$ch_smplid smplper:$ch_smplper prec:$ch_prec unit:$ch_unit \n" if ($opt_verb); + + # check here if there is another result of the id select (should not be?) + if (@data = $pgsth_channel_id->fetchrow_array()){ + print "Warning: found duplicate ids1'!!"; + } + + +# 4.2 the data from Postgres table SAMPLE since last import (see remark B) +# and stores them in Oracle table PGIMPORT_SAMPLE. +# This must be done in larger batches, single read/write are too slow. +# (see also remark D) + +print "Getting samples from postgres...\n" if ($opt_verb); +# arguments: channel_id, channel_id, lastImpEndtime, channel_id, +$pgsth_get_samples->execute($ch_id, $ch_id, $currentTime, $ch_id) or die "Couldn't execute get samples statement: " . $pgdbh->errstr;; + +my $success = 1; +my @sampdata; +while (@sampdata = $pgsth_get_samples->fetchrow_array()) { +# TODO : exit counter? +# TODO: fetch all rows into variable arrays, then insert to oracle outside loop? + + +# my $channel_id = $sampdata[0]; +# my $smpl_time = $sampdata[1]; +# my $severity_id = $sampdata[2]; +# my $status_id = $sampdata[3]; +# my $num_val = $sampdata[4]; +# my $float_val = $sampdata[5]; +# my $str_val = $sampdata[6]; +# +# # supress some warnings if certain type is not available: +# $num_val = -1 unless $num_val; +# $float_val = -1.0 unless $float_val; +# $str_val = "nothing" unless $str_val; +# +# print "transfer data - id:$channel_id time:$smpl_time sev_id: $severity_id st_id:$status_id numval: $num_val floatval:$float_val strval:$str_val \n" if ($opt_verb); +# # arguments: channelId, smplTime, $severity, $statusId, $numVal, $floatVal, $strVal + + if($opt_oracle) + { +## print "before orasth_store_samples: $channel_id \n"; +# $success &&= $orasth_store_samples->execute($channel_id, $smpl_time, $severity_id, $status_id, $num_val, $float_val, $str_val); + $success &&= $orasth_store_samples->execute($sampdata[0],$sampdata[1],$sampdata[2],$sampdata[3],$sampdata[4],$sampdata[5],$sampdata[6]); + +unless( $success){ + print "ERROR during store samples!"; + } + } + else + { + print " - just pretend!\n" if ($opt_verb); + } + +} +#end while + +return $success unless($opt_oracle); + + + +#print "before orasth_store_paras: $ch_id \n"; + +# 4.3 Then the channel parameters are stored in the Oracle table +# PGIMPORT_CHANNEL together with the the start and end time of the import +# day (independing of the sample timestamps, because there might be no data +# at all in the import date range) + +# arguments: $channelId, $channelName, $smplModeId, $smplVal, $smplPer, $prec, $unit $ch_prec, $ch_unit + +$success &&= $orasth_store_paras->execute($ch_id, $ch_name, $ch_smplid, $ch_smplval, $ch_smplper, $ch_prec, $ch_unit); + unless( $success){ + print "ERROR during store parameters!"; + } + + print "success is: $success"; + +my $result = ($success ? $oradbh->commit : $oradbh->rollback); + + + +unless ($result) { + die "Couldn't finish transferChannelData transaction: " . $oradbh->errstr + } +return $success; + +} + + +sub processOracleImportedData() +{ + + return 1 unless($opt_oracle); + + my $result=$oradbh->do('BEGIN hades_scs.hcss_move_data_make_summary(); END;'); + return $result; +} + + +sub testOracleImportTableEmpty() +{ + + return 1 unless($opt_oracle); + + my $orasth_testempty = $oradbh->prepare("SELECT ( a.n + b.n + c.n + d.n ) from + ( SELECT COUNT(*) as n FROM $orachannel ) a, + ( SELECT COUNT(*) as n FROM $orasample ) b, + ( SELECT COUNT(*) as n FROM $oraseverity ) c, + ( SELECT COUNT(*) as n FROM $orastatus ) d") + or die "Couldn't prepare statement: " . $oradbh->errstr; + + my $result=$orasth_testempty->execute() or print "Couldn't execute table empty test statement: " . $pgdbh->errstr; + my $success = 1; + my @data; +while (@data = $orasth_testempty->fetchrow_array()) { + my $entries= $data[0]; + print "import tables have $entries entries \n"; + if ($entries == 0) {return 1;} + else {return 0;} + } + + return 0; +} + + + + +# sub statusServer() +# { +# my $server_socket; +# my $client_socket; +# my $selector; +# +# unless (defined( $server_socket = +# IO::Socket::INET->new( LocalPort => $opt_sport, +# Proto => 'tcp', +# Listen => SOMAXCONN ) )) +# { +# print "ERROR: Cannot start status server!\n"; +# } +# +# $selector = new IO::Select( $server_socket ); +# +# while(1) { +# +# # wait 5 seconds for connections +# while (my @file_handles = $selector->can_read( 5 )) { +# +# foreach my $file_handle (@file_handles) { +# +# if($file_handle == $server_socket) { +# +# # create a new socket for this transaction +# unless (defined( $client_socket = $server_socket->accept() )) +# { +# print "ERROR: Cannot open socket to send status!\n"; +# &exitProgram( 2 ); +# } +# +# print $client_socket $status; +# +# close( $client_socket ); +# } +# } +# } +# +# if( $ExitCode ne "-1" ){ +# print "Exit status server thread.\n"; +# close( $server_socket ); +# return; +# } +# } +# }