Thread

  1. [PATCH v1] POC: Preserve the subscription relations during pg_upgrade

    Julien Rouhaud <julien.rouhaud@free.fr> — 2023-02-22T01:19:32Z

    Previously, only the subscription information was preserved.  Without the list
    of relations and their state it's impossible to re-enable the subscriptions
    without missing some records as the list of relations can only be refreshed
    after enabling the subscription (and therefore starting the apply worker).
    Even if we added a way to refresh the subscription while enabling a
    publication, we still wouldn't know which relation are new on the publication
    side, and therefore should be fully synced, and which shouldn't.
    
    To fix this problem, this patch teaches pg_dump in binary upgrade mode to emit
    additional commands to be able to restore the content of pg_subscription_rel.
    
    This new ALTER SUBSCRIPTION subcommand, usable only during binary upgrade, has
    the following syntax:
    
    ALTER SUBSCRIPTION name ADD TABLE (relid = XYZ, state = 'x' [, lsn = 'X/Y'])
    
    The relation is identified by its oid, as it's preserved during pg_upgrade.
    The lsn is optional, and defaults to NULL / InvalidXLogRecPtr.
    
    Author: Julien Rouhaud
    Reviewed-by: FIXME
    Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
    ---
     src/backend/commands/subscriptioncmds.c | 57 +++++++++++++++++
     src/backend/parser/gram.y               | 11 ++++
     src/bin/pg_dump/pg_dump.c               | 84 +++++++++++++++++++++++++
     src/bin/pg_dump/pg_dump.h               | 12 ++++
     src/include/nodes/parsenodes.h          |  3 +-
     5 files changed, 166 insertions(+), 1 deletion(-)
    
    diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
    index 464db6d247..7f2560faf8 100644
    --- a/src/backend/commands/subscriptioncmds.c
    +++ b/src/backend/commands/subscriptioncmds.c
    @@ -66,6 +66,8 @@
     #define SUBOPT_DISABLE_ON_ERR		0x00000400
     #define SUBOPT_LSN					0x00000800
     #define SUBOPT_ORIGIN				0x00001000
    +#define SUBOPT_RELID				0x00002000
    +#define SUBOPT_STATE				0x00004000
     
     /* check if the 'val' has 'bits' set */
     #define IsSet(val, bits)  (((val) & (bits)) == (bits))
    @@ -90,6 +92,8 @@ typedef struct SubOpts
     	bool		disableonerr;
     	char	   *origin;
     	XLogRecPtr	lsn;
    +	Oid			relid;
    +	char		state;
     } SubOpts;
     
     static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
    @@ -324,6 +328,38 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
     			opts->specified_opts |= SUBOPT_LSN;
     			opts->lsn = lsn;
     		}
    +		else if (IsSet(supported_opts, SUBOPT_RELID) &&
    +				 strcmp(defel->defname, "relid") == 0)
    +		{
    +			Oid			relid = defGetObjectId(defel);
    +
    +			if (IsSet(opts->specified_opts, SUBOPT_RELID))
    +				errorConflictingDefElem(defel, pstate);
    +
    +			if (!OidIsValid(relid))
    +				ereport(ERROR,
    +						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    +						 errmsg("invalid relation identifier used")));
    +
    +			opts->specified_opts |= SUBOPT_RELID;
    +			opts->relid = relid;
    +		}
    +		else if (IsSet(supported_opts, SUBOPT_STATE) &&
    +				 strcmp(defel->defname, "state") == 0)
    +		{
    +			char	   *state_str = defGetString(defel);
    +
    +			if (IsSet(opts->specified_opts, SUBOPT_STATE))
    +				errorConflictingDefElem(defel, pstate);
    +
    +			if (strlen(state_str) != 1)
    +				ereport(ERROR,
    +						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    +						 errmsg("invalid relation state used")));
    +
    +			opts->specified_opts |= SUBOPT_STATE;
    +			opts->state = defGetString(defel)[0];
    +		}
     		else
     			ereport(ERROR,
     					(errcode(ERRCODE_SYNTAX_ERROR),
    @@ -1341,6 +1377,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
     				break;
     			}
     
    +		case ALTER_SUBSCRIPTION_ADD_TABLE:
    +			{
    +				if (!IsBinaryUpgrade)
    +					ereport(ERROR,
    +							(errcode(ERRCODE_SYNTAX_ERROR)),
    +							errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not supported"));
    +
    +				supported_opts = SUBOPT_RELID | SUBOPT_STATE | SUBOPT_LSN;
    +				parse_subscription_options(pstate, stmt->options,
    +										   supported_opts, &opts);
    +
    +				/* relid and state should always be provided. */
    +				Assert(IsSet(opts.specified_opts, SUBOPT_RELID));
    +				Assert(IsSet(opts.specified_opts, SUBOPT_STATE));
    +
    +				AddSubscriptionRelState(subid, opts.relid, opts.state,
    +										opts.lsn);
    +
    +				break;
    +			}
    +
     		default:
     			elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
     				 stmt->kind);
    diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
    index a0138382a1..0a3448c487 100644
    --- a/src/backend/parser/gram.y
    +++ b/src/backend/parser/gram.y
    @@ -10670,6 +10670,17 @@ AlterSubscriptionStmt:
     					n->options = $5;
     					$$ = (Node *) n;
     				}
    +			/* for binary upgrade only */
    +			| ALTER SUBSCRIPTION name ADD_P TABLE definition
    +				{
    +					AlterSubscriptionStmt *n =
    +						makeNode(AlterSubscriptionStmt);
    +
    +					n->kind = ALTER_SUBSCRIPTION_ADD_TABLE;
    +					n->subname = $3;
    +					n->options = $6;
    +					$$ = (Node *) n;
    +				}
     		;
     
     /*****************************************************************************
    diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
    index 527c7651ab..61f54ee549 100644
    --- a/src/bin/pg_dump/pg_dump.c
    +++ b/src/bin/pg_dump/pg_dump.c
    @@ -4470,6 +4470,69 @@ is_superuser(Archive *fout)
     	return false;
     }
     
    +/*
    + * getSubscriptionRels
    + *	  get information about the given subscription's relations
    + */
    +static SubRelInfo *
    +getSubscriptionRels(Archive *fout, Oid subid, int *nrels)
    +{
    +	SubRelInfo *rels;
    +	PQExpBuffer query;
    +	PGresult   *res;
    +	int			i_srrelid;
    +	int			i_srsubstate;
    +	int			i_srsublsn;
    +	int			i,
    +				ntups;
    +
    +	if (!fout->dopt->binary_upgrade)
    +	{
    +		*nrels = 0;
    +
    +		return NULL;
    +	}
    +
    +	query = createPQExpBuffer();
    +
    +	appendPQExpBuffer(query, "SELECT srrelid, srsubstate, srsublsn "
    +								" FROM pg_subscription_rel"
    +								" WHERE srsubid = %u", subid);
    +
    +	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
    +
    +	ntups = PQntuples(res);
    +	*nrels = ntups;
    +
    +	if (ntups == 0)
    +	{
    +		rels = NULL;
    +		goto cleanup;
    +	}
    +
    +	/*
    +	 * Get subscription relation fields.
    +	 */
    +	i_srrelid = PQfnumber(res, "srrelid");
    +	i_srsubstate = PQfnumber(res, "srsubstate");
    +	i_srsublsn = PQfnumber(res, "srsublsn");
    +
    +	rels = pg_malloc(ntups * sizeof(SubRelInfo));
    +
    +	for (i = 0; i < ntups; i++)
    +	{
    +		rels[i].srrelid = atooid(PQgetvalue(res, i, i_srrelid));
    +		rels[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
    +		rels[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
    +	}
    +
    +cleanup:
    +	PQclear(res);
    +	destroyPQExpBuffer(query);
    +
    +	return rels;
    +}
    +
     /*
      * getSubscriptions
      *	  get information about subscriptions
    @@ -4607,6 +4670,10 @@ getSubscriptions(Archive *fout)
     			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
     		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
     
    +		subinfo[i].subrels = getSubscriptionRels(fout,
    +												 subinfo[i].dobj.catId.oid,
    +												 &subinfo[i].nrels);
    +
     		/* Decide whether we want to dump it */
     		selectDumpableObject(&(subinfo[i].dobj), fout);
     	}
    @@ -4690,6 +4757,22 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
     	appendPQExpBufferStr(query, ");\n");
     
     	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
    +	{
    +		for (i = 0; i < subinfo->nrels; i++)
    +		{
    +			appendPQExpBuffer(query, "\nALTER SUBSCRIPTION %s ADD TABLE "
    +									 "(RELID = %u, STATE = '%c'",
    +									 qsubname,
    +									 subinfo->subrels[i].srrelid,
    +									 subinfo->subrels[i].srsubstate);
    +
    +			if (subinfo->subrels[i].srsublsn[0] != '\0')
    +				appendPQExpBuffer(query, ", LSN = '%s'",
    +								  subinfo->subrels[i].srsublsn);
    +
    +			appendPQExpBufferStr(query, ");");
    +		}
    +
     		ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
     					 ARCHIVE_OPTS(.tag = subinfo->dobj.name,
     								  .owner = subinfo->rolname,
    @@ -4697,6 +4780,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
     								  .section = SECTION_POST_DATA,
     								  .createStmt = query->data,
     								  .dropStmt = delq->data));
    +	}
     
     	if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
     		dumpComment(fout, "SUBSCRIPTION", qsubname,
    diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
    index e7cbd8d7ed..03fb0dafe0 100644
    --- a/src/bin/pg_dump/pg_dump.h
    +++ b/src/bin/pg_dump/pg_dump.h
    @@ -646,6 +646,16 @@ typedef struct _PublicationSchemaInfo
     	NamespaceInfo *pubschema;
     } PublicationSchemaInfo;
     
    +/*
    + * The SubRelInfo struct is used to represent subscription relation.
    + */
    +typedef struct _SubRelInfo
    +{
    +	Oid		srrelid;
    +	char	srsubstate;
    +	char   *srsublsn;
    +} SubRelInfo;
    +
     /*
      * The SubscriptionInfo struct is used to represent subscription.
      */
    @@ -662,6 +672,8 @@ typedef struct _SubscriptionInfo
     	char	   *suborigin;
     	char	   *subsynccommit;
     	char	   *subpublications;
    +	int			nrels;
    +	SubRelInfo *subrels;
     } SubscriptionInfo;
     
     /*
    diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
    index f7d7f10f7d..8f66307287 100644
    --- a/src/include/nodes/parsenodes.h
    +++ b/src/include/nodes/parsenodes.h
    @@ -3917,7 +3917,8 @@ typedef enum AlterSubscriptionType
     	ALTER_SUBSCRIPTION_DROP_PUBLICATION,
     	ALTER_SUBSCRIPTION_REFRESH,
     	ALTER_SUBSCRIPTION_ENABLED,
    -	ALTER_SUBSCRIPTION_SKIP
    +	ALTER_SUBSCRIPTION_SKIP,
    +	ALTER_SUBSCRIPTION_ADD_TABLE
     } AlterSubscriptionType;
     
     typedef struct AlterSubscriptionStmt
    -- 
    2.37.0
    
    
    --zm7tcmnqvsudpkdn--