v20240111-0005-CREATE-SUBSCRIPTION-flag-to-enable-sequenc.patch

text/x-patch

Filename: v20240111-0005-CREATE-SUBSCRIPTION-flag-to-enable-sequenc.patch
Type: text/x-patch
Part: 4
Message: Re: logical decoding and replication of sequences, take 2

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v20240111-0005
Subject: CREATE SUBSCRIPTION flag to enable sequences
File+
contrib/test_decoding/test_decoding.c 3 10
doc/src/sgml/logicaldecoding.sgml 8 0
doc/src/sgml/ref/alter_subscription.sgml 3 2
doc/src/sgml/ref/create_subscription.sgml 12 0
src/backend/catalog/pg_subscription.c 1 0
src/backend/commands/subscriptioncmds.c 38 5
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c 4 0
src/backend/replication/logical/logical.c 20 0
src/backend/replication/logical/worker.c 10 0
src/backend/replication/pgoutput/pgoutput.c 14 0
src/include/catalog/pg_subscription.h 4 0
src/include/replication/output_plugin.h 1 0
src/include/replication/pgoutput.h 1 0
src/include/replication/walreceiver.h 1 0
src/test/subscription/t/034_sequences.pl 1 1
From 58fd0bd09edfa2814e163e75e05449946ad1795b Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Wed, 10 Jan 2024 17:22:07 +0100
Subject: [PATCH v20240111 5/7] CREATE SUBSCRIPTION flag to enable sequences

---
 contrib/test_decoding/test_decoding.c         | 13 ++----
 doc/src/sgml/logicaldecoding.sgml             |  8 ++++
 doc/src/sgml/ref/alter_subscription.sgml      |  5 ++-
 doc/src/sgml/ref/create_subscription.sgml     | 12 ++++++
 src/backend/catalog/pg_subscription.c         |  1 +
 src/backend/commands/subscriptioncmds.c       | 43 ++++++++++++++++---
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 ++
 src/backend/replication/logical/logical.c     | 20 +++++++++
 src/backend/replication/logical/worker.c      | 10 +++++
 src/backend/replication/pgoutput/pgoutput.c   | 14 ++++++
 src/include/catalog/pg_subscription.h         |  4 ++
 src/include/replication/output_plugin.h       |  1 +
 src/include/replication/pgoutput.h            |  1 +
 src/include/replication/walreceiver.h         |  1 +
 src/test/subscription/t/034_sequences.pl      |  2 +-
 15 files changed, 121 insertions(+), 18 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 200caad6863..072bbba575b 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -29,7 +29,6 @@ typedef struct
 	MemoryContext context;
 	bool		include_xids;
 	bool		include_timestamp;
-	bool		include_sequences;
 	bool		skip_empty_xacts;
 	bool		only_local;
 } TestDecodingData;
@@ -179,12 +178,12 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
-	data->include_sequences = false;
 
 	ctx->output_plugin_private = data;
 
 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
 	opt->receive_rewrites = false;
