v20250630-0004-Introduce-REFRESH-PUBLICATION-SEQUENCES-fo.patch

application/octet-stream

Filename: v20250630-0004-Introduce-REFRESH-PUBLICATION-SEQUENCES-fo.patch
Type: application/octet-stream
Part: 3
Message: Re: Logical Replication of sequences
From b33b3562ce0945f2c794cf745f95ac854543faf0 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Mon, 9 Jun 2025 20:18:54 +0530
Subject: [PATCH v20250630 4/6] Introduce "REFRESH PUBLICATION SEQUENCES" for
 subscriptions

This patch introduce a new command to synchronize the sequences of
a subscription:
  ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES
---
 src/backend/catalog/pg_publication.c        |  82 +++++
 src/backend/catalog/pg_subscription.c       |  61 +++-
 src/backend/catalog/system_views.sql        |  10 +
 src/backend/commands/subscriptioncmds.c     | 348 +++++++++++++++-----
 src/backend/executor/execReplication.c      |   4 +-
 src/backend/parser/gram.y                   |  11 +-
 src/backend/replication/logical/syncutils.c |   5 +-
 src/bin/pg_dump/common.c                    |   4 +-
 src/bin/pg_dump/pg_dump.c                   |   8 +-
 src/bin/pg_dump/pg_dump.h                   |   2 +-
 src/bin/psql/tab-complete.in.c              |   2 +-
 src/include/catalog/pg_proc.dat             |   5 +
 src/include/catalog/pg_publication.h        |   1 +
 src/include/catalog/pg_subscription_rel.h   |   4 +-
 src/include/nodes/parsenodes.h              |   3 +-
 src/test/regress/expected/rules.out         |  11 +-
 src/test/regress/expected/subscription.out  |   4 +-
 17 files changed, 460 insertions(+), 105 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index dc3f9ed3fbf..ec46b126304 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1062,6 +1062,42 @@ GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 	return result;
 }
 
