v20231128-0003-Add-decoding-of-sequences-to-test_decoding.patch

text/x-patch

Filename: v20231128-0003-Add-decoding-of-sequences-to-test_decoding.patch
Type: text/x-patch
Part: 2
Message: Re: logical decoding and replication of sequences, take 2

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v20231128-0003
Subject: Add decoding of sequences to test_decoding
File+
contrib/test_decoding/expected/sequence.out 325 0
contrib/test_decoding/Makefile 2 1
contrib/test_decoding/sql/sequence.sql 119 0
contrib/test_decoding/test_decoding.c 87 0
From 7debb9e0656257d2043e349ebd9d992588157239 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@2ndquadrant.com>
Date: Fri, 13 Jan 2023 20:32:44 +0100
Subject: [PATCH v20231128 3/4] Add decoding of sequences to test_decoding

Extends test_decoding with callbacks to decode sequences. This feature
is disabled by default (i.e. sequence changes are omitted from the
output), but may be enabled by passing 'include-sequences' option to the
test_decoding plugin.

Author: Tomas Vondra, Cary Huang
Reviewed-by: Ashutosh Bapat, Amit Kapila, Peter Eisentraut, Hannu
Krosing, Andres Freund, Zhijie Hou
Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com
Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca
---
 contrib/test_decoding/Makefile              |   3 +-
 contrib/test_decoding/expected/sequence.out | 325 ++++++++++++++++++++
 contrib/test_decoding/sql/sequence.sql      | 119 +++++++
 contrib/test_decoding/test_decoding.c       |  87 ++++++
 4 files changed, 533 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/sequence.out
 create mode 100644 contrib/test_decoding/sql/sequence.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c7ce6037064..05c0c5a2f86 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	decoding_into_rel binary prepared replorigin time messages \
-	spill slot truncate stream stats twophase twophase_stream
+	spill slot truncate stream stats twophase twophase_stream \
+	sequence
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot slot_creation_error catalog_change_snapshot
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 00000000000..98a8f25e5ad
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,325 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_sequence;
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       3
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       4
+(1 row)
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+      14
+(1 row)
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    4000
+(1 row)
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, true);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, false);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3500
+(1 row)
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+                            data                             
+-------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 1
+ COMMIT
+ sequence public.test_sequence: transactional: 0 value: 33
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 4
+ COMMIT
+ sequence public.test_sequence: transactional: 0 value: 334
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 46
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 14
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 4000
+ COMMIT
+ sequence public.test_sequence: transactional: 0 value: 4320
+ sequence public.test_sequence: transactional: 0 value: 3500
+ sequence public.test_sequence: transactional: 0 value: 3830
+ sequence public.test_sequence: transactional: 0 value: 3500
+ sequence public.test_sequence: transactional: 0 value: 3830
+ sequence public.test_sequence: transactional: 0 value: 3500
+ sequence public.test_sequence: transactional: 0 value: 3820
+(24 rows)
+
+DROP SEQUENCE test_sequence;
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       3
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3001
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3002
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3003
+(1 row)
+
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    6001
+(1 row)
+
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data 
+------
+(0 rows)
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+ data 
+------
+(0 rows)
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+ last_value | log_cnt | is_called 
+------------+---------+-----------
+       3003 |      30 | t
+(1 row)
+
+SELECT nextval('test_table_a_seq');
+ nextval 
+---------
+    3004
+(1 row)
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+                              data                              
+----------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional: 1 value: 1
+ COMMIT
+ sequence public.test_table_a_seq: transactional: 0 value: 33
+ sequence public.test_table_a_seq: transactional: 0 value: 3000
+ sequence public.test_table_a_seq: transactional: 0 value: 3033
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+                             data                             
+--------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional: 1 value: 1
+ sequence public.test_table_a_seq: transactional: 1 value: 33
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3001
+(1 row)
+
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ setval 
+--------
+   5000
+(1 row)
+
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+ last_value | log_cnt | is_called 
+------------+---------+-----------
+       3001 |      32 | t
+(1 row)
+
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+                            data                             
+-------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional: 1 value: 1
+ sequence public.test_sequence: transactional: 1 value: 33
+ sequence public.test_sequence: transactional: 1 value: 3000
+ sequence public.test_sequence: transactional: 1 value: 3033
+ COMMIT
+(6 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 00000000000..c19a5d50747
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,119 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_sequence;
+
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, true);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, false);
+SELECT nextval('test_sequence');
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+DROP SEQUENCE test_sequence;
+
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+SELECT nextval('test_table_a_seq');
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 288fd0bb4ab..0a781edffc1 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -29,6 +29,7 @@ typedef struct
 	MemoryContext context;
 	bool		include_xids;
 	bool		include_timestamp;
