0002-Add-decoding-of-sequences-to-test_decoding-20231012.patch
text/x-patch
Filename: 0002-Add-decoding-of-sequences-to-test_decoding-20231012.patch
Type: text/x-patch
Part: 1
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0002
Subject: Add decoding of sequences to test_decoding
| File | + | − |
|---|---|---|
| contrib/test_decoding/expected/sequence.out | 325 | 0 |
| contrib/test_decoding/Makefile | 2 | 1 |
| contrib/test_decoding/sql/sequence.sql | 119 | 0 |
| contrib/test_decoding/test_decoding.c | 87 | 0 |
From 54f3d923685f956d3e07cf72eb07e83bf161ffdb Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@2ndquadrant.com>
Date: Fri, 13 Jan 2023 20:32:44 +0100
Subject: [PATCH 2/4] Add decoding of sequences to test_decoding
Commit 0da92dc530 improved the logical decoding infrastructure to handle
sequences, and did various changes to related parts (WAL logging etc.).
But it did not include any implementation of the new callbacks added to
OutputPluginCallbacks.
This extends test_decoding with two callbacks to decode sequences. The
decoding of sequences may be disabled using 'include-sequences', a new
option of the output plugin.
Author: Tomas Vondra, Cary Huang
Reviewed-by: Ashutosh Bapat, Amit Kapila, Peter Eisentraut, Hannu
Krosing, Andres Freund
Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com
Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca
---
contrib/test_decoding/Makefile | 3 +-
contrib/test_decoding/expected/sequence.out | 325 ++++++++++++++++++++
contrib/test_decoding/sql/sequence.sql | 119 +++++++
contrib/test_decoding/test_decoding.c | 87 ++++++
4 files changed, 533 insertions(+), 1 deletion(-)
create mode 100644 contrib/test_decoding/expected/sequence.out
create mode 100644 contrib/test_decoding/sql/sequence.sql
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c7ce6037064..05c0c5a2f86 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill slot truncate stream stats twophase twophase_stream
+ spill slot truncate stream stats twophase twophase_stream \
+ sequence
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error catalog_change_snapshot
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 00000000000..98a8f25e5ad
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,325 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_sequence;
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 4
+(1 row)
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 14
+(1 row)
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 4000
+(1 row)
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, true);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, false);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3500
+(1 row)
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+-------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 1
+ COMMIT
+ sequence public.test_sequence: transactional: 0 value: 33
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 4
+ COMMIT
+ sequence public.test_sequence: transactional: 0 value: 334
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 46
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 14
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 4000
+ COMMIT
+ sequence public.test_sequence: transactional: 0 value: 4320
+ sequence public.test_sequence: transactional: 0 value: 3500
+ sequence public.test_sequence: transactional: 0 value: 3830
+ sequence public.test_sequence: transactional: 0 value: 3500
+ sequence public.test_sequence: transactional: 0 value: 3830
+ sequence public.test_sequence: transactional: 0 value: 3500
+ sequence public.test_sequence: transactional: 0 value: 3820
+(24 rows)
+
+DROP SEQUENCE test_sequence;
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3001
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3002
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3003
+(1 row)
+
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 6001
+(1 row)
+
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+------
+(0 rows)
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+------
+(0 rows)
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 3003 | 30 | t
+(1 row)
+
+SELECT nextval('test_table_a_seq');
+ nextval
+---------
+ 3004
+(1 row)
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+----------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional: 1 value: 1
+ COMMIT
+ sequence public.test_table_a_seq: transactional: 0 value: 33
+ sequence public.test_table_a_seq: transactional: 0 value: 3000
+ sequence public.test_table_a_seq: transactional: 0 value: 3033
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+--------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional: 1 value: 1
+ sequence public.test_table_a_seq: transactional: 1 value: 33
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3001
+(1 row)
+
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ setval
+--------
+ 5000
+(1 row)
+
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 3001 | 32 | t
+(1 row)
+
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+-------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 1
+ sequence public.test_sequence: transactional: 1 value: 33
+ sequence public.test_sequence: transactional: 1 value: 3000
+ sequence public.test_sequence: transactional: 1 value: 3033
+ COMMIT
+(6 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 00000000000..c19a5d50747
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,119 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_sequence;
+
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, true);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, false);
+SELECT nextval('test_sequence');
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+DROP SEQUENCE test_sequence;
+
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+SELECT nextval('test_table_a_seq');
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ab870d9e4dc..657de720669 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -29,6 +29,7 @@ typedef struct
MemoryContext context;
bool include_xids;
bool include_timestamp;
+ bool include_sequences;
bool skip_empty_xacts;
bool only_local;
} TestDecodingData;
@@ -72,6 +73,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional,
+ int64 value);
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
@@ -112,6 +117,10 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_stream_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional,
+ int64 value);
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
@@ -135,6 +144,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->sequence_cb = pg_decode_sequence;
cb->filter_prepare_cb = pg_decode_filter_prepare;
cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
cb->prepare_cb = pg_decode_prepare_txn;
@@ -147,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pg_decode_stream_commit;
cb->stream_change_cb = pg_decode_stream_change;
cb->stream_message_cb = pg_decode_stream_message;
+ cb->stream_sequence_cb = pg_decode_stream_sequence;
cb->stream_truncate_cb = pg_decode_stream_truncate;
}
@@ -168,6 +179,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_timestamp = false;
data->skip_empty_xacts = false;
data->only_local = false;
+ data->include_sequences = false;
ctx->output_plugin_private = data;
@@ -259,6 +271,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "include-sequences") == 0)
+ {
+
+ if (elem->arg == NULL)
+ data->include_sequences = true;
+ else if (!parse_bool(strVal(elem->arg), &data->include_sequences))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -762,6 +785,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 value)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+ if (!data->include_sequences)
+ return;
+
+ /* output BEGIN if we haven't yet, but only for the transactional case */
+ if (transactional)
+ {
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ {
+ pg_output_begin(ctx, data, txn, false);
+ }
+ txndata->xact_wrote_changes = true;
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfoString(ctx->out, "sequence ");
+ appendStringInfoString(ctx->out,
+ quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+ RelationGetRelationName(rel)));
+ appendStringInfo(ctx->out, ": transactional: %d value: " INT64_FORMAT,
+ transactional, value);
+ OutputPluginWrite(ctx, true);
+}
+
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
@@ -961,6 +1016,38 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 value)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+ if (!data->include_sequences)
+ return;
+
+ /* output BEGIN if we haven't yet, but only for the transactional case */
+ if (transactional)
+ {
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ {
+ pg_output_begin(ctx, data, txn, false);
+ }
+ txndata->xact_wrote_changes = true;
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfoString(ctx->out, "streaming sequence ");
+ appendStringInfoString(ctx->out,
+ quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+ RelationGetRelationName(rel)));
+ appendStringInfo(ctx->out, ": transactional: %d value: " INT64_FORMAT,
+ transactional, value);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* In streaming mode, we don't display the detailed information of Truncate.
* See pg_decode_stream_change.
--
2.41.0