v20231128-0003-Add-decoding-of-sequences-to-test_decoding.patch
text/x-patch
Filename: v20231128-0003-Add-decoding-of-sequences-to-test_decoding.patch
Type: text/x-patch
Part: 2
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v20231128-0003
Subject: Add decoding of sequences to test_decoding
| File | + | − |
|---|---|---|
| contrib/test_decoding/expected/sequence.out | 325 | 0 |
| contrib/test_decoding/Makefile | 2 | 1 |
| contrib/test_decoding/sql/sequence.sql | 119 | 0 |
| contrib/test_decoding/test_decoding.c | 87 | 0 |
From 92c7bb6ae227fa79aac447e0ce31988e82bfdffd Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@2ndquadrant.com>
Date: Fri, 13 Jan 2023 20:32:44 +0100
Subject: [PATCH v20231128 3/8] Add decoding of sequences to test_decoding
Extends test_decoding with callbacks to decode sequences. This feature
is disabled by default (i.e. sequence changes are omitted from the
output), but may be enabled by passing 'include-sequences' option to the
test_decoding plugin.
Author: Tomas Vondra, Cary Huang
Reviewed-by: Ashutosh Bapat, Amit Kapila, Peter Eisentraut, Hannu
Krosing, Andres Freund, Zhijie Hou
Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com
Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca
---
contrib/test_decoding/Makefile | 3 +-
contrib/test_decoding/expected/sequence.out | 325 ++++++++++++++++++++
contrib/test_decoding/sql/sequence.sql | 119 +++++++
contrib/test_decoding/test_decoding.c | 87 ++++++
4 files changed, 533 insertions(+), 1 deletion(-)
create mode 100644 contrib/test_decoding/expected/sequence.out
create mode 100644 contrib/test_decoding/sql/sequence.sql
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c7ce6037064..05c0c5a2f86 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill slot truncate stream stats twophase twophase_stream
+ spill slot truncate stream stats twophase twophase_stream \
+ sequence
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error catalog_change_snapshot
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 00000000000..98a8f25e5ad
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,325 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_sequence;
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 4
+(1 row)
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 14
+(1 row)
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 4000
+(1 row)
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, true);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, false);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3500
+(1 row)
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+-------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 1
+ COMMIT
+ sequence public.test_sequence: transactional: 0 value: 33
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 4
+ COMMIT
+ sequence public.test_sequence: transactional: 0 value: 334
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 46
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 14
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 4000
+ COMMIT
+ sequence public.test_sequence: transactional: 0 value: 4320
+ sequence public.test_sequence: transactional: 0 value: 3500
+ sequence public.test_sequence: transactional: 0 value: 3830
+ sequence public.test_sequence: transactional: 0 value: 3500
+ sequence public.test_sequence: transactional: 0 value: 3830
+ sequence public.test_sequence: transactional: 0 value: 3500
+ sequence public.test_sequence: transactional: 0 value: 3820
+(24 rows)
+
+DROP SEQUENCE test_sequence;
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3001
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3002
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3003
+(1 row)
+
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 6001
+(1 row)
+
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+------
+(0 rows)
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+------
+(0 rows)
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 3003 | 30 | t
+(1 row)
+
+SELECT nextval('test_table_a_seq');
+ nextval
+---------
+ 3004
+(1 row)
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+----------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional: 1 value: 1
+ COMMIT
+ sequence public.test_table_a_seq: transactional: 0 value: 33
+ sequence public.test_table_a_seq: transactional: 0 value: 3000
+ sequence public.test_table_a_seq: transactional: 0 value: 3033
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+--------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional: 1 value: 1
+ sequence public.test_table_a_seq: transactional: 1 value: 33
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3001
+(1 row)
+
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ setval
+--------
+ 5000
+(1 row)
+
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 3001 | 32 | t
+(1 row)
+
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data
+-------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 1
+ sequence public.test_sequence: transactional: 1 value: 33
+ sequence public.test_sequence: transactional: 1 value: 3000
+ sequence public.test_sequence: transactional: 1 value: 3033
+ COMMIT
+(6 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 00000000000..c19a5d50747
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,119 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_sequence;
+
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, true);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, false);
+SELECT nextval('test_sequence');
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+DROP SEQUENCE test_sequence;
+
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+SELECT nextval('test_table_a_seq');
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 288fd0bb4ab..0a781edffc1 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -29,6 +29,7 @@ typedef struct
MemoryContext context;
bool include_xids;
bool include_timestamp;
+ bool include_sequences;
bool skip_empty_xacts;
bool only_local;
} TestDecodingData;
@@ -72,6 +73,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional,
+ int64 value);
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
@@ -112,6 +117,10 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_stream_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional,
+ int64 value);
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
@@ -135,6 +144,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->sequence_cb = pg_decode_sequence;
cb->filter_prepare_cb = pg_decode_filter_prepare;
cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
cb->prepare_cb = pg_decode_prepare_txn;
@@ -147,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pg_decode_stream_commit;
cb->stream_change_cb = pg_decode_stream_change;
cb->stream_message_cb = pg_decode_stream_message;
+ cb->stream_sequence_cb = pg_decode_stream_sequence;
cb->stream_truncate_cb = pg_decode_stream_truncate;
}
@@ -168,6 +179,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_timestamp = false;
data->skip_empty_xacts = false;
data->only_local = false;
+ data->include_sequences = false;
ctx->output_plugin_private = data;
@@ -259,6 +271,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "include-sequences") == 0)
+ {
+
+ if (elem->arg == NULL)
+ data->include_sequences = true;
+ else if (!parse_bool(strVal(elem->arg), &data->include_sequences))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -762,6 +785,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 value)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+ if (!data->include_sequences)
+ return;
+
+ /* output BEGIN if we haven't yet, but only for the transactional case */
+ if (transactional)
+ {
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ {
+ pg_output_begin(ctx, data, txn, false);
+ }
+ txndata->xact_wrote_changes = true;
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfoString(ctx->out, "sequence ");
+ appendStringInfoString(ctx->out,
+ quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+ RelationGetRelationName(rel)));
+ appendStringInfo(ctx->out, ": transactional: %d value: " INT64_FORMAT,
+ transactional, value);
+ OutputPluginWrite(ctx, true);
+}
+
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
@@ -974,6 +1029,38 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 value)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+ if (!data->include_sequences)
+ return;
+
+ /* output BEGIN if we haven't yet, but only for the transactional case */
+ if (transactional)
+ {
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ txndata->xact_wrote_changes = true;
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfoString(ctx->out, "streaming sequence ");
+ appendStringInfoString(ctx->out,
+ quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+ RelationGetRelationName(rel)));
+ appendStringInfo(ctx->out, ": transactional: %d value: " INT64_FORMAT,
+ transactional, value);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* In streaming mode, we don't display the detailed information of Truncate.
* See pg_decode_stream_change.
--
2.41.0