+	bool		include_sequences;
 	bool		skip_empty_xacts;
 	bool		only_local;
 } TestDecodingData;
@@ -72,6 +73,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr lsn,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+							   ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+							   Relation rel, bool transactional,
+							   int64 value);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
 									 TransactionId xid,
 									 const char *gid);
@@ -112,6 +117,10 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
 									 ReorderBufferTXN *txn, XLogRecPtr lsn,
 									 bool transactional, const char *prefix,
 									 Size sz, const char *message);
+static void pg_decode_stream_sequence(LogicalDecodingContext *ctx,
+									  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+									  Relation rel, bool transactional,
+									  int64 value);
 static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn,
 									  int nrelations, Relation relations[],
@@ -135,6 +144,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->sequence_cb = pg_decode_sequence;
 	cb->filter_prepare_cb = pg_decode_filter_prepare;
 	cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
 	cb->prepare_cb = pg_decode_prepare_txn;
@@ -147,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_commit_cb = pg_decode_stream_commit;
 	cb->stream_change_cb = pg_decode_stream_change;
 	cb->stream_message_cb = pg_decode_stream_message;
+	cb->stream_sequence_cb = pg_decode_stream_sequence;
 	cb->stream_truncate_cb = pg_decode_stream_truncate;
 }
 
@@ -168,6 +179,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->include_sequences = false;
 
 	ctx->output_plugin_private = data;
 
@@ -259,6 +271,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "include-sequences") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->include_sequences = true;
+			else if (!parse_bool(strVal(elem->arg), &data->include_sequences))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -762,6 +785,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				   XLogRecPtr sequence_lsn, Relation rel,
+				   bool transactional,
+				   int64 value)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+	if (!data->include_sequences)
+		return;
+
+	/* output BEGIN if we haven't yet, but only for the transactional case */
+	if (transactional)
+	{
+		if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+		{
+			pg_output_begin(ctx, data, txn, false);
+		}
+		txndata->xact_wrote_changes = true;
+	}
+
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfoString(ctx->out, "sequence ");
+	appendStringInfoString(ctx->out,
+						   quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+													  RelationGetRelationName(rel)));
+	appendStringInfo(ctx->out, ": transactional: %d value: " INT64_FORMAT,
+					 transactional, value);
+	OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
@@ -974,6 +1029,38 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+						  XLogRecPtr sequence_lsn, Relation rel,
+						  bool transactional,
+						  int64 value)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+	if (!data->include_sequences)
+		return;
+
+	/* output BEGIN if we haven't yet, but only for the transactional case */
+	if (transactional)
+	{
+		if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+		{
+			pg_output_stream_start(ctx, data, txn, false);
+		}
+		txndata->xact_wrote_changes = true;
+	}
+
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfoString(ctx->out, "streaming sequence ");
+	appendStringInfoString(ctx->out,
+						   quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+													  RelationGetRelationName(rel)));
+	appendStringInfo(ctx->out, ": transactional: %d value: " INT64_FORMAT,
+					 transactional, value);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * In streaming mode, we don't display the detailed information of Truncate.
  * See pg_decode_stream_change.
-- 
2.42.0