#! /usr/bin/ruby1.8

require 'optparse'
require 'postgres'

# Parses command-line options and returns them in a hash
def get_command_line
  options = {}
  OptionParser.new do |opts|
    opts.banner = "Usage aploader [options]"

    # Verbose displays a running summary of progress.
    # Enabling this requires a prescan of the file or a manual
    # specification of the number of lines. It's ok if the manually
    # specified number isn't exact, but the progress accuracy will be
    # scewed.
    opts.on("-v", "--verbose", "Run verbosely") do |arg|
      options[:verbose] = arg
    end

    # File to read raw data from
    opts.on("-f", "--file [FILE]", String, "Input file name") do |arg|
      options[:file] = arg
    end

    # Batch size indicates how many rows to read at a time.
    # After many tests, 500 seems to give good performance while
    # leaving failed batches managable at the same time.
    # Values above 500 don't seem to help performance and leave you
    # with a larger group of rejects.
    opts.on("-b", "--batch [BATCH]", Integer, "Batch size") do |arg|
      options[:batch] = arg
    end

    # Database to connect to
    opts.on("-d", "--database [DATABASE]", "Database for connection") do |arg|
      options[:database] = arg
    end

    # Table to write to
    opts.on("-t", "--table [TABLE]", "Tablename for COPY") do |arg|
      options[:table] = arg
    end

    # Host to connect to
    opts.on("-h", "--host [HOST]", "Hostname for connection") do |arg|
      options[:hostname] = arg
    end

    # Port to connect to
    opts.on("--port [PORT]", "Port for connection") do |arg|
      options[:port] = arg
    end

    # Username for connection
    opts.on("-u", "--username [USERNAME]", "Username for connection") do |arg|
      options[:username] = arg
    end

    # Password for connection
    opts.on("-p", "--password [PASSWORD]", "Password for connection") do |arg|
      options[:password] = arg
    end

    # Comma separated column list
    # No spaces allowed.
    opts.on("-c", "--column [COLUMN]", "Column list field1,field2...") do |arg|
      options[:column] = arg
    end

    # Comma separated NOT NULL list
    # No spaces allowed.
    opts.on("--not-null [COLUMN]", "NOT NULL list field1,field2...") do |arg|
      options[:notnull] = arg
    end

    # Manual specification of lines in file to prevent prescan
    # when running in verbose mode.
    opts.on("--lines [LINES]", "Lines in file") do |arg|
      options[:lines] = arg
    end

    # Binary file format
    opts.on("--binary", "Binary format") do |arg|
      options[:binary] = arg
    end
 
    # Copy OID for each row
    opts.on("--oids", "Copy OID for each row") do |arg|
      options[:oids] = arg
    end

    # Delimiter character
    opts.on("--delimiter [DELIMITER]", "Delimeter") do |arg|
      options[:delimiter] = arg
    end

    # Null string
    opts.on("--null [NULL]", "Null string") do |arg|
      options[:null] = arg
    end
 
    # Manual specification of lines in file to prevent prescan
    # when running in verbose mode.
    opts.on("--lines [LINES]", "Lines in file") do |arg|
      options[:lines] = arg
    end

    # CSV file format
    opts.on("--csv", "CSV format") do |arg|
      options[:csv] = arg
    end

    # CSV header included
    opts.on("--csv-header", "CSV header included") do |arg|
      options[:csv_header] = arg
    end

    # CSV quote character
    opts.on("--csv-quote", "CSV quote character") do |arg|
      options[:csv_quote] = arg
    end

    # CSV escape character
    opts.on("--csv-escape", "CSV escape character") do |arg|
      options[:csv_escape] = arg
    end

    # Display help
    opts.on_tail("--help", "Show this message") do
      puts opts
      exit
    end
  end.parse!
  return options
end

def main
  count    = 0 # line count for verbose mode
  batches  = 0 # batches processed
  options  = get_command_line # retrieve command-line options
  dbloader = DBLoader.new(options) # main object

  # open the database connection
  dbloader.open_connection
  # open the input file
  dbloader.open_file

  # Read specified batch size from file until finished
  while batch = dbloader.read_batch
    if not batch.empty?
      dbloader.do_copy(batch)
    else
      break
    end

    # load statistics are only calculated when verbose mode is enabled
    # however, these calculations seem to have a negligible impact
    # on performance regardless
    if options[:verbose]
      batches += 1
      count += dbloader.batch_size
      # prevent reporting over 100% complete
      count = dbloader.lines if count > dbloader.lines
      # format percentage complete to two decimals
      complete = sprintf("%.2f", (count / dbloader.lines.to_f) * 100)
      # display progress
      print "progress: (#{count}/#{dbloader.lines} processed) " +
            "(#{complete}% complete) (#{dbloader.errors} failures) " +
            "(#{batches} batches)\r"
      # prevent output buffering
      STDOUT.flush
    end
  end
  # Cleanup
  dbloader.close_connection
  dbloader.close_file
