multixids.py
text/x-python
import sys;
import threading;
import psycopg2;
def test_multixact(tblname: str):
with psycopg2.connect() as conn:
cur = conn.cursor()
cur.execute(
f"""
DROP TABLE IF EXISTS {tblname};
CREATE TABLE {tblname}(i int primary key, n_updated int) WITH (autovacuum_enabled=false);
INSERT INTO {tblname} select g, 0 from generate_series(1, 50) g;
"""
)
# Lock entries using parallel connections in a round-robin fashion.
nclients = 50
update_every = 97
connections = []
for _ in range(nclients):
# Do not turn on autocommit. We want to hold the key-share locks.
conn = psycopg2.connect()
connections.append(conn)
# On each iteration, we commit the previous transaction on a connection,
# and issue another select. Each SELECT generates a new multixact that
# includes the new XID, and the XIDs of all the other parallel transactions.
# This generates enough traffic on both multixact offsets and members SLRUs
# to cross page boundaries.
for i in range(20000):
conn = connections[i % nclients]
conn.commit()
# Perform some non-key UPDATEs too, to exercise different multixact
# member statuses.
if i % update_every == 0:
conn.cursor().execute(f"update {tblname} set n_updated = n_updated + 1 where i = {i % 50}")
else:
conn.cursor().execute(f"select * from {tblname} for key share")
#nthreads=10
#
#threads = []
#for threadno in range(nthreads):
# tblname = f"tbl{threadno}"
# t = threading.Thread(target=test_multixact, args=(tblname,))
# t.start()
# threads.append(t)
#
#for threadno in range(nthreads):
# threads[threadno].join()
test_multixact(sys.argv[1])