0005-Simplify-protocol-versioning-20230720.patch
text/x-patch
Filename: 0005-Simplify-protocol-versioning-20230720.patch
Type: text/x-patch
Part: 4
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0005
Subject: Simplify protocol versioning
| File | + | − |
|---|---|---|
| contrib/test_decoding/test_decoding.c | 10 | 8 |
| src/backend/catalog/pg_subscription.c | 0 | 1 |
| src/backend/commands/subscriptioncmds.c | 1 | 22 |
| src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 0 | 8 |
| src/backend/replication/logical/logical.c | 0 | 12 |
| src/backend/replication/logical/worker.c | 0 | 12 |
| src/backend/replication/pgoutput/pgoutput.c | 9 | 36 |
| src/include/catalog/pg_subscription.h | 0 | 4 |
| src/include/replication/logical.h | 0 | 9 |
| src/include/replication/pgoutput.h | 0 | 1 |
| src/include/replication/walreceiver.h | 0 | 1 |
From c262ef2bee45672eafdfd9fa87b2ca15b0dbc69b Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Wed, 12 Jul 2023 20:08:04 +0200
Subject: [PATCH 5/6] Simplify protocol versioning
https://www.postgresql.org/message-id/8046273f-ea88-5c97-5540-0ccd5d244fd4@enterprisedb.com
---
contrib/test_decoding/test_decoding.c | 18 ++++----
src/backend/catalog/pg_subscription.c | 1 -
src/backend/commands/subscriptioncmds.c | 23 +---------
.../libpqwalreceiver/libpqwalreceiver.c | 8 ----
src/backend/replication/logical/logical.c | 12 -----
src/backend/replication/logical/worker.c | 12 -----
src/backend/replication/pgoutput/pgoutput.c | 45 ++++---------------
src/include/catalog/pg_subscription.h | 4 --
src/include/replication/logical.h | 9 ----
src/include/replication/pgoutput.h | 1 -
src/include/replication/walreceiver.h | 1 -
11 files changed, 20 insertions(+), 114 deletions(-)
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 60ccbacadf8..9c40702da7e 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -29,6 +29,7 @@ typedef struct
MemoryContext context;
bool include_xids;
bool include_timestamp;
+ bool include_sequences;
bool skip_empty_xacts;
bool only_local;
} TestDecodingData;
@@ -169,7 +170,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ListCell *option;
TestDecodingData *data;
bool enable_streaming = false;
- bool enable_sequences = false;
data = palloc0(sizeof(TestDecodingData));
data->context = AllocSetContextCreate(ctx->context,
@@ -179,6 +179,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_timestamp = false;
data->skip_empty_xacts = false;
data->only_local = false;
+ data->include_sequences = false;
ctx->output_plugin_private = data;
@@ -272,10 +273,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
else if (strcmp(elem->defname, "include-sequences") == 0)
{
-
if (elem->arg == NULL)
- continue;
- else if (!parse_bool(strVal(elem->arg), &enable_sequences))
+ data->include_sequences = true;
+ else if (!parse_bool(strVal(elem->arg), &data->include_sequences))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
@@ -291,11 +291,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
}
- /* remember the user explicitly requested sequences, otherwise the */
- ctx->sequences_opt_given = enable_sequences;
-
ctx->streaming &= enable_streaming;
- ctx->sequences &= enable_sequences;
}
/* cleanup this plugin's resources */
@@ -797,6 +793,9 @@ pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TestDecodingData *data = ctx->output_plugin_private;
TestDecodingTxnData *txndata = txn->output_plugin_private;
+ if (!data->include_sequences)
+ return;
+
/* output BEGIN if we haven't yet, but only for the transactional case */
if (transactional)
{
@@ -1025,6 +1024,9 @@ pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TestDecodingData *data = ctx->output_plugin_private;
TestDecodingTxnData *txndata = txn->output_plugin_private;
+ if (!data->include_sequences)
+ return;
+
/* output BEGIN if we haven't yet, but only for the transactional case */
if (transactional)
{
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 49aa3b9f533..d07f88ce280 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -68,7 +68,6 @@ GetSubscription(Oid subid, bool missing_ok)
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->binary = subform->subbinary;
- sub->sequences = subform->subsequences;
sub->stream = subform->substream;
sub->twophasestate = subform->subtwophasestate;
sub->disableonerr = subform->subdisableonerr;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 7078b61c773..8be85c8659c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -71,7 +71,6 @@
#define SUBOPT_RUN_AS_OWNER 0x00001000
#define SUBOPT_LSN 0x00002000
#define SUBOPT_ORIGIN 0x00004000
-#define SUBOPT_SEQUENCES 0x00004000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -93,7 +92,6 @@ typedef struct SubOpts
bool binary;
char streaming;
bool twophase;
- bool sequences;
bool disableonerr;
bool passwordrequired;
bool runasowner;
@@ -148,8 +146,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->refresh = true;
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
- if (IsSet(supported_opts, SUBOPT_SEQUENCES))
- opts->sequences = true;
if (IsSet(supported_opts, SUBOPT_STREAMING))
opts->streaming = LOGICALREP_STREAM_OFF;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
@@ -260,15 +256,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_STREAMING;
opts->streaming = defGetStreamingMode(defel);
}
- else if (IsSet(supported_opts, SUBOPT_SEQUENCES) &&
- strcmp(defel->defname, "sequences") == 0)
- {
- if (IsSet(opts->specified_opts, SUBOPT_SEQUENCES))
- errorConflictingDefElem(defel, pstate);
-
- opts->specified_opts |= SUBOPT_SEQUENCES;
- opts->sequences = defGetBoolean(defel);
- }
else if (strcmp(defel->defname, "two_phase") == 0)
{
/*
@@ -605,7 +592,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | SUBOPT_SEQUENCES);
+ SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -704,7 +691,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
- values[Anum_pg_subscription_subsequences - 1] = CharGetDatum(opts.sequences);
values[Anum_pg_subscription_subtwophasestate - 1] =
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -1194,13 +1180,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subbinary - 1] = true;
}
- if (IsSet(opts.specified_opts, SUBOPT_SEQUENCES))
- {
- values[Anum_pg_subscription_subsequences - 1] =
- BoolGetDatum(opts.sequences);
- replaces[Anum_pg_subscription_subsequences - 1] = true;
- }
-
if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
{
values[Anum_pg_subscription_substream - 1] =
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f8420ec4b4b..60d5c1fc403 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -461,14 +461,6 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
appendStringInfo(&cmd, ", streaming '%s'",
options->proto.logical.streaming_str);
- if (PQserverVersion(conn->streamConn) >= 170000)
- {
- if (options->proto.logical.sequences)
- appendStringInfoString(&cmd, ", sequences 'on'");
- else
- appendStringInfoString(&cmd, ", sequences 'off'");
- }
-
if (options->proto.logical.twophase &&
PQserverVersion(conn->streamConn) >= 150000)
appendStringInfoString(&cmd, ", two_phase 'on'");
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d648a126283..74ba6fd861e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -605,18 +605,6 @@ CreateDecodingContext(XLogRecPtr start_lsn,
startup_cb_wrapper(ctx, &ctx->options, false);
MemoryContextSwitchTo(old_context);
- /*
- * We allow decoding of sequences when the option is given at the streaming
- * start, provided the plugin supports all the callbacks for two-phase.
- *
- * XXX Similar behavior to the two-phase block below.
- *
- * XXX Shouldn't this error out if the callbacks are not defined? That is,
- * if the client requests sequences for pluging that does not support
- * sequences, maybe we should error-out instead of silently ignoring it.
- */
- ctx->sequences &= ctx->sequences_opt_given;
-
/*
* We allow decoding of prepared transactions when the two_phase is
* enabled at the time of slot creation, or when the two_phase option is
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d5220398d25..7ebd13604c5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4741,21 +4741,9 @@ ApplyWorkerMain(Datum main_arg)
MyLogicalRepWorker->parallel_apply = false;
}
- options.proto.logical.sequences = false;
options.proto.logical.twophase = false;
options.proto.logical.origin = pstrdup(MySubscription->origin);
- /*
- * Assign the appropriate option value for sequence decoding option according
- * to the 'sequences' mode and the publisher's ability to support that mode.
- *
- * XXX Isn't this redundant with the version check in libpqwalreceiver.c, using
- * PQserverVersion(conn->streamConn)?
- */
- if (server_version >= 170000 &&
- MySubscription->sequences)
- options.proto.logical.sequences = true;
-
if (!am_tablesync_worker())
{
/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index afa420a88cc..80262adf3e5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -290,7 +290,6 @@ parse_output_parameters(List *options, PGOutputData *data)
bool publication_names_given = false;
bool binary_option_given = false;
bool messages_option_given = false;
- bool sequences_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
bool origin_option_given = false;
@@ -299,7 +298,6 @@ parse_output_parameters(List *options, PGOutputData *data)
data->streaming = LOGICALREP_STREAM_OFF;
data->messages = false;
data->two_phase = false;
- data->sequences = false;
foreach(lc, options)
{
@@ -368,16 +366,6 @@ parse_output_parameters(List *options, PGOutputData *data)
data->messages = defGetBoolean(defel);
}
- else if (strcmp(defel->defname, "sequences") == 0)
- {
- if (sequences_option_given)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
- sequences_option_given = true;
-
- data->sequences = defGetBoolean(defel);
- }
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -522,27 +510,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
else
ctx->twophase_opt_given = true;
- /*
- * Here, we just check whether the sequences decoding option is passed
- * by plugin and decide whether to enable it at later point of time. It
- * remains enabled if the previous start-up has done so. But we only
- * allow the option to be passed in with sufficient version of the
- * protocol, and when the output plugin supports it.
- */
- if (!data->sequences)
- ctx->sequences_opt_given = false;
- else if (data->protocol_version < LOGICALREP_PROTO_SEQUENCES_VERSION_NUM)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("requested proto_version=%d does not support sequences, need %d or higher",
- data->protocol_version, LOGICALREP_PROTO_SEQUENCES_VERSION_NUM)));
- else if (!ctx->sequences)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("sequences requested, but not supported by output plugin")));
- else
- ctx->sequences_opt_given = true;
-
/* Init publication state. */
data->publications = NIL;
publications_valid = false;
@@ -1716,12 +1683,18 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
TransactionId xid = InvalidTransactionId;
RelationSyncEntry *relentry;
- if (!data->sequences)
- return;
-
if (!is_publishable_relation(relation))
return;
+ /*
+ * If the negotiated protocol version does not support sequences, yet we
+ * found a sequence in the publication, error out.
+ */
+ if (data->protocol_version < LOGICALREP_PROTO_SEQUENCES_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("protocol version does not support sequence replication")));
+
/*
* Remember the xid for the message in streaming mode. See
* pgoutput_change.
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 047e86fcd9b..1d40eebc789 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -83,9 +83,6 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
char substream; /* Stream in-progress transactions. See
* LOGICALREP_STREAM_xxx constants. */
- bool subsequences; /* True if the subscription wants the
- * publisher to send sequence data */
-
char subtwophasestate; /* Stream two-phase transactions */
bool subdisableonerr; /* True if a worker error should cause the
@@ -136,7 +133,6 @@ typedef struct Subscription
char stream; /* Allow streaming in-progress transactions.
* See LOGICALREP_STREAM_xxx constants. */
char twophasestate; /* Allow streaming two-phase transactions */
- bool sequences; /* Allow replication of sequence increments. */
bool disableonerr; /* Indicates if the subscription should be
* automatically disabled if a worker error
* occurs */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index df31b1276f6..c62ac4247b0 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -106,15 +106,6 @@ typedef struct LogicalDecodingContext
*/
bool twophase_opt_given;
- /*
- * Is sequences option given by output plugin?
- *
- * This indicates the plugin passed the sequences option as part of the
- * START_STREAMING command. We can't rely solely on the sequences flag
- * which only tells whether the plugin provided the necessary callback.
- */
- bool sequences_opt_given;
-
/*
* State for writing output.
*/
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 56dfa2a7417..b4a8015403b 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -30,7 +30,6 @@ typedef struct PGOutputData
bool messages;
bool two_phase;
char *origin;
- bool sequences;
} PGOutputData;
#endif /* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 229cb3ca589..281626fa6f5 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -185,7 +185,6 @@ typedef struct
char *streaming_str; /* Streaming of large transactions */
bool twophase; /* Streaming of two-phase transactions at
* prepare time */
- bool sequences; /* Replication of sequences. */
char *origin; /* Only publish data originating from the
* specified origin */
} logical;
--
2.41.0