[PATCH v1] POC: Preserve the subscription relations during pg_upgrade
Julien Rouhaud <julien.rouhaud@free.fr>
From: Julien Rouhaud <julien.rouhaud@free.fr>
To:
Date: 2023-02-22T01:19:32Z
Lists: pgsql-hackers
Previously, only the subscription information was preserved. Without the list
of relations and their state it's impossible to re-enable the subscriptions
without missing some records as the list of relations can only be refreshed
after enabling the subscription (and therefore starting the apply worker).
Even if we added a way to refresh the subscription while enabling a
publication, we still wouldn't know which relation are new on the publication
side, and therefore should be fully synced, and which shouldn't.
To fix this problem, this patch teaches pg_dump in binary upgrade mode to emit
additional commands to be able to restore the content of pg_subscription_rel.
This new ALTER SUBSCRIPTION subcommand, usable only during binary upgrade, has
the following syntax:
ALTER SUBSCRIPTION name ADD TABLE (relid = XYZ, state = 'x' [, lsn = 'X/Y'])
The relation is identified by its oid, as it's preserved during pg_upgrade.
The lsn is optional, and defaults to NULL / InvalidXLogRecPtr.
Author: Julien Rouhaud
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
---
src/backend/commands/subscriptioncmds.c | 57 +++++++++++++++++
src/backend/parser/gram.y | 11 ++++
src/bin/pg_dump/pg_dump.c | 84 +++++++++++++++++++++++++
src/bin/pg_dump/pg_dump.h | 12 ++++
src/include/nodes/parsenodes.h | 3 +-
5 files changed, 166 insertions(+), 1 deletion(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..7f2560faf8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,8 @@
#define SUBOPT_DISABLE_ON_ERR 0x00000400
#define SUBOPT_LSN 0x00000800
#define SUBOPT_ORIGIN 0x00001000
+#define SUBOPT_RELID 0x00002000
+#define SUBOPT_STATE 0x00004000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -90,6 +92,8 @@ typedef struct SubOpts
bool disableonerr;
char *origin;
XLogRecPtr lsn;
+ Oid relid;
+ char state;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -324,6 +328,38 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_LSN;
opts->lsn = lsn;
}
+ else if (IsSet(supported_opts, SUBOPT_RELID) &&
+ strcmp(defel->defname, "relid") == 0)
+ {
+ Oid relid = defGetObjectId(defel);
+
+ if (IsSet(opts->specified_opts, SUBOPT_RELID))
+ errorConflictingDefElem(defel, pstate);
+
+ if (!OidIsValid(relid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid relation identifier used")));
+
+ opts->specified_opts |= SUBOPT_RELID;
+ opts->relid = relid;
+ }
+ else if (IsSet(supported_opts, SUBOPT_STATE) &&
+ strcmp(defel->defname, "state") == 0)
+ {
+ char *state_str = defGetString(defel);
+
+ if (IsSet(opts->specified_opts, SUBOPT_STATE))
+ errorConflictingDefElem(defel, pstate);
+
+ if (strlen(state_str) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid relation state used")));
+
+ opts->specified_opts |= SUBOPT_STATE;
+ opts->state = defGetString(defel)[0];
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1341,6 +1377,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break;
}
+ case ALTER_SUBSCRIPTION_ADD_TABLE:
+ {
+ if (!IsBinaryUpgrade)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR)),
+ errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not supported"));
+
+ supported_opts = SUBOPT_RELID | SUBOPT_STATE | SUBOPT_LSN;
+ parse_subscription_options(pstate, stmt->options,
+ supported_opts, &opts);
+
+ /* relid and state should always be provided. */
+ Assert(IsSet(opts.specified_opts, SUBOPT_RELID));
+ Assert(IsSet(opts.specified_opts, SUBOPT_STATE));
+
+ AddSubscriptionRelState(subid, opts.relid, opts.state,
+ opts.lsn);
+
+ break;
+ }
+
default:
elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a0138382a1..0a3448c487 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10670,6 +10670,17 @@ AlterSubscriptionStmt:
n->options = $5;
$$ = (Node *) n;
}
+ /* for binary upgrade only */
+ | ALTER SUBSCRIPTION name ADD_P TABLE definition
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+
+ n->kind = ALTER_SUBSCRIPTION_ADD_TABLE;
+ n->subname = $3;
+ n->options = $6;
+ $$ = (Node *) n;
+ }
;
/*****************************************************************************
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 527c7651ab..61f54ee549 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4470,6 +4470,69 @@ is_superuser(Archive *fout)
return false;
}
+/*
+ * getSubscriptionRels
+ * get information about the given subscription's relations
+ */
+static SubRelInfo *
+getSubscriptionRels(Archive *fout, Oid subid, int *nrels)
+{
+ SubRelInfo *rels;
+ PQExpBuffer query;
+ PGresult *res;
+ int i_srrelid;
+ int i_srsubstate;
+ int i_srsublsn;
+ int i,
+ ntups;
+
+ if (!fout->dopt->binary_upgrade)
+ {
+ *nrels = 0;
+
+ return NULL;
+ }
+
+ query = createPQExpBuffer();
+
+ appendPQExpBuffer(query, "SELECT srrelid, srsubstate, srsublsn "
+ " FROM pg_subscription_rel"
+ " WHERE srsubid = %u", subid);
+
+ res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+ ntups = PQntuples(res);
+ *nrels = ntups;
+
+ if (ntups == 0)
+ {
+ rels = NULL;
+ goto cleanup;
+ }
+
+ /*
+ * Get subscription relation fields.
+ */
+ i_srrelid = PQfnumber(res, "srrelid");
+ i_srsubstate = PQfnumber(res, "srsubstate");
+ i_srsublsn = PQfnumber(res, "srsublsn");
+
+ rels = pg_malloc(ntups * sizeof(SubRelInfo));
+
+ for (i = 0; i < ntups; i++)
+ {
+ rels[i].srrelid = atooid(PQgetvalue(res, i, i_srrelid));
+ rels[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
+ rels[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
+ }
+
+cleanup:
+ PQclear(res);
+ destroyPQExpBuffer(query);
+
+ return rels;
+}
+
/*
* getSubscriptions
* get information about subscriptions
@@ -4607,6 +4670,10 @@ getSubscriptions(Archive *fout)
pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+ subinfo[i].subrels = getSubscriptionRels(fout,
+ subinfo[i].dobj.catId.oid,
+ &subinfo[i].nrels);
+
/* Decide whether we want to dump it */
selectDumpableObject(&(subinfo[i].dobj), fout);
}
@@ -4690,6 +4757,22 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ");\n");
if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+ {
+ for (i = 0; i < subinfo->nrels; i++)
+ {
+ appendPQExpBuffer(query, "\nALTER SUBSCRIPTION %s ADD TABLE "
+ "(RELID = %u, STATE = '%c'",
+ qsubname,
+ subinfo->subrels[i].srrelid,
+ subinfo->subrels[i].srsubstate);
+
+ if (subinfo->subrels[i].srsublsn[0] != '\0')
+ appendPQExpBuffer(query, ", LSN = '%s'",
+ subinfo->subrels[i].srsublsn);
+
+ appendPQExpBufferStr(query, ");");
+ }
+
ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
ARCHIVE_OPTS(.tag = subinfo->dobj.name,
.owner = subinfo->rolname,
@@ -4697,6 +4780,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
.section = SECTION_POST_DATA,
.createStmt = query->data,
.dropStmt = delq->data));
+ }
if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
dumpComment(fout, "SUBSCRIPTION", qsubname,
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index e7cbd8d7ed..03fb0dafe0 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -646,6 +646,16 @@ typedef struct _PublicationSchemaInfo
NamespaceInfo *pubschema;
} PublicationSchemaInfo;
+/*
+ * The SubRelInfo struct is used to represent subscription relation.
+ */
+typedef struct _SubRelInfo
+{
+ Oid srrelid;
+ char srsubstate;
+ char *srsublsn;
+} SubRelInfo;
+
/*
* The SubscriptionInfo struct is used to represent subscription.
*/
@@ -662,6 +672,8 @@ typedef struct _SubscriptionInfo
char *suborigin;
char *subsynccommit;
char *subpublications;
+ int nrels;
+ SubRelInfo *subrels;
} SubscriptionInfo;
/*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index f7d7f10f7d..8f66307287 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3917,7 +3917,8 @@ typedef enum AlterSubscriptionType
ALTER_SUBSCRIPTION_DROP_PUBLICATION,
ALTER_SUBSCRIPTION_REFRESH,
ALTER_SUBSCRIPTION_ENABLED,
- ALTER_SUBSCRIPTION_SKIP
+ ALTER_SUBSCRIPTION_SKIP,
+ ALTER_SUBSCRIPTION_ADD_TABLE
} AlterSubscriptionType;
typedef struct AlterSubscriptionStmt
--
2.37.0
--zm7tcmnqvsudpkdn--