#!/bin/sh
# ----------------------------------------------------------------------
# pgloader -
#
#	A data conversion utility for PostgreSQL
#
# ----------------------------------------------------------------------
# Start tclsh \
exec tclsh "$0" "$@"

# ----
# Load the require Tcl extensions Pgtcl and CSV
# ----
if {[catch {
		package require Pgtcl
	} err]} {
	puts "Cannot load PostgreSQL DB support: $err"
	puts "Make sure the shared libpgtcl is installed properly"
	exit -1
}

if {[catch {
		package require csv
	} err]} {
	puts "Cannot load CSV parsing module"
	puts "Make sure the tcllib package is installed properly"
	exit -1
}


# ----------------------------------------------------------------------
# main
# ----------------------------------------------------------------------
proc main {argv0 argc argv} {
	if {$argc != 3} {
		puts stderr "usage: [file tail $argv0] config-fname data-fname db-conninfo"
		exit -1
	}

	#
	# We need to close stderr because that's the only way to
	# silence libpq from inside Pgtcl
	#
	close stderr

	#
	# And showtime ...
	#
	if {[catch {
			set rc [eval [concat pgloader $argv]]
		}]} {
		global errorInfo
		puts $errorInfo
		exit -1
	}
	exit $rc
}

# ----------------------------------------------------------------------
# pgloader - 
#
#	The entire conversion and load control framework
# ----------------------------------------------------------------------
proc pgloader {cfg_fname data_fname db_conninfo} {
	# ----
	# Initialize the converter with the config
	# ----
	if {[catch {
			set id [pgdl_create_converter $cfg_fname $data_fname]
		} err]} {
		puts $err
		exit -1
	}
	upvar #0	ConvStatus_$id	conv

	# ----
	# Open the data file
	# ----
	if {[catch {
			set fd [open $data_fname r]
		} err]} {
		unset conv
		puts $err
		exit -1
	}

	# ----
	# Connect to the database
	# ----
	if {[catch {
			set db [pg_connect -conninfo $db_conninfo]
		} err]} {
		close $fd
		unset conv
		puts $err
		exit -1
	}

	# ----
	# Try to execute the COPY command with an empty data set
	# to check the syntax
	# ----
	set res [pg_exec $db $conv(copy_stmt)]
	if {"[pg_result $res -status]" != "PGRES_COPY_IN"} {
		puts -nonewline "[pg_result $res -error]"
		puts "The command was '$conv(copy_stmt)'"
		close $fd
		unset conv
		pg_result $res -clear
		pg_disconnect $db
		exit -1
	}
	puts $db "\\."
	pg_result $res -clear

	# ----
	# Load groups of input lines and try to apply them
	# ----
	while {1} {
		set group {}
		set rawgroup {}
		set gsize 0
		for {set i 0} {$i < $conv(opt.group_size)} {incr i} {
			if {[pgdl_get_copyline $id $fd line rline] < 0} {
				break
			}
			lappend group $line
			lappend rawgroup $rline
			incr gsize
		}
		if {$gsize == 0} {
			break
		}

		pgdl_apply_group $id $db group rawgroup 0 $gsize
	}
	unset group
	unset rawgroup

	# ----
	# Close and shutdown
	# ----
	close $fd
	set rc [pgdl_close_converter $id]
	pg_disconnect $db

	return $rc
}

# ----------------------------------------------------------------------
# pgdl_apply_group -
#
#	Load a group of rows via COPY. On errors work recursively
#	by splitting the group.
# ----------------------------------------------------------------------
proc pgdl_apply_group {id db _group _rawgroup idx1 idx2} {
	upvar #0	ConvStatus_$id	conv
	upvar 1		$_group			group
	upvar 1		$_rawgroup		rawgroup

	set ok 1
	set errormsg {}

	# ----
	# Start the COPY command
	# ----
	set res [pg_exec $db $conv(copy_stmt)]
	if {"[pg_result $res -status]" != "PGRES_COPY_IN"} {
		puts -nonewline "[pg_result $res -error]"
		puts "The command was '$conv(copy_stmt)'"
		close $fd
		unset conv
		pg_result $res -clear
		pg_disconnect $db
		exit -1
	}

	# ----
	# Send the (sub)group of data lines
	# ----
	for {set i $idx1} {$i < $idx2} {incr i} {
		if {[catch {
				puts $db [lindex $group $i]
			} err]} {
			lappend errormsg $err
			set ok 0
			break
		}
	}

	# ----
	# End the COPY command and check for errors
	# ----
	if {[catch {
			puts $db "\\."
		} err]} {
		set ok 0
	}
	if {"[pg_result $res -status]" != "PGRES_COMMAND_OK"} {
		set ok 0
		lappend errormsg [string trim [pg_result $res -error]]
	}
	pg_result $res -clear
	
	# ----
	# On success count rows and stop recursion
	# ----
	if {$ok} {
		incr conv(apply.count) [expr $idx2 - $idx1]
		pgdl_status $id
		return
	}

	# ----
	# If the error affected a single line, drop it into the
	# .rej file and stop recursion.
	# ----
	if {$idx1 == $idx2 - 1} {
		pgdl_reject $id [lindex $rawgroup $idx1] $errormsg
		pgdl_status $id
		return
	}

	# ----
	# If the error affected multiple lines, split the block
	# into two subgroups and try each of them separately.
	# ----
	for {set idxs 1} {$idxs < ($idx2 - $idx1)} {incr idxs $idxs} {}
	set idxs [expr ($idxs / 2) + $idx1]
	pgdl_apply_group $id $db group rawgroup $idx1 $idxs
	pgdl_apply_group $id $db group rawgroup $idxs $idx2

	pgdl_status $id
}