end

# Main class to drive DB loading
# Designed to be reusable enough for other applications if necessary
class DBLoader
  VERSION = "0.1.0"

  attr_reader :lines, :batch_size, :errors

  def initialize(options)
    if options[:file]
      @filename = options[:file]
    else
      $stderr.puts "No file specified. See --help for options."
      exit
    end

    if options[:batch]
      @batch_size = options[:batch]
    else
      $stderr.puts "No batch size specified. See --help for options. Using 500."
      @batch_size = 500
    end

    if options[:table]
      @table = options[:table]
    else
      $stderr.puts "No table specified. See --help for options."
      exit
    end

    if options[:database]
      @database = options[:database]
    else
      $stderr.puts "No database specified. See --help for options."
      exit
    end

    if options[:hostname]
      @hostname = options[:hostname]
    else
      @hostname = 'localhost'
    end

    if options[:port]
      @port = options[:port]
    else
      @port = nil
    end

    if options[:username]
      @username = options[:username]
    else
      @username = nil
    end

    if options[:password]
      @password = options[:password]
    else
      @password = nil
    end

    if options[:verbose]
      if options[:lines]
        # if lines are specified, use them
        # otherwise, we must scan the file
        @lines = options[:lines].to_i
      else
        @lines = get_line_count
      end
    else
      @lines = nil
    end

    @errors = 0
    @options = options
    @timestamp = Time.now.strftime("%Y%m%d%H%M%S")
  end

  # Opens a connection to the database
  def open_connection
    begin
      @conn = PGconn.connect(@hostname, @port, nil, nil, @database, @username, @password)
    rescue PGError
      if @conn.status == PGconn::CONNECTION_BAD
        $stderr.puts "Lost connection to the backend. Exiting."
        exit
      else
        $stderr.puts @conn.error
        exit
      end
    end
  end

  # Closes the connection to the database
  def close_connection
    @conn.close
  end

  # Open input file for reading
  def open_file
    begin
      @fh = File.new(@filename, 'r')
    rescue Errno::ENOENT
      $stderr.puts "File (#{@filename}) does not exist. Exiting."
      exit
    end
  end

  # close input file
  def close_file
    @fh.close
    @fh = nil
  end

  # Count the number of lines in the file for statistical purposes
  def get_line_count
    count = 0

    if not @fh
      open_file
    end
    
    while @fh.gets
      count += 1
    end

    close_file
    return count.to_i
  end

  # Reads @batch_size lines from file and returns them
  def read_batch
    batch = []
    1.upto(@batch_size) do
      if row = @fh.gets
        batch << row
      else
        break
      end
    end
    return batch.compact
  end

  # Writes a failed batch to a file for reprocessing
  def dump_batch(batch)
    fh = File.new(@timestamp + Process.pid.to_s + '-err', 'a')
    fh.print batch.join
    fh.close
  end

  # Initiates and executes the copy command
  def do_copy(batch)
    query  = "COPY "
    query += @table

    if @options[:column]
      query += " (#{@options[:column]})"
    end

    query += " FROM STDIN"
    
    # Binary format
    if @options[:binary]
      query += " BINARY"
    end

    # Copy OIDS
    if @options[:oids]
      query += " OIDS"
    end

    # Delimiter
    if @options[:delimiter]
      query += " DELIMITER AS #{@options[:delimiter]}"
    end

    # Null string as...
    if @options[:null]
      query += " NULL AS #{@options[:null]}"
    end
    
    # If CSV is specified check for CSV options
    if @options[:csv]
      query += " CSV"
      if @options[:csv_header]
        query += " HEADER"
      end

      if @options[:csv_quote]
        query += " QUOTE AS #{@options[:csv_quote]}"
      end

      if @options[:csv_escape]
        query += " ESCAPE AS #{@options[:csv_escape]}"
      end
      
      if @options[:notnull]
        query += " FORCE NOT NULL #{@options[:notnull]}"
      end
    end

    # Make sure we're in COPY_IN mode before sending data
    if @conn.exec(query).status == PGresult::COPY_IN
      batch.each do |line|
        return if line == "\\.\n" # \. indicates end of data
        @conn.putline(line)
      end
      @conn.putline("\\.\n")
      # Complete copy and check for errors
      begin
        @conn.endcopy
      # If an error occurs dump it for reprocessing
      rescue PGError
        @errors += 1
        dump_batch(batch)
      end
    else
      $stderr.puts "Error initiating COPY_IN. Exiting."
      exit
    end
  end
end

if $0 == __FILE__
  main()
end
