v20250610-0004-Introduce-REFRESH-PUBLICATION-SEQUENCES-fo.patch
application/x-patch
Filename: v20250610-0004-Introduce-REFRESH-PUBLICATION-SEQUENCES-fo.patch
Type: application/x-patch
Part: 3
Message:
Re: Logical Replication of sequences
From b825cd6549fbbfb1b3d02fc7ff05c38ac1019d1e Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Mon, 9 Jun 2025 20:18:54 +0530
Subject: [PATCH v20250610 4/6] Introduce "REFRESH PUBLICATION SEQUENCES" for
subscriptions
This patch introduce a new command to synchronize the sequences of
a subscription:
ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES
---
src/backend/catalog/pg_publication.c | 82 +++++
src/backend/catalog/pg_subscription.c | 61 +++-
src/backend/catalog/system_views.sql | 10 +
src/backend/commands/subscriptioncmds.c | 322 +++++++++++++++-----
src/backend/executor/execReplication.c | 4 +-
src/backend/parser/gram.y | 11 +-
src/backend/replication/logical/syncutils.c | 5 +-
src/bin/pg_dump/common.c | 4 +-
src/bin/pg_dump/pg_dump.c | 8 +-
src/bin/pg_dump/pg_dump.h | 2 +-
src/bin/psql/tab-complete.in.c | 2 +-
src/include/catalog/pg_proc.dat | 5 +
src/include/catalog/pg_publication.h | 1 +
src/include/catalog/pg_subscription_rel.h | 4 +-
src/include/nodes/parsenodes.h | 3 +-
src/test/regress/expected/rules.out | 11 +-
src/test/regress/expected/subscription.out | 4 +-
17 files changed, 444 insertions(+), 95 deletions(-)
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index dc3f9ed3fbf..ec46b126304 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1062,6 +1062,42 @@ GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
return result;
}
+/*
+ * Gets list of all relations published by FOR ALL SEQUENCES publication(s).
+ */
+List *
+GetAllSequencesPublicationRelations(void)
+{
+ Relation classRel;
+ ScanKeyData key[1];
+ TableScanDesc scan;
+ HeapTuple tuple;
+ List *result = NIL;
+
+ classRel = table_open(RelationRelationId, AccessShareLock);
+
+ ScanKeyInit(&key[0],
+ Anum_pg_class_relkind,
+ BTEqualStrategyNumber, F_CHAREQ,
+ CharGetDatum(RELKIND_SEQUENCE));
+
+ scan = table_beginscan_catalog(classRel, 1, key);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+ Oid relid = relForm->oid;
+
+ if (is_publishable_class(relid, relForm))
+ result = lappend_oid(result, relid);
+ }
+
+ table_endscan(scan);
+
+ table_close(classRel, AccessShareLock);
+ return result;
+}
+
/*
* Get publication using oid
*
@@ -1334,3 +1370,49 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+
+/*
+ * Returns Oids of sequences in a publication.
+ */
+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ List *sequences = NIL;
+
+ /* stuff done only on the first call of the function */
+ if (SRF_IS_FIRSTCALL())
+ {
+ char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ Publication *publication;
+ MemoryContext oldcontext;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ publication = GetPublicationByName(pubname, false);
+
+ if (publication->allsequences)
+ sequences = GetAllSequencesPublicationRelations();
+
+ funcctx->user_fctx = (void *) sequences;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* stuff done on every call of the function */
+ funcctx = SRF_PERCALL_SETUP();
+ sequences = (List *) funcctx->user_fctx;
+
+ if (funcctx->call_cntr < list_length(sequences))
+ {
+ Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
+
+ SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 1c71161e723..37bf385bb60 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -27,6 +27,7 @@
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
+#include "utils/memutils.h"
#include "utils/lsyscache.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
@@ -462,7 +463,9 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
* leave tablesync slots or origins in the system when the
* corresponding table is dropped.
*/
- if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
+ if (!OidIsValid(subid) &&
+ get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE &&
+ subrel->srsubstate != SUBREL_STATE_READY)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -499,7 +502,8 @@ HasSubscriptionTables(Oid subid)
Relation rel;
ScanKeyData skey[1];
SysScanDesc scan;
- bool has_subrels;
+ HeapTuple tup;
+ bool has_subrels = false;
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
@@ -511,8 +515,22 @@ HasSubscriptionTables(Oid subid)
scan = systable_beginscan(rel, InvalidOid, false,
NULL, 1, skey);
- /* If even a single tuple exists then the subscription has tables. */
- has_subrels = HeapTupleIsValid(systable_getnext(scan));
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_subscription_rel subrel;
+
+ subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+ /*
+ * Skip sequence tuples. If even a single table tuple exists then the
+ * subscription has tables.
+ */
+ if (get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
+ {
+ has_subrels = true;
+ break;
+ }
+ }
/* Cleanup */
systable_endscan(scan);
@@ -524,12 +542,22 @@ HasSubscriptionTables(Oid subid)
/*
* Get the relations for the subscription.
*
- * If not_ready is true, return only the relations that are not in a ready
- * state, otherwise return all the relations of the subscription. The
- * returned list is palloc'ed in the current memory context.
+ * get_tables: get relations for tables of the subscription.
+ *
+ * get_sequences: get relations for sequences of the subscription.
+ *
+ * not_ready:
+ * If getting tables, if not_ready is false get all tables, otherwise
+ * only get tables that have not reached READY state.
+ * If getting sequences, if not_ready is false get all sequences,
+ * otherwise only get sequences that have not reached READY state (i.e. are
+ * still in INIT state).
+ *
+ * The returned list is palloc'ed in the current memory context.
*/
List *
-GetSubscriptionRelations(Oid subid, bool not_ready)
+GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences,
+ bool not_ready)
{
List *res = NIL;
Relation rel;
@@ -538,6 +566,9 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
ScanKeyData skey[2];
SysScanDesc scan;
+ /* One or both of 'get_tables' and 'get_sequences' must be true. */
+ Assert(get_tables || get_sequences);
+
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
ScanKeyInit(&skey[nkeys++],
@@ -560,9 +591,23 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
SubscriptionRelState *relstate;
Datum d;
bool isnull;
+ bool issequence;
+ bool istable;
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+ /* Relation is either a sequence or a table */
+ issequence = get_rel_relkind(subrel->srrelid) == RELKIND_SEQUENCE;
+ istable = !issequence;
+
+ /* Skip sequences if they were not requested */
+ if (!get_sequences && issequence)
+ continue;
+
+ /* Skip tables if they were not requested */
+ if (!get_tables && istable)
+ continue;
+
relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
relstate->relid = subrel->srrelid;
relstate->state = subrel->srsubstate;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 08f780a2e63..9853fd50b35 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -394,6 +394,16 @@ CREATE VIEW pg_publication_tables AS
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE C.oid = GPT.relid;
+CREATE VIEW pg_publication_sequences AS
+ SELECT
+ P.pubname AS pubname,
+ N.nspname AS schemaname,
+ C.relname AS sequencename
+ FROM pg_publication P,
+ LATERAL pg_get_publication_sequences(P.pubname) GPS,
+ pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+ WHERE C.oid = GPS.relid;
+
CREATE VIEW pg_locks AS
SELECT * FROM pg_lock_status() AS L;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 4aec73bcc6b..32c77ad372c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -26,6 +26,7 @@
#include "catalog/objectaddress.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_database_d.h"
+#include "catalog/pg_sequence.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
@@ -103,6 +104,7 @@ typedef struct SubOpts
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
static void check_publications_origin(WalReceiverConn *wrconn,
List *publications, bool copydata,
char *origin, Oid *subrel_local_oids,
@@ -692,6 +694,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
+ /*
+ * XXX: If the subscription is for a sequence-only publication, creating
+ * a replication origin is unnecessary because incremental synchronization
+ * of sequences is not supported, and sequence data is fully synced during
+ * a REFRESH, which does not rely on the origin. If the publication is
+ * later modified to include tables, the origin can be created during the
+ * ALTER SUBSCRIPTION ... REFRESH command.
+ */
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
replorigin_create(originname);
@@ -703,9 +713,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
{
char *err;
WalReceiverConn *wrconn;
- List *tables;
- ListCell *lc;
- char table_state;
bool must_use_password;
/* Try to connect to the publisher. */
@@ -720,6 +727,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
PG_TRY();
{
+ bool has_tables;
+ List *relations;
+ char table_state;
+
check_publications(wrconn, publications);
check_publications_origin(wrconn, publications, opts.copy_data,
opts.origin, NULL, 0, stmt->subname);
@@ -731,13 +742,16 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
/*
- * Get the table list from publisher and build local table status
- * info.
+ * Build local relation status info. Relations are for both tables
+ * and sequences from the publisher.
*/
- tables = fetch_table_list(wrconn, publications);
- foreach(lc, tables)
+ relations = fetch_table_list(wrconn, publications);
+ has_tables = relations != NIL;
+ relations = list_concat(relations,
+ fetch_sequence_list(wrconn, publications));
+
+ foreach_ptr(RangeVar, rv, relations)
{
- RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
relid = RangeVarGetRelid(rv, AccessShareLock, false);
@@ -754,6 +768,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* If requested, create permanent slot for the subscription. We
* won't use the initial snapshot for anything, so no need to
* export it.
+ *
+ * XXX: If the subscription is for a sequence-only publication,
+ * creating this slot is unnecessary. It can be created later
+ * during the ALTER SUBSCRIPTION ... REFRESH PUBLICATION or ALTER
+ * SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES command, if the
+ * publication is updated to include tables.
*/
if (opts.create_slot)
{
@@ -777,7 +797,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
* PUBLICATION to work.
*/
- if (opts.twophase && !opts.copy_data && tables != NIL)
+ if (opts.twophase && !opts.copy_data && has_tables)
twophase_enabled = true;
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
@@ -816,12 +836,50 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
return myself;
}
+/*
+ * Update the subscription to refresh both the publication and the publication
+ * objects associated with the subscription.
+ *
+ * Parameters:
+ *
+ * If 'copy_data' is true, the function will set the state to INIT; otherwise,
+ * it will set the state to READY.
+ *
+ * If 'validate_publications' is provided with a publication list, the
+ * function checks that the specified publications exist on the publisher.
+ *
+ * If 'refresh_tables' is true, update the subscription by adding or removing
+ * tables that have been added or removed since the last subscription creation
+ * or refresh publication.
+ *
+ * If 'refresh_sequences' is true, update the subscription by adding or removing
+ * sequences that have been added or removed since the last subscription
+ * creation or refresh publication.
+ *
+ * Note, this is a common function for handling different REFRESH commands
+ * according to the parameter 'resync_all_sequences'
+ *
+ * 1. ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES
+ * (when parameter resync_all_sequences is true)
+ *
+ * The function will mark all sequences with INIT state.
+ * Assert copy_data is true.
+ * Assert refresh_tables is false.
+ * Assert refresh_sequences is true.
+ *
+ * 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION [WITH (copy_data=true|false)]
+ * (when parameter resync_all_sequences is false)
+ *
+ * The function will update only the newly added tables and/or sequences
+ * based on the copy_data parameter.
+ */
static void
AlterSubscription_refresh(Subscription *sub, bool copy_data,
- List *validate_publications)
+ List *validate_publications, bool refresh_tables,
+ bool refresh_sequences, bool resync_all_sequences)
{
char *err;
- List *pubrel_names;
+ List *pubrel_names = NIL;
List *subrel_states;
Oid *subrel_local_oids;
Oid *pubrel_local_oids;
@@ -839,6 +897,12 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
WalReceiverConn *wrconn;
bool must_use_password;
+#ifdef USE_ASSERT_CHECKING
+ /* Sanity checks for parameter values */
+ if (resync_all_sequences)
+ Assert(copy_data && !refresh_tables && refresh_sequences);
+#endif
+
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
@@ -858,10 +922,17 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
check_publications(wrconn, validate_publications);
/* Get the table list from publisher. */
- pubrel_names = fetch_table_list(wrconn, sub->publications);
+ if (refresh_tables)
+ pubrel_names = fetch_table_list(wrconn, sub->publications);
+
+ /* Get the sequence list from publisher. */
+ if (refresh_sequences)
+ pubrel_names = list_concat(pubrel_names,
+ fetch_sequence_list(wrconn,
+ sub->publications));
/* Get local table list. */
- subrel_states = GetSubscriptionRelations(sub->oid, false);
+ subrel_states = GetSubscriptionRelations(sub->oid, refresh_tables, refresh_sequences, false);
subrel_count = list_length(subrel_states);
/*
@@ -880,9 +951,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
qsort(subrel_local_oids, subrel_count,
sizeof(Oid), oid_cmp);
- check_publications_origin(wrconn, sub->publications, copy_data,
- sub->origin, subrel_local_oids,
- subrel_count, sub->name);
+ if (refresh_tables)
+ check_publications_origin(wrconn, sub->publications, copy_data,
+ sub->origin, subrel_local_oids,
+ subrel_count, sub->name);
/*
* Rels that we want to remove from subscription and drop any slots
@@ -904,12 +976,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
{
RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
+ char relkind;
relid = RangeVarGetRelid(rv, AccessShareLock, false);
/* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
- rv->schemaname, rv->relname);
+ relkind = get_rel_relkind(relid);
+ CheckSubscriptionRelkind(relkind, rv->schemaname, rv->relname);
pubrel_local_oids[off++] = relid;
@@ -920,8 +993,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
InvalidXLogRecPtr, true);
ereport(DEBUG1,
- (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
- rv->schemaname, rv->relname, sub->name)));
+ errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
+ relkind == RELKIND_SEQUENCE ? "sequence" : "table",
+ rv->schemaname, rv->relname, sub->name));
}
}
@@ -937,11 +1011,31 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
{
Oid relid = subrel_local_oids[off];
- if (!bsearch(&relid, pubrel_local_oids,
- list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ if (bsearch(&relid, pubrel_local_oids,
+ list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ {
+ /*
+ * The resync_all_sequences flag will only be set to true for
+ * the REFRESH PUBLICATION SEQUENCES command, indicating that
+ * the existing sequences need to be re-synchronized by
+ * resetting the relation to its initial state.
+ */
+ if (resync_all_sequences)
+ {
+ UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name));
+ }
+ }
+ else
{
char state;
XLogRecPtr statelsn;
+ char relkind = get_rel_relkind(relid);
/*
* Lock pg_subscription_rel with AccessExclusiveLock to
@@ -963,41 +1057,51 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
/* Last known rel state. */
state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
- sub_remove_rels[remove_rel_len].relid = relid;
- sub_remove_rels[remove_rel_len++].state = state;
-
RemoveSubscriptionRel(sub->oid, relid);
- logicalrep_worker_stop(sub->oid, relid);
-
/*
- * For READY state, we would have already dropped the
- * tablesync origin.
+ * A single sequencesync worker synchronizes all sequences, so
+ * only stop workers when relation kind is not sequence.
*/
- if (state != SUBREL_STATE_READY)
+ if (relkind != RELKIND_SEQUENCE)
{
- char originname[NAMEDATALEN];
+ sub_remove_rels[remove_rel_len].relid = relid;
+ sub_remove_rels[remove_rel_len++].state = state;
+
+ logicalrep_worker_stop(sub->oid, relid);
/*
- * Drop the tablesync's origin tracking if exists.
- *
- * It is possible that the origin is not yet created for
- * tablesync worker, this can happen for the states before
- * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
- * apply worker can also concurrently try to drop the
- * origin and by this time the origin might be already
- * removed. For these reasons, passing missing_ok = true.
+ * For READY state, we would have already dropped the
+ * tablesync origin.
*/
- ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
- sizeof(originname));
- replorigin_drop_by_name(originname, true, false);
+ if (state != SUBREL_STATE_READY)
+ {
+ char originname[NAMEDATALEN];
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created
+ * for tablesync worker, this can happen for the
+ * states before SUBREL_STATE_FINISHEDCOPY. The
+ * tablesync worker or apply worker can also
+ * concurrently try to drop the origin and by this
+ * time the origin might be already removed. For these
+ * reasons, passing missing_ok = true.
+ */
+ ReplicationOriginNameForLogicalRep(sub->oid, relid,
+ originname,
+ sizeof(originname));
+ replorigin_drop_by_name(originname, true, false);
+ }
}
ereport(DEBUG1,
- (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
- get_namespace_name(get_rel_namespace(relid)),
- get_rel_name(relid),
- sub->name)));
+ errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"",
+ relkind == RELKIND_SEQUENCE ? "sequence" : "table",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name));
}
}
@@ -1393,8 +1497,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
/*
- * See ALTER_SUBSCRIPTION_REFRESH for details why this is
- * not allowed.
+ * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
+ * why this is not allowed.
*/
if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
ereport(ERROR,
@@ -1408,7 +1512,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
sub->publications = stmt->publication;
AlterSubscription_refresh(sub, opts.copy_data,
- stmt->publication);
+ stmt->publication, true, true,
+ false);
}
break;
@@ -1448,8 +1553,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
"ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
/*
- * See ALTER_SUBSCRIPTION_REFRESH for details why this is
- * not allowed.
+ * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
+ * why this is not allowed.
*/
if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
ereport(ERROR,
@@ -1467,18 +1572,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
sub->publications = publist;
AlterSubscription_refresh(sub, opts.copy_data,
- validate_publications);
+ validate_publications, true, true,
+ false);
}
break;
}
- case ALTER_SUBSCRIPTION_REFRESH:
+ case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
{
if (!sub->enabled)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
+ errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION is not allowed for disabled subscriptions")));
parse_subscription_options(pstate, stmt->options,
SUBOPT_COPY_DATA, &opts);
@@ -1490,8 +1596,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
*
* But, having reached this two-phase commit "enabled" state
* we must not allow any subsequent table initialization to
- * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
- * when the user had requested two_phase = on mode.
+ * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
+ * disallowed when the user had requested two_phase = on mode.
*
* The exception to this restriction is when copy_data =
* false, because when copy_data is false the tablesync will
@@ -1503,12 +1609,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
- errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
+ errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
+ errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
+
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
- PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
+ AlterSubscription_refresh(sub, opts.copy_data, NULL, true, true, false);
- AlterSubscription_refresh(sub, opts.copy_data, NULL);
+ break;
+ }
+
+ case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES:
+ {
+ if (!sub->enabled)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled subscriptions"));
+
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES");
+
+ AlterSubscription_refresh(sub, true, NULL, false, true, true);
break;
}
@@ -1773,7 +1893,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* the apply and tablesync workers and they can't restart because of
* exclusive lock on the subscription.
*/
- rstates = GetSubscriptionRelations(subid, true);
+ rstates = GetSubscriptionRelations(subid, true, false, true);
foreach(lc, rstates)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
@@ -2087,8 +2207,8 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
* its partition ancestors (if it's a partition), or its partition children (if
* it's a partitioned table), from some other publishers. This check is
* required only if "copy_data = true" and "origin = none" for CREATE
- * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements to notify the
- * user that data having origin might have been copied.
+ * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION statements to
+ * notify the user that data having origin might have been copied.
*
* This check need not be performed on the tables that are already added
* because incremental sync for those tables will happen through WAL and the
@@ -2127,18 +2247,23 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
appendStringInfoString(&cmd, ")\n");
/*
- * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
- * the list of relation oids that are already present on the subscriber.
- * This check should be skipped for these tables.
+ * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
+ * subrel_local_oids contains the list of relation oids that are already
+ * present on the subscriber. This check should be skipped for these
+ * tables.
*/
for (i = 0; i < subrel_count; i++)
{
Oid relid = subrel_local_oids[i];
- char *schemaname = get_namespace_name(get_rel_namespace(relid));
- char *tablename = get_rel_name(relid);
- appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
- schemaname, tablename);
+ if (get_rel_relkind(relid) != RELKIND_SEQUENCE)
+ {
+ char *schemaname = get_namespace_name(get_rel_namespace(relid));
+ char *tablename = get_rel_name(relid);
+
+ appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+ schemaname, tablename);
+ }
}
res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
@@ -2307,6 +2432,63 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
return tablelist;
}
+/*
+ * Get the list of sequences which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[2] = {TEXTOID, TEXTOID};
+ List *seqlist = NIL;
+
+ Assert(list_length(publications) > 0);
+
+ initStringInfo(&cmd);
+
+ appendStringInfoString(&cmd,
+ "SELECT DISTINCT s.schemaname, s.sequencename\n"
+ "FROM pg_catalog.pg_publication_sequences s\n"
+ "WHERE s.pubname IN (");
+ GetPublicationsStr(publications, &cmd, true);
+ appendStringInfoChar(&cmd, ')');
+
+ res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ errmsg("could not receive list of sequences from the publisher: %s",
+ res->err));
+
+ /* Process sequences. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *nspname;
+ char *relname;
+ bool isnull;
+ RangeVar *rv;
+
+ nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+ relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ rv = makeRangeVar(nspname, relname, -1);
+ seqlist = lappend(seqlist, rv);
+ ExecClearTuple(slot);
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+ walrcv_clear_result(res);
+
+ return seqlist;
+}
+
/*
* This is to report the connection failure while dropping replication slots.
* Here, we report the WARNING for all tablesync slots so that user can drop
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42d..3dfa086faa8 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -877,7 +877,9 @@ void
CheckSubscriptionRelkind(char relkind, const char *nspname,
const char *relname)
{
- if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+ if (relkind != RELKIND_RELATION &&
+ relkind != RELKIND_PARTITIONED_TABLE &&
+ relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 958ff4c226a..fbd0188cd78 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10894,11 +10894,20 @@ AlterSubscriptionStmt:
AlterSubscriptionStmt *n =
makeNode(AlterSubscriptionStmt);
- n->kind = ALTER_SUBSCRIPTION_REFRESH;
+ n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION;
n->subname = $3;
n->options = $6;
$$ = (Node *) n;
}
+ | ALTER SUBSCRIPTION name REFRESH PUBLICATION SEQUENCES
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+
+ n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES;
+ n->subname = $3;
+ $$ = (Node *) n;
+ }
| ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition
{
AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
index e8bbce141b7..db15051f47b 100644
--- a/src/backend/replication/logical/syncutils.c
+++ b/src/backend/replication/logical/syncutils.c
@@ -152,8 +152,9 @@ SyncFetchRelationStates(bool *started_tx)
*started_tx = true;
}
- /* Fetch tables that are in non-ready state. */
- rstates = GetSubscriptionRelations(MySubscription->oid, true);
+ /* Fetch tables and sequences that are in non-ready state. */
+ rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
+ true);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index aa1589e3331..6dc46a78af2 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -243,8 +243,8 @@ getSchemaData(Archive *fout, int *numTablesPtr)
pg_log_info("reading subscriptions");
getSubscriptions(fout);
- pg_log_info("reading subscription membership of tables");
- getSubscriptionTables(fout);
+ pg_log_info("reading subscription membership of relations");
+ getSubscriptionRelations(fout);
free(inhinfo); /* not needed any longer */
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 8453a3670f3..f038f6ff3e9 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5137,12 +5137,12 @@ getSubscriptions(Archive *fout)
}
/*
- * getSubscriptionTables
- * Get information about subscription membership for dumpable tables. This
+ * getSubscriptionRelations
+ * Get information about subscription membership for dumpable relations. This
* will be used only in binary-upgrade mode for PG17 or later versions.
*/
void
-getSubscriptionTables(Archive *fout)
+getSubscriptionRelations(Archive *fout)
{
DumpOptions *dopt = fout->dopt;
SubscriptionInfo *subinfo = NULL;
@@ -5196,7 +5196,7 @@ getSubscriptionTables(Archive *fout)
tblinfo = findTableByOid(relid);
if (tblinfo == NULL)
- pg_fatal("failed sanity check, table with OID %u not found",
+ pg_fatal("failed sanity check, relation with OID %u not found",
relid);
/* OK, make a DumpableObject for this relationship */
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 1f9bd58a4e2..e648adb8a0e 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -818,6 +818,6 @@ extern void getPublicationNamespaces(Archive *fout);
extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
int numTables);
extern void getSubscriptions(Archive *fout);
-extern void getSubscriptionTables(Archive *fout);
+extern void getSubscriptionRelations(Archive *fout);
#endif /* PG_DUMP_H */
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 3dc84074e63..1206c515a0a 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2288,7 +2288,7 @@ match_previous_words(int pattern_id,
"ADD PUBLICATION", "DROP PUBLICATION");
/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION"))
- COMPLETE_WITH("WITH (");
+ COMPLETE_WITH("SEQUENCES", "WITH (");
/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION WITH ( */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION", "WITH", "("))
COMPLETE_WITH("copy_data");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index eacb553075e..fa824499fa2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12258,6 +12258,11 @@
proargmodes => '{v,o,o,o,o}',
proargnames => '{pubname,pubid,relid,attrs,qual}',
prosrc => 'pg_get_publication_tables' },
+{ oid => '8052', descr => 'get OIDs of sequences in a publication',
+ proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't',
+ provolatile => 's', prorettype => 'oid', proargtypes => 'text',
+ proallargtypes => '{text,oid}', proargmodes => '{i,o}',
+ proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' },
{ oid => '6121',
descr => 'returns whether a relation can be part of a publication',
proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 843fe784d64..283c0b11195 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -171,6 +171,7 @@ typedef enum PublicationPartOpt
extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
extern List *GetAllTablesPublications(void);
extern List *GetAllTablesPublicationRelations(bool pubviaroot);
+extern List *GetAllSequencesPublicationRelations(void);
extern List *GetPublicationSchemas(Oid pubid);
extern List *GetSchemaPublications(Oid schemaid);
extern List *GetSchemaPublicationRelations(Oid schemaid,
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index ea869588d84..a541f4843bd 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -90,6 +90,8 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
extern bool HasSubscriptionTables(Oid subid);
-extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
+extern List *GetSubscriptionRelations(Oid subid, bool get_tables,
+ bool get_sequences,
+ bool not_ready);
#endif /* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 03ec92d2098..4dee92f4089 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4326,7 +4326,8 @@ typedef enum AlterSubscriptionType
ALTER_SUBSCRIPTION_SET_PUBLICATION,
ALTER_SUBSCRIPTION_ADD_PUBLICATION,
ALTER_SUBSCRIPTION_DROP_PUBLICATION,
- ALTER_SUBSCRIPTION_REFRESH,
+ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION,
+ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES,
ALTER_SUBSCRIPTION_ENABLED,
ALTER_SUBSCRIPTION_SKIP,
} AlterSubscriptionType;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6cf828ca8d0..9623240915c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1458,6 +1458,14 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_sequences| SELECT p.pubname,
+ n.nspname AS schemaname,
+ c.relname AS sequencename
+ FROM pg_publication p,
+ LATERAL pg_get_publication_sequences((p.pubname)::text) gps(relid),
+ (pg_class c
+ JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+ WHERE (c.oid = gps.relid);
pg_publication_tables| SELECT p.pubname,
n.nspname AS schemaname,
c.relname AS tablename,
@@ -2171,6 +2179,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
pg_stat_subscription_stats| SELECT ss.subid,
s.subname,
ss.apply_error_count,
+ ss.sequence_sync_error_count,
ss.sync_error_count,
ss.confl_insert_exists,
ss.confl_update_origin_differs,
@@ -2181,7 +2190,7 @@ pg_stat_subscription_stats| SELECT ss.subid,
ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sequence_sync_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 1443e1d9292..66dcd71eefa 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -107,7 +107,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
ALTER SUBSCRIPTION regress_testsub3 ENABLE;
ERROR: cannot enable subscription that does not have a slot name
ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
-ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions
+ERROR: ALTER SUBSCRIPTION ... REFRESH PUBLICATION is not allowed for disabled subscriptions
-- fail - origin must be either none or any
CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo);
ERROR: unrecognized origin value: "foo"
@@ -352,7 +352,7 @@ ERROR: ALTER SUBSCRIPTION with refresh cannot run inside a transaction block
END;
BEGIN;
ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION;
-ERROR: ALTER SUBSCRIPTION ... REFRESH cannot run inside a transaction block
+ERROR: ALTER SUBSCRIPTION ... REFRESH PUBLICATION cannot run inside a transaction block
END;
CREATE FUNCTION func() RETURNS VOID AS
$$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL;
--
2.34.1