From 84640161f11b80b9104d893ab9cdac0c987bcc1f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@2ndquadrant.com>
Date: Sun, 2 Apr 2023 18:57:53 +0200
Subject: [PATCH 7/7] protocol changes

---
 src/backend/catalog/pg_subscription.c         |  1 +
 src/backend/commands/subscriptioncmds.c       | 23 +++++++++++++++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  3 +++
 src/backend/replication/logical/decode.c      | 15 ++++++++++++
 src/backend/replication/logical/logical.c     | 24 +++++++++++++++++++
 src/backend/replication/logical/worker.c      | 11 ++++++++-
 src/backend/replication/pgoutput/pgoutput.c   | 23 +++++++++++++++++-
 src/include/catalog/pg_subscription.h         |  4 ++++
 src/include/replication/logical.h             | 15 ++++++++++++
 src/include/replication/logicalproto.h        |  3 ++-
 src/include/replication/walreceiver.h         |  1 +
 11 files changed, 119 insertions(+), 4 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 87e8ea7efa..bc296a6eaf 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -68,6 +68,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
 	sub->binary = subform->subbinary;
+	sub->sequences = subform->subsequences;
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8f52bc9c37..c1b7de0470 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -70,6 +70,7 @@
 #define SUBOPT_PASSWORD_REQUIRED	0x00000800
 #define SUBOPT_LSN					0x00001000
 #define SUBOPT_ORIGIN				0x00002000
+#define SUBOPT_SEQUENCES			0x00004000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -91,6 +92,7 @@ typedef struct SubOpts
 	bool		binary;
 	char		streaming;
 	bool		twophase;
+	bool		sequences;
 	bool		disableonerr;
 	bool		passwordrequired;
 	char	   *origin;
@@ -144,6 +146,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->refresh = true;
 	if (IsSet(supported_opts, SUBOPT_BINARY))
 		opts->binary = false;
+	if (IsSet(supported_opts, SUBOPT_SEQUENCES))
+		opts->sequences = false;
 	if (IsSet(supported_opts, SUBOPT_STREAMING))
 		opts->streaming = LOGICALREP_STREAM_OFF;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
@@ -252,6 +256,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_STREAMING;
 			opts->streaming = defGetStreamingMode(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_SEQUENCES) &&
