0001-Made-tuple-ids-and-info-about-HOT-updates-available-.patch
application/x-patch
Filename: 0001-Made-tuple-ids-and-info-about-HOT-updates-available-.patch
Type: application/x-patch
Part: 0
From e2a28c6d4cd0d735fd0ff1c287b0e289c2d29022 Mon Sep 17 00:00:00 2001
From: Hannu Krosing <hannuk@google.com>
Date: Thu, 4 Dec 2025 21:21:04 +0100
Subject: [PATCH] Made tuple ids and info about HOT updates available to
logical decoding
Modified test_decoding to show both
old tid has format -(pageno, slot)
new tid has format +(pageno, slot)
if it is a HOT update, ith is decoded prefixed with 'HOT '
Sample usage:
hannu=# SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');
pg_create_logical_replication_slot
------------------------------------
(test_slot,0/1BF1B38)
(1 row)
hannu=# CREATE TABLE nokey(data text);
CREATE TABLE
hannu=# insert into nokey (data) values('a');
INSERT 0 1
hannu=# update nokey set data = 'b';
UPDATE 1
hannu=# delete from nokey ;
DELETE 1
hannu=# SELECT lsn, xid, data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL);
lsn | xid | data
-----------+-----+------------------------------------------------------------
0/1C20538 | 767 | BEGIN 767
0/1C2B1E8 | 767 | COMMIT 767
0/1C2B220 | 768 | BEGIN 768
0/1C2B220 | 768 | table public.nokey: INSERT:+(0,1) data[text]:'a'
0/1C2B290 | 768 | COMMIT 768
0/1C2B300 | 769 | BEGIN 769
0/1C2B300 | 769 | table public.nokey: HOT UPDATE:-(0,1)+(0,2) data[text]:'b'
0/1C2B378 | 769 | COMMIT 769
0/1C2B3B0 | 770 | BEGIN 770
0/1C2B3B0 | 770 | table public.nokey: DELETE:-(0,2) (no-tuple-data)
0/1C2B418 | 770 | COMMIT 770
(11 rows)
---
contrib/test_decoding/test_decoding.c | 33 +++++++++++++++++++-
src/backend/replication/logical/decode.c | 38 +++++++++++++++++++-----
src/include/replication/reorderbuffer.h | 7 +++++
3 files changed, 70 insertions(+), 8 deletions(-)
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index bb495563200..7a1805d5a97 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -596,6 +596,23 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
}
}
+
+static inline char* _format_tid(char *tidbuf, char prefix, ItemPointer itemPtr)
+{
+ BlockNumber blockNumber;
+ OffsetNumber offsetNumber;
+
+ blockNumber = ItemPointerGetBlockNumberNoCheck(itemPtr);
+ offsetNumber = ItemPointerGetOffsetNumberNoCheck(itemPtr);
+
+ tidbuf[0] = prefix;
+ /* Perhaps someday we should output this as a record. */
+ snprintf(tidbuf+1, 32-1, "(%u,%u)", blockNumber, offsetNumber);
+
+ return tidbuf;
+}
+
+
/*
* callback for individual changed tuples
*/
@@ -608,6 +625,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Form_pg_class class_form;
TupleDesc tupdesc;
MemoryContext old;
+ char tidbuf[32];
data = ctx->output_plugin_private;
txndata = txn->output_plugin_private;
@@ -639,6 +657,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
appendStringInfoString(ctx->out, " INSERT:");
+ if (change->data.tp.newctid.ip_posid)
+ appendStringInfoString(ctx->out,
+ _format_tid(tidbuf, '+', &(change->data.tp.newctid)));
if (change->data.tp.newtuple == NULL)
appendStringInfoString(ctx->out, " (no-tuple-data)");
else
@@ -647,7 +668,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
false);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
+ if (change->data.tp.is_hot_update)
+ appendStringInfoString(ctx->out, " HOT");
appendStringInfoString(ctx->out, " UPDATE:");
+ if (change->data.tp.oldctid.ip_posid)
+ appendStringInfoString(ctx->out,
+ _format_tid(tidbuf, '-', &(change->data.tp.oldctid)));
+ if (change->data.tp.newctid.ip_posid)
+ appendStringInfoString(ctx->out,
+ _format_tid(tidbuf, '+', &(change->data.tp.newctid)));
if (change->data.tp.oldtuple != NULL)
{
appendStringInfoString(ctx->out, " old-key:");
@@ -666,7 +695,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
break;
case REORDER_BUFFER_CHANGE_DELETE:
appendStringInfoString(ctx->out, " DELETE:");
-
+ if (change->data.tp.oldctid.ip_posid)
+ appendStringInfoString(ctx->out,
+ _format_tid(tidbuf, '-', &(change->data.tp.oldctid)));
/* if there was no PK, we only know that a delete happened */
if (change->data.tp.oldtuple == NULL)
appendStringInfoString(ctx->out, " (no-tuple-data)");
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index cc03f0706e9..a04ae2a717a 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,7 +42,7 @@
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool is_hot);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -502,7 +502,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_HEAP_UPDATE:
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
- DecodeUpdate(ctx, buf);
+ DecodeUpdate(ctx, buf, info == XLOG_HEAP_HOT_UPDATE);
break;
case XLOG_HEAP_DELETE:
@@ -909,6 +909,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_heap_insert *xlrec;
ReorderBufferChange *change;
RelFileLocator target_locator;
+ BlockNumber blkno;
xlrec = (xl_heap_insert *) XLogRecGetData(r);
@@ -920,7 +921,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
return;
/* only interested in our database */
- XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+ XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno);
if (target_locator.dbOid != ctx->slot->data.database)
return;
@@ -945,6 +946,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
+ ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->offnum);
change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -959,18 +961,20 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* Updates can possibly contain a new tuple and the old primary key.
*/
static void
-DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool is_hot)
{
XLogReaderState *r = buf->record;
xl_heap_update *xlrec;
ReorderBufferChange *change;
char *data;
RelFileLocator target_locator;
+ BlockNumber blkno;
+ BlockNumber old_blkno = InvalidBlockNumber;
xlrec = (xl_heap_update *) XLogRecGetData(r);
/* only interested in our database */
- XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+ XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno);
if (target_locator.dbOid != ctx->slot->data.database)
return;
@@ -983,6 +987,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
+ ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->new_offnum);
+
+ /* If block 1 is present, it contains old tuple */
+ if (XLogRecHasBlockRef(r, 1))
+ {
+ RelFileLocator old_locator;
+ XLogRecGetBlockTag(r, 1, &old_locator, NULL, &old_blkno);
+ }
+
+ if (BlockNumberIsValid(old_blkno))
+ ItemPointerSet(&change->data.tp.oldctid, old_blkno, xlrec->old_offnum);
+ else
+ ItemPointerSet(&change->data.tp.oldctid, blkno, xlrec->old_offnum);
+
if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
{
Size datalen;
@@ -1015,6 +1033,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
change->data.tp.clear_toast_afterwards = true;
+ change->data.tp.is_hot_update = is_hot;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
@@ -1032,11 +1051,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_heap_delete *xlrec;
ReorderBufferChange *change;
RelFileLocator target_locator;
+ BlockNumber blkno;
xlrec = (xl_heap_delete *) XLogRecGetData(r);
/* only interested in our database */
- XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+ XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno);
if (target_locator.dbOid != ctx->slot->data.database)
return;
@@ -1070,6 +1090,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
datalen, change->data.tp.oldtuple);
}
+ ItemPointerSet(&change->data.tp.oldctid, blkno, xlrec->offnum);
change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1127,6 +1148,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
char *tupledata;
Size tuplelen;
RelFileLocator rlocator;
+ BlockNumber blkno;
xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
@@ -1138,7 +1160,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
return;
/* only interested in our database */
- XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
+ XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blkno);
if (rlocator.dbOid != ctx->slot->data.database)
return;
@@ -1168,6 +1190,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
+ ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->offsets[i]);
+
xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
datalen = xlhdr->datalen;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fa0745552f8..8c305ec6e41 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -104,6 +104,13 @@ typedef struct ReorderBufferChange
HeapTuple oldtuple;
/* valid for INSERT || UPDATE */
HeapTuple newtuple;
+
+ /* ctid for old tuple; valid for DELETE || UPDATE */
+ ItemPointerData oldctid;
+ /* ctid for new tuple; valid for INSERT || UPDATE */
+ ItemPointerData newctid;
+ /* update type - only valid for UPDATE */
+ bool is_hot_update;
} tp;
/*
--
2.43.0