0001-Made-tuple-ids-and-info-about-HOT-updates-available-.patch

application/x-patch

Filename: 0001-Made-tuple-ids-and-info-about-HOT-updates-available-.patch
Type: application/x-patch
Part: 0
Message: making tid and HOTness of UPDATE available to logical decoding plugins
From e2a28c6d4cd0d735fd0ff1c287b0e289c2d29022 Mon Sep 17 00:00:00 2001
From: Hannu Krosing <hannuk@google.com>
Date: Thu, 4 Dec 2025 21:21:04 +0100
Subject: [PATCH] Made tuple ids and info about HOT updates available to
 logical decoding

Modified test_decoding to show both
 old tid has format -(pageno, slot)
 new tid has format +(pageno, slot)
 if it is a HOT update, ith is decoded prefixed with 'HOT '

Sample usage:

hannu=# SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');
 pg_create_logical_replication_slot
------------------------------------
 (test_slot,0/1BF1B38)
(1 row)
hannu=# CREATE TABLE nokey(data text);
CREATE TABLE
hannu=# insert into nokey (data) values('a');
INSERT 0 1
hannu=# update nokey set data = 'b';
UPDATE 1
hannu=# delete from nokey ;
DELETE 1
hannu=# SELECT lsn, xid, data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL);
    lsn    | xid |                            data
-----------+-----+------------------------------------------------------------
 0/1C20538 | 767 | BEGIN 767
 0/1C2B1E8 | 767 | COMMIT 767
 0/1C2B220 | 768 | BEGIN 768
 0/1C2B220 | 768 | table public.nokey: INSERT:+(0,1) data[text]:'a'
 0/1C2B290 | 768 | COMMIT 768
 0/1C2B300 | 769 | BEGIN 769
 0/1C2B300 | 769 | table public.nokey: HOT UPDATE:-(0,1)+(0,2) data[text]:'b'
 0/1C2B378 | 769 | COMMIT 769
 0/1C2B3B0 | 770 | BEGIN 770
 0/1C2B3B0 | 770 | table public.nokey: DELETE:-(0,2) (no-tuple-data)
 0/1C2B418 | 770 | COMMIT 770
(11 rows)
---
 contrib/test_decoding/test_decoding.c    | 33 +++++++++++++++++++-
 src/backend/replication/logical/decode.c | 38 +++++++++++++++++++-----
 src/include/replication/reorderbuffer.h  |  7 +++++
 3 files changed, 70 insertions(+), 8 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index bb495563200..7a1805d5a97 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -596,6 +596,23 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
 	}
 }
 
+
+static inline char* _format_tid(char *tidbuf, char prefix, ItemPointer itemPtr)
+{
+	BlockNumber blockNumber;
+	OffsetNumber offsetNumber;
+
+	blockNumber = ItemPointerGetBlockNumberNoCheck(itemPtr);
+	offsetNumber = ItemPointerGetOffsetNumberNoCheck(itemPtr);
+
+	tidbuf[0] = prefix;
+	/* Perhaps someday we should output this as a record. */
+	snprintf(tidbuf+1, 32-1, "(%u,%u)", blockNumber, offsetNumber);
+
+	return tidbuf;
+}
+
+
 /*
  * callback for individual changed tuples
  */
@@ -608,6 +625,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Form_pg_class class_form;
 	TupleDesc	tupdesc;
 	MemoryContext old;
+	char		tidbuf[32];
 
 	data = ctx->output_plugin_private;
 	txndata = txn->output_plugin_private;
@@ -639,6 +657,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			appendStringInfoString(ctx->out, " INSERT:");
+			if (change->data.tp.newctid.ip_posid)
+				appendStringInfoString(ctx->out,
+					_format_tid(tidbuf, '+', &(change->data.tp.newctid)));
 			if (change->data.tp.newtuple == NULL)
 				appendStringInfoString(ctx->out, " (no-tuple-data)");
 			else
@@ -647,7 +668,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 									false);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
+			if (change->data.tp.is_hot_update)
+				appendStringInfoString(ctx->out, " HOT");
 			appendStringInfoString(ctx->out, " UPDATE:");