+				 strcmp(defel->defname, "sequences") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_SEQUENCES))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_SEQUENCES;
+			opts->sequences = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "two_phase") == 0)
 		{
 			/*
@@ -579,7 +592,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_ORIGIN);
+					  SUBOPT_ORIGIN | SUBOPT_SEQUENCES);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -676,6 +689,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
 	values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
+	values[Anum_pg_subscription_subsequences - 1] = CharGetDatum(opts.sequences);
 	values[Anum_pg_subscription_subtwophasestate - 1] =
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -1163,6 +1177,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subbinary - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_SEQUENCES))
+				{
+					values[Anum_pg_subscription_subsequences - 1] =
+						BoolGetDatum(opts.sequences);
+					replaces[Anum_pg_subscription_subsequences - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
 				{
 					values[Anum_pg_subscription_substream - 1] =
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 052505e46f..6f636647ca 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -461,6 +461,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", streaming '%s'",
 							 options->proto.logical.streaming_str);
 
+		if (options->proto.logical.sequences)
+			appendStringInfoString(&cmd, ", sequences 'on'");
+
 		if (options->proto.logical.twophase &&
 			PQserverVersion(conn->streamConn) >= 150000)
 			appendStringInfoString(&cmd, ", two_phase 'on'");
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index cd2f88e0bd..5dd94779dc 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -1329,6 +1329,21 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	RepOriginId origin_id = XLogRecGetOrigin(r);
 	bool		transactional;
 
+	/*
+	 * ignore sequences when not requested
+	 *
+	 * XXX Maybe we should differentiate between "callbacks not defined" or
+	 * "subscriber disabled sequence replication" and "subscriber does not
+	 * know about sequence replication" (e.g. old subscriber version).
+	 *
+	 * For the first two it'd be fine to bail out here, but for the last it
+	 * might be better to continue and error out only when the sequence
+	 * would be replicated (e.g. as part of the publication). We don't know
+	 * that here, unfortunately.
+	 */
+	if (!ctx->sequences)
+		return;
+
 	/* only decode changes flagged with XLOG_SEQ_LOG */
 	if (info != XLOG_SEQ_LOG)
 		elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 675cf12366..0d1497f66b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -300,6 +300,20 @@ StartupDecodingContext(List *output_plugin_options,
 	 */
 	ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
 
+	/*
+	 * To support logical decoding of sequences, we require the sequence
+	 * callback. We decide it here, but only check it later in the wrappers.
+	 *
+	 * XXX Isn't it wrong to define only one of those callbacks? Say we
+	 * only define the stream_sequence_cb() - that may get strange results
+	 * depending on what gets streamed. Either none or both?
+	 *
+	 * XXX Shouldn't sequence be defined at slot creation time, similar
+	 * to two_phase? Probably not.
+	 */
+	ctx->sequences = ((ctx->callbacks.sequence_cb != NULL) ||
+					  (ctx->callbacks.stream_sequence_cb != NULL));
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -566,6 +580,16 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 		startup_cb_wrapper(ctx, &ctx->options, false);
 	MemoryContextSwitchTo(old_context);
 
+	/*
+	 * We allow decoding of sequences when the option is given at the streaming
+	 * start, provided the plugin supports all the callbacks for two-phase.
+	 *
+	 * XXX Similar behavior to the two-phase block below.
+	 *
+	 * XXX Shouldn't this error out if the callbacks are not defined?
+	 */
+	ctx->sequences &= ctx->sequences_opt_given;
+
 	/*
 	 * We allow decoding of prepared transactions when the two_phase is
 	 * enabled at the time of slot creation, or when the two_phase option is
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 28059f9e97..c129123bc4 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4664,7 +4664,7 @@ ApplyWorkerMain(Datum main_arg)
 
 	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
 	options.proto.logical.proto_version =
-		server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
+		server_version >= 160000 ? LOGICALREP_PROTO_SEQUENCES_VERSION_NUM :
 		server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
 		server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
 		LOGICALREP_PROTO_VERSION_NUM;
@@ -4694,9 +4694,18 @@ ApplyWorkerMain(Datum main_arg)
 		MyLogicalRepWorker->parallel_apply = false;
 	}
 
+	options.proto.logical.sequences = false;
 	options.proto.logical.twophase = false;
 	options.proto.logical.origin = pstrdup(MySubscription->origin);
 
+	/*
+	 * Assign the appropriate option value for sequence decoding option according
+	 * to the 'sequences' mode and the publisher's ability to support that mode.
+	 */
+	if (server_version >= 160000 &&
+		MySubscription->sequences)
+		options.proto.logical.sequences = true;
+
 	if (!am_tablesync_worker())
 	{
 		/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index af037ec546..50dbd66068 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -299,7 +299,7 @@ parse_output_parameters(List *options, PGOutputData *data)
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
-	data->sequences = true;
+	data->sequences = false;
 
 	foreach(lc, options)
 	{
@@ -522,6 +522,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		else
 			ctx->twophase_opt_given = true;
 
+		/*
+		 * Here, we just check whether the sequences decoding option is passed
+		 * by plugin and decide whether to enable it at later point of time. It
+		 * remains enabled if the previous start-up has done so. But we only
+		 * allow the option to be passed in with sufficient version of the
+		 * protocol, and when the output plugin supports it.
+		 */
+		if (!data->sequences)
+			ctx->sequences_opt_given = false;
+		else if (data->protocol_version < LOGICALREP_PROTO_SEQUENCES_VERSION_NUM)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("requested proto_version=%d does not support sequences, need %d or higher",
+							data->protocol_version, LOGICALREP_PROTO_SEQUENCES_VERSION_NUM)));
+		else if (!ctx->sequences)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("sequences requested, but not supported by output plugin")));
+		else
+			ctx->sequences_opt_given = true;
+
 		/* Init publication state. */
 		data->publications = NIL;
 		publications_valid = false;
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 6319f598d8..feca2b02ea 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -83,6 +83,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	char		substream;		/* Stream in-progress transactions. See
 								 * LOGICALREP_STREAM_xxx constants. */
 
+	bool		subsequences;	/* True if the subscription wants the
+								 * publisher to send sequence data */
+
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
 	bool		subdisableonerr;	/* True if a worker error should cause the
@@ -130,6 +133,7 @@ typedef struct Subscription
 	char		stream;			/* Allow streaming in-progress transactions.
 								 * See LOGICALREP_STREAM_xxx constants. */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
+	bool		sequences;		/* Allow replication of sequence increments. */
 	bool		disableonerr;	/* Indicates if the subscription should be
 								 * automatically disabled if a worker error
 								 * occurs */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea0..df31b1276f 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -90,6 +90,12 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		twophase;
 
+	/*
+	 * Does the output pluging support decoding of sequence increments, and
+	 * is it enabled?
+	 */
+	bool		sequences;
+
 	/*
 	 * Is two-phase option given by output plugin?
 	 *
@@ -100,6 +106,15 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		twophase_opt_given;
 
+	/*
+	 * Is sequences option given by output plugin?
+	 *
+	 * This indicates the plugin passed the sequences option as part of the
+	 * START_STREAMING command.  We can't rely solely on the sequences flag
+	 * which only tells whether the plugin provided the necessary callback.
+	 */
+	bool		sequences_opt_given;
+
 	/*
 	 * State for writing output.
 	 */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 6d82d5b6cc..25de258094 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -42,7 +42,8 @@
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
 #define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
 #define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
-#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
+#define LOGICALREP_PROTO_SEQUENCES_VERSION_NUM 5
+#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_SEQUENCES_VERSION_NUM
 
 /*
  * Logical message types
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 281626fa6f..229cb3ca58 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -185,6 +185,7 @@ typedef struct
 			char	   *streaming_str;	/* Streaming of large transactions */
 			bool		twophase;	/* Streaming of two-phase transactions at
 									 * prepare time */
+			bool		sequences;	/* Replication of sequences. */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
 		}			logical;
-- 
2.39.2