# ----------------------------------------------------------------------
# pgdl_status -
#
#	Print a status message telling where we are
# ----------------------------------------------------------------------
proc pgdl_status {id} {
	upvar #0	ConvStatus_$id	conv

	puts -nonewline " $conv(apply.count) row(s) loaded - $conv(reject.count) row(s) rejected\r"
	flush stdout
}


# ----------------------------------------------------------------------
# pgdl_create_converter -
#
#	Initialize the converter and parse the config file
# ----------------------------------------------------------------------
proc pgdl_create_converter {cfg_name base_rejname} {
	upvar #0	ConvStatus		cstat

	if {![info exists cstat]} {
		array set cstat {
			id_count		0
		}
	}

	# ----
	# Allocate a converter ID and initialize the status
	# ----
	incr cstat(id_count)
	set id $cstat(id_count)
	upvar #0	ConvStatus_$id	conv
	array set conv {
		opt.table_name		{}
		opt.table_columns	{}
		opt.file_format		{}
		opt.file_sepchar	{}
		opt.file_sepchar	{}
		opt.nulls			{}
		opt.group_size		{}
		oln.table_name		{<not specified>}
		oln.table_columns	{<not specified>}
		oln.file_format		{<not specified>}
		oln.file_sepchar	{<not specified>}
		oln.nulls			{<not specified>}
		oln.group_size		{<not specified>}
	}

	# ----
	# Read the config file
	# ----
	if {[catch {
			set fd [open $cfg_name r]
		} err]} {
		return -code error $err
	}
	set lno 0
	set errors 0
	while {[gets $fd line] >= 0} {
		incr lno
		set line [string trim $line]
		if {[string compare [string index $line 0] "#"] == 0} continue
		if {[string compare $line ""] == 0} continue

		if {![regexp -nocase "^(\[_a-z0-9\]+)\[ \t\]*=\[ \t\]*(.*)" $line {} key val]} {
			puts "$cfg_name line $lno: cannot parse"
			incr errors
			continue
		}
		set key [string tolower $key]
		if {![info exists conv(opt.$key)]} {
			puts "$cfg_name line $lno: unknown option '$key'"
			incr errors
			continue
		}
		if {[string compare $conv(opt.$key) ""] != 0} {
			puts "$cfg_name line $lno: option '$key' already defined"
			incr errors
			continue
		}

		set conv(opt.$key) $val
		set conv(oln.$key) $lno
	}
	close $fd

	# ----
	# Check the file format
	# ----
	switch -- $conv(opt.file_format) {
		COPY {
		}
		CSV -
		MSCSV {
			switch -- $conv(opt.file_sepchar) {
				"\"" {
					puts "$cfg_name line $conv(oln.file_format): illegal separator character"
					incr errors
				}
				"" {
					set conv(file_sepchar) ","
				}
			}
		}
		"" {
			set conv(opt.file_format) COPY
		}
		default {
			puts "$cfg_name line $conv(oln.file_format): unknown file format '$conv(opt.file_format)'"
			incr errors
		}
	}

	# ----
	# Check that we have a table name
	# ----
	if {[string compare $conv(opt.table_name) ""] == 0} {
		puts "$cfg_name line $conv(oln.table_columns): option table_name missing or empty"
	}

	# ----
	# Assuming the table_columns are a CSV, count the number
	# ----
	set conv(num_columns) 0
	if {[catch {
			set conv(num_columns) [llength [::csv::split $conv(opt.table_columns)]]
		} err]} {
		puts "$cfg_name line $conv(oln.table_columns): $err"
		incr errors
	} else {
		if {$conv(num_columns) == 0} {
			puts "$cfg_name line $conv(oln.table_columns): option table_columns missing or empty"
			incr errors
		}
	}

	# ----
	# fallback groupsize
	# ----
	if {[string compare $conv(opt.group_size) ""] == 0} {
		set conv(opt.group_size) 1000
	}

	# ----
	# Forget everything on errors
	# ----
	if {$errors > 0} {
		unset conv
		return -code error "$errors error(s) in config file"
	}

	# ----
	# assemble the COPY statement
	# ----
	if {[string compare $conv(oln.nulls) "<not specified>"] != 0} {
		set nulls " WITH NULLS AS $conv(opt.nulls)"
	} else {
		set nulls ""
	}
	set conv(copy_stmt) "COPY $conv(opt.table_name) ($conv(opt.table_columns)) FROM stdin${nulls};"

	# ----
	# remember the basename for the reject files
	# ----
	set conv(base_rejname)	$base_rejname
	set conv(reject.count)	0
	set conv(apply.count)	0

	return $id
}


