v1-0002-Create-conflict-history-table-if-it-does-not-exis.patch
application/octet-stream
Filename: v1-0002-Create-conflict-history-table-if-it-does-not-exis.patch
Type: application/octet-stream
Part: 0
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v1-0002
Subject: Create conflict history table if it does not exist
| File | + | − |
|---|---|---|
| src/backend/commands/subscriptioncmds.c | 70 | 13 |
From 711e05c4ae42d415fc13e2a6983a05fb8eeec154 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Sun, 14 Sep 2025 12:13:40 +0530
Subject: [PATCH v1 2/2] Create conflict history table if it does not exist
---
src/backend/commands/subscriptioncmds.c | 83 +++++++++++++++++++++----
1 file changed, 70 insertions(+), 13 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c2f2fdabadb..f919d357a7e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -123,7 +123,9 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
static void CheckAlterSubOption(Subscription *sub, const char *option,
bool slot_needs_update, bool isTopLevel);
-static void ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel);
+static void CreateConflictHistoryTable(Oid namespaceId, char *conflictrel);
+static void ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel,
+ Oid relid);
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -576,6 +578,54 @@ publicationListToArray(List *publist)
return PointerGetDatum(arr);
}
+/*
+ * CreateConflictHistoryTable: Create conflict log history table.
+ *
+ * The subscription creator becomes the owner of this table and has all
+ * privileges on it.
+ */
+static void
+CreateConflictHistoryTable(Oid namespaceId, char *conflictrel)
+{
+ StringInfoData querybuf;
+
+ initStringInfo(&querybuf);
+
+ /*
+ * Build and execute the CREATE TABLE query.
+ */
+ appendStringInfo(&querybuf,
+ "CREATE TABLE %s.%s ("
+ "relid Oid," /* Oid of relation */
+ "local_xid xid," /* local xid at the time of conflict */
+ "remote_xid xid," /* remote node xid that produced the conflicting change */
+ "local_lsn pg_lsn," /* local lsn at the time of conflict */
+ "remote_commit_lsn pg_lsn," /* commit lsn of the remote transaction */
+ "local_commit_ts TIMESTAMPTZ," /* commit ts of the local tuple */
+ "remote_commit_ts TIMESTAMPTZ," /* commit ts of the remote tuple */
+ "table_schema TEXT," /* name of the schema */
+ "table_name TEXT," /* name of the table */
+ "conflict_type TEXT," /* conflict type */
+ "local_origin TEXT," /* origin of remote tuple */
+ "remote_origin TEXT," /* origin of remote tuple */
+ "key_tuple JSON," /* json representation of the key used for searching */
+ "local_tuple JSON," /* json representation of the local tuple */
+ "remote_tuple JSON)", /* json representation of the remote tuple */
+ quote_identifier(get_namespace_name(namespaceId)),
+ quote_identifier(conflictrel));
+
+ if (SPI_connect() != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect failed");
+
+ if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+ if (SPI_finish() != SPI_OK_FINISH)
+ elog(ERROR, "SPI_finish failed");
+
+ pfree(querybuf.data);
+}
+
/*
* ValidateConflictHistoryTable - Validate conflict history table
*
@@ -583,7 +633,8 @@ publicationListToArray(List *publist)
* conflict log history table.
*/
static void
-ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel)
+ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel,
+ Oid relid)
{
Datum value;
Relation pg_attribute;
@@ -592,17 +643,9 @@ ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel)
ScanKeyData scankey;
SysScanDesc scan;
HeapTuple atup;
- Oid relid;
int attcnt = 0;
bool tbl_ok = true;
- relid = get_relname_relid(conflictrel, namespaceId);
- if (!OidIsValid(relid))
- ereport(ERROR,
- errcode(ERRCODE_UNDEFINED_TABLE),
- errmsg("relation \"%s.%s\" does not exist",
- get_namespace_name(namespaceId), conflictrel));
-
/* log history table must be a regular realtion */
if (get_rel_relkind(relid) != RELKIND_RELATION)
ereport(ERROR,
@@ -959,8 +1002,16 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
/* If conflict log history table name is given than create the table. */
if (opts.conflicttable)
- ValidateConflictHistoryTable(conflict_table_nspid,
- conflict_table);
+ {
+ Oid relid = get_relname_relid(conflict_table, conflict_table_nspid);
+
+ if (!OidIsValid(relid))
+ CreateConflictHistoryTable(conflict_table_nspid, conflict_table);
+ else
+ ValidateConflictHistoryTable(conflict_table_nspid,
+ conflict_table, relid);
+ }
+
/*
* Connect to remote side to execute requested commands and fetch table
* info.
@@ -1753,6 +1804,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_TABLE))
{
Oid nspid;
+ Oid relid;
char *relname = NULL;
List *names =
stringToQualifiedNameList(opts.conflicttable, NULL);
@@ -1766,7 +1818,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subconflictnspid - 1] = true;
replaces[Anum_pg_subscription_subconflicttable - 1] = true;
- ValidateConflictHistoryTable(nspid, relname);
+ relid = get_relname_relid(relname, nspid);
+
+ if (!OidIsValid(relid))
+ CreateConflictHistoryTable(nspid, relname);
+ else
+ ValidateConflictHistoryTable(nspid, relname, relid);
}
update_tuple = true;
--
2.49.0