From 711e05c4ae42d415fc13e2a6983a05fb8eeec154 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Sun, 14 Sep 2025 12:13:40 +0530 Subject: [PATCH v1 2/2] Create conflict history table if it does not exist --- src/backend/commands/subscriptioncmds.c | 83 +++++++++++++++++++++---- 1 file changed, 70 insertions(+), 13 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index c2f2fdabadb..f919d357a7e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -123,7 +123,9 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel); -static void ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel); +static void CreateConflictHistoryTable(Oid namespaceId, char *conflictrel); +static void ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel, + Oid relid); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -576,6 +578,54 @@ publicationListToArray(List *publist) return PointerGetDatum(arr); } +/* + * CreateConflictHistoryTable: Create conflict log history table. + * + * The subscription creator becomes the owner of this table and has all + * privileges on it. + */ +static void +CreateConflictHistoryTable(Oid namespaceId, char *conflictrel) +{ + StringInfoData querybuf; + + initStringInfo(&querybuf); + + /* + * Build and execute the CREATE TABLE query. + */ + appendStringInfo(&querybuf, + "CREATE TABLE %s.%s (" + "relid Oid," /* Oid of relation */ + "local_xid xid," /* local xid at the time of conflict */ + "remote_xid xid," /* remote node xid that produced the conflicting change */ + "local_lsn pg_lsn," /* local lsn at the time of conflict */ + "remote_commit_lsn pg_lsn," /* commit lsn of the remote transaction */ + "local_commit_ts TIMESTAMPTZ," /* commit ts of the local tuple */ + "remote_commit_ts TIMESTAMPTZ," /* commit ts of the remote tuple */ + "table_schema TEXT," /* name of the schema */ + "table_name TEXT," /* name of the table */ + "conflict_type TEXT," /* conflict type */ + "local_origin TEXT," /* origin of remote tuple */ + "remote_origin TEXT," /* origin of remote tuple */ + "key_tuple JSON," /* json representation of the key used for searching */ + "local_tuple JSON," /* json representation of the local tuple */ + "remote_tuple JSON)", /* json representation of the remote tuple */ + quote_identifier(get_namespace_name(namespaceId)), + quote_identifier(conflictrel)); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + pfree(querybuf.data); +} + /* * ValidateConflictHistoryTable - Validate conflict history table * @@ -583,7 +633,8 @@ publicationListToArray(List *publist) * conflict log history table. */ static void -ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel) +ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel, + Oid relid) { Datum value; Relation pg_attribute; @@ -592,17 +643,9 @@ ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel) ScanKeyData scankey; SysScanDesc scan; HeapTuple atup; - Oid relid; int attcnt = 0; bool tbl_ok = true; - relid = get_relname_relid(conflictrel, namespaceId); - if (!OidIsValid(relid)) - ereport(ERROR, - errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s.%s\" does not exist", - get_namespace_name(namespaceId), conflictrel)); - /* log history table must be a regular realtion */ if (get_rel_relkind(relid) != RELKIND_RELATION) ereport(ERROR, @@ -959,8 +1002,16 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, /* If conflict log history table name is given than create the table. */ if (opts.conflicttable) - ValidateConflictHistoryTable(conflict_table_nspid, - conflict_table); + { + Oid relid = get_relname_relid(conflict_table, conflict_table_nspid); + + if (!OidIsValid(relid)) + CreateConflictHistoryTable(conflict_table_nspid, conflict_table); + else + ValidateConflictHistoryTable(conflict_table_nspid, + conflict_table, relid); + } + /* * Connect to remote side to execute requested commands and fetch table * info. @@ -1753,6 +1804,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_TABLE)) { Oid nspid; + Oid relid; char *relname = NULL; List *names = stringToQualifiedNameList(opts.conflicttable, NULL); @@ -1766,7 +1818,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subconflictnspid - 1] = true; replaces[Anum_pg_subscription_subconflicttable - 1] = true; - ValidateConflictHistoryTable(nspid, relname); + relid = get_relname_relid(relname, nspid); + + if (!OidIsValid(relid)) + CreateConflictHistoryTable(nspid, relname); + else + ValidateConflictHistoryTable(nspid, relname, relid); } update_tuple = true; -- 2.49.0