[PATCH v1] POC: Preserve the subscription relations during pg_upgrade

Julien Rouhaud <julien.rouhaud@free.fr>

From: Julien Rouhaud <julien.rouhaud@free.fr>
To:
Date: 2023-02-22T01:19:32Z
Lists: pgsql-hackers
Previously, only the subscription information was preserved.  Without the list
of relations and their state it's impossible to re-enable the subscriptions
without missing some records as the list of relations can only be refreshed
after enabling the subscription (and therefore starting the apply worker).
Even if we added a way to refresh the subscription while enabling a
publication, we still wouldn't know which relation are new on the publication
side, and therefore should be fully synced, and which shouldn't.

To fix this problem, this patch teaches pg_dump in binary upgrade mode to emit
additional commands to be able to restore the content of pg_subscription_rel.

This new ALTER SUBSCRIPTION subcommand, usable only during binary upgrade, has
the following syntax:

ALTER SUBSCRIPTION name ADD TABLE (relid = XYZ, state = 'x' [, lsn = 'X/Y'])

The relation is identified by its oid, as it's preserved during pg_upgrade.
The lsn is optional, and defaults to NULL / InvalidXLogRecPtr.

Author: Julien Rouhaud
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
---
 src/backend/commands/subscriptioncmds.c | 57 +++++++++++++++++
 src/backend/parser/gram.y               | 11 ++++
 src/bin/pg_dump/pg_dump.c               | 84 +++++++++++++++++++++++++
 src/bin/pg_dump/pg_dump.h               | 12 ++++
 src/include/nodes/parsenodes.h          |  3 +-
 5 files changed, 166 insertions(+), 1 deletion(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..7f2560faf8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,8 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_RELID				0x00002000
+#define SUBOPT_STATE				0x00004000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +92,8 @@ typedef struct SubOpts
 	bool		disableonerr;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	Oid			relid;
+	char		state;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -324,6 +328,38 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_RELID) &&
+				 strcmp(defel->defname, "relid") == 0)
+		{
+			Oid			relid = defGetObjectId(defel);
+
+			if (IsSet(opts->specified_opts, SUBOPT_RELID))
+				errorConflictingDefElem(defel, pstate);
+
+			if (!OidIsValid(relid))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid relation identifier used")));
+
+			opts->specified_opts |= SUBOPT_RELID;
+			opts->relid = relid;
+		}
+		else if (IsSet(supported_opts, SUBOPT_STATE) &&
+				 strcmp(defel->defname, "state") == 0)
+		{
+			char	   *state_str = defGetString(defel);
+
+			if (IsSet(opts->specified_opts, SUBOPT_STATE))
+				errorConflictingDefElem(defel, pstate);
+
+			if (strlen(state_str) != 1)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid relation state used")));
+
+			opts->specified_opts |= SUBOPT_STATE;
+			opts->state = defGetString(defel)[0];
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1341,6 +1377,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_ADD_TABLE:
+			{
+				if (!IsBinaryUpgrade)
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR)),
+							errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not supported"));
+
+				supported_opts = SUBOPT_RELID | SUBOPT_STATE | SUBOPT_LSN;
+				parse_subscription_options(pstate, stmt->options,
+										   supported_opts, &opts);
+
+				/* relid and state should always be provided. */
+				Assert(IsSet(opts.specified_opts, SUBOPT_RELID));
+				Assert(IsSet(opts.specified_opts, SUBOPT_STATE));
+
+				AddSubscriptionRelState(subid, opts.relid, opts.state,
+										opts.lsn);
+
+				break;
+			}
+
 		default:
 			elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
 				 stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a0138382a1..0a3448c487 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10670,6 +10670,17 @@ AlterSubscriptionStmt:
 					n->options = $5;
 					$$ = (Node *) n;
 				}
