0007-protocol-changes-20230402.patch
text/x-patch
Filename: 0007-protocol-changes-20230402.patch
Type: text/x-patch
Part: 6
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0007
Subject: protocol changes
| File | + | − |
|---|---|---|
| src/backend/catalog/pg_subscription.c | 1 | 0 |
| src/backend/commands/subscriptioncmds.c | 22 | 1 |
| src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 3 | 0 |
| src/backend/replication/logical/decode.c | 15 | 0 |
| src/backend/replication/logical/logical.c | 24 | 0 |
| src/backend/replication/logical/worker.c | 10 | 1 |
| src/backend/replication/pgoutput/pgoutput.c | 22 | 1 |
| src/include/catalog/pg_subscription.h | 4 | 0 |
| src/include/replication/logical.h | 15 | 0 |
| src/include/replication/logicalproto.h | 2 | 1 |
| src/include/replication/walreceiver.h | 1 | 0 |
From 84640161f11b80b9104d893ab9cdac0c987bcc1f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@2ndquadrant.com>
Date: Sun, 2 Apr 2023 18:57:53 +0200
Subject: [PATCH 7/7] protocol changes
---
src/backend/catalog/pg_subscription.c | 1 +
src/backend/commands/subscriptioncmds.c | 23 +++++++++++++++++-
.../libpqwalreceiver/libpqwalreceiver.c | 3 +++
src/backend/replication/logical/decode.c | 15 ++++++++++++
src/backend/replication/logical/logical.c | 24 +++++++++++++++++++
src/backend/replication/logical/worker.c | 11 ++++++++-
src/backend/replication/pgoutput/pgoutput.c | 23 +++++++++++++++++-
src/include/catalog/pg_subscription.h | 4 ++++
src/include/replication/logical.h | 15 ++++++++++++
src/include/replication/logicalproto.h | 3 ++-
src/include/replication/walreceiver.h | 1 +
11 files changed, 119 insertions(+), 4 deletions(-)
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 87e8ea7efa..bc296a6eaf 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -68,6 +68,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->binary = subform->subbinary;
+ sub->sequences = subform->subsequences;
sub->stream = subform->substream;
sub->twophasestate = subform->subtwophasestate;
sub->disableonerr = subform->subdisableonerr;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8f52bc9c37..c1b7de0470 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -70,6 +70,7 @@
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
#define SUBOPT_LSN 0x00001000
#define SUBOPT_ORIGIN 0x00002000
+#define SUBOPT_SEQUENCES 0x00004000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -91,6 +92,7 @@ typedef struct SubOpts
bool binary;
char streaming;
bool twophase;
+ bool sequences;
bool disableonerr;
bool passwordrequired;
char *origin;
@@ -144,6 +146,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->refresh = true;
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
+ if (IsSet(supported_opts, SUBOPT_SEQUENCES))
+ opts->sequences = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
opts->streaming = LOGICALREP_STREAM_OFF;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
@@ -252,6 +256,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_STREAMING;
opts->streaming = defGetStreamingMode(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_SEQUENCES) &&
+ strcmp(defel->defname, "sequences") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_SEQUENCES))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_SEQUENCES;
+ opts->sequences = defGetBoolean(defel);
+ }
else if (strcmp(defel->defname, "two_phase") == 0)
{
/*
@@ -579,7 +592,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_ORIGIN);
+ SUBOPT_ORIGIN | SUBOPT_SEQUENCES);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -676,6 +689,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
+ values[Anum_pg_subscription_subsequences - 1] = CharGetDatum(opts.sequences);
values[Anum_pg_subscription_subtwophasestate - 1] =
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -1163,6 +1177,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subbinary - 1] = true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_SEQUENCES))
+ {
+ values[Anum_pg_subscription_subsequences - 1] =
+ BoolGetDatum(opts.sequences);
+ replaces[Anum_pg_subscription_subsequences - 1] = true;
+ }
+
if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
{
values[Anum_pg_subscription_substream - 1] =
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 052505e46f..6f636647ca 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -461,6 +461,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
appendStringInfo(&cmd, ", streaming '%s'",
options->proto.logical.streaming_str);
+ if (options->proto.logical.sequences)
+ appendStringInfoString(&cmd, ", sequences 'on'");
+
if (options->proto.logical.twophase &&
PQserverVersion(conn->streamConn) >= 150000)
appendStringInfoString(&cmd, ", two_phase 'on'");
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index cd2f88e0bd..5dd94779dc 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -1329,6 +1329,21 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
RepOriginId origin_id = XLogRecGetOrigin(r);
bool transactional;
+ /*
+ * ignore sequences when not requested
+ *
+ * XXX Maybe we should differentiate between "callbacks not defined" or
+ * "subscriber disabled sequence replication" and "subscriber does not
+ * know about sequence replication" (e.g. old subscriber version).
+ *
+ * For the first two it'd be fine to bail out here, but for the last it
+ * might be better to continue and error out only when the sequence
+ * would be replicated (e.g. as part of the publication). We don't know
+ * that here, unfortunately.
+ */
+ if (!ctx->sequences)
+ return;
+
/* only decode changes flagged with XLOG_SEQ_LOG */
if (info != XLOG_SEQ_LOG)
elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 675cf12366..0d1497f66b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -300,6 +300,20 @@ StartupDecodingContext(List *output_plugin_options,
*/
ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
+ /*
+ * To support logical decoding of sequences, we require the sequence
+ * callback. We decide it here, but only check it later in the wrappers.
+ *
+ * XXX Isn't it wrong to define only one of those callbacks? Say we
+ * only define the stream_sequence_cb() - that may get strange results
+ * depending on what gets streamed. Either none or both?
+ *
+ * XXX Shouldn't sequence be defined at slot creation time, similar
+ * to two_phase? Probably not.
+ */
+ ctx->sequences = ((ctx->callbacks.sequence_cb != NULL) ||
+ (ctx->callbacks.stream_sequence_cb != NULL));
+
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
@@ -566,6 +580,16 @@ CreateDecodingContext(XLogRecPtr start_lsn,
startup_cb_wrapper(ctx, &ctx->options, false);
MemoryContextSwitchTo(old_context);
+ /*
+ * We allow decoding of sequences when the option is given at the streaming
+ * start, provided the plugin supports all the callbacks for two-phase.
+ *
+ * XXX Similar behavior to the two-phase block below.
+ *
+ * XXX Shouldn't this error out if the callbacks are not defined?
+ */
+ ctx->sequences &= ctx->sequences_opt_given;
+
/*
* We allow decoding of prepared transactions when the two_phase is
* enabled at the time of slot creation, or when the two_phase option is
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 28059f9e97..c129123bc4 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4664,7 +4664,7 @@ ApplyWorkerMain(Datum main_arg)
server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
options.proto.logical.proto_version =
- server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
+ server_version >= 160000 ? LOGICALREP_PROTO_SEQUENCES_VERSION_NUM :
server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
LOGICALREP_PROTO_VERSION_NUM;
@@ -4694,9 +4694,18 @@ ApplyWorkerMain(Datum main_arg)
MyLogicalRepWorker->parallel_apply = false;
}
+ options.proto.logical.sequences = false;
options.proto.logical.twophase = false;
options.proto.logical.origin = pstrdup(MySubscription->origin);
+ /*
+ * Assign the appropriate option value for sequence decoding option according
+ * to the 'sequences' mode and the publisher's ability to support that mode.
+ */
+ if (server_version >= 160000 &&
+ MySubscription->sequences)
+ options.proto.logical.sequences = true;
+
if (!am_tablesync_worker())
{
/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index af037ec546..50dbd66068 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -299,7 +299,7 @@ parse_output_parameters(List *options, PGOutputData *data)
data->streaming = LOGICALREP_STREAM_OFF;
data->messages = false;
data->two_phase = false;
- data->sequences = true;
+ data->sequences = false;
foreach(lc, options)
{
@@ -522,6 +522,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
else
ctx->twophase_opt_given = true;
+ /*
+ * Here, we just check whether the sequences decoding option is passed
+ * by plugin and decide whether to enable it at later point of time. It
+ * remains enabled if the previous start-up has done so. But we only
+ * allow the option to be passed in with sufficient version of the
+ * protocol, and when the output plugin supports it.
+ */
+ if (!data->sequences)
+ ctx->sequences_opt_given = false;
+ else if (data->protocol_version < LOGICALREP_PROTO_SEQUENCES_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support sequences, need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_SEQUENCES_VERSION_NUM)));
+ else if (!ctx->sequences)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("sequences requested, but not supported by output plugin")));
+ else
+ ctx->sequences_opt_given = true;
+
/* Init publication state. */
data->publications = NIL;
publications_valid = false;
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 6319f598d8..feca2b02ea 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -83,6 +83,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
char substream; /* Stream in-progress transactions. See
* LOGICALREP_STREAM_xxx constants. */
+ bool subsequences; /* True if the subscription wants the
+ * publisher to send sequence data */
+
char subtwophasestate; /* Stream two-phase transactions */
bool subdisableonerr; /* True if a worker error should cause the
@@ -130,6 +133,7 @@ typedef struct Subscription
char stream; /* Allow streaming in-progress transactions.
* See LOGICALREP_STREAM_xxx constants. */
char twophasestate; /* Allow streaming two-phase transactions */
+ bool sequences; /* Allow replication of sequence increments. */
bool disableonerr; /* Indicates if the subscription should be
* automatically disabled if a worker error
* occurs */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea0..df31b1276f 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -90,6 +90,12 @@ typedef struct LogicalDecodingContext
*/
bool twophase;
+ /*
+ * Does the output pluging support decoding of sequence increments, and
+ * is it enabled?
+ */
+ bool sequences;
+
/*
* Is two-phase option given by output plugin?
*
@@ -100,6 +106,15 @@ typedef struct LogicalDecodingContext
*/
bool twophase_opt_given;
+ /*
+ * Is sequences option given by output plugin?
+ *
+ * This indicates the plugin passed the sequences option as part of the
+ * START_STREAMING command. We can't rely solely on the sequences flag
+ * which only tells whether the plugin provided the necessary callback.
+ */
+ bool sequences_opt_given;
+
/*
* State for writing output.
*/
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 6d82d5b6cc..25de258094 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -42,7 +42,8 @@
#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
-#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
+#define LOGICALREP_PROTO_SEQUENCES_VERSION_NUM 5
+#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_SEQUENCES_VERSION_NUM
/*
* Logical message types
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 281626fa6f..229cb3ca58 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -185,6 +185,7 @@ typedef struct
char *streaming_str; /* Streaming of large transactions */
bool twophase; /* Streaming of two-phase transactions at
* prepare time */
+ bool sequences; /* Replication of sequences. */
char *origin; /* Only publish data originating from the
* specified origin */
} logical;
--
2.39.2