+	opt->receive_sequences = false;
 
 	foreach(option, ctx->output_plugin_options)
 	{
@@ -275,8 +274,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		{
 
 			if (elem->arg == NULL)
-				data->include_sequences = true;
-			else if (!parse_bool(strVal(elem->arg), &data->include_sequences))
+				opt->receive_sequences = true;
+			else if (!parse_bool(strVal(elem->arg), &opt->receive_sequences))
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
@@ -794,9 +793,6 @@ pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TestDecodingData *data = ctx->output_plugin_private;
 	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (!data->include_sequences)
-		return;
-
 	/* output BEGIN if we haven't yet, but only for the transactional case */
 	if (transactional)
 	{
@@ -1038,9 +1034,6 @@ pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TestDecodingData *data = ctx->output_plugin_private;
 	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (!data->include_sequences)
-		return;
-
 	/* output BEGIN if we haven't yet, but only for the transactional case */
 	if (transactional)
 	{
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 90e99a95128..31f8aa7f932 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -646,6 +646,7 @@ typedef struct OutputPluginOptions
 {
     OutputPluginOutputType output_type;
     bool        receive_rewrites;
+    bool        receive_sequences;
 } OutputPluginOptions;
 </programlisting>
       <literal>output_type</literal> has to either be set to
@@ -658,6 +659,13 @@ typedef struct OutputPluginOptions
       replication, but they require special handling.
      </para>
 
+     <para>
+      If <literal>receive_sequences</literal> is true, the output plugin will
+      also be called for changes made by operations on sequences.  These are
+      of interest to plugins that need to maintain sequences consistent with
+      the rest of the data, for example for purposes of upgrade or failover.
+     </para>
+
      <para>
       The startup callback should validate the options present in
       <literal>ctx-&gt;output_plugin_options</literal>. If the output plugin
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 479ec495896..2d24ff5768e 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -226,8 +226,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-streaming"><literal>streaming</literal></link>,
       <link linkend="sql-createsubscription-params-with-disable-on-error"><literal>disable_on_error</literal></link>,
       <link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
-      <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>, and
-      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>.
+      <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
+      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link> and
+      <link linkend="sql-createsubscription-params-with-sequences"><literal>sequences</literal></link>.
       Only a superuser can set <literal>password_required = false</literal>.
      </para>
     </listitem>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index f1c20b3a465..4591e37dac7 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -399,6 +399,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry id="sql-createsubscription-params-with-sequences">
+        <term><literal>sequences</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription will request the publisher to
+          decode and send sequence changes. Note that for sequences to be
+          decoded, the sequence also has to be added to the publication.
+          The default is <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c516c25ac7b..e52abf61791 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -73,6 +73,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->disableonerr = subform->subdisableonerr;
 	sub->passwordrequired = subform->subpasswordrequired;
 	sub->runasowner = subform->subrunasowner;
+	sub->sequences = subform->subsequences;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 144609c1b9f..76b1bbd9b2f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -71,6 +71,7 @@
 #define SUBOPT_RUN_AS_OWNER			0x00001000
 #define SUBOPT_LSN					0x00002000
 #define SUBOPT_ORIGIN				0x00004000
+#define SUBOPT_SEQUENCES			0x00008000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -97,6 +98,7 @@ typedef struct SubOpts
 	bool		runasowner;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	bool		sequences;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -158,6 +160,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->runasowner = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_SEQUENCES))
+		opts->sequences = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -354,6 +358,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_SEQUENCES) &&
+				 strcmp(defel->defname, "sequences") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_SEQUENCES))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_SEQUENCES;
+			opts->sequences = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -592,7 +605,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | SUBOPT_SEQUENCES);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -698,6 +711,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
+	values[Anum_pg_subscription_subsequences - 1] = BoolGetDatum(opts.sequences);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -762,8 +776,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 * relation sync status info.
 			 */
 			relations = fetch_table_list(wrconn, publications);