# ----------------------------------------------------------------------
# pgdl_close_converter -
#
#	Print final status and remove the converter info.
# ----------------------------------------------------------------------
proc pgdl_close_converter {id} {
	upvar #0	ConvStatus_$id	conv

	set rc 0

	if {![info exists conv]} {
		return -code error "unknown converter ID $id"
	}

	puts [format %79s ""]
	puts "$conv(apply.count) row(s) loaded"

	if {[info exists conv(reject.fd)]} {
		close $conv(reject.fd)
		puts "$conv(reject.count) rejected input row(s) saved in $conv(reject.fname)"

		incr rc 1
	} else {
		puts "$conv(reject.count) row(s) rejected"
	}
	if {[info exists conv(rejlog.fd)]} {
		close $conv(rejlog.fd)
		puts "See details in $conv(base_rejname).rejlog"
	}

	unset conv

	return $rc
}


# ----------------------------------------------------------------------
# pgdl_reject
#
#	Add a data line to the .rej file and the corresponding
#	log information to the .rejlog
# ----------------------------------------------------------------------
proc pgdl_reject {id data msgs} {
	upvar #0	ConvStatus_$id	conv

	if {![info exists conv]} {
		return -code error "unknown converter ID $id"
	}

	# ----
	# Count rejected lines
	# ----
	incr conv(reject.count)

	# ----
	# Open log- and rejected data files as needed
	# ----
	if {![info exists conv(base_rejname)]} {
		return
	}
	if {![info exists conv(reject.fd)]} {
		set conv(reject.fname) "$conv(base_rejname).rej"
		if {[catch {
				set conv(reject.fd) [open $conv(reject.fname) w]
			} err]} {
			puts $err
			exit -1
		}
	}
	if {![info exists conv(rejlog.fd)]} {
		if {[catch {
				set conv(rejlog.fd) [open $conv(base_rejname).rejlog w]
			} err]} {
			puts $err
			exit -1
		}
	}

	# ----
	# Save the data and log the event
	# ----
	puts $conv(reject.fd) $data
	foreach line $msgs {
		puts $conv(rejlog.fd) $line
	}
	puts $conv(rejlog.fd) "    input data line saved in file $conv(reject.fname)"
	puts $conv(rejlog.fd) ""
}


# ----------------------------------------------------------------------
# pgdl_get_copyline -
#
#	Read one line from the input file and convert it into
#	the PostgreSQL COPY format
# ----------------------------------------------------------------------
proc pgdl_get_copyline {id fd _data _raw} {
	upvar #0	ConvStatus_$id	conv
	upvar 1		$_data			data
	upvar 1		$_raw			raw
	
	if {![info exists conv]} {
		return -code error "unknown converter ID $id"
	}

	while {1} {
		if {[catch {
				set n [gets $fd raw]
			} err]} {
			return -code error $err
		}
		if {$n < 0} {
			return $n
		}

		switch -- $conv(opt.file_format) {
			COPY {
				set data $raw
				return $n
			}
			CSV {
				if {[catch {
						set list [::csv::split $raw]
					} err]} {
					pgdl_reject $id $raw [list $err]
				}
			}
			MSCSV {
				if {[catch {
						set list [::csv::split -alternate $raw]
					} err]} {
					pgdl_reject $id $raw [list $err]
				}
			}
			default {
				return -code error "file format $conv(opt.file_format) not implemented"
			}
		}

		set qlist {}
		foreach att $list {
			regsub -all "\t" $att "\\t" att
			regsub -all "\n" $att "\\n" att
			regsub -all "\r" $att "\\r" att
			regsub -all "\\\\" $att "\\\\\\\\" att
			lappend qlist $att
		}
		set data [join $qlist "\t"]

		return [string length $data]
	}
}


main $argv0 $argc $argv


