#!/usr/bin/perl

##########################################################################
#
# This script originally should be called with the following shebang:
#
#  #!/usr/bin/perl -CSAD
#
# Unfortunately the perl version of SLES11 is buggy and does not like
# a script being called this way. So the parameters need to be given
# manually like this:
#
# perl -CSAD /usr/bin/spacewalk-dump-schema --db=susemanager --user=susemanager --password=susemanager > /tmp/db.dump
#
# It is very important to use those options! If you fail to do so,
# the encoding will be incorrect and the dump cannot be imported correctly
#
##########################################################################

use strict;
use warnings FATAL => 'all';

use Spacewalk::Setup ();
use Getopt::Long ();
use DBI ();

my $DATA_DIR = '/etc/sysconfig/rhn/schema-dump';
my $config_file = Spacewalk::Setup::DEFAULT_RHN_CONF_LOCATION;
my %options;
local *FILE;
Spacewalk::Setup::read_config($config_file, \%options);
my @supported_backends = qw( oracle postgresql );
my ($db, $user, $password, $from_backend, $to_backend, $raw, $host, $port, $dump_dir, $upgrade);

sub usage {
        print "Usage:\n\n";
        print "$0 [--from=oracle] [--to=postgresql] [--db=SID --user=USERNAME --password=PASSWORD]\n\n";
        print "$0 [--from=oracle] [--to=oracle] [--db=SID --user=USERNAME --password=PASSWORD] [--dump-dir=/tmp/dir]\n\n";
        print "$0 [--from=postgresql] [--to=postgresql] [--db=DBNAME --host=HOSTNAME --port=PORT --user=USERNAME --password=PASSWORD]\n\n";
        print "$0 [--from=postgresql] [--to=oracle] [--db=DBNAME --host=HOSTNAME --port=PORT --user=USERNAME --password=PASSWORD] [--dump-dir=/tmp/dir]\n\n";
        print "$0 [ --help ]\n\n";
        if ($@) {
                exit shift;
        }
        exit;
}

Getopt::Long::GetOptions (
        'db=s' => \$db,
        'host=s' => \$host,
        'port=s' => \$port,
        'user=s' => \$user,
        'password=s' => \$password,
        'from=s' => \$from_backend,
        'to=s' => \$to_backend,
        'raw' => \$raw,
        'dump-dir=s' => \$dump_dir,
        'help' => \&usage,
        'upgrade' => \$upgrade,
) or exit 1;

if (!system(qq'rpm -q rhn-upgrade > /dev/null') and !$upgrade) {
        print "Package rhn-upgrade detected, which may lead to errors during dump process. Please remove rhn-upgrade package and proceed with dump.\n";
        exit 1;
}

if (grep { defined $_ } $db, $user, $password, $host, $port) {
        # Some connect parameter was specified, let's use it as new connect info
        @options{qw( db_name db_user db_password db_host db_port )} = ( $db, $user, $password, $host, $port );
}

# Setting input and output backend type from optional parameters, use defaultly Oracle to PostgreSQL if not sufficient data provided
$options{db_backend} = $from_backend if defined $from_backend;
$options{db_backend} = "oracle" if not defined $options{db_backend};
$options{db_backend_target} = defined $to_backend ? $to_backend : "postgresql";

# Check if both backends are valid
if ((not grep $_ eq $options{db_backend}, @supported_backends)
  or (not grep $_ eq $options{db_backend_target}, @supported_backends)) {
        die "The $0 can only work with Oracle or PostgreSQL database schema.\n";
}

if ((not -d $DATA_DIR) or ($options{db_backend} eq "postgresql"))  {
  $raw = 1;
}

$ENV{NLS_LANG} = 'AMERICAN_AMERICA.UTF8';
$ENV{NLS_DATE_FORMAT} = 'YYYY-MM-DD HH24:MI:SS';
$ENV{NLS_TIMESTAMP_FORMAT} = 'YYYY-MM-DD HH24:MI:SS.FF';
$ENV{NLS_TIMESTAMP_TZ_FORMAT} = 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM';