+			/* for binary upgrade only */
+			| ALTER SUBSCRIPTION name ADD_P TABLE definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+
+					n->kind = ALTER_SUBSCRIPTION_ADD_TABLE;
+					n->subname = $3;
+					n->options = $6;
+					$$ = (Node *) n;
+				}
 		;
 
 /*****************************************************************************
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 527c7651ab..61f54ee549 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4470,6 +4470,69 @@ is_superuser(Archive *fout)
 	return false;
 }
 
+/*
+ * getSubscriptionRels
+ *	  get information about the given subscription's relations
+ */
+static SubRelInfo *
+getSubscriptionRels(Archive *fout, Oid subid, int *nrels)
+{
+	SubRelInfo *rels;
+	PQExpBuffer query;
+	PGresult   *res;
+	int			i_srrelid;
+	int			i_srsubstate;
+	int			i_srsublsn;
+	int			i,
+				ntups;
+
+	if (!fout->dopt->binary_upgrade)
+	{
+		*nrels = 0;
+
+		return NULL;
+	}
+
+	query = createPQExpBuffer();
+
+	appendPQExpBuffer(query, "SELECT srrelid, srsubstate, srsublsn "
+								" FROM pg_subscription_rel"
+								" WHERE srsubid = %u", subid);
+
+	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+	ntups = PQntuples(res);
+	*nrels = ntups;
+
+	if (ntups == 0)
+	{
+		rels = NULL;
+		goto cleanup;
+	}
+
+	/*
+	 * Get subscription relation fields.
+	 */
+	i_srrelid = PQfnumber(res, "srrelid");
+	i_srsubstate = PQfnumber(res, "srsubstate");
+	i_srsublsn = PQfnumber(res, "srsublsn");
+
+	rels = pg_malloc(ntups * sizeof(SubRelInfo));
+
+	for (i = 0; i < ntups; i++)
+	{
+		rels[i].srrelid = atooid(PQgetvalue(res, i, i_srrelid));
+		rels[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
+		rels[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
+	}
+
+cleanup:
+	PQclear(res);
+	destroyPQExpBuffer(query);
+
+	return rels;
+}
+
 /*
  * getSubscriptions
  *	  get information about subscriptions
@@ -4607,6 +4670,10 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
 
+		subinfo[i].subrels = getSubscriptionRels(fout,
+												 subinfo[i].dobj.catId.oid,
+												 &subinfo[i].nrels);
+
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
 	}
@@ -4690,6 +4757,22 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+	{
+		for (i = 0; i < subinfo->nrels; i++)
+		{
+			appendPQExpBuffer(query, "\nALTER SUBSCRIPTION %s ADD TABLE "
+									 "(RELID = %u, STATE = '%c'",
+									 qsubname,
+									 subinfo->subrels[i].srrelid,
+									 subinfo->subrels[i].srsubstate);
+
+			if (subinfo->subrels[i].srsublsn[0] != '\0')
+				appendPQExpBuffer(query, ", LSN = '%s'",
+								  subinfo->subrels[i].srsublsn);
+
+			appendPQExpBufferStr(query, ");");
+		}
+
 		ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
 					 ARCHIVE_OPTS(.tag = subinfo->dobj.name,
 								  .owner = subinfo->rolname,
@@ -4697,6 +4780,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 								  .section = SECTION_POST_DATA,
 								  .createStmt = query->data,
 								  .dropStmt = delq->data));
+	}
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
 		dumpComment(fout, "SUBSCRIPTION", qsubname,
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index e7cbd8d7ed..03fb0dafe0 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -646,6 +646,16 @@ typedef struct _PublicationSchemaInfo
 	NamespaceInfo *pubschema;
 } PublicationSchemaInfo;
 
+/*
+ * The SubRelInfo struct is used to represent subscription relation.
+ */
+typedef struct _SubRelInfo
+{
+	Oid		srrelid;
+	char	srsubstate;
+	char   *srsublsn;
+} SubRelInfo;
+
 /*
  * The SubscriptionInfo struct is used to represent subscription.
  */
@@ -662,6 +672,8 @@ typedef struct _SubscriptionInfo
 	char	   *suborigin;
 	char	   *subsynccommit;
 	char	   *subpublications;
+	int			nrels;
+	SubRelInfo *subrels;
 } SubscriptionInfo;
 
 /*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index f7d7f10f7d..8f66307287 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3917,7 +3917,8 @@ typedef enum AlterSubscriptionType
 	ALTER_SUBSCRIPTION_DROP_PUBLICATION,
 	ALTER_SUBSCRIPTION_REFRESH,
 	ALTER_SUBSCRIPTION_ENABLED,
-	ALTER_SUBSCRIPTION_SKIP
+	ALTER_SUBSCRIPTION_SKIP,
+	ALTER_SUBSCRIPTION_ADD_TABLE
 } AlterSubscriptionType;
 
 typedef struct AlterSubscriptionStmt
-- 
2.37.0


--zm7tcmnqvsudpkdn--