#!/bin/bash

####################
### Declarations ###
####################

## Publisher-related params
PORT_PUB=6633
DATA_PUB=data_pub
LOG_PUB=pub.log

## Subscriber-related params
PORT_SUB=6634
DATA_SUB=data_sub
LOG_SUB=sub.log

## prefix
PREFIX=insert

## Number of runs
NUMRUN=1

## Measurement paramus
WORKERS_ARRAY=(16)
NUMTXN=1000
NUMLINEPERTXN=1000
TOTAL=$((NUMTXN*NUMLINEPERTXN))

# Setup logical replication with above parameters.
function setup () {
    WORKERS=$1

    ################
    ### clean up ###
    ################

    pg_ctl stop  -D $DATA_PUB -w
    pg_ctl stop  -D $DATA_SUB -w
    rm -rf $DATA_PUB $DATA_SUB $LOG_PUB $LOG_SUB

    #######################
    ### setup publisher ###
    #######################
    initdb -D data_pub -U postgres
    cat << EOF >> data_pub/postgresql.conf
port=$PORT_PUB
autovacuum = false
shared_buffers = '100GB'
max_wal_size = 20GB
min_wal_size = 10GB
wal_level = logical
logical_decoding_work_mem = 10GB
EOF
    pg_ctl -D $DATA_PUB start -w -l $LOG_PUB
    psql -U postgres -p $PORT_PUB -c "CREATE TABLE foo (id int PRIMARY KEY, value double precision)"
    psql -U postgres -p $PORT_PUB -c "CREATE PUBLICATION pub FOR ALL TABLES;"
    

    ########################
    ### setup subscriber ###
    ########################
    initdb -D $DATA_SUB -U postgres
    cat << EOF >> $DATA_SUB/postgresql.conf
port=$PORT_SUB
autovacuum = false
shared_buffers = '100GB'
max_wal_size = 20GB
min_wal_size = 10GB
max_worker_processes = 100
max_logical_replication_workers = 50
max_parallel_apply_workers_per_subscription = $WORKERS
EOF
    pg_ctl -D $DATA_SUB start -w -l $LOG_SUB

    psql -U postgres -p $PORT_SUB -c "CREATE TABLE foo (id int PRIMARY KEY, value double precision)"
    psql -U postgres -p $PORT_SUB -c "CREATE SUBSCRIPTION sub CONNECTION 'port=$PORT_PUB user=postgres' PUBLICATION pub with (copy_data=false);"
}

# Insert initial tuples
function insert_tuples () {
    psql -U postgres -p $PORT_PUB -c "BEGIN; INSERT INTO foo VALUES (generate_series(1, $TOTAL), random()); COMMIT;"
}

# Ensure data is replicated by logical replication
function ensure_replication_is_done () {
    # Wait until all the table sync is done
    SYNC_DONE="f"

    while [ "$SYNC_DONE" = "f" ]
    do
	sleep 0.01s
	SYNC_DONE=`psql -qtA -U postgres -p $PORT_PUB -c "SELECT pg_current_wal_lsn() <= replay_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = 'sub'"`
#	echo $SYNC_DONE
    done
}

###########################
### measure performance ###
###########################

# Outer loop controls number of parallel workers
for WORKER in "${WORKERS_ARRAY[@]}"
do
    OUTPUT_SUB=${PREFIX}_${WORKER}_${NUMTXN}_${NUMLINEPERTXN}.dat
    rm $OUTPUT_SUB

    # Inner loop repeats with the same configuration
    for i in `seq 1 ${NUMRUN}`
    do
	echo "${i}th try"
	
	# Setup logical replication system and disable a subscription once
	setup $WORKER

	psql -U postgres -p $PORT_SUB -c "ALTER SUBSCRIPTION sub DISABLE;"
	sleep 1s

	# Insert tuples on publisher
	insert_tuples

	pg_ctl -D $DATA_PUB restart -w -l $LOG_PUB

	# Enable the subscription again and measure the time till replication is done
	START_TIME=$(date +'%s.%6N')
	psql -U postgres -p $PORT_SUB -c "ALTER SUBSCRIPTION sub ENABLE;"
	sleep 1s
	ensure_replication_is_done
	END_TIME=$(date +'%s.%6N')
	ELAPSED=$(echo "scale=6; $END_TIME - $START_TIME" | bc)
	echo $ELAPSED >> $OUTPUT_SUB
    done
done