-			relations = list_concat(relations,
-									fetch_sequence_list(wrconn, publications));
+
+			/*
+			 * Add sequences, but only if the subscription explicitly enabled
+			 * them to be replicated.
+			 */
+			if (opts.sequences)
+				relations = list_concat(relations,
+										fetch_sequence_list(wrconn, publications));
 
 			foreach(lc, relations)
 			{
@@ -888,8 +908,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
 		/* Get the list of relations from publisher. */
 		pubrel_names = fetch_table_list(wrconn, sub->publications);
-		pubrel_names = list_concat(pubrel_names,
-								   fetch_sequence_list(wrconn, sub->publications));
+
+		/*
+		 * Add sequences, but only if the subscription explicitly enabled
+		 * them to be replicated.
+		 */
+		if (sub->sequences)
+			pubrel_names = list_concat(pubrel_names,
+									   fetch_sequence_list(wrconn, sub->publications));
 
 		/* Get local table list. */
 		subrel_states = GetSubscriptionRelations(sub->oid, false);
@@ -1224,6 +1250,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_suborigin - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_SEQUENCES))
+				{
+					values[Anum_pg_subscription_subsequences - 1] =
+						BoolGetDatum(opts.sequences);
+					replaces[Anum_pg_subscription_subsequences - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 78344a03615..07a0e55c21d 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -475,6 +475,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", origin '%s'",
 							 options->proto.logical.origin);
 
+		if (options->proto.logical.sequences &&
+			PQserverVersion(conn->streamConn) >= 170000)
+			appendStringInfo(&cmd, ", sequences 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8cadc0e9e6f..86557745e18 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -475,6 +475,16 @@ CreateInitDecodingContext(const char *plugin,
 
 	ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
+	/*
+	 * StartupDecodingContext only checks which callbacks are defined, and
+	 * startup_cb_wrapper sets receiver_sequences flag. We need to combine
+	 * those two pieces and disable sequences if receiver_sequences=false.
+	 *
+	 * XXX We could also leave this up to the plugin startup, but it seems
+	 * cleaner to just do it here.
+	 */
+	ctx->sequences &= ctx->options.receive_sequences;
+
 	return ctx;
 }
 
@@ -622,6 +632,16 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
+	/*
+	 * StartupDecodingContext only checks which callbacks are defined, and
+	 * startup_cb_wrapper sets receiver_sequences flag. We need to combine
+	 * those two pieces and disable sequences if receiver_sequences=false.
+	 *
+	 * XXX We could also leave this up to the plugin startup, but it seems
+	 * cleaner to just do it here.
+	 */
+	ctx->sequences &= ctx->options.receive_sequences;
+
 	ereport(LOG,
 			(errmsg("starting logical decoding for slot \"%s\"",
 					NameStr(slot->data.name)),
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 01fe9527b6b..6a71db354a3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4040,6 +4040,7 @@ maybe_reread_subscription(void)
 		strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
+		newsub->sequences != MySubscription->sequences ||
 		newsub->passwordrequired != MySubscription->passwordrequired ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
@@ -4470,6 +4471,15 @@ set_stream_options(WalRcvStreamOptions *options,
 	options->proto.logical.publication_names = MySubscription->publications;
 	options->proto.logical.binary = MySubscription->binary;
 
+	/*
+	 * FIXME maybe this should depend on server_version too? We don't want
+	 * to request sequences from old releases, but maybe we should not just
+	 * ignore that? Might easily lead to surprises.
+	 *
+	 * XXX Now there's a server_version check in libpqrcv_startstreaming.
+	 */
+	options->proto.logical.sequences = MySubscription->sequences;
+
 	/*
 	 * Assign the appropriate option value for streaming option according to
 	 * the 'streaming' mode and the publisher's ability to support that mode.
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 8b8f81c5f8e..5a19306d26c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -291,6 +291,7 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		sequences_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
@@ -404,6 +405,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
+		else if (strcmp(defel->defname, "sequences") == 0)
+		{
+			if (sequences_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			sequences_option_given = true;
+
+			data->sequences = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -442,6 +453,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 
 	/* This plugin uses binary protocol. */
 	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+	opt->receive_sequences = false;
 
 	/*
 	 * This is replication start and not slot initialization.
@@ -540,6 +552,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		ctx->streaming = false;
 		ctx->twophase = false;
 	}
+
+	opt->receive_sequences = data->sequences;
 }
 
 /*
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index ca326255852..7205f9ed13b 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -93,6 +93,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subrunasowner;	/* True if replication should execute as the
 								 * subscription owner */
 
+	bool		subsequences;	/* True if replication stream should contain
+								 * decoded sequence changes */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -139,6 +142,7 @@ typedef struct Subscription
 								 * occurs */
 	bool		passwordrequired;	/* Must connection use a password? */
 	bool		runasowner;		/* Run replication as subscription owner */
+	bool		sequences;		/* Request sequences from the publisher */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2ae25c6f4fe..f9ca2994fa7 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -27,6 +27,7 @@ typedef struct OutputPluginOptions
 {
 	OutputPluginOutputType output_type;
 	bool		receive_rewrites;
+	bool		receive_sequences;
 } OutputPluginOptions;
 
 /*
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 89f94e11472..a7461277a00 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -33,6 +33,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		publish_no_origin;
+	bool		sequences;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0899891cdb8..c0526fd2cd3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -186,6 +186,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		sequences;	/* Request sequences from publisher */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/subscription/t/034_sequences.pl b/src/test/subscription/t/034_sequences.pl
index 2e9317642fc..ca706631203 100644
--- a/src/test/subscription/t/034_sequences.pl
+++ b/src/test/subscription/t/034_sequences.pl
@@ -54,7 +54,7 @@ EXCEPTION WHEN others THEN NULL;
 END; \$\$ LANGUAGE plpgsql");
 
 $node_subscriber->safe_psql('postgres',
-	"CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub"
+	"CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub WITH (sequences = true)"
 );
 
 $node_publisher->wait_for_catchup('seq_sub');
-- 
2.43.0