v20240111-0005-CREATE-SUBSCRIPTION-flag-to-enable-sequenc.patch
text/x-patch
Filename: v20240111-0005-CREATE-SUBSCRIPTION-flag-to-enable-sequenc.patch
Type: text/x-patch
Part: 4
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v20240111-0005
Subject: CREATE SUBSCRIPTION flag to enable sequences
| File | + | − |
|---|---|---|
| contrib/test_decoding/test_decoding.c | 3 | 10 |
| doc/src/sgml/logicaldecoding.sgml | 8 | 0 |
| doc/src/sgml/ref/alter_subscription.sgml | 3 | 2 |
| doc/src/sgml/ref/create_subscription.sgml | 12 | 0 |
| src/backend/catalog/pg_subscription.c | 1 | 0 |
| src/backend/commands/subscriptioncmds.c | 38 | 5 |
| src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 4 | 0 |
| src/backend/replication/logical/logical.c | 20 | 0 |
| src/backend/replication/logical/worker.c | 10 | 0 |
| src/backend/replication/pgoutput/pgoutput.c | 14 | 0 |
| src/include/catalog/pg_subscription.h | 4 | 0 |
| src/include/replication/output_plugin.h | 1 | 0 |
| src/include/replication/pgoutput.h | 1 | 0 |
| src/include/replication/walreceiver.h | 1 | 0 |
| src/test/subscription/t/034_sequences.pl | 1 | 1 |
From 58fd0bd09edfa2814e163e75e05449946ad1795b Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Wed, 10 Jan 2024 17:22:07 +0100
Subject: [PATCH v20240111 5/7] CREATE SUBSCRIPTION flag to enable sequences
---
contrib/test_decoding/test_decoding.c | 13 ++----
doc/src/sgml/logicaldecoding.sgml | 8 ++++
doc/src/sgml/ref/alter_subscription.sgml | 5 ++-
doc/src/sgml/ref/create_subscription.sgml | 12 ++++++
src/backend/catalog/pg_subscription.c | 1 +
src/backend/commands/subscriptioncmds.c | 43 ++++++++++++++++---
.../libpqwalreceiver/libpqwalreceiver.c | 4 ++
src/backend/replication/logical/logical.c | 20 +++++++++
src/backend/replication/logical/worker.c | 10 +++++
src/backend/replication/pgoutput/pgoutput.c | 14 ++++++
src/include/catalog/pg_subscription.h | 4 ++
src/include/replication/output_plugin.h | 1 +
src/include/replication/pgoutput.h | 1 +
src/include/replication/walreceiver.h | 1 +
src/test/subscription/t/034_sequences.pl | 2 +-
15 files changed, 121 insertions(+), 18 deletions(-)
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 200caad6863..072bbba575b 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -29,7 +29,6 @@ typedef struct
MemoryContext context;
bool include_xids;
bool include_timestamp;
- bool include_sequences;
bool skip_empty_xacts;
bool only_local;
} TestDecodingData;
@@ -179,12 +178,12 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_timestamp = false;
data->skip_empty_xacts = false;
data->only_local = false;
- data->include_sequences = false;
ctx->output_plugin_private = data;
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
opt->receive_rewrites = false;
+ opt->receive_sequences = false;
foreach(option, ctx->output_plugin_options)
{
@@ -275,8 +274,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
{
if (elem->arg == NULL)
- data->include_sequences = true;
- else if (!parse_bool(strVal(elem->arg), &data->include_sequences))
+ opt->receive_sequences = true;
+ else if (!parse_bool(strVal(elem->arg), &opt->receive_sequences))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
@@ -794,9 +793,6 @@ pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TestDecodingData *data = ctx->output_plugin_private;
TestDecodingTxnData *txndata = txn->output_plugin_private;
- if (!data->include_sequences)
- return;
-
/* output BEGIN if we haven't yet, but only for the transactional case */
if (transactional)
{
@@ -1038,9 +1034,6 @@ pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TestDecodingData *data = ctx->output_plugin_private;
TestDecodingTxnData *txndata = txn->output_plugin_private;
- if (!data->include_sequences)
- return;
-
/* output BEGIN if we haven't yet, but only for the transactional case */
if (transactional)
{
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 90e99a95128..31f8aa7f932 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -646,6 +646,7 @@ typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
+ bool receive_sequences;
} OutputPluginOptions;
</programlisting>
<literal>output_type</literal> has to either be set to
@@ -658,6 +659,13 @@ typedef struct OutputPluginOptions
replication, but they require special handling.
</para>
+ <para>
+ If <literal>receive_sequences</literal> is true, the output plugin will
+ also be called for changes made by operations on sequences. These are
+ of interest to plugins that need to maintain sequences consistent with
+ the rest of the data, for example for purposes of upgrade or failover.
+ </para>
+
<para>
The startup callback should validate the options present in
<literal>ctx->output_plugin_options</literal>. If the output plugin
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 479ec495896..2d24ff5768e 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -226,8 +226,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<link linkend="sql-createsubscription-params-with-streaming"><literal>streaming</literal></link>,
<link linkend="sql-createsubscription-params-with-disable-on-error"><literal>disable_on_error</literal></link>,
<link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
- <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>, and
- <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>.
+ <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
+ <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link> and
+ <link linkend="sql-createsubscription-params-with-sequences"><literal>sequences</literal></link>.
Only a superuser can set <literal>password_required = false</literal>.
</para>
</listitem>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index f1c20b3a465..4591e37dac7 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -399,6 +399,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
</listitem>
</varlistentry>
+
+ <varlistentry id="sql-createsubscription-params-with-sequences">
+ <term><literal>sequences</literal> (<type>boolean</type>)</term>
+ <listitem>
+ <para>
+ Specifies whether the subscription will request the publisher to
+ decode and send sequence changes. Note that for sequences to be
+ decoded, the sequence also has to be added to the publication.
+ The default is <literal>false</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist></para>
</listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c516c25ac7b..e52abf61791 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -73,6 +73,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->disableonerr = subform->subdisableonerr;
sub->passwordrequired = subform->subpasswordrequired;
sub->runasowner = subform->subrunasowner;
+ sub->sequences = subform->subsequences;
/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 144609c1b9f..76b1bbd9b2f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -71,6 +71,7 @@
#define SUBOPT_RUN_AS_OWNER 0x00001000
#define SUBOPT_LSN 0x00002000
#define SUBOPT_ORIGIN 0x00004000
+#define SUBOPT_SEQUENCES 0x00008000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -97,6 +98,7 @@ typedef struct SubOpts
bool runasowner;
char *origin;
XLogRecPtr lsn;
+ bool sequences;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -158,6 +160,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->runasowner = false;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+ if (IsSet(supported_opts, SUBOPT_SEQUENCES))
+ opts->sequences = false;
/* Parse options */
foreach(lc, stmt_options)
@@ -354,6 +358,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_LSN;
opts->lsn = lsn;
}
+ else if (IsSet(supported_opts, SUBOPT_SEQUENCES) &&
+ strcmp(defel->defname, "sequences") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_SEQUENCES))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_SEQUENCES;
+ opts->sequences = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -592,7 +605,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
+ SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | SUBOPT_SEQUENCES);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -698,6 +711,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
+ values[Anum_pg_subscription_subsequences - 1] = BoolGetDatum(opts.sequences);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -762,8 +776,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* relation sync status info.
*/
relations = fetch_table_list(wrconn, publications);
- relations = list_concat(relations,
- fetch_sequence_list(wrconn, publications));
+
+ /*
+ * Add sequences, but only if the subscription explicitly enabled
+ * them to be replicated.
+ */
+ if (opts.sequences)
+ relations = list_concat(relations,
+ fetch_sequence_list(wrconn, publications));
foreach(lc, relations)
{
@@ -888,8 +908,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
/* Get the list of relations from publisher. */
pubrel_names = fetch_table_list(wrconn, sub->publications);
- pubrel_names = list_concat(pubrel_names,
- fetch_sequence_list(wrconn, sub->publications));
+
+ /*
+ * Add sequences, but only if the subscription explicitly enabled
+ * them to be replicated.
+ */
+ if (sub->sequences)
+ pubrel_names = list_concat(pubrel_names,
+ fetch_sequence_list(wrconn, sub->publications));
/* Get local table list. */
subrel_states = GetSubscriptionRelations(sub->oid, false);
@@ -1224,6 +1250,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_suborigin - 1] = true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_SEQUENCES))
+ {
+ values[Anum_pg_subscription_subsequences - 1] =
+ BoolGetDatum(opts.sequences);
+ replaces[Anum_pg_subscription_subsequences - 1] = true;
+ }
+
update_tuple = true;
break;
}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 78344a03615..07a0e55c21d 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -475,6 +475,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
appendStringInfo(&cmd, ", origin '%s'",
options->proto.logical.origin);
+ if (options->proto.logical.sequences &&
+ PQserverVersion(conn->streamConn) >= 170000)
+ appendStringInfo(&cmd, ", sequences 'on'");
+
pubnames = options->proto.logical.publication_names;
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8cadc0e9e6f..86557745e18 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -475,6 +475,16 @@ CreateInitDecodingContext(const char *plugin,
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
+ /*
+ * StartupDecodingContext only checks which callbacks are defined, and
+ * startup_cb_wrapper sets receiver_sequences flag. We need to combine
+ * those two pieces and disable sequences if receiver_sequences=false.
+ *
+ * XXX We could also leave this up to the plugin startup, but it seems
+ * cleaner to just do it here.
+ */
+ ctx->sequences &= ctx->options.receive_sequences;
+
return ctx;
}
@@ -622,6 +632,16 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
+ /*
+ * StartupDecodingContext only checks which callbacks are defined, and
+ * startup_cb_wrapper sets receiver_sequences flag. We need to combine
+ * those two pieces and disable sequences if receiver_sequences=false.
+ *
+ * XXX We could also leave this up to the plugin startup, but it seems
+ * cleaner to just do it here.
+ */
+ ctx->sequences &= ctx->options.receive_sequences;
+
ereport(LOG,
(errmsg("starting logical decoding for slot \"%s\"",
NameStr(slot->data.name)),
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 01fe9527b6b..6a71db354a3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4040,6 +4040,7 @@ maybe_reread_subscription(void)
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
newsub->stream != MySubscription->stream ||
+ newsub->sequences != MySubscription->sequences ||
newsub->passwordrequired != MySubscription->passwordrequired ||
strcmp(newsub->origin, MySubscription->origin) != 0 ||
newsub->owner != MySubscription->owner ||
@@ -4470,6 +4471,15 @@ set_stream_options(WalRcvStreamOptions *options,
options->proto.logical.publication_names = MySubscription->publications;
options->proto.logical.binary = MySubscription->binary;
+ /*
+ * FIXME maybe this should depend on server_version too? We don't want
+ * to request sequences from old releases, but maybe we should not just
+ * ignore that? Might easily lead to surprises.
+ *
+ * XXX Now there's a server_version check in libpqrcv_startstreaming.
+ */
+ options->proto.logical.sequences = MySubscription->sequences;
+
/*
* Assign the appropriate option value for streaming option according to
* the 'streaming' mode and the publisher's ability to support that mode.
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 8b8f81c5f8e..5a19306d26c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -291,6 +291,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool streaming_given = false;
bool two_phase_option_given = false;
bool origin_option_given = false;
+ bool sequences_option_given = false;
data->binary = false;
data->streaming = LOGICALREP_STREAM_OFF;
@@ -404,6 +405,16 @@ parse_output_parameters(List *options, PGOutputData *data)
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized origin value: \"%s\"", origin));
}
+ else if (strcmp(defel->defname, "sequences") == 0)
+ {
+ if (sequences_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ sequences_option_given = true;
+
+ data->sequences = defGetBoolean(defel);
+ }
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
@@ -442,6 +453,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
/* This plugin uses binary protocol. */
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+ opt->receive_sequences = false;
/*
* This is replication start and not slot initialization.
@@ -540,6 +552,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ctx->streaming = false;
ctx->twophase = false;
}
+
+ opt->receive_sequences = data->sequences;
}
/*
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index ca326255852..7205f9ed13b 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -93,6 +93,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subrunasowner; /* True if replication should execute as the
* subscription owner */
+ bool subsequences; /* True if replication stream should contain
+ * decoded sequence changes */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
@@ -139,6 +142,7 @@ typedef struct Subscription
* occurs */
bool passwordrequired; /* Must connection use a password? */
bool runasowner; /* Run replication as subscription owner */
+ bool sequences; /* Request sequences from the publisher */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2ae25c6f4fe..f9ca2994fa7 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -27,6 +27,7 @@ typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
+ bool receive_sequences;
} OutputPluginOptions;
/*
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 89f94e11472..a7461277a00 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -33,6 +33,7 @@ typedef struct PGOutputData
bool messages;
bool two_phase;
bool publish_no_origin;
+ bool sequences;
} PGOutputData;
#endif /* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0899891cdb8..c0526fd2cd3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -186,6 +186,7 @@ typedef struct
* prepare time */
char *origin; /* Only publish data originating from the
* specified origin */
+ bool sequences; /* Request sequences from publisher */
} logical;
} proto;
} WalRcvStreamOptions;
diff --git a/src/test/subscription/t/034_sequences.pl b/src/test/subscription/t/034_sequences.pl
index 2e9317642fc..ca706631203 100644
--- a/src/test/subscription/t/034_sequences.pl
+++ b/src/test/subscription/t/034_sequences.pl
@@ -54,7 +54,7 @@ EXCEPTION WHEN others THEN NULL;
END; \$\$ LANGUAGE plpgsql");
$node_subscriber->safe_psql('postgres',
- "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub"
+ "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub WITH (sequences = true)"
);
$node_publisher->wait_for_catchup('seq_sub');
--
2.43.0