consume-mxids.patch.txt

text/plain

Filename: consume-mxids.patch.txt
Type: text/plain
Part: 0
Message: Re: POC: make mxidoff 64 bits
diff --git a/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql b/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql
index 51d25fc4c63..c24164c480c 100644
--- a/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql
+++ b/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql
@@ -10,3 +10,7 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 CREATE FUNCTION consume_xids_until(targetxid xid8)
 RETURNS xid8 IMMUTABLE PARALLEL SAFE STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION consume_mxids(nmxids bigint)
+RETURNS xid8 IMMUTABLE PARALLEL SAFE STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/xid_wraparound/xid_wraparound.c b/src/test/modules/xid_wraparound/xid_wraparound.c
index dce81c0c6d6..935a770a683 100644
--- a/src/test/modules/xid_wraparound/xid_wraparound.c
+++ b/src/test/modules/xid_wraparound/xid_wraparound.c
@@ -14,6 +14,7 @@
  */
 #include "postgres.h"
 
+#include "access/multixact.h"
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "storage/proc.h"
@@ -24,6 +25,8 @@ PG_MODULE_MAGIC;
 static int64 consume_xids_shortcut(void);
 static FullTransactionId consume_xids_common(FullTransactionId untilxid, uint64 nxids);
 
+static MultiXactId  consume_multixids_common(uint64 nmxids);
+
 /*
  * Consume the specified number of XIDs.
  */
@@ -151,6 +154,7 @@ consume_xids_common(FullTransactionId untilxid, uint64 nxids)
 	}
 
 	return lastxid;
+#undef REPORT_INTERVAL
 }
 
 /*
@@ -217,3 +221,89 @@ consume_xids_shortcut(void)
 
 	return consumed;
 }
+
+/*
+ * Consume the specified number of multitransaction IDs.
+ */
+PG_FUNCTION_INFO_V1(consume_mxids);
+Datum
+consume_mxids(PG_FUNCTION_ARGS)
+{
+	int64		nmxids = PG_GETARG_INT64(0);
+	MultiXactId lastmxid;
+
+	if (nmxids < 0)
+		elog(ERROR, "invalid nmxids argument: %lld", (long long) nmxids);
+
+	if (nmxids == 0)
+		lastmxid = ReadNextMultiXactId();
+	else
+		lastmxid = consume_multixids_common((uint64) nmxids);
+
+	PG_RETURN_TRANSACTIONID(lastmxid);
+}
+
+
+/*
+ * Common functionality between the two public functions. XXX
+ */
+static MultiXactId
+consume_multixids_common(uint64 nmxids)
+{
+	MultiXactId lastmxid;
+	uint64		last_reported_at = 0;
+	uint64		consumed = 0;
+	MultiXactMember member;
+	TransactionId xids[256];
+
+	/* Print a NOTICE every REPORT_INTERVAL xids */
+#define REPORT_INTERVAL (10 * 1000000 /  10)
+
+	/* initialize 'lastmxid' with the system's current next XID */
+	lastmxid = ReadNextMultiXactId();
+
+	xids[0] = GetTopTransactionId();
+	for (int i = 1; i < Min(256, nmxids); i++)
+	{
+		xids[i] = XidFromFullTransactionId(GetNewTransactionId(true));
+	}
+
+	for (;;)
+	{
+		//uint64		mxids_left;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* How many XIDs do we have left to consume? */
+		if (nmxids > 0)
+		{
+			if (consumed >= nmxids)
+				break;
+			//mxids_left = nmxids - consumed;
+		}
+
+		/* (no fast path) */
+
+		/* Slow path: Call GetNewTransactionId to allocate a new XID. */
+
+		member = (MultiXactMember) {
+			.xid = xids[consumed % 256],
+			.status = ((consumed / 256) % 2) ? MultiXactStatusForUpdate : MultiXactStatusForKeyShare,
+		};
+
+		lastmxid = MultiXactIdCreateFromMembers(1, &member);
+		consumed++;
+
+		/* Report progress */
+		if (consumed - last_reported_at >= REPORT_INTERVAL)
+		{
+			elog(NOTICE, "consumed %llu / %llu XIDs, latest %u",
+				 (unsigned long long) consumed, (unsigned long long) nmxids,
+				 lastmxid);
+			last_reported_at = consumed;
+		}
+	}
+
+	return lastmxid;
+#undef REPORT_INTERVAL
+}