+/*
+ * Gets list of all relations published by FOR ALL SEQUENCES publication(s).
+ */
+List *
+GetAllSequencesPublicationRelations(void)
+{
+	Relation	classRel;
+	ScanKeyData key[1];
+	TableScanDesc scan;
+	HeapTuple	tuple;
+	List	   *result = NIL;
+
+	classRel = table_open(RelationRelationId, AccessShareLock);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_class_relkind,
+				BTEqualStrategyNumber, F_CHAREQ,
+				CharGetDatum(RELKIND_SEQUENCE));
+
+	scan = table_beginscan_catalog(classRel, 1, key);
+
+	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+	{
+		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+		Oid			relid = relForm->oid;
+
+		if (is_publishable_class(relid, relForm))
+			result = lappend_oid(result, relid);
+	}
+
+	table_endscan(scan);
+
+	table_close(classRel, AccessShareLock);
+	return result;
+}
+
 /*
  * Get publication using oid
  *
@@ -1334,3 +1370,49 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
 	SRF_RETURN_DONE(funcctx);
 }
+
+/*
+ * Returns Oids of sequences in a publication.
+ */
+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
+	FuncCallContext *funcctx;
+	List	   *sequences = NIL;
+
+	/* stuff done only on the first call of the function */
+	if (SRF_IS_FIRSTCALL())
+	{
+		char	   *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+		Publication *publication;
+		MemoryContext oldcontext;
+
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/* switch to memory context appropriate for multiple function calls */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		publication = GetPublicationByName(pubname, false);
+
+		if (publication->allsequences)
+			sequences = GetAllSequencesPublicationRelations();
+
+		funcctx->user_fctx = (void *) sequences;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	/* stuff done on every call of the function */
+	funcctx = SRF_PERCALL_SETUP();
+	sequences = (List *) funcctx->user_fctx;
+
+	if (funcctx->call_cntr < list_length(sequences))
+	{
+		Oid			relid = list_nth_oid(sequences, funcctx->call_cntr);
+
+		SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 1c71161e723..ebd5605afe3 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -27,6 +27,7 @@
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
+#include "utils/memutils.h"
 #include "utils/lsyscache.h"
 #include "utils/pg_lsn.h"
 #include "utils/rel.h"
@@ -462,7 +463,9 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 		 * leave tablesync slots or origins in the system when the
 		 * corresponding table is dropped.
 		 */
-		if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
+		if (!OidIsValid(subid) &&
+			get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE &&
+			subrel->srsubstate != SUBREL_STATE_READY)
 		{
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -499,7 +502,8 @@ HasSubscriptionTables(Oid subid)
 	Relation	rel;
 	ScanKeyData skey[1];
 	SysScanDesc scan;
-	bool		has_subrels;
+	HeapTuple	tup;
+	bool		has_subrels = false;
 
 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
 
@@ -511,8 +515,22 @@ HasSubscriptionTables(Oid subid)
 	scan = systable_beginscan(rel, InvalidOid, false,
 							  NULL, 1, skey);
 
-	/* If even a single tuple exists then the subscription has tables. */
-	has_subrels = HeapTupleIsValid(systable_getnext(scan));
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_subscription_rel subrel;
+
+		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+		/*
+		 * Skip sequence tuples. If even a single table tuple exists then the
+		 * subscription has tables.
+		 */
+		if (get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
+		{
+			has_subrels = true;
+			break;
+		}
+	}
 
 	/* Cleanup */
 	systable_endscan(scan);
@@ -524,12 +542,22 @@ HasSubscriptionTables(Oid subid)
 /*
  * Get the relations for the subscription.
  *
- * If not_ready is true, return only the relations that are not in a ready
- * state, otherwise return all the relations of the subscription.  The
- * returned list is palloc'ed in the current memory context.
+ * get_tables: get relations for tables of the subscription.
+ *
+ * get_sequences: get relations for sequences of the subscription.
+ *
+ * not_ready:
+ * If getting tables and not_ready is false get all tables, otherwise,
+ * only get tables that have not reached READY state.
+ * If getting sequences and not_ready is false get all sequences,
+ * otherwise, only get sequences that have not reached READY state (i.e. are
+ * still in INIT state).
+ *
+ * The returned list is palloc'ed in the current memory context.
  */
 List *
-GetSubscriptionRelations(Oid subid, bool not_ready)
+GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences,
+						 bool not_ready)
 {
 	List	   *res = NIL;
 	Relation	rel;
@@ -538,6 +566,9 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
 	ScanKeyData skey[2];
 	SysScanDesc scan;
 
+	/* One or both of 'get_tables' and 'get_sequences' must be true. */
+	Assert(get_tables || get_sequences);
+
 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
 
 	ScanKeyInit(&skey[nkeys++],
@@ -560,9 +591,23 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
 		SubscriptionRelState *relstate;
 		Datum		d;
 		bool		isnull;
+		bool		issequence;
+		bool		istable;
 
 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
 
+		/* Relation is either a sequence or a table */
+		issequence = get_rel_relkind(subrel->srrelid) == RELKIND_SEQUENCE;
+		istable = !issequence;
+
+		/* Skip sequences if they were not requested */
+		if (!get_sequences && issequence)
+			continue;
+
+		/* Skip tables if they were not requested */
+		if (!get_tables && istable)
+			continue;
+
 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
 		relstate->relid = subrel->srrelid;
 		relstate->state = subrel->srsubstate;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 08f780a2e63..9853fd50b35 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -394,6 +394,16 @@ CREATE VIEW pg_publication_tables AS
          pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
     WHERE C.oid = GPT.relid;
 
+CREATE VIEW pg_publication_sequences AS
+    SELECT
+        P.pubname AS pubname,
+        N.nspname AS schemaname,
+        C.relname AS sequencename
+    FROM pg_publication P,
+         LATERAL pg_get_publication_sequences(P.pubname) GPS,
+         pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+    WHERE C.oid = GPS.relid;
+
 CREATE VIEW pg_locks AS
     SELECT * FROM pg_lock_status() AS L;
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 4ff246cd943..09ae7b80722 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -26,6 +26,7 @@
 #include "catalog/objectaddress.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_database_d.h"
+#include "catalog/pg_sequence.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
@@ -103,6 +104,7 @@ typedef struct SubOpts
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
 static void check_publications_origin(WalReceiverConn *wrconn,
 									  List *publications, bool copydata,
 									  char *origin, Oid *subrel_local_oids,
@@ -692,6 +694,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
+	/*
+	 * XXX: If the subscription is for a sequence-only publication, creating a
+	 * replication origin is unnecessary because incremental synchronization
+	 * of sequences is not supported, and sequence data is fully synced during
+	 * a REFRESH, which does not rely on the origin. If the publication is
+	 * later modified to include tables, the origin can be created during the
+	 * ALTER SUBSCRIPTION ... REFRESH command.
+	 */
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
@@ -703,9 +713,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	{
 		char	   *err;
 		WalReceiverConn *wrconn;
-		List	   *tables;
-		ListCell   *lc;
-		char		table_state;
 		bool		must_use_password;
 
 		/* Try to connect to the publisher. */
@@ -720,6 +727,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 		PG_TRY();
 		{
+			bool		has_tables;
+			List	   *relations;
+			char		table_state;
+
 			check_publications(wrconn, publications);
 			check_publications_origin(wrconn, publications, opts.copy_data,
 									  opts.origin, NULL, 0, stmt->subname);
@@ -731,13 +742,16 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 
 			/*
-			 * Get the table list from publisher and build local table status
-			 * info.
+			 * Build local relation status info. Relations are for both tables
+			 * and sequences from the publisher.
 			 */
-			tables = fetch_table_list(wrconn, publications);
-			foreach(lc, tables)
+			relations = fetch_table_list(wrconn, publications);
+			has_tables = relations != NIL;
+			relations = list_concat(relations,
+									fetch_sequence_list(wrconn, publications));
+
+			foreach_ptr(RangeVar, rv, relations)
 			{
-				RangeVar   *rv = (RangeVar *) lfirst(lc);
 				Oid			relid;
 
 				relid = RangeVarGetRelid(rv, AccessShareLock, false);
@@ -754,6 +768,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 * If requested, create permanent slot for the subscription. We
 			 * won't use the initial snapshot for anything, so no need to
 			 * export it.
+			 *
+			 * XXX: If the subscription is for a sequence-only publication,
+			 * creating this slot is unnecessary. It can be created later
+			 * during the ALTER SUBSCRIPTION ... REFRESH PUBLICATION or ALTER
+			 * SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES command, if the
+			 * publication is updated to include tables.
 			 */
 			if (opts.create_slot)
 			{
@@ -777,7 +797,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
 				 * PUBLICATION to work.
 				 */
-				if (opts.twophase && !opts.copy_data && tables != NIL)
+				if (opts.twophase && !opts.copy_data && has_tables)
 					twophase_enabled = true;
 
 				walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
@@ -816,12 +836,50 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	return myself;
 }
 
+/*
+ * Update the subscription to refresh both the publication and the publication
+ * objects associated with the subscription.
+ *
+ * Parameters:
+ *
+ * If 'copy_data' is true, the function will set the state to INIT; otherwise,
+ * it will set the state to READY.
+ *
+ * If 'validate_publications' is provided with a publication list, the
+ * function checks that the specified publications exist on the publisher.
+ *
+ * If 'refresh_tables' is true, update the subscription by adding or removing
+ * tables that have been added or removed since the last subscription creation
+ * or refresh publication.
+ *
+ * If 'refresh_sequences' is true, update the subscription by adding or removing
+ * sequences that have been added or removed since the last subscription
+ * creation or refresh publication.
+ *
+ * Note, this is a common function for handling different REFRESH commands
+ * according to the parameter 'resync_all_sequences'
+ *
+ * 1. ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES
+ *    (when parameter resync_all_sequences is true)
+ *
+ *    The function will mark all sequences with INIT state.
+ *    Assert copy_data is true.
+ *    Assert refresh_tables is false.
+ *    Assert refresh_sequences is true.
+ *
+ * 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION [WITH (copy_data=true|false)]
+ *    (when parameter resync_all_sequences is false)
+ *
+ *    The function will update only the newly added tables and/or sequences
+ *    based on the copy_data parameter.
+ */
 static void
 AlterSubscription_refresh(Subscription *sub, bool copy_data,
-						  List *validate_publications)
+						  List *validate_publications, bool refresh_tables,
+						  bool refresh_sequences, bool resync_all_sequences)
 {
 	char	   *err;
-	List	   *pubrel_names;
+	List	   *pubrel_names = NIL;
 	List	   *subrel_states;
 	Oid		   *subrel_local_oids;
 	Oid		   *pubrel_local_oids;
@@ -839,6 +897,12 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	WalReceiverConn *wrconn;
 	bool		must_use_password;
 
+#ifdef USE_ASSERT_CHECKING
+	/* Sanity checks for parameter values */
+	if (resync_all_sequences)
+		Assert(copy_data && !refresh_tables && refresh_sequences);
+#endif
+
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
@@ -858,16 +922,23 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			check_publications(wrconn, validate_publications);
 
 		/* Get the table list from publisher. */
-		pubrel_names = fetch_table_list(wrconn, sub->publications);
+		if (refresh_tables)
+			pubrel_names = fetch_table_list(wrconn, sub->publications);
+
+		/* Get the sequence list from publisher. */
+		if (refresh_sequences)
+			pubrel_names = list_concat(pubrel_names,
+									   fetch_sequence_list(wrconn,
+														   sub->publications));
 
-		/* Get local table list. */
-		subrel_states = GetSubscriptionRelations(sub->oid, false);
+		/* Get local relation list. */
+		subrel_states = GetSubscriptionRelations(sub->oid, refresh_tables, refresh_sequences, false);
 		subrel_count = list_length(subrel_states);
 
 		/*
-		 * Build qsorted array of local table oids for faster lookup. This can
-		 * potentially contain all tables in the database so speed of lookup
-		 * is important.
+		 * Build qsorted array of local relation oids for faster lookup. This
+		 * can potentially contain all relation in the database so speed of
+		 * lookup is important.
 		 */
 		subrel_local_oids = palloc(subrel_count * sizeof(Oid));
 		off = 0;
@@ -880,9 +951,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		qsort(subrel_local_oids, subrel_count,
 			  sizeof(Oid), oid_cmp);
 
-		check_publications_origin(wrconn, sub->publications, copy_data,
-								  sub->origin, subrel_local_oids,
-								  subrel_count, sub->name);
+		if (refresh_tables)
+			check_publications_origin(wrconn, sub->publications, copy_data,
+									  sub->origin, subrel_local_oids,
+									  subrel_count, sub->name);
 
 		/*
 		 * Rels that we want to remove from subscription and drop any slots
@@ -891,11 +963,12 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
 
 		/*
-		 * Walk over the remote tables and try to match them to locally known
-		 * tables. If the table is not known locally create a new state for
-		 * it.
+		 * Walk over the remote relations and try to match them to locally
+		 * known tables. If the table is not known locally create a new state
+		 * for it.
 		 *
-		 * Also builds array of local oids of remote tables for the next step.
+		 * Also builds array of local oids of remote relations for the next
+		 * step.
 		 */
 		off = 0;
 		pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
@@ -904,12 +977,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		{
 			RangeVar   *rv = (RangeVar *) lfirst(lc);
 			Oid			relid;
+			char		relkind;
 
 			relid = RangeVarGetRelid(rv, AccessShareLock, false);
 
 			/* Check for supported relkind. */
-			CheckSubscriptionRelkind(get_rel_relkind(relid),
-									 rv->schemaname, rv->relname);
+			relkind = get_rel_relkind(relid);
+			CheckSubscriptionRelkind(relkind, rv->schemaname, rv->relname);
 
 			pubrel_local_oids[off++] = relid;
 
@@ -920,14 +994,15 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
 										InvalidXLogRecPtr, true);
 				ereport(DEBUG1,
-						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
-										 rv->schemaname, rv->relname, sub->name)));
+						errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
+										relkind == RELKIND_SEQUENCE ? "sequence" : "table",
+										rv->schemaname, rv->relname, sub->name));
 			}
 		}
 
 		/*
-		 * Next remove state for tables we should not care about anymore using
-		 * the data we collected above
+		 * Next remove state for relations we should not care about anymore
+		 * using the data we collected above
 		 */
 		qsort(pubrel_local_oids, list_length(pubrel_names),
 			  sizeof(Oid), oid_cmp);
@@ -937,11 +1012,31 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		{
 			Oid			relid = subrel_local_oids[off];
 
-			if (!bsearch(&relid, pubrel_local_oids,
-						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+			if (bsearch(&relid, pubrel_local_oids,
+						list_length(pubrel_names), sizeof(Oid), oid_cmp))
+			{
+				/*
+				 * The resync_all_sequences flag will only be set to true for
+				 * the REFRESH PUBLICATION SEQUENCES command, indicating that
+				 * the existing sequences need to be re-synchronized by
+				 * resetting the relation to its initial state.
+				 */
+				if (resync_all_sequences)
+				{
+					UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
+											   InvalidXLogRecPtr);
+					ereport(DEBUG1,
+							errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
+											get_namespace_name(get_rel_namespace(relid)),
+											get_rel_name(relid),
+											sub->name));
+				}
+			}
+			else
 			{
 				char		state;
 				XLogRecPtr	statelsn;
+				char		relkind = get_rel_relkind(relid);
 
 				/*
 				 * Lock pg_subscription_rel with AccessExclusiveLock to
@@ -963,41 +1058,51 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 				/* Last known rel state. */
 				state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
 
-				sub_remove_rels[remove_rel_len].relid = relid;
-				sub_remove_rels[remove_rel_len++].state = state;
-
 				RemoveSubscriptionRel(sub->oid, relid);
 
-				logicalrep_worker_stop(sub->oid, relid);
-
 				/*
-				 * For READY state, we would have already dropped the
-				 * tablesync origin.
+				 * A single sequencesync worker synchronizes all sequences, so
+				 * only stop workers when relation kind is not sequence.
 				 */
-				if (state != SUBREL_STATE_READY)
+				if (relkind != RELKIND_SEQUENCE)
 				{
-					char		originname[NAMEDATALEN];
+					sub_remove_rels[remove_rel_len].relid = relid;
+					sub_remove_rels[remove_rel_len++].state = state;
+
+					logicalrep_worker_stop(sub->oid, relid);
 
 					/*
-					 * Drop the tablesync's origin tracking if exists.
-					 *
-					 * It is possible that the origin is not yet created for
-					 * tablesync worker, this can happen for the states before
-					 * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
-					 * apply worker can also concurrently try to drop the
-					 * origin and by this time the origin might be already
-					 * removed. For these reasons, passing missing_ok = true.
+					 * For READY state, we would have already dropped the
+					 * tablesync origin.
 					 */
-					ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
-													   sizeof(originname));
-					replorigin_drop_by_name(originname, true, false);
+					if (state != SUBREL_STATE_READY)
+					{
+						char		originname[NAMEDATALEN];
+
+						/*
+						 * Drop the tablesync's origin tracking if exists.
+						 *
+						 * It is possible that the origin is not yet created
+						 * for tablesync worker, this can happen for the
+						 * states before SUBREL_STATE_FINISHEDCOPY. The
+						 * tablesync worker or apply worker can also
+						 * concurrently try to drop the origin and by this
+						 * time the origin might be already removed. For these
+						 * reasons, passing missing_ok = true.
+						 */
+						ReplicationOriginNameForLogicalRep(sub->oid, relid,
+														   originname,
+														   sizeof(originname));
+						replorigin_drop_by_name(originname, true, false);
+					}
 				}
 
 				ereport(DEBUG1,
-						(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
-										 get_namespace_name(get_rel_namespace(relid)),
-										 get_rel_name(relid),
-										 sub->name)));
+						errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"",
+										relkind == RELKIND_SEQUENCE ? "sequence" : "table",
+										get_namespace_name(get_rel_namespace(relid)),
+										get_rel_name(relid),
+										sub->name));
 			}
 		}
 
@@ -1393,8 +1498,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
 
 					/*
-					 * See ALTER_SUBSCRIPTION_REFRESH for details why this is
-					 * not allowed.
+					 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
+					 * why this is not allowed.
 					 */
 					if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
 						ereport(ERROR,
@@ -1408,7 +1513,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					sub->publications = stmt->publication;
 
 					AlterSubscription_refresh(sub, opts.copy_data,
-											  stmt->publication);
+											  stmt->publication, true, true,
+											  false);
 				}
 
 				break;
@@ -1448,8 +1554,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 										 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
 
 					/*
-					 * See ALTER_SUBSCRIPTION_REFRESH for details why this is
-					 * not allowed.
+					 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
+					 * why this is not allowed.
 					 */
 					if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
 						ereport(ERROR,
@@ -1467,18 +1573,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					sub->publications = publist;
 
 					AlterSubscription_refresh(sub, opts.copy_data,
-											  validate_publications);
+											  validate_publications, true, true,
+											  false);
 				}
 
 				break;
 			}
 
-		case ALTER_SUBSCRIPTION_REFRESH:
+		case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
 			{
 				if (!sub->enabled)
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
+							 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION is not allowed for disabled subscriptions")));
 
 				parse_subscription_options(pstate, stmt->options,
 										   SUBOPT_COPY_DATA, &opts);
@@ -1490,8 +1597,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				 *
 				 * But, having reached this two-phase commit "enabled" state
 				 * we must not allow any subsequent table initialization to
-				 * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
-				 * when the user had requested two_phase = on mode.
+				 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
+				 * disallowed when the user had requested two_phase = on mode.
 				 *
 				 * The exception to this restriction is when copy_data =
 				 * false, because when copy_data is false the tablesync will
@@ -1503,12 +1610,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
-							 errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
-							 errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
+							 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
+							 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
+
+				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
+
+				AlterSubscription_refresh(sub, opts.copy_data, NULL, true, true, false);
 
-				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
+				break;
+			}
+
+		case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES:
+			{
+				if (!sub->enabled)
+					ereport(ERROR,
+							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled subscriptions"));
 
-				AlterSubscription_refresh(sub, opts.copy_data, NULL);
+				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES");
+
+				AlterSubscription_refresh(sub, true, NULL, false, true, true);
 
 				break;
 			}
@@ -1773,7 +1894,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * the apply and tablesync workers and they can't restart because of
 	 * exclusive lock on the subscription.
 	 */
-	rstates = GetSubscriptionRelations(subid, true);
+	rstates = GetSubscriptionRelations(subid, true, false, true);
 	foreach(lc, rstates)
 	{
 		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
@@ -2087,8 +2208,8 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
  * its partition ancestors (if it's a partition), or its partition children (if
  * it's a partitioned table), from some other publishers. This check is
  * required only if "copy_data = true" and "origin = none" for CREATE
- * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements to notify the
- * user that data having origin might have been copied.
+ * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION statements to
+ * notify the user that data having origin might have been copied.
  *
  * This check need not be performed on the tables that are already added
  * because incremental sync for those tables will happen through WAL and the
@@ -2127,18 +2248,23 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
 	appendStringInfoString(&cmd, ")\n");
 
 	/*
-	 * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
-	 * the list of relation oids that are already present on the subscriber.
-	 * This check should be skipped for these tables.
+	 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
+	 * subrel_local_oids contains the list of relation oids that are already
+	 * present on the subscriber. This check should be skipped for these
+	 * tables.
 	 */
 	for (i = 0; i < subrel_count; i++)
 	{
 		Oid			relid = subrel_local_oids[i];
-		char	   *schemaname = get_namespace_name(get_rel_namespace(relid));
-		char	   *tablename = get_rel_name(relid);
 
-		appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
-						 schemaname, tablename);
+		if (get_rel_relkind(relid) != RELKIND_SEQUENCE)
+		{
+			char	   *schemaname = get_namespace_name(get_rel_namespace(relid));
+			char	   *tablename = get_rel_name(relid);
+
+			appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+							 schemaname, tablename);
+		}
 	}
 
 	res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
@@ -2307,6 +2433,68 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	return tablelist;
 }
 
+/*
+ * Get the list of sequences which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	List	   *seqlist = NIL;
+	int			server_version = walrcv_server_version(wrconn);
+
+	/* Skip sequence fetch if the publisher is older than version 19 */
+	if (server_version < 190000)
+		return seqlist;
+
+	Assert(list_length(publications) > 0);
+
+	initStringInfo(&cmd);
+
+	appendStringInfoString(&cmd,
+						   "SELECT DISTINCT s.schemaname, s.sequencename\n"
+						   "FROM pg_catalog.pg_publication_sequences s\n"
+						   "WHERE s.pubname IN (");
+	GetPublicationsStr(publications, &cmd, true);
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				errmsg("could not receive list of sequences from the publisher: %s",
+					   res->err));
+
+	/* Process sequences. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *nspname;
+		char	   *relname;
+		bool		isnull;
+		RangeVar   *rv;
+
+		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		rv = makeRangeVar(nspname, relname, -1);
+		seqlist = lappend(seqlist, rv);
+		ExecClearTuple(slot);
+	}
+
+	ExecDropSingleTupleTableSlot(slot);
+	walrcv_clear_result(res);
+
+	return seqlist;
+}
+
 /*
  * This is to report the connection failure while dropping replication slots.
  * Here, we report the WARNING for all tablesync slots so that user can drop
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42d..3dfa086faa8 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -877,7 +877,9 @@ void
 CheckSubscriptionRelkind(char relkind, const char *nspname,
 						 const char *relname)
 {
-	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+	if (relkind != RELKIND_RELATION &&
+		relkind != RELKIND_PARTITIONED_TABLE &&
+		relkind != RELKIND_SEQUENCE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b6dbb4fe7fc..04d6408580a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10889,11 +10889,20 @@ AlterSubscriptionStmt:
 					AlterSubscriptionStmt *n =
 						makeNode(AlterSubscriptionStmt);
 
-					n->kind = ALTER_SUBSCRIPTION_REFRESH;
+					n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION;
 					n->subname = $3;
 					n->options = $6;
 					$$ = (Node *) n;
 				}
+			| ALTER SUBSCRIPTION name REFRESH PUBLICATION SEQUENCES
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+
+					n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES;
+					n->subname = $3;
+					$$ = (Node *) n;
+				}
 			| ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition
 				{
 					AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
index e8bbce141b7..db15051f47b 100644
--- a/src/backend/replication/logical/syncutils.c
+++ b/src/backend/replication/logical/syncutils.c
@@ -152,8 +152,9 @@ SyncFetchRelationStates(bool *started_tx)
 			*started_tx = true;
 		}
 
-		/* Fetch tables that are in non-ready state. */
-		rstates = GetSubscriptionRelations(MySubscription->oid, true);
+		/* Fetch tables and sequences that are in non-ready state. */
+		rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
+										   true);
 
 		/* Allocate the tracking info in a permanent memory context. */
 		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index aa1589e3331..6dc46a78af2 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -243,8 +243,8 @@ getSchemaData(Archive *fout, int *numTablesPtr)
 	pg_log_info("reading subscriptions");
 	getSubscriptions(fout);
 
-	pg_log_info("reading subscription membership of tables");
-	getSubscriptionTables(fout);
+	pg_log_info("reading subscription membership of relations");
+	getSubscriptionRelations(fout);
 
 	free(inhinfo);				/* not needed any longer */
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index eda52fc813a..5c23d3db56e 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5139,12 +5139,12 @@ getSubscriptions(Archive *fout)
 }
 
 /*
- * getSubscriptionTables
- *	  Get information about subscription membership for dumpable tables. This
+ * getSubscriptionRelations
+ *	  Get information about subscription membership for dumpable relations. This
  *    will be used only in binary-upgrade mode for PG17 or later versions.
  */
 void
-getSubscriptionTables(Archive *fout)
+getSubscriptionRelations(Archive *fout)
 {
 	DumpOptions *dopt = fout->dopt;
 	SubscriptionInfo *subinfo = NULL;
@@ -5198,7 +5198,7 @@ getSubscriptionTables(Archive *fout)
 
 		tblinfo = findTableByOid(relid);
 		if (tblinfo == NULL)
-			pg_fatal("failed sanity check, table with OID %u not found",
+			pg_fatal("failed sanity check, relation with OID %u not found",
 					 relid);
 
 		/* OK, make a DumpableObject for this relationship */
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index b1a6a08f52b..1ceb25bdcde 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -819,6 +819,6 @@ extern void getPublicationNamespaces(Archive *fout);
 extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
 								 int numTables);
 extern void getSubscriptions(Archive *fout);
-extern void getSubscriptionTables(Archive *fout);
+extern void getSubscriptionRelations(Archive *fout);
 
 #endif							/* PG_DUMP_H */
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 31030fc212c..29a18913512 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2288,7 +2288,7 @@ match_previous_words(int pattern_id,
 					  "ADD PUBLICATION", "DROP PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
 	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION"))
-		COMPLETE_WITH("WITH (");
+		COMPLETE_WITH("SEQUENCES", "WITH (");
 	/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION WITH ( */
 	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION", "WITH", "("))
 		COMPLETE_WITH("copy_data");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b0e60dfa3ce..52a8a2a9672 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12261,6 +12261,11 @@
   proargmodes => '{v,o,o,o,o}',
   proargnames => '{pubname,pubid,relid,attrs,qual}',
   prosrc => 'pg_get_publication_tables' },
+{ oid => '8052', descr => 'get OIDs of sequences in a publication',
+  proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't',
+  provolatile => 's', prorettype => 'oid', proargtypes => 'text',
+  proallargtypes => '{text,oid}', proargmodes => '{i,o}',
+  proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
   proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 843fe784d64..283c0b11195 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -171,6 +171,7 @@ typedef enum PublicationPartOpt
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
+extern List *GetAllSequencesPublicationRelations(void);
 extern List *GetPublicationSchemas(Oid pubid);
 extern List *GetSchemaPublications(Oid schemaid);
 extern List *GetSchemaPublicationRelations(Oid schemaid,
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index ea869588d84..a541f4843bd 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -90,6 +90,8 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
 extern bool HasSubscriptionTables(Oid subid);
-extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
+extern List *GetSubscriptionRelations(Oid subid, bool get_tables,
+									  bool get_sequences,
+									  bool not_ready);
 
 #endif							/* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 57d8c129796..3c64e2d17a7 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4325,7 +4325,8 @@ typedef enum AlterSubscriptionType
 	ALTER_SUBSCRIPTION_SET_PUBLICATION,
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
 	ALTER_SUBSCRIPTION_DROP_PUBLICATION,
-	ALTER_SUBSCRIPTION_REFRESH,
+	ALTER_SUBSCRIPTION_REFRESH_PUBLICATION,
+	ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES,
 	ALTER_SUBSCRIPTION_ENABLED,
 	ALTER_SUBSCRIPTION_SKIP,
 } AlterSubscriptionType;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6cf828ca8d0..9623240915c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1458,6 +1458,14 @@ pg_prepared_xacts| SELECT p.transaction,
    FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
      LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
      LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_sequences| SELECT p.pubname,
+    n.nspname AS schemaname,
+    c.relname AS sequencename
+   FROM pg_publication p,
+    LATERAL pg_get_publication_sequences((p.pubname)::text) gps(relid),
+    (pg_class c
+     JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+  WHERE (c.oid = gps.relid);
 pg_publication_tables| SELECT p.pubname,
     n.nspname AS schemaname,
     c.relname AS tablename,
@@ -2171,6 +2179,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
+    ss.sequence_sync_error_count,
     ss.sync_error_count,
     ss.confl_insert_exists,
     ss.confl_update_origin_differs,
@@ -2181,7 +2190,7 @@ pg_stat_subscription_stats| SELECT ss.subid,
     ss.confl_multiple_unique_conflicts,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sequence_sync_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 1443e1d9292..66dcd71eefa 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -107,7 +107,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
 ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ERROR:  cannot enable subscription that does not have a slot name
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
-ERROR:  ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions
+ERROR:  ALTER SUBSCRIPTION ... REFRESH PUBLICATION is not allowed for disabled subscriptions
 -- fail - origin must be either none or any
 CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo);
 ERROR:  unrecognized origin value: "foo"
@@ -352,7 +352,7 @@ ERROR:  ALTER SUBSCRIPTION with refresh cannot run inside a transaction block
 END;
 BEGIN;
 ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION;
-ERROR:  ALTER SUBSCRIPTION ... REFRESH cannot run inside a transaction block
+ERROR:  ALTER SUBSCRIPTION ... REFRESH PUBLICATION cannot run inside a transaction block
 END;
 CREATE FUNCTION func() RETURNS VOID AS
 $$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL;
-- 
2.34.1