+			if (change->data.tp.oldctid.ip_posid)
+				appendStringInfoString(ctx->out,
+					_format_tid(tidbuf, '-', &(change->data.tp.oldctid)));
+			if (change->data.tp.newctid.ip_posid)
+				appendStringInfoString(ctx->out,
+					_format_tid(tidbuf, '+', &(change->data.tp.newctid)));
 			if (change->data.tp.oldtuple != NULL)
 			{
 				appendStringInfoString(ctx->out, " old-key:");
@@ -666,7 +695,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			appendStringInfoString(ctx->out, " DELETE:");
-
+			if (change->data.tp.oldctid.ip_posid)
+				appendStringInfoString(ctx->out,
+					_format_tid(tidbuf, '-', &(change->data.tp.oldctid)));
 			/* if there was no PK, we only know that a delete happened */
 			if (change->data.tp.oldtuple == NULL)
 				appendStringInfoString(ctx->out, " (no-tuple-data)");
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index cc03f0706e9..a04ae2a717a 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,7 +42,7 @@
 
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool is_hot);
 static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -502,7 +502,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_HEAP_UPDATE:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
 				!ctx->fast_forward)
-				DecodeUpdate(ctx, buf);
+				DecodeUpdate(ctx, buf, info == XLOG_HEAP_HOT_UPDATE);
 			break;
 
 		case XLOG_HEAP_DELETE:
@@ -909,6 +909,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	xl_heap_insert *xlrec;
 	ReorderBufferChange *change;
 	RelFileLocator target_locator;
+	BlockNumber blkno;
 
 	xlrec = (xl_heap_insert *) XLogRecGetData(r);
 
@@ -920,7 +921,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -945,6 +946,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
 
+	ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->offnum);
 	change->data.tp.clear_toast_afterwards = true;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -959,18 +961,20 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * Updates can possibly contain a new tuple and the old primary key.
  */
 static void
-DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool is_hot)
 {
 	XLogReaderState *r = buf->record;
 	xl_heap_update *xlrec;
 	ReorderBufferChange *change;
 	char	   *data;
 	RelFileLocator target_locator;
+	BlockNumber blkno;
+	BlockNumber old_blkno = InvalidBlockNumber;
 
 	xlrec = (xl_heap_update *) XLogRecGetData(r);
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -983,6 +987,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	change->origin_id = XLogRecGetOrigin(r);
 	memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
 
+	ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->new_offnum);
+
+	/* If block 1 is present, it contains old tuple */
+	if (XLogRecHasBlockRef(r, 1))
+	{
+		RelFileLocator old_locator;
+		XLogRecGetBlockTag(r, 1, &old_locator, NULL, &old_blkno);
+	}
+
+	if (BlockNumberIsValid(old_blkno))
+		ItemPointerSet(&change->data.tp.oldctid, old_blkno, xlrec->old_offnum);
+	else
+		ItemPointerSet(&change->data.tp.oldctid, blkno, xlrec->old_offnum);
+
 	if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
 	{
 		Size		datalen;
@@ -1015,6 +1033,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 
 	change->data.tp.clear_toast_afterwards = true;
+	change->data.tp.is_hot_update = is_hot;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
 							 change, false);
@@ -1032,11 +1051,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	xl_heap_delete *xlrec;
 	ReorderBufferChange *change;
 	RelFileLocator target_locator;
+	BlockNumber blkno;
 
 	xlrec = (xl_heap_delete *) XLogRecGetData(r);
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1070,6 +1090,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 						datalen, change->data.tp.oldtuple);
 	}
 
+	ItemPointerSet(&change->data.tp.oldctid, blkno, xlrec->offnum);
 	change->data.tp.clear_toast_afterwards = true;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1127,6 +1148,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	char	   *tupledata;
 	Size		tuplelen;
 	RelFileLocator rlocator;
+	BlockNumber blkno;
 
 	xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
 
@@ -1138,7 +1160,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blkno);
 	if (rlocator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1168,6 +1190,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 		memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
 
+		ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->offsets[i]);
+
 		xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
 		data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
 		datalen = xlhdr->datalen;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fa0745552f8..8c305ec6e41 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -104,6 +104,13 @@ typedef struct ReorderBufferChange
 			HeapTuple	oldtuple;
 			/* valid for INSERT || UPDATE */
 			HeapTuple	newtuple;
+
+			/* ctid for old tuple; valid for DELETE || UPDATE */
+			ItemPointerData oldctid;
+			/* ctid for new tuple; valid for INSERT || UPDATE */
+			ItemPointerData newctid;
+			/* update type - only valid for UPDATE */
+			bool		is_hot_update;
 		}			tp;
 
 		/*
-- 
2.43.0