Last active
February 16, 2017 11:18
-
-
Save sax/f7e59f6d677108aac010 to your computer and use it in GitHub Desktop.
Dump and reload data into PostgreSQL with massive concurrency
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# interpolated load tables step | |
find . -type f -print0 | xargs -0 -n1 -P50 bash -c "psql -U <postgres_user> <mydatabase> < \$0" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -e | |
readonly PROGNAME=$(basename $0) | |
readonly ARGS="$@" | |
readonly PID=$$ | |
readonly DEFAULT_SCHEMA="public" | |
readonly DEFAULT_USER="postgres" | |
readonly DEFAULT_EXPORT_DIR="/tmp" | |
readonly PARALLELISM=50 | |
readonly BATCH_SIZE=500000 | |
export PGOPTIONS="--statement-timeout=0" | |
usage() { | |
echo | |
exit 0 | |
} | |
process_args() { | |
local arg=$1 | |
for arg; do | |
local delim="" | |
case "$arg" in | |
--database) args="${args}-d ";; | |
--schema) args="${args}-s ";; | |
--export_dir) args="${args}-e ";; | |
--user) args="${args}-U ";; | |
--dump) args="${args}-D ";; | |
--dump-schema) args="${args}-S ";; | |
--load) args="${args}-L ";; | |
--help) args="${args}-h ";; | |
--verbose) args="${args}-v ";; | |
*) [[ "${arg:0:1}" == "-" ]] || delim="\"" | |
args="${args}${delim}${arg}${delim} ";; | |
esac | |
done | |
eval set -- $args | |
while getopts "d:e:s:U:DLShv" OPTION | |
do | |
case $OPTION in | |
v) | |
set -x | |
;; | |
h) | |
usage | |
;; | |
D) | |
readonly DUMP=true | |
;; | |
L) | |
readonly LOAD=true | |
;; | |
S) | |
local dump_schema=true | |
;; | |
d) | |
local database=${OPTARG%/} | |
;; | |
e) | |
local export_dir=${OPTARG%/} | |
;; | |
s) | |
local schema=${OPTARG%/} | |
;; | |
U) | |
local user=${OPTARG%/} | |
;; | |
esac | |
done | |
readonly DATABASE=${database} | |
readonly DB_USER=${user:-$DEFAULT_USER} | |
readonly EXPORT_DIR=${export_dir:-$DEFAULT_EXPORT_DIR} | |
readonly SCHEMA=${schema:-$DEFAULT_SCHEMA} | |
if [[ ${DUMP} || ${dump_schema} ]]; then | |
readonly DUMP_SCHEMA=true | |
fi | |
} | |
# Writes out two files. The first includes only the table structure of the schema. The second is | |
# only the indices associated with those tables. | |
# | |
# $SCHEMA.schema.sql | |
# $SCHEMA.indicies.sql | |
# | |
function dump_schema() { | |
pg_dump -U ${DB_USER} -d ${DATABASE} -n ${SCHEMA} -s --section=pre-data > ${EXPORT_DIR}/${SCHEMA}.schema.sql | |
pg_dump -U ${DB_USER} -d ${DATABASE} -n ${SCHEMA} -s --section=post-data > ${EXPORT_DIR}/${SCHEMA}.indices.sql | |
} | |
# private | |
function list_tables() { | |
echo $(psql -qAt -U ${DB_USER} -c "\d" ${DATABASE} | grep table | cut -d'|' -f2) | |
} | |
# private | |
function list_sequences() { | |
echo $(psql -qAt -U ${DB_USER} -c "\ds ${SCHEMA}.*" ${DATABASE} | cut -d'|' -f2) | |
} | |
# private | |
function table_info_filename() { | |
echo "${EXPORT_DIR}/table_info" | |
} | |
# private | |
function table_sequence_filename() { | |
echo "${EXPORT_DIR}/table_sequences" | |
} | |
# For each table in the database, write out data into a file for us to look up later | |
# when actually exporting the data. This data is used to specify batches of records to | |
# export. | |
function dump_table_information() { | |
local tables=$(list_tables) | |
local table_info_file=$(table_info_filename) | |
echo -n '' > ${table_info_file} | |
for table in ${tables}; do | |
local max_id=$(psql -qAt -U ${DB_USER} -c "select max(id) from ${SCHEMA}.${table}" ${DATABASE}) | |
local batch_count=$(expr $max_id / ${BATCH_SIZE}) | |
for i in $(seq 0 ${batch_count}); do | |
local min=$(((${i} * ${BATCH_SIZE}) + 1)) | |
local max=$(((${i} + 1) * ${BATCH_SIZE})) | |
echo -n "${table} ${min} ${max} " >> ${table_info_file} | |
done | |
done | |
} | |
# Deprecated, but can be used to look up the sequence attached to an :id field | |
# on a table. | |
function sequence_for_table() { | |
local table=$1 | |
echo $(psql -qAt -U ${DB_USER} -c "SELECT column_default from information_schema.columns where table_name='${table}' and column_name = 'id' and table_schema = '${SCHEMA}'" ${DATABASE} | cut -d"'" -f2) | |
} | |
# For each sequence in the target schema, write out its next value. | |
# Call nextval() on the sequence to see what value we should set when loading it back | |
# into the database. | |
function dump_sequence_information() { | |
local sequences=$(list_sequences) | |
local table_sequence_file=$(table_sequence_filename) | |
echo -n '' > ${table_sequence_file} | |
for sequence in ${sequences}; do | |
local next_sequence=$(psql -qAt -U ${DB_USER} -c "select nextval('${SCHEMA}.${sequence}')" ${DATABASE}) | |
local new_count=$((next_sequence + 100)) | |
echo "${sequence} ${next_sequence}" >> ${table_sequence_file} | |
done | |
} | |
# For each batch of each table, write out a CSV file of records. | |
# | |
# Note that this currently only dumps tables that have :id columns. | |
# | |
function dump_tables() { | |
local table_info_file=$(table_info_filename) | |
cat ${table_info_file} | xargs -d ' ' -n3 -P${PARALLELISM} bash -c "psql -U ${DB_USER} --set=statement_timeout=0 -c \"COPY (select * from ${SCHEMA}.\$0 where id >= \$1 and id <= \$2) TO '${EXPORT_DIR}/\$0.\$1.\$2.csv'\" ${DATABASE}" | |
} | |
# For every data file exported from a previous --dump, load the data into | |
# the new schema. | |
# | |
function load_tables() { | |
find ${EXPORT_DIR} -type f -name '*.csv' -size +0c -print0 | xargs -0 -n1 -P${PARALLELISM} bash -c "psql -U ${DB_USER} --set=statement_timeout=0 -c \"COPY ${SCHEMA}.\$(basename \$0 | cut -d'.' -f1) FROM '\$0' \" ${DATABASE}" | |
} | |
# Look up the sequences exported in a previous --dump. Set the next sequence value to | |
# what we saw before +1. | |
function reset_sequences() { | |
local table_sequence_file=$(table_sequence_filename) | |
while read sequence count; do | |
if [[ ! -z "${count}" ]]; then | |
local new_count=$((count + 1)) | |
psql -U ${DB_USER} --set=statement_timeout=0 -c "ALTER SEQUENCE ${SCHEMA}.${sequence} restart ${new_count}" ${DATABASE} | |
fi | |
done < $table_sequence_file | |
} | |
function main() { | |
process_args ${ARGS} | |
if [ ${DUMP_SCHEMA} ]; then | |
dump_schema | |
fi | |
if [ ${DUMP} ]; then | |
dump_table_information | |
dump_sequence_information | |
dump_tables | |
fi | |
if [ ${LOAD} ]; then | |
load_tables | |
reset_sequences | |
fi | |
} | |
main | |
# find . -type f -print0 | xargs -0 -n1 -P50 bash -c "psql -U <postgres_user> <mydatabase> < \$0" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment