From c262ef2bee45672eafdfd9fa87b2ca15b0dbc69b Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Wed, 12 Jul 2023 20:08:04 +0200
Subject: [PATCH 5/6] Simplify protocol versioning

https://www.postgresql.org/message-id/8046273f-ea88-5c97-5540-0ccd5d244fd4@enterprisedb.com
---
 contrib/test_decoding/test_decoding.c         | 18 ++++----
 src/backend/catalog/pg_subscription.c         |  1 -
 src/backend/commands/subscriptioncmds.c       | 23 +---------
 .../libpqwalreceiver/libpqwalreceiver.c       |  8 ----
 src/backend/replication/logical/logical.c     | 12 -----
 src/backend/replication/logical/worker.c      | 12 -----
 src/backend/replication/pgoutput/pgoutput.c   | 45 ++++---------------
 src/include/catalog/pg_subscription.h         |  4 --
 src/include/replication/logical.h             |  9 ----
 src/include/replication/pgoutput.h            |  1 -
 src/include/replication/walreceiver.h         |  1 -
 11 files changed, 20 insertions(+), 114 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 60ccbacadf8..9c40702da7e 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -29,6 +29,7 @@ typedef struct
 	MemoryContext context;
 	bool		include_xids;
 	bool		include_timestamp;
+	bool		include_sequences;
 	bool		skip_empty_xacts;
 	bool		only_local;
 } TestDecodingData;
@@ -169,7 +170,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	ListCell   *option;
 	TestDecodingData *data;
 	bool		enable_streaming = false;
-	bool		enable_sequences = false;
 
 	data = palloc0(sizeof(TestDecodingData));
 	data->context = AllocSetContextCreate(ctx->context,
@@ -179,6 +179,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->include_sequences = false;
 
 	ctx->output_plugin_private = data;
 
@@ -272,10 +273,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		}
 		else if (strcmp(elem->defname, "include-sequences") == 0)
 		{
-
 			if (elem->arg == NULL)
-				continue;
-			else if (!parse_bool(strVal(elem->arg), &enable_sequences))
+				data->include_sequences = true;
+			else if (!parse_bool(strVal(elem->arg), &data->include_sequences))
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
@@ -291,11 +291,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		}
 	}
 
-	/* remember the user explicitly requested sequences, otherwise the */
-	ctx->sequences_opt_given = enable_sequences;
-
 	ctx->streaming &= enable_streaming;
-	ctx->sequences &= enable_sequences;
 }
 
 /* cleanup this plugin's resources */
@@ -797,6 +793,9 @@ pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TestDecodingData *data = ctx->output_plugin_private;
 	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
+	if (!data->include_sequences)
+		return;
+
 	/* output BEGIN if we haven't yet, but only for the transactional case */
 	if (transactional)
 	{
@@ -1025,6 +1024,9 @@ pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TestDecodingData *data = ctx->output_plugin_private;
 	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
+	if (!data->include_sequences)
+		return;
+
 	/* output BEGIN if we haven't yet, but only for the transactional case */
 	if (transactional)
 	{
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 49aa3b9f533..d07f88ce280 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -68,7 +68,6 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
 	sub->binary = subform->subbinary;
-	sub->sequences = subform->subsequences;
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 7078b61c773..8be85c8659c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -71,7 +71,6 @@
 #define SUBOPT_RUN_AS_OWNER			0x00001000
 #define SUBOPT_LSN					0x00002000
 #define SUBOPT_ORIGIN				0x00004000
-#define SUBOPT_SEQUENCES			0x00004000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -93,7 +92,6 @@ typedef struct SubOpts
 	bool		binary;
 	char		streaming;
 	bool		twophase;
-	bool		sequences;
 	bool		disableonerr;
 	bool		passwordrequired;
 	bool		runasowner;
@@ -148,8 +146,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->refresh = true;
 	if (IsSet(supported_opts, SUBOPT_BINARY))
 		opts->binary = false;
-	if (IsSet(supported_opts, SUBOPT_SEQUENCES))
-		opts->sequences = true;
 	if (IsSet(supported_opts, SUBOPT_STREAMING))
 		opts->streaming = LOGICALREP_STREAM_OFF;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
@@ -260,15 +256,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_STREAMING;
 			opts->streaming = defGetStreamingMode(defel);
 		}
-		else if (IsSet(supported_opts, SUBOPT_SEQUENCES) &&
-				 strcmp(defel->defname, "sequences") == 0)
-		{
-			if (IsSet(opts->specified_opts, SUBOPT_SEQUENCES))
-				errorConflictingDefElem(defel, pstate);
-
-			opts->specified_opts |= SUBOPT_SEQUENCES;
-			opts->sequences = defGetBoolean(defel);
-		}
 		else if (strcmp(defel->defname, "two_phase") == 0)
 		{
 			/*
@@ -605,7 +592,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | SUBOPT_SEQUENCES);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -704,7 +691,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
 	values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
-	values[Anum_pg_subscription_subsequences - 1] = CharGetDatum(opts.sequences);
 	values[Anum_pg_subscription_subtwophasestate - 1] =
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -1194,13 +1180,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subbinary - 1] = true;
 				}
 
-				if (IsSet(opts.specified_opts, SUBOPT_SEQUENCES))
-				{
-					values[Anum_pg_subscription_subsequences - 1] =
-						BoolGetDatum(opts.sequences);
-					replaces[Anum_pg_subscription_subsequences - 1] = true;
-				}
-
 				if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
 				{
 					values[Anum_pg_subscription_substream - 1] =
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f8420ec4b4b..60d5c1fc403 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -461,14 +461,6 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", streaming '%s'",
 							 options->proto.logical.streaming_str);
 
-		if (PQserverVersion(conn->streamConn) >= 170000)
-		{
-			if (options->proto.logical.sequences)
-				appendStringInfoString(&cmd, ", sequences 'on'");
-			else
-				appendStringInfoString(&cmd, ", sequences 'off'");
-		}
-
 		if (options->proto.logical.twophase &&
 			PQserverVersion(conn->streamConn) >= 150000)
 			appendStringInfoString(&cmd, ", two_phase 'on'");
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d648a126283..74ba6fd861e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -605,18 +605,6 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 		startup_cb_wrapper(ctx, &ctx->options, false);
 	MemoryContextSwitchTo(old_context);
 
-	/*
-	 * We allow decoding of sequences when the option is given at the streaming
-	 * start, provided the plugin supports all the callbacks for two-phase.
-	 *
-	 * XXX Similar behavior to the two-phase block below.
-	 *
-	 * XXX Shouldn't this error out if the callbacks are not defined? That is,
-	 * if the client requests sequences for pluging that does not support
-	 * sequences, maybe we should error-out instead of silently ignoring it.
-	 */
-	ctx->sequences &= ctx->sequences_opt_given;
-
 	/*
 	 * We allow decoding of prepared transactions when the two_phase is
 	 * enabled at the time of slot creation, or when the two_phase option is
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d5220398d25..7ebd13604c5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4741,21 +4741,9 @@ ApplyWorkerMain(Datum main_arg)
 		MyLogicalRepWorker->parallel_apply = false;
 	}
 
-	options.proto.logical.sequences = false;
 	options.proto.logical.twophase = false;
 	options.proto.logical.origin = pstrdup(MySubscription->origin);
 
-	/*
-	 * Assign the appropriate option value for sequence decoding option according
-	 * to the 'sequences' mode and the publisher's ability to support that mode.
-	 *
-	 * XXX Isn't this redundant with the version check in libpqwalreceiver.c, using
-	 * PQserverVersion(conn->streamConn)?
-	 */
-	if (server_version >= 170000 &&
-		MySubscription->sequences)
-		options.proto.logical.sequences = true;
-
 	if (!am_tablesync_worker())
 	{
 		/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index afa420a88cc..80262adf3e5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -290,7 +290,6 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
 	bool		messages_option_given = false;
-	bool		sequences_option_given = false;
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
@@ -299,7 +298,6 @@ parse_output_parameters(List *options, PGOutputData *data)
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
-	data->sequences = false;
 
 	foreach(lc, options)
 	{
@@ -368,16 +366,6 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->messages = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "sequences") == 0)
-		{
-			if (sequences_option_given)
-				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
-			sequences_option_given = true;
-
-			data->sequences = defGetBoolean(defel);
-		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
 			if (streaming_given)
@@ -522,27 +510,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		else
 			ctx->twophase_opt_given = true;
 
-		/*
-		 * Here, we just check whether the sequences decoding option is passed
-		 * by plugin and decide whether to enable it at later point of time. It
-		 * remains enabled if the previous start-up has done so. But we only
-		 * allow the option to be passed in with sufficient version of the
-		 * protocol, and when the output plugin supports it.
-		 */
-		if (!data->sequences)
-			ctx->sequences_opt_given = false;
-		else if (data->protocol_version < LOGICALREP_PROTO_SEQUENCES_VERSION_NUM)
-			ereport(ERROR,
-					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					 errmsg("requested proto_version=%d does not support sequences, need %d or higher",
-							data->protocol_version, LOGICALREP_PROTO_SEQUENCES_VERSION_NUM)));
-		else if (!ctx->sequences)
-			ereport(ERROR,
-					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					 errmsg("sequences requested, but not supported by output plugin")));
-		else
-			ctx->sequences_opt_given = true;
-
 		/* Init publication state. */
 		data->publications = NIL;
 		publications_valid = false;
@@ -1716,12 +1683,18 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
 	TransactionId xid = InvalidTransactionId;
 	RelationSyncEntry *relentry;
 
-	if (!data->sequences)
-		return;
-
 	if (!is_publishable_relation(relation))
 		return;
 
+	/*
+	 * If the negotiated protocol version does not support sequences, yet we
+	 * found a sequence in the publication, error out.
+	 */
+	if (data->protocol_version < LOGICALREP_PROTO_SEQUENCES_VERSION_NUM)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				errmsg("protocol version does not support sequence replication")));
+
 	/*
 	 * Remember the xid for the message in streaming mode. See
 	 * pgoutput_change.
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 047e86fcd9b..1d40eebc789 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -83,9 +83,6 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	char		substream;		/* Stream in-progress transactions. See
 								 * LOGICALREP_STREAM_xxx constants. */
 