# Make settings for different databases, map functions
my $connection_string;
if ($options{db_backend} eq 'postgresql') {
        $connection_string = "dbi:Pg:dbname=".$options{db_name};
        $connection_string .= ";host=".$options{db_host} if $options{db_host};
        $connection_string .= ";port=".$options{db_port} if $options{db_port};
        $options{fetch_row} = \&fetch_row_postgresql;
} else {
        $connection_string = "dbi:Oracle:".$options{db_name};
        $options{fetch_row} = \&fetch_row_oracle;
}

if ($options{db_backend_target} eq 'postgresql') {
        $options{get_custom_type_string} = \&get_ctstr_postgresql;
        $options{blob_string} = \&blobstr_postgresql;
        $options{escape_string} = \&escapestr_postgresql;
        $options{null_string} = '\N';
        $options{record_separator} = "\n";
        $options{process_table_start} = \&process_table_start_postgresql;
        $options{process_table_end} = \&process_table_end_postgresql;
        print STDERR "Warning: Ignoring Oracle-only parameter --dump-dir\n" if defined $dump_dir;
        $options{clear_table_command} = "delete from";
        $options{print_script_header} = \&print_script_header_postgresql;
        $options{print_script_footer} = \&print_script_footer_postgresql;
} else {
        $options{get_custom_type_string} = \&get_ctstr_oracle;
        $options{blob_string} = \&blobstr_oracle;
        $options{escape_string} = \&escapestr_oracle;
        $options{null_string} = '""';
        $options{record_separator} = "\x00\x0a";
        $options{process_table_start} = \&process_table_start_oracle;
        $options{process_table_end} = \&process_table_end_oracle;
        $options{dump_dir} = defined $dump_dir ? $dump_dir : "/tmp/dumped-tables";
        # We need to have tables in files for Oracle
        if (not -d $options{dump_dir}) {
                mkdir $options{dump_dir}, 0755 or die "Cannot create directory [$options{dump_dir}]: $!\n";
        }
        $options{clear_table_command} = "truncate table";
        $options{print_script_header} = \&print_script_header_oracle;
        $options{print_script_footer} = \&print_script_footer_oracle;
}

# Connect to database
my $dbh = DBI->connect($connection_string, $options{db_user}, $options{db_password}, {
        RaiseError => 1,
        FetchHashKeyName => 'NAME_lc',
        LongReadLen => 20_000_000,
        LongTruncOk => 0,
        AutoCommit => 0,
        pg_enable_utf8 => 1,
});

$SIG{'PIPE'} = sub {
        die "SIGPIPE received.\n";
};

process_scripts($dbh, 'pre');

$options{print_script_header}->();

my %ROWIDS = ();
my $SEQ = 1;
process_tables($dbh);
process_sequences($dbh);
if (not $raw) {
        my $dist = `rpm -qf --qf '%{release}\n' /etc/sysconfig/rhn/schema-dump`;
        chomp $dist;
        $dist =~ s/^.+(\.[^\.]+)$/$1/;
        for my $file (sort < $DATA_DIR/sql/* >) {
                local *FILE;
                open FILE, '<', $file or die "Error reading [$file]: $!\n";
                while (<FILE>) {
                        s/\@\@DIST\@\@/$dist/;
                        print;
                }
                close FILE;
        }
}

$options{print_script_footer}->();

END {
        return if (not defined $dbh);

        process_scripts($dbh, 'post');
        $dbh->disconnect();
}

sub get_tables {
        my $dbh = shift;

        my $tables;
        if ($options{db_backend} eq 'postgresql') {
                $tables = $dbh->selectall_hashref(q!
                        select lower(tablename) table_name
                        from pg_catalog.pg_tables
                        where tableowner = ?
                        and tablename not in ('dual')
                        order by table_name
                !, 'table_name', {}, $options{db_user});
        } else {
                $tables = $dbh->selectall_hashref(q!
                        select lower(table_name) table_name
                        from user_tables
                        where table_name not in ('PLAN_TABLE', 'PLAN_TABLE_9I') -- plan_table is not part of our schema
                        order by table_name
                !, 'table_name');
        }
        return $tables;
}

sub get_sequences {
        my $dbh = shift;

        my $sequences;
        if ($options{db_backend} eq 'postgresql') {
                $sequences = $dbh->selectall_hashref(q!
                        select lower(c.relname) sequence_name
                        from pg_class c
                        where c.relkind = 'S'
                        order by sequence_name
                !, 'sequence_name');
                # Select last value of each sequence separately
                foreach my $sequence (keys %{ $sequences }) {
                        my ($last_value, $is_called) = $dbh->selectrow_array("select last_value, is_called from $sequence");
                        $sequences->{$sequence}{last_number} = $last_value;
                        # Logic value from PostgreSQL saying if current value is already used
                        $sequences->{$sequence}{is_called} = $is_called;
                }
        } else {
                $sequences = $dbh->selectall_hashref(q!
                        select lower(sequence_name) sequence_name, last_number
                        from user_sequences
                        order by sequence_name
                !, 'sequence_name');
        }
        return $sequences;
}

sub get_table_scripts {
        my $filename = shift;

        return if (not -f $filename);

        local $_;
        local *FILE;
        open FILE, '<', $filename or die "Error reading [$filename]: $!\n";
        my $data = {};
        my $key;
        my $text;
        while (<FILE>) {
                chomp;
                if (/^\s*$/) {
                        push @{$data->{$key}}, $text if ($text);
                        $text = '';
                } elsif (/^\s/) {
                        $text .= $_;
                } else {
                        $key = join ',', sort split /,/, $_;
                        $data->{$key} = ();
                }
        }
        push @{$data->{$key}}, $text if ($text);
        close FILE;
        return $data;
}

sub process_scripts {
        my $dbh = shift;
        my $stage = shift;

        return if ($raw or not -d "$DATA_DIR/$stage");

        my $tables = get_tables($dbh);

        my %scripts;
        {
                local *DIR;
                opendir DIR, "$DATA_DIR/$stage" or die "Error reading [$DATA_DIR/$stage]: $!\n";
                while (defined($_ = readdir DIR)) {
                        next if /^\.\.?$/ or not exists $tables->{$_};
                        $scripts{$_} = get_table_scripts("$DATA_DIR/$stage/$_");
                }
                closedir DIR;
        }

        foreach my $table (keys %scripts) {
                next if (not defined $tables->{$table});

                my $sth = eval {
                        local $dbh->{PrintError} = 0;
                        $dbh->prepare("select * from $table");
                };

                my $columns;
                if (defined $sth and not $@) {
                        $sth->execute();
                        my $row = $sth->fetchrow_arrayref();
                        $columns = join ',', sort @{$sth->{NAME_lc}};
                } else {
                        die $@ if (not $DBI::err == 942);
                }

                next if (not exists $scripts{$table}{$columns});

                foreach my $script (@{$scripts{$table}{$columns}}) {
                        $sth = $dbh->prepare($script);
                        $sth->execute();
                        $dbh->commit();
                }
        }
}

sub get_sequence_exception {
        local $_;
        my $name = shift;
        my $file = "$DATA_DIR/sequences/$name";
        if (not -f $file) {
                return;
        }
        local *EXCEPTION;
        open EXCEPTION, '<', $file or die "Error reading [$file]: $!\n";
        my $data = '';
        while (<EXCEPTION>) {
                if (/^skip\s*$/) {
                        close EXCEPTION;
                        return 'skip';
                }
                $data .= $_;
        }
        close EXCEPTION;
        return $data;
}

sub process_sequences {
        my $dbh = shift;

        my $sequences = get_sequences($dbh);

        if (not $raw and -d "$DATA_DIR/sequences") {
                local *DIR;
                opendir DIR, "$DATA_DIR/sequences" or die "Error reading [$DATA_DIR/sequences]: $!\n";
                while (defined($_ = readdir DIR)) {
                        next if /^\.\.?$/;
                        $sequences->{$_} = 'missing' if not exists $sequences->{$_};
                }
                closedir DIR;
        }
        for (sort keys %$sequences) {
                my $last_number = $sequences->{$_}{last_number} if ref $sequences->{$_};
                my $is_called = $sequences->{$_}{is_called} if ref $sequences->{$_};
                if (not $raw) {
                        my $exception = get_sequence_exception($_);
                        if (defined $exception) {
                                if ($exception =~ /^missing sequence/) {
                                        if ($sequences->{$_} eq 'missing') {
                                                $last_number = $SEQ;
                                        }
                                } elsif ($exception eq 'skip') {
                                        print "-- Skipping $_\n";
                                        next;
                                } else {
                                        print "select pg_catalog.setval('$_', ( $exception )::bigint);\n";
                                        next;
                                }
                        }
                }
                $last_number = 1 if not defined $last_number;
                $is_called = 0 if not defined $is_called;
                if ($options{db_backend_target} eq 'postgresql') {
                        my $is_called_value = $is_called ? 'true' : 'false';
                        print "select pg_catalog.setval('$_', $last_number, $is_called_value);\n";
                } else {
                        print "drop sequence $_;\n";
                        $last_number++ if $is_called;
                        print "create sequence $_ start with $last_number;\n";
                }
        }
}

sub get_table_exception {
        local $_;
        my $name = shift;
        my $file = "$DATA_DIR/tables/$name";
        if (not -f $file) {
                return;
        }
        local *EXCEPTION;
        open EXCEPTION, '<', $file or die "Error reading [$file]: $!\n";
        my $data = {};
        my $key;
        while (<EXCEPTION>) {
                if (/^skip\s*$/) {
                        close EXCEPTION;
                        return 'skip';
                }
                if (/^\s/) {
                        $data->{$key} .= $_;
                } else {
                        chomp;
                        $key = join ',', sort split /,/, $_;
                }
        }
        close EXCEPTION;
        return $data;
}

sub process_tables {
        my $dbh = shift;

        my $tables = get_tables($dbh);

        if (not $raw and -d "$DATA_DIR/tables") {
                local *DIR;
                opendir DIR, "$DATA_DIR/tables" or die "Error reading [$DATA_DIR/tables]: $!\n";
                while (defined($_ = readdir DIR)) {
                        next if /^\.\.?$/;
                        $tables->{$_} = 'missing' if not exists $tables->{$_};
                }
                closedir DIR;
        }
        my %exceptions;
        for (sort keys %$tables) {
                if (not $raw) {
                        my $exception = get_table_exception($_);
                        if (defined $exception) {
                                $exceptions{$_} = $exception;
                                if ($tables->{$_} eq 'missing'
                                        and ref $exceptions{$_}) {
                                        if (exists $exceptions{$_}{'missing table'}) {
                                                $exceptions{$_} = { 'missing table' => $exceptions{$_}{'missing table'} };
                                        } else {
                                                $exception = $exceptions{$_} = 'skip';
                                        }
                                }
                                if ($exception eq 'skip') {
                                        print "-- Skipping $_\n";
                                        next;
                                }
                        }
                }

                if ($options{db_backend_target} eq 'postgresql') {
                        print "alter table $_ disable trigger all;\n";
                        print "alter table $_ set (autovacuum_enabled = false);\n";
                } else {
                        print "alter table $_ disable all triggers;\n";
                        print "alter table $_ nologging;\n";
                }
        }

        for (sort keys %$tables) {
                if (exists $exceptions{$_} and $exceptions{$_} eq 'skip') {
                        next;
                }
                print "$options{clear_table_command} $_;\n";
        }
        for (sort keys %$tables) {
                if (exists $exceptions{$_} and $exceptions{$_} eq 'skip') {
                        next;
                }
                next if /^qrtz_/;               # skip the quartz tables, they get regenerated anyway
                process_table($dbh, $_, $exceptions{$_});
        }
        for (sort keys %$tables) {
                if (exists $exceptions{$_} and $exceptions{$_} eq 'skip') {
                        next;
                }
                if ($options{db_backend_target} eq 'postgresql') {
                        print "alter table $_ enable trigger all;\n";
                        print "alter table $_ set (autovacuum_enabled = true);\n";
                } else {
                        print "alter table $_ enable all triggers;\n";
                        if (uc($_) ne 'RHNORGERRATACACHEQUEUE' and uc($_) ne 'RHNSET') {
                                print "alter table $_ logging;\n";
                        }
                }
        }
}

sub process_table {
        my ($dbh, $table, $exception) = @_;
        my ($row, $names, $the_names);
        my $sth = eval {
                local $dbh->{PrintError} = 0;
                # PostgreSQL Perl DBI loads whole table into memory :(
                if ($options{db_backend} eq 'postgresql') {
                        $dbh->do("declare csr cursor with hold for select * from $table");
                        $dbh->prepare("fetch 10000 from csr");
                } else {
                        $dbh->prepare("select * from $table");
                }
        };
        my $sorted_the_names = '';
        if (defined $sth and not $@) {
                $sth->execute();
                $row = $sth->fetchrow_arrayref();
                $names = $sth->{NAME_lc};
                $the_names = join ',', @$names;
                $sorted_the_names = join ',', sort @$names;
        } else {
                if (not $DBI::err == 942) {
                        die $@;
                }
                $sorted_the_names = $the_names = 'missing table';
        }
        if (ref $exception) {
                if (exists $exception->{$sorted_the_names}) {
                        $sth = $dbh->prepare($exception->{$sorted_the_names});
                        $sth->execute();
                        $row = $sth->fetchrow_arrayref();
                        $names = $sth->{NAME_lc};
                        print "-- Original columns for $table: $the_names\n";
                        $the_names = join ',', @$names;
                }
        }
        my @use_rowids = ();
        for (my $i = 0; $i < @$names; $i++) {
                if ($names->[$i] =~ s/^rowid_//) {
                        $use_rowids[$i] = 1;
                }
        }
        if (@use_rowids) {
                $the_names = join ',', @$names;
        }
        my $types = $sth->{TYPE};
        my @types;
        for (my $i = 0; $i < @$types; $i++) {
                my $type = eval { $dbh->type_info($types->[$i])->{TYPE_NAME} };
                push @types, ( defined $type ? $type : 'unknown' );
        }
        my $blob_cnt = 0;
        $options{process_table_start}->($table, \@types, $the_names);
        while ($row) {
                for (my $i = 0; $i < @$row; $i++) {
                        print "\t" if $i;
                        if (defined $use_rowids[$i]) {
                                if (not defined $ROWIDS{$row->[$i]}) {
                                        $SEQ++;
                                        $ROWIDS{$row->[$i]} = $SEQ;
                                }
                        print $ROWIDS{$row->[$i]};
                        } elsif (defined $row->[$i]) {
                                # create array from (a,b,c) string - custom types selected from PG
                                if ($types[$i] eq 'unknown' and $row->[$i] =~ s/^\((.*)\)$/$1/) {
                                        $row->[$i] = [ split(/,/, $row->[$i]) ];
                                }
                                if (ref $row->[$i] and ref $row->[$i] eq 'ARRAY') {     # user types
                                        no warnings 'uninitialized';
                                        my @r = @{$row->[$i]};
                                        map { s/,/\\\\,/g; } @r;
                                        $row->[$i] = $options{get_custom_type_string}->(\@r);
                                } elsif ($types[$i] eq 'unknown' or $types[$i] eq 'BLOB' or $types[$i] eq 'bytea') {    # blobs
                                        $options{blob_string}->(\$row->[$i], $table, $blob_cnt);
                                        $blob_cnt++;
                                } else {
                                        utf8::encode($row->[$i]);
                                        $options{escape_string}->(\$row->[$i]);
                                        utf8::decode($row->[$i]);
                                }
                                print $row->[$i];
                        } else {
                                print $options{null_string};
                        }
                }
                print $options{record_separator};
                $options{fetch_row}->($sth, \$row);
        }
        $options{process_table_end}->($table, $dbh);
        if ($options{db_backend} eq 'postgresql') {
                $dbh->do("close csr");
        }
}

# Oracle-only statements
sub process_ext_table {
        my ($dbh, $table) = @_;
        my ($table_string, $loader_string, $insert_string, $select_string);

        # using column_info because simple $sth->{TYPE} and $sth->{PRECISION} return worse values
        my ($sth, $name_key, $type_key, $size_key);
        if ($options{db_backend} eq 'postgresql') {
                $sth = $dbh->column_info(undef, undef, $table, undef);
                ($name_key, $type_key, $size_key) = ('COLUMN_NAME','TYPE_NAME','COLUMN_SIZE');
        } else {
                $sth = $dbh->column_info(undef, undef, uc($table), undef);
                ($name_key, $type_key, $size_key) = ('column_name','type_name','column_size');
        }

        my @table_schema = my @loader_schema = my @insert_list = my @select_list = ();
        my $column = $sth->fetchrow_hashref();
        while ($column) {
                # some column names are enclosed in double quotes, it could be beneficial for $loader_item but unfortunately not all reserved words are covered
                $column->{$name_key} =~ s/"//g;
                my $table_item = my $insert_item = my $select_item = $column->{$name_key};
                # column name may hit reserved word in ORACLE_LOADER driver => enclose in double quotes
                my $loader_item = '"' . uc($column->{$name_key}) . '"';
                my $typename = $column->{$type_key};
                if ($typename eq 'numeric' or $typename eq 'NUMBER') {
                        $table_item .= " NUMBER";
                } elsif ($typename =~ /char/i) {
                        $table_item .= " VARCHAR2($column->{$size_key})";
                        $loader_item .= " CHAR($column->{$size_key})";
                } elsif ($typename eq 'timestamp without time zone' or $typename =~ /TIMESTAMP\(\d+\)$/) {
                        $table_item .= " TIMESTAMP";
                        $loader_item .= " CHAR DATE_FORMAT TIMESTAMP '$ENV{NLS_TIMESTAMP_FORMAT}'";
                } elsif ($typename =~ /timestamp(\(\d+\))? with( local)? time zone/i or $typename eq 'DATE') {
                        $table_item .= " TIMESTAMP WITH LOCAL TIME ZONE";
                        $loader_item .= " CHAR DATE_FORMAT TIMESTAMP WITH TIMEZONE '$ENV{NLS_TIMESTAMP_TZ_FORMAT}'";
                } elsif ($typename eq 'bytea' or $typename eq 'BLOB') {
                        $table_item .= " VARCHAR2(255)";
                        $select_item = "load_blob($select_item)";
                } elsif (lc($typename) eq 'evr_t') {
                        $table_item = "e VARCHAR2(16), v VARCHAR2(512), r VARCHAR2(512)";
                        $loader_item = "e CHAR(16), v CHAR(512), r CHAR(512)";
                        $select_item = "EVR_T(e,v,r)";
                }
                push @table_schema, $table_item;
                push @loader_schema, $loader_item;
                push @insert_list, $insert_item;
                push @select_list, $select_item;
                $column = $sth->fetchrow_hashref();
        }

        $table_string = join(', ', @table_schema);
        $loader_string = join(', ', @loader_schema);
        $insert_string = join(', ', @insert_list);
        $select_string = join(', ', @select_list);

        print <<EOS;
CREATE TABLE ext_table
(
$table_string
)
ORGANIZATION EXTERNAL
(
TYPE ORACLE_LOADER
DEFAULT DIRECTORY ext_tables_dir
ACCESS PARAMETERS
(
 RECORDS DELIMITED BY 0x'000A'
 LOGFILE ext_tables_log_dir:'oracle_import.log'
 BADFILE ext_tables_log_dir:'oracle_import.bad'
 DISCARDFILE ext_tables_log_dir:'oracle_import.dsc'
 FIELDS TERMINATED BY '\\t' OPTIONALLY ENCLOSED BY '"'
 MISSING FIELD VALUES ARE NULL
 (
$loader_string
 )
)
LOCATION('$table')
)
REJECT LIMIT 0;
INSERT /*+ APPEND */
INTO $table
(
$insert_string
)
(
SELECT
$select_string
FROM ext_table
);
DROP TABLE ext_table;
EOS
}

sub get_ctstr_postgresql {
        my $r = shift;
        no warnings 'uninitialized';
        return "(@{[ join ',', @$r ]})";
}

sub get_ctstr_oracle {
        my $r = shift;
        no warnings 'uninitialized';
        return '"' . join("\"\t\"", @$r) . '"';
}

sub blobstr_postgresql {
        my $blob = shift;
        $$blob =~ s!(.)! sprintf "\\\\%03o", ord($1) !seg;
}

sub blobstr_oracle {
        my $blob = shift;
        my $table = shift;
        my $blob_cnt = shift;
        my $blob_file;
        open $blob_file, '>:raw', "$options{dump_dir}/${table}_blob_$blob_cnt" or die "Error writing [$options{dump_dir}/${table}_blob_$blob_cnt]: $!\n";
        print $blob_file $$blob;
        # save filename where to find blob content
        $$blob = "${table}_blob_$blob_cnt";
}

sub escapestr_postgresql {
        my $string = shift;
        $$string =~ s!([\x00-\x1f\x5c])! sprintf "\\x%02x", ord($1) !seg;
}

sub escapestr_oracle {
        my $string = shift;
        $$string =~ s/"/""/g;
        $$string = '"' . $$string . '"';
}

sub process_table_start_postgresql {
        my $table = shift;
        my $types = shift;
        my $the_names = shift;
        print "-- Types for $table: @$types\n";
        print qq!copy $table($the_names) from stdin;\n!;
}

sub process_table_start_oracle {
        my $table = shift;
        open FILE, '>', "$options{dump_dir}/$table" or die "Error writing [$options{dump_dir}/$table]: $!\n";
        select FILE;
}

sub process_table_end_postgresql {
        print "\\.\n";
}

sub process_table_end_oracle {
        my $table = shift;
        my $dbh = shift;
        close FILE;
        select STDOUT;
        process_ext_table($dbh, $table);
}

sub fetch_row_postgresql {
        my $sth = shift;
        my $row = shift;
        $$row = $sth->fetchrow_arrayref();
        if (not defined $$row) {
                $sth->execute();
                $$row = $sth->fetchrow_arrayref();
        }
}

sub fetch_row_oracle {
        my $sth = shift;
        my $row = shift;
        $$row = $sth->fetchrow_arrayref();
}

sub print_script_header_postgresql {
        print <<'EOS';
\set ON_ERROR_STOP on
update pg_index
  set indisvalid = false,
      indisready = false
where indexrelid in (
      select pi.indexrelid
        from pg_index pi,
             pg_class pc,
             pg_namespace pn
       where pi.indexrelid = pc.oid and
             pc.relnamespace = pn.oid and
             pc.relkind = 'i'::"char" and
             pn.nspname = current_schema()
      );
EOS
}

sub print_script_header_oracle {
        print <<EOS;
whenever sqlerror exit sql.sqlcode;
CREATE OR REPLACE DIRECTORY ext_tables_dir AS '$options{dump_dir}';
CREATE OR REPLACE DIRECTORY ext_tables_log_dir AS '/tmp';
alter session set NLS_DATE_FORMAT = '$ENV{NLS_DATE_FORMAT}';
alter session set NLS_TIMESTAMP_FORMAT = '$ENV{NLS_TIMESTAMP_FORMAT}';
alter session set NLS_TIMESTAMP_TZ_FORMAT = '$ENV{NLS_TIMESTAMP_TZ_FORMAT}';

-- function for loading BLOB from file
CREATE OR REPLACE FUNCTION load_blob(filename IN VARCHAR)
RETURN BLOB IS
 src  BFILE;
 dest BLOB;
 file_length INTEGER;
BEGIN
 IF length(filename) > 0
 THEN
  DBMS_LOB.CREATETEMPORARY(dest, true);
  src := BFILENAME('EXT_TABLES_DIR', filename);
  DBMS_LOB.FILEOPEN(src, DBMS_LOB.FILE_READONLY);
  file_length := DBMS_LOB.GETLENGTH(src);
  IF file_length > 0
  THEN
   DBMS_LOB.LOADFROMFILE(dest, src, file_length);
  END IF;
  DBMS_LOB.FILECLOSE(src);
 END IF;
 RETURN dest;
END load_blob;
/

-- disable constraints
BEGIN
 FOR item IN (
select uc.owner, uc.table_name, uc.constraint_name
  from user_constraints uc, user_tables ut
where uc.table_name = ut.table_name and
      uc.status = 'ENABLED'
order by decode ( uc.constraint_type, 'R', 1, 'U', 2, 'P', 3 )
)
 LOOP
        execute immediate 'alter table ' || item.owner || '.' || item.table_name || ' disable constraint ' || item.constraint_name;
 END LOOP;
END;
/
EOS
}

sub print_script_footer_postgresql {
        print <<'EOS';
update pg_index
  set indisvalid = true,
      indisready = true
where indexrelid in (
      select pi.indexrelid
        from pg_index pi,
             pg_class pc,
             pg_namespace pn
       where pi.indexrelid = pc.oid and
             pc.relnamespace = pn.oid and
             pc.relkind = 'i'::"char" and
             pn.nspname = current_schema()
      );
select pg_dblink_exec('reindex database "' || current_database() || '";');
EOS
}

sub print_script_footer_oracle {
        print <<'EOS';
-- enable constraints
BEGIN
 FOR item IN (
select uc.owner, uc.table_name, uc.constraint_name
  from user_constraints uc, user_tables ut
where uc.table_name = ut.table_name and
      uc.status = 'DISABLED'
order by decode ( uc.constraint_type, 'P', 1, 'U', 2, 'R', 3 )
)
 LOOP
    execute immediate 'alter table ' || item.owner || '.' || item.table_name || ' enable novalidate constraint ' || item.constraint_name;
 END LOOP;
END;
/
DROP FUNCTION load_blob;
DROP DIRECTORY ext_tables_log_dir;
DROP DIRECTORY ext_tables_dir;
EOS
}

1;

__END__

=head1 NAME

spacewalk-dump-schema

=head1 SYNOPSIS

        spacewalk-dump-schema [--from=oracle] [--to=DB_BACKEND]
            [--db=SID --user=USERNAME --password=PASSWORD]

        spacewalk-dump-schema [--from=postgresql] [--to=DB_BACKEND]
            [--db=DBNAME --host=HOSTNAME --port=PORT --user=USERNAME --password=PASSWORD]

        spacewalk-dump-schema --help

=head1 DESCRIPTION

The B<spacewalk-dump-schema> script dumps the content of Spacewalk's
database schema in format which can be fed to PostgreSQL's B<psql>
or Oracle's B<sqlplus>. Thus it can be used to migrate between Spacewalk
with Oracle database backend and the one which uses PostgreSQL database
backend.

The script connects to the Oracle or PostgreSQL instance, so that one
must be running and accessible with given connect parameter.

The output is printed to the standard output. It can be piped to
B<psql> or B<sqlplus> directly in which case the PostgreSQL or Oracle
server must also be running, or you can redirect it to a file and use
that file as input for B<psql> or B<sqlplus> later.

When migrating to Oracle backend, only control SQL script is printed
on standard output. Content of tables is dumped into separate files,
which are then used as external tables. Location of table files
can be specified by parameter. Table files must be accessible from
machine where Oracle database is running for user running database
instance. Location of table files in database machine may need
to be manually altered in control file. If parameter is not specified,
location of table files is defaultly '/tmp/dumped-tables/'.

The database schema in target backend must already exist, created by
(probably) B<spacewalk-setup>. The output of this script does not
create any tables or other database objects, it only emits commands
to set sequences and table contents in the PostgreSQL or Oracle
database schema. Any existing content in tables is discarded, data
is not appended.

When migrating to Oracle, make sure your UNDO, TEMP and DATA tablespaces
are big enough otherwise migration fails.

The exit value is 0 upon success, non-zero value upon error.

=head1 EXAMPLE

        spacewalk-dump-schema --db=xe \
                --user=spacewalk --password=o9k2HInsl \
                | PGPASSWORD=o9k2HInsl psql -h localhost \
                        -U spaceuser spaceschema

        spacewalk-dump-schema --from=oracle --to=postgresql --db=xe \
                --user=spacewalk --password=o9k2HInsl \
                | PGPASSWORD=o9k2HInsl psql -h localhost \
                        -U spaceuser spaceschema

        spacewalk-dump-schema --to=oracle --db=spaceschema \
                --host=localhost --port=5433 \
                --user=spaceuser --password=o9k2HInsl \
                | NLS_LANG=AMERICAN_AMERICA.UTF8 sqlplus spaceuser/spacepw@//localhost/XE

        spacewalk-dump-schema --to=oracle --dump-dir=/tmp/dumped-tables > control_script.sql
        ...
        spacewalk-sql -i < control_script.sql

=head1 AUTHOR

Jan Pazdziora
Jan Dobes

=cut

