#!/bin/bash

####################
### Declarations ###
####################

## Publisher-related params
PORT_PUB=6633
DATA_PUB=data_pub
LOG_PUB=pub.log

## Subscriber-related params
PORT_SUB=6634
DATA_SUB=data_sub
LOG_SUB=sub.log

## prefix
PREFIX=patched

## Number of runs
NUMRUN=5

## Measurement paramus
WORKERS_ARRAY=(0 1 2 4 8 16)
NUMTXN=1000
NUMLINEPERTXN=1000
TOTAL=$((NUMTXN*NUMLINEPERTXN))

# Setup logical replication with above parameters.
function setup () {
    WORKERS=$1

    ################
    ### clean up ###
    ################

    pg_ctl stop  -D $DATA_PUB -w
    pg_ctl stop  -D $DATA_SUB -w
    rm -rf $DATA_PUB $DATA_SUB $LOG_PUB $LOG_SUB

    #######################
    ### setup publisher ###
    #######################
    initdb -D data_pub -U postgres
    cat << EOF >> data_pub/postgresql.conf
port=$PORT_PUB
autovacuum = false
shared_buffers = '100GB'
max_wal_size = 20GB
min_wal_size = 10GB
wal_level = logical
logical_decoding_work_mem = 10GB
EOF
    pg_ctl -D $DATA_PUB start -w -l $LOG_PUB

    psql -U postgres -p $PORT_PUB -c "CREATE TABLE foo (id int PRIMARY KEY, value double precision)"
    psql -U postgres -p $PORT_PUB -c "CREATE PUBLICATION pub FOR ALL TABLES;"
    

    ########################
    ### setup subscriber ###
    ########################
    initdb -D $DATA_SUB -U postgres
    cat << EOF >> $DATA_SUB/postgresql.conf
port=$PORT_SUB
autovacuum = false
shared_buffers = '10GB'
max_wal_size = 20GB
min_wal_size = 10GB
max_worker_processes = 100
max_logical_replication_workers = 50
max_parallel_apply_workers_per_subscription = $WORKERS
logical_decoding_work_mem=10GB
EOF
    pg_ctl -D $DATA_SUB start -w -l $LOG_SUB

    psql -U postgres -p $PORT_SUB -c "CREATE TABLE foo (id int PRIMARY KEY, value double precision)"
    psql -U postgres -p $PORT_SUB -c "CREATE SUBSCRIPTION sub CONNECTION 'port=$PORT_PUB user=postgres' PUBLICATION pub with (copy_data=false);"
}

# Insert initial tuples on publisher side
function insert_tuples () {
    psql -U postgres -p $PORT_PUB -c "BEGIN; INSERT INTO foo VALUES (generate_series(1, $TOTAL), random()); COMMIT;"
}

# Ensure data is replicated by logical replication
function ensure_replication_is_done () {
    # Wait until all the table sync is done
    SYNC_DONE="f"

    while [ "$SYNC_DONE" = "f" ]
    do
	sleep 0.1s
	SYNC_DONE=`psql -qtA -U postgres -p $PORT_PUB -c "SELECT pg_current_wal_lsn() <= replay_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = 'sub'"`
#	echo $SYNC_DONE
    done
}

# Update tuples on subscriber side
function update_tuples () {

    for i in `seq 1 ${NUMTXN}`
    do
	psql -U postgres -p $PORT_PUB -c "UPDATE foo SET value = random() WHERE $i * $NUMLINEPERTXN <= id AND id <= ($i + 1) * $NUMLINEPERTXN - 1;"
    done
}

###########################
### measure performance ###
###########################

# Outer loop controls number of parallel workers
for WORKER in "${WORKERS_ARRAY[@]}"
do
    OUTPUT_SUB=${PREFIX}_${WORKER}_${NUMTXN}_${NUMLINEPERTXN}.dat
    rm $OUTPUT_SUB

    # Inner loop repeats with the same configuration
    for i in `seq 1 ${NUMRUN}`
    do
	echo "${i}th try"
	
	# Setup logical replication system and disable a subscription once
	setup $WORKER

	# Insert initial tuples on publisher
	insert_tuples

	# Ensure the insertion is replicated on subscriber
	ensure_replication_is_done

	psql -U postgres -p $PORT_SUB -c "ALTER SUBSCRIPTION sub DISABLE;"
	sleep 1s

	update_tuples

	# Restart instances just in case. A server log for the subscriber is switched here
	pg_ctl -D $DATA_PUB restart -w -l $LOG_PUB
	pg_ctl -D $DATA_SUB stop
	rm $LOG_SUB
	pg_ctl -D $DATA_SUB start -w -l $LOG_SUB

	# Enable the subscription again and measure the time till replication is done
	START_TIME=$(date +'%s.%6N')
	psql -U postgres -p $PORT_SUB -c "ALTER SUBSCRIPTION sub ENABLE;"
	ensure_replication_is_done
	END_TIME=$(date +'%s.%6N')

	# Record results to file
	ELAPSED=$(echo "scale=6; $END_TIME - $START_TIME" | bc)
	echo $ELAPSED >> $OUTPUT_SUB
	cp $LOG_SUB sub_${WORKER}.log

    done
done
