consume-mxids.patch.txt
text/plain
diff --git a/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql b/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql
index 51d25fc4c63..c24164c480c 100644
--- a/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql
+++ b/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql
@@ -10,3 +10,7 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION consume_xids_until(targetxid xid8)
RETURNS xid8 IMMUTABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION consume_mxids(nmxids bigint)
+RETURNS xid8 IMMUTABLE PARALLEL SAFE STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/xid_wraparound/xid_wraparound.c b/src/test/modules/xid_wraparound/xid_wraparound.c
index dce81c0c6d6..935a770a683 100644
--- a/src/test/modules/xid_wraparound/xid_wraparound.c
+++ b/src/test/modules/xid_wraparound/xid_wraparound.c
@@ -14,6 +14,7 @@
*/
#include "postgres.h"
+#include "access/multixact.h"
#include "access/xact.h"
#include "miscadmin.h"
#include "storage/proc.h"
@@ -24,6 +25,8 @@ PG_MODULE_MAGIC;
static int64 consume_xids_shortcut(void);
static FullTransactionId consume_xids_common(FullTransactionId untilxid, uint64 nxids);
+static MultiXactId consume_multixids_common(uint64 nmxids);
+
/*
* Consume the specified number of XIDs.
*/
@@ -151,6 +154,7 @@ consume_xids_common(FullTransactionId untilxid, uint64 nxids)
}
return lastxid;
+#undef REPORT_INTERVAL
}
/*
@@ -217,3 +221,89 @@ consume_xids_shortcut(void)
return consumed;
}
+
+/*
+ * Consume the specified number of multitransaction IDs.
+ */
+PG_FUNCTION_INFO_V1(consume_mxids);
+Datum
+consume_mxids(PG_FUNCTION_ARGS)
+{
+ int64 nmxids = PG_GETARG_INT64(0);
+ MultiXactId lastmxid;
+
+ if (nmxids < 0)
+ elog(ERROR, "invalid nmxids argument: %lld", (long long) nmxids);
+
+ if (nmxids == 0)
+ lastmxid = ReadNextMultiXactId();
+ else
+ lastmxid = consume_multixids_common((uint64) nmxids);
+
+ PG_RETURN_TRANSACTIONID(lastmxid);
+}
+
+
+/*
+ * Common functionality between the two public functions. XXX
+ */
+static MultiXactId
+consume_multixids_common(uint64 nmxids)
+{
+ MultiXactId lastmxid;
+ uint64 last_reported_at = 0;
+ uint64 consumed = 0;
+ MultiXactMember member;
+ TransactionId xids[256];
+
+ /* Print a NOTICE every REPORT_INTERVAL xids */
+#define REPORT_INTERVAL (10 * 1000000 / 10)
+
+ /* initialize 'lastmxid' with the system's current next XID */
+ lastmxid = ReadNextMultiXactId();
+
+ xids[0] = GetTopTransactionId();
+ for (int i = 1; i < Min(256, nmxids); i++)
+ {
+ xids[i] = XidFromFullTransactionId(GetNewTransactionId(true));
+ }
+
+ for (;;)
+ {
+ //uint64 mxids_left;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* How many XIDs do we have left to consume? */
+ if (nmxids > 0)
+ {
+ if (consumed >= nmxids)
+ break;
+ //mxids_left = nmxids - consumed;
+ }
+
+ /* (no fast path) */
+
+ /* Slow path: Call GetNewTransactionId to allocate a new XID. */
+
+ member = (MultiXactMember) {
+ .xid = xids[consumed % 256],
+ .status = ((consumed / 256) % 2) ? MultiXactStatusForUpdate : MultiXactStatusForKeyShare,
+ };
+
+ lastmxid = MultiXactIdCreateFromMembers(1, &member);
+ consumed++;
+
+ /* Report progress */
+ if (consumed - last_reported_at >= REPORT_INTERVAL)
+ {
+ elog(NOTICE, "consumed %llu / %llu XIDs, latest %u",
+ (unsigned long long) consumed, (unsigned long long) nmxids,
+ lastmxid);
+ last_reported_at = consumed;
+ }
+ }
+
+ return lastmxid;
+#undef REPORT_INTERVAL
+}