measure_slotsync_memory.sh
text/x-sh
Filename: measure_slotsync_memory.sh
Type: text/x-sh
Part: 1
#!/usr/bin/env bash
set -euo pipefail
pg_bin=""
workdir=""
port_base=55432
slots=100
samples=20
interval=30
dump_timeout=45
keep=0
primary_port=""
standby_port=""
primary_dir=""
standby_dir=""
primary_log=""
standby_log=""
sync_psql_pid=""
usage()
{
cat <<EOF
Usage:
$0 --pg-bin DIR [--workdir DIR] [--slots N] [--samples N]
[--interval SEC] [--dump-timeout SEC] [--port-base PORT] [--keep]
Creates a primary and standby, creates failover logical slots on the primary,
forces pg_sync_replication_slots() on the standby to wait/retry, then samples
the waiting backend with pg_log_backend_memory_contexts().
Output is CSV: timestamp,backend_pid,total_bytes,delta_bytes
EOF
}
die()
{
echo "error: $*" >&2
exit 1
}
run()
{
echo "+ $*" >&2
"$@"
}
psql_primary()
{
"$pg_bin/psql" -X -v ON_ERROR_STOP=1 \
"host=$workdir port=$primary_port dbname=postgres" "$@"
}
psql_standby()
{
"$pg_bin/psql" -X -v ON_ERROR_STOP=1 \
"host=$workdir port=$standby_port dbname=postgres" "$@"
}
log_size()
{
wc -c < "$standby_log" | tr -d '[:space:]'
}
extract_grand_total_since()
{
local offset=$1
tail -c +"$((offset + 1))" "$standby_log" 2>/dev/null |
awk '
/Grand total:/ {
for (i = 1; i <= NF; i++)
{
if ($i == "total:")
{
gsub(/[^0-9]/, "", $(i + 1));
total = $(i + 1);
}
}
}
END {
if (total != "")
print total;
}
'
}
wait_sql()
{
local node=$1
local sql=$2
local timeout=${3:-60}
for _ in $(seq 1 "$timeout"); do
if [ "$node" = primary ]; then
if psql_primary -Atqc "$sql" 2>/dev/null | grep -qx t; then
return 0
fi
else
if psql_standby -Atqc "$sql" 2>/dev/null | grep -qx t; then
return 0
fi
fi
sleep 1
done
return 1
}
wait_for_log()
{
local pattern=$1
local offset=$2
local timeout=${3:-60}
for _ in $(seq 1 "$timeout"); do
if tail -c +"$((offset + 1))" "$standby_log" 2>/dev/null |
grep "$pattern" >/dev/null; then
return 0
fi
if [ -n "$sync_psql_pid" ] && ! kill -0 "$sync_psql_pid" 2>/dev/null; then
break
fi
sleep 1
done
return 1
}
wait_for_file()
{
local file=$1
local timeout=${2:-60}
for _ in $(seq 1 "$timeout"); do
[ -s "$file" ] && return 0
sleep 1
done
return 1
}
stop_cluster()
{
local datadir=$1
if [ -n "$datadir" ] && [ -d "$datadir" ]; then
"$pg_bin/pg_ctl" -D "$datadir" -m immediate stop >/dev/null 2>&1 || true
fi
}
prepare_workdir()
{
mkdir -p "$workdir"
# Allow reusing a --keep workdir by removing only files created by this
# harness. Leave unrelated files in the directory alone.
stop_cluster "$standby_dir"
stop_cluster "$primary_dir"
rm -rf "$primary_dir" "$standby_dir"
rm -f "$primary_log" "$standby_log"
rm -f "$workdir/create_slots.sql" "$workdir/run_slotsync.sql"
rm -f "$workdir/slotsync.out" "$workdir/slotsync.err"
rm -f "$workdir/slotsync_backend.pid"
}
cleanup()
{
if [ -n "$sync_psql_pid" ] && kill -0 "$sync_psql_pid" 2>/dev/null; then
kill "$sync_psql_pid" 2>/dev/null || true
wait "$sync_psql_pid" 2>/dev/null || true
fi
stop_cluster "$standby_dir"
stop_cluster "$primary_dir"
if [ "$keep" -eq 0 ] && [ -n "$workdir" ] && [ -d "$workdir" ]; then
rm -rf "$workdir"
else
echo "kept workdir: $workdir" >&2
fi
}
while [ $# -gt 0 ]; do
case "$1" in
--pg-bin)
pg_bin=${2:?}
shift 2
;;
--workdir)
workdir=${2:?}
shift 2
;;
--slots)
slots=${2:?}
shift 2
;;
--samples)
samples=${2:?}
shift 2
;;
--interval)
interval=${2:?}
shift 2
;;
--dump-timeout)
dump_timeout=${2:?}
shift 2
;;
--port-base)
port_base=${2:?}
shift 2
;;
--keep)
keep=1
shift
;;
-h|--help)
usage
exit 0
;;
*)
die "unknown option: $1"
;;
esac
done
[ -n "$pg_bin" ] || die "--pg-bin is required"
[ -x "$pg_bin/postgres" ] || die "postgres not found in --pg-bin: $pg_bin"
[ "$slots" -gt 0 ] || die "--slots must be positive"
if [ -z "$workdir" ]; then
workdir=$(mktemp -d "${TMPDIR:-/tmp}/slotsync-memory.XXXXXX")
fi
primary_port=$port_base
standby_port=$((port_base + 1))
primary_dir="$workdir/primary"
standby_dir="$workdir/standby"
primary_log="$workdir/primary.log"
standby_log="$workdir/standby.log"
trap cleanup EXIT
prepare_workdir
echo "workdir=$workdir" >&2
echo "primary_port=$primary_port standby_port=$standby_port slots=$slots" >&2
run "$pg_bin/initdb" -D "$primary_dir" -A trust >/dev/null
cat >> "$primary_dir/postgresql.conf" <<EOF
listen_addresses = ''
unix_socket_directories = '$workdir'
port = $primary_port
wal_level = logical
max_wal_senders = 20
max_replication_slots = $((slots + 10))
max_connections = 100
log_min_messages = debug1
EOF
cat >> "$primary_dir/pg_hba.conf" <<EOF
local all all trust
local replication all trust
EOF
run "$pg_bin/pg_ctl" -D "$primary_dir" -l "$primary_log" -w start >/dev/null
wait_sql primary "SELECT true" 30 || die "primary did not start"
psql_primary -qAtc "SELECT pg_create_physical_replication_slot('standby_slot');" >/dev/null
run "$pg_bin/pg_basebackup" -D "$standby_dir" -h "$workdir" -p "$primary_port" \
-d "dbname=postgres" -X stream >/dev/null
cat >> "$standby_dir/postgresql.conf" <<EOF
listen_addresses = ''
unix_socket_directories = '$workdir'
port = $standby_port
hot_standby = on
hot_standby_feedback = on
primary_conninfo = 'host=$workdir port=$primary_port dbname=postgres application_name=standby'
primary_slot_name = 'standby_slot'
max_replication_slots = $((slots + 10))
max_connections = 100
log_min_messages = debug1
EOF
touch "$standby_dir/standby.signal"
run "$pg_bin/pg_ctl" -D "$standby_dir" -l "$standby_log" -w start >/dev/null
wait_sql standby "SELECT pg_is_in_recovery()" 30 || die "standby did not enter recovery"
slot_sql="$workdir/create_slots.sql"
for i in $(seq 1 "$slots"); do
printf "SELECT pg_create_logical_replication_slot('memslot_%s', 'pgoutput', false, false, true);\n" "$i"
done > "$slot_sql"
psql_primary -q -f "$slot_sql" >/dev/null
psql_primary -q <<'SQL' >/dev/null
CREATE TABLE slotsync_memory_wait(a int);
INSERT INTO slotsync_memory_wait SELECT generate_series(1, 10000);
CHECKPOINT;
SELECT pg_switch_wal();
SQL
target_lsn=$(psql_primary -Atqc "SELECT pg_current_wal_flush_lsn();")
wait_sql standby "SELECT pg_last_wal_replay_lsn() >= '$target_lsn'::pg_lsn" 60 ||
die "standby did not catch up to $target_lsn"
# Advance the standby restart point so the local slot reservation is ahead of
# the remote slot positions, forcing pg_sync_replication_slots() to retry.
psql_standby -qAtc "CHECKPOINT;" >/dev/null || true
pid_file="$workdir/slotsync_backend.pid"
sync_sql="$workdir/run_slotsync.sql"
cat > "$sync_sql" <<EOF
\\pset tuples_only on
\\pset format unaligned
\\o $pid_file
SELECT pg_backend_pid();
\\o
SELECT pg_sync_replication_slots();
EOF
log_offset=$(log_size)
"$pg_bin/psql" -X -v ON_ERROR_STOP=1 \
"host=$workdir port=$standby_port dbname=postgres" \
-f "$sync_sql" > "$workdir/slotsync.out" 2> "$workdir/slotsync.err" &
sync_psql_pid=$!
wait_for_file "$pid_file" 30 || die "timed out waiting for slotsync backend pid"
backend_pid=$(tr -d '[:space:]' < "$pid_file")
wait_for_log "could not synchronize replication slot" "$log_offset" 60 ||
die "pg_sync_replication_slots() did not enter the expected retry path"
echo "timestamp,backend_pid,total_bytes,delta_bytes"
previous_total=""
offset=$(log_size)
for _ in $(seq 1 "$samples"); do
timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
if ! kill -0 "$sync_psql_pid" 2>/dev/null; then
echo "$timestamp,$backend_pid,,"
break
fi
ok=$(psql_standby -Atqc "SELECT pg_log_backend_memory_contexts($backend_pid);" 2>/dev/null || true)
if [ "$ok" != "t" ]; then
echo "warning: pg_log_backend_memory_contexts($backend_pid) returned '$ok'" >&2
echo "$timestamp,$backend_pid,,"
break
fi
total=""
for _ in $(seq 1 "$dump_timeout"); do
total=$(extract_grand_total_since "$offset")
[ -n "$total" ] && break
sleep 1
done
if [ -z "$total" ]; then
echo "warning: no memory dump observed within ${dump_timeout}s for pid $backend_pid" >&2
fi
offset=$(log_size)
if [ -n "$total" ] && [ -n "$previous_total" ]; then
delta=$((total - previous_total))
else
delta=""
fi
echo "$timestamp,$backend_pid,$total,$delta"
if [ -n "$total" ]; then
previous_total=$total
fi
sleep "$interval"
done