-	bool		subsequences;	/* True if the subscription wants the
-								 * publisher to send sequence data */
-
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
 	bool		subdisableonerr;	/* True if a worker error should cause the
@@ -136,7 +133,6 @@ typedef struct Subscription
 	char		stream;			/* Allow streaming in-progress transactions.
 								 * See LOGICALREP_STREAM_xxx constants. */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
-	bool		sequences;		/* Allow replication of sequence increments. */
 	bool		disableonerr;	/* Indicates if the subscription should be
 								 * automatically disabled if a worker error
 								 * occurs */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index df31b1276f6..c62ac4247b0 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -106,15 +106,6 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		twophase_opt_given;
 
-	/*
-	 * Is sequences option given by output plugin?
-	 *
-	 * This indicates the plugin passed the sequences option as part of the
-	 * START_STREAMING command.  We can't rely solely on the sequences flag
-	 * which only tells whether the plugin provided the necessary callback.
-	 */
-	bool		sequences_opt_given;
-
 	/*
 	 * State for writing output.
 	 */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 56dfa2a7417..b4a8015403b 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -30,7 +30,6 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	char	   *origin;
-	bool		sequences;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 229cb3ca589..281626fa6f5 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -185,7 +185,6 @@ typedef struct
 			char	   *streaming_str;	/* Streaming of large transactions */
 			bool		twophase;	/* Streaming of two-phase transactions at
 									 * prepare time */
-			bool		sequences;	/* Replication of sequences. */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
 		}			logical;
-- 
2.41.0

