v8-0001-Add-configurable-conflict-log-table-for-Logical-R.patch
application/octet-stream
Filename: v8-0001-Add-configurable-conflict-log-table-for-Logical-R.patch
Type: application/octet-stream
Part: 0
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v8-0001
Subject: Add configurable conflict log table for Logical Replication
| File | + | − |
|---|---|---|
| src/backend/catalog/pg_publication.c | 23 | 2 |
| src/backend/commands/subscriptioncmds.c | 236 | 3 |
| src/backend/replication/logical/conflict.c | 337 | 23 |
| src/backend/replication/logical/launcher.c | 1 | 0 |
| src/backend/replication/logical/worker.c | 29 | 1 |
| src/backend/utils/cache/lsyscache.c | 38 | 0 |
| src/bin/psql/describe.c | 7 | 1 |
| src/bin/psql/tab-complete.in.c | 4 | 4 |
| src/include/catalog/pg_subscription.h | 5 | 0 |
| src/include/commands/subscriptioncmds.h | 2 | 0 |
| src/include/replication/conflict.h | 4 | 0 |
| src/include/replication/worker_internal.h | 7 | 0 |
| src/include/utils/lsyscache.h | 1 | 0 |
| src/test/regress/expected/subscription.out | 231 | 88 |
| src/test/regress/sql/subscription.sql | 87 | 0 |
From e6ec7862f7ef760f5a489dab28d8dcab1cfc02dd Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Wed, 12 Nov 2025 10:43:19 +0530
Subject: [PATCH v8] Add configurable conflict log table for Logical
Replication
This patch adds a feature to provide a structured, queryable record of all
logical replication conflicts. The current approach of logging conflicts as
plain text in the server logs makes it difficult to query, analyze, and
use for external monitoring and automation.
This patch addresses these limitations by introducing a configurable
conflict_log_table option in the CREATE SUBSCRIPTION command. Key design
decisions include:
User-Managed Table: The conflict log is stored in a user-managed table
rather than a system catalog.
Structured Data: Conflict details, including the original and remote tuples,
are stored in JSON columns, providing a flexible format to accommodate different
table schemas.
Comprehensive Information: The log table captures essential attributes such as
local and remote transaction IDs, LSNs, commit timestamps, and conflict type,
providing a complete record for post-mortem analysis.
This feature will make logical replication conflicts easier to monitor and manage,
significantly improving the overall resilience and operability of replication setups.
---
src/backend/catalog/pg_publication.c | 25 +-
src/backend/commands/subscriptioncmds.c | 239 +++++++++++++-
src/backend/replication/logical/conflict.c | 360 +++++++++++++++++++--
src/backend/replication/logical/launcher.c | 1 +
src/backend/replication/logical/worker.c | 30 +-
src/backend/utils/cache/lsyscache.c | 38 +++
src/bin/psql/describe.c | 8 +-
src/bin/psql/tab-complete.in.c | 8 +-
src/include/catalog/pg_subscription.h | 5 +
src/include/commands/subscriptioncmds.h | 2 +
src/include/replication/conflict.h | 4 +
src/include/replication/worker_internal.h | 7 +
src/include/utils/lsyscache.h | 1 +
src/test/regress/expected/subscription.out | 319 +++++++++++++-----
src/test/regress/sql/subscription.sql | 87 +++++
15 files changed, 1012 insertions(+), 122 deletions(-)
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index ac2f4ee3561..7e2f50cafd6 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -31,6 +31,7 @@
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/publicationcmds.h"
+#include "commands/subscriptioncmds.h"
#include "funcapi.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -85,6 +86,14 @@ check_publication_add_relation(Relation targetrel)
errmsg("cannot add relation \"%s\" to publication",
RelationGetRelationName(targetrel)),
errdetail("This operation is not supported for unlogged tables.")));
+
+ /* Can't be conflict log table */
+ if (IsConflictLogRelid(RelationGetRelid(targetrel)))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add relation \"%s\" to publication",
+ RelationGetRelationName(targetrel)),
+ errdetail("This operation is not supported for conflict log tables.")));
}
/*
@@ -145,6 +154,13 @@ is_publishable_class(Oid relid, Form_pg_class reltuple)
/*
* Another variant of is_publishable_class(), taking a Relation.
+ *
+ * Note: Conflict log tables are not publishable. However, we intentionally
+ * skip this check here because this function is called for every change and
+ * performing this check during every change publication is costly. To ensure
+ * unpublishable entries are ignored without incurring performance overhead,
+ * tuples inserted into the conflict log table uses the HEAP_INSERT_NO_LOGICAL
+ * flag. This allows the decoding layer to bypass these entries automatically.
*/
bool
is_publishable_relation(Relation rel)
@@ -169,7 +185,10 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
PG_RETURN_NULL();
- result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
+
+ /* Subscription conflict log tables are not published */
+ result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)) &&
+ !IsConflictLogRelid(relid);
ReleaseSysCache(tuple);
PG_RETURN_BOOL(result);
}
@@ -890,7 +909,9 @@ GetAllPublicationRelations(char relkind, bool pubviaroot)
Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
Oid relid = relForm->oid;
+ /* Subscription conflict log tables are not published */
if (is_publishable_class(relid, relForm) &&
+ !IsConflictLogRelid(relid) &&
!(relForm->relispartition && pubviaroot))
result = lappend_oid(result, relid);
}
@@ -1018,7 +1039,7 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Oid relid = relForm->oid;
char relkind;
- if (!is_publishable_class(relid, relForm))
+ if (!is_publishable_class(relid, relForm) || IsConflictLogRelid(relid))
continue;
relkind = get_rel_relkind(relid);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 24b70234b35..e96dee29851 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -15,6 +15,7 @@
#include "postgres.h"
#include "access/commit_ts.h"
+#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/twophase.h"
@@ -34,6 +35,7 @@
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
+#include "executor/spi.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
@@ -47,10 +49,12 @@
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/builtins.h"
+#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
+#include "utils/regproc.h"
#include "utils/syscache.h"
/*
@@ -75,6 +79,7 @@
#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
#define SUBOPT_LSN 0x00010000
#define SUBOPT_ORIGIN 0x00020000
+#define SUBOPT_CONFLICT_LOG_TABLE 0x00040000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -103,6 +108,7 @@ typedef struct SubOpts
bool retaindeadtuples;
int32 maxretention;
char *origin;
+ char *conflictlogtable;
XLogRecPtr lsn;
} SubOpts;
@@ -135,7 +141,8 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
static void CheckAlterSubOption(Subscription *sub, const char *option,
bool slot_needs_update, bool isTopLevel);
-
+static void create_conflict_log_table(Oid namespaceId, char *conflictrel);
+static void drop_conflict_log_table(Oid namespaceId, char *conflictrel);
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -191,6 +198,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->maxretention = 0;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+ if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_TABLE))
+ opts->conflictlogtable = NULL;
/* Parse options */
foreach(lc, stmt_options)
@@ -402,6 +411,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_LSN;
opts->lsn = lsn;
}
+ else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_TABLE) &&
+ strcmp(defel->defname, "conflict_log_table") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_TABLE))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_CONFLICT_LOG_TABLE;
+ opts->conflictlogtable = defGetString(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -599,6 +617,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
bits32 supported_opts;
SubOpts opts = {0};
AclResult aclresult;
+ Oid conflictlogtable_nspid = InvalidOid;
+ char *conflictlogtable = NULL;
/*
* Parse and check options.
@@ -612,7 +632,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_RETAIN_DEAD_TUPLES |
- SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
+ SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN |
+ SUBOPT_CONFLICT_LOG_TABLE);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -747,6 +768,25 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_suborigin - 1] =
CStringGetTextDatum(opts.origin);
+ /*
+ * If a conflict log table name is specified, parse the schema and table
+ * name from the string. Store the namespace OID and the table name in
+ * the pg_subscription catalog tuple.
+ */
+ if (opts.conflictlogtable)
+ {
+ List *names = stringToQualifiedNameList(opts.conflictlogtable, NULL);
+
+ conflictlogtable_nspid =
+ QualifiedNameGetCreationNamespace(names, &conflictlogtable);
+ values[Anum_pg_subscription_subconflictlognspid - 1] =
+ ObjectIdGetDatum(conflictlogtable_nspid);
+ values[Anum_pg_subscription_subconflictlogtable - 1] =
+ CStringGetTextDatum(conflictlogtable);
+ }
+ else
+ nulls[Anum_pg_subscription_subconflictlogtable - 1] = true;
+
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
/* Insert tuple into catalog. */
@@ -768,6 +808,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
replorigin_create(originname);
+ /* If a conflict log table name is given then create the table. */
+ if (opts.conflictlogtable)
+ create_conflict_log_table(conflictlogtable_nspid, conflictlogtable);
+
/*
* Connect to remote side to execute requested commands and fetch table
* and sequence info.
@@ -1410,7 +1454,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_RETAIN_DEAD_TUPLES |
SUBOPT_MAX_RETENTION_DURATION |
- SUBOPT_ORIGIN);
+ SUBOPT_ORIGIN |
+ SUBOPT_CONFLICT_LOG_TABLE);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -1665,6 +1710,59 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
origin = opts.origin;
}
+ if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_TABLE))
+ {
+ Oid nspid;
+ char *relname = NULL;
+ Oid old_nspid = InvalidOid;
+ char *old_relname = NULL;
+ List *names =
+ stringToQualifiedNameList(opts.conflictlogtable, NULL);
+
+ nspid = QualifiedNameGetCreationNamespace(names, &relname);
+
+ /*
+ * If the subscription already uses this conflict log table
+ * and it exists, just issue a notice.
+ */
+ old_relname =
+ get_subscription_conflict_log_table(subid, &old_nspid);
+ if (old_relname != NULL &&
+ strcmp(old_relname, relname) == 0 &&
+ old_nspid == nspid &&
+ OidIsValid(get_relname_relid(relname, nspid)))
+ {
+ char *nspname = get_namespace_name(nspid);
+
+ ereport(NOTICE,
+ (errmsg("\"%s.%s\" is already in use as the conflict log table for this subscription",
+ nspname, relname)));
+ pfree(nspname);
+ }
+ else
+ {
+ /*
+ * Create the conflict log table after dropping any
+ * pre-existing one.
+ */
+ if (old_relname)
+ drop_conflict_log_table(old_nspid, old_relname);
+ create_conflict_log_table(nspid, relname);
+
+ values[Anum_pg_subscription_subconflictlognspid - 1] =
+ ObjectIdGetDatum(nspid);
+ values[Anum_pg_subscription_subconflictlogtable - 1] =
+ CStringGetTextDatum(relname);
+ replaces[Anum_pg_subscription_subconflictlognspid - 1] =
+ true;
+ replaces[Anum_pg_subscription_subconflictlogtable - 1] =
+ true;
+ }
+
+ if (old_relname != NULL)
+ pfree(old_relname);
+ }
+
update_tuple = true;
break;
}
@@ -2027,6 +2125,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
Form_pg_subscription form;
List *rstates;
bool must_use_password;
+ Oid conflictlogtable_nsp = InvalidOid;
+ char *conflictlogtable = NULL;
/*
* The launcher may concurrently start a new worker for this subscription.
@@ -2110,6 +2210,20 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ObjectAddressSet(myself, SubscriptionRelationId, subid);
EventTriggerSQLDropAddObject(&myself, true, true);
+ /* Fetch the conflict log table information. */
+ conflictlogtable =
+ get_subscription_conflict_log_table(subid, &conflictlogtable_nsp);
+
+ /*
+ * If the subscription had a conflict log table, drop it now. This happens
+ * before deleting the subscription tuple.
+ */
+ if (conflictlogtable)
+ {
+ drop_conflict_log_table(conflictlogtable_nsp, conflictlogtable);
+ pfree(conflictlogtable);
+ }
+
/* Remove the tuple from catalog. */
CatalogTupleDelete(rel, &tup->t_self);
@@ -3188,3 +3302,122 @@ defGetStreamingMode(DefElem *def)
def->defname)));
return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
}
+
+/*
+ * Create conflict log table.
+ *
+ * The subscription owner becomes the owner of this table and has all
+ * privileges on it.
+ */
+static void
+create_conflict_log_table(Oid namespaceId, char *conflictrel)
+{
+ StringInfoData querybuf;
+
+ /* Report an error if the specified conflict log table already exists. */
+ if (OidIsValid(get_relname_relid(conflictrel, namespaceId)))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_TABLE),
+ errmsg("cannot create conflict log table \"%s.%s\" because a table with that name already exists",
+ get_namespace_name(namespaceId), conflictrel),
+ errhint("Use a different name for the conflict log table or drop the existing table.")));
+
+ initStringInfo(&querybuf);
+
+ /* build and execute the CREATE TABLE query. */
+ appendStringInfo(&querybuf,
+ "CREATE TABLE %s.%s ("
+ "relid Oid,"
+ "schemaname TEXT,"
+ "relname TEXT,"
+ "conflict_type TEXT,"
+ "local_xid xid,"
+ "remote_xid xid,"
+ "remote_commit_lsn pg_lsn,"
+ "local_commit_ts TIMESTAMPTZ,"
+ "remote_commit_ts TIMESTAMPTZ,"
+ "local_origin TEXT,"
+ "remote_origin TEXT,"
+ "key_tuple JSON,"
+ "local_tuple JSON,"
+ "remote_tuple JSON)",
+ quote_identifier(get_namespace_name(namespaceId)),
+ quote_identifier(conflictrel));
+
+ if (SPI_connect() != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect failed");
+
+ if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+ if (SPI_finish() != SPI_OK_FINISH)
+ elog(ERROR, "SPI_finish failed");
+
+ pfree(querybuf.data);
+}
+
+/*
+ * Drop the conflict log table.
+ *
+ * This function uses SPI to execute DROP TABLE IF EXISTS.
+ * We use IF EXISTS to avoid errors if the user manually dropped it first.
+ */
+static void
+drop_conflict_log_table(Oid namespaceId, char *conflictrel)
+{
+ StringInfoData querybuf;
+
+ initStringInfo(&querybuf);
+
+ /* Drop the conflict log table if it exists. */
+ appendStringInfo(&querybuf,
+ "DROP TABLE IF EXISTS %s.%s",
+ quote_identifier(get_namespace_name(namespaceId)),
+ quote_identifier(conflictrel));
+
+ if (SPI_connect() != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect failed");
+
+ if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+ if (SPI_finish() != SPI_OK_FINISH)
+ elog(ERROR, "SPI_finish failed");
+
+ pfree(querybuf.data);
+}
+
+/*
+ * Check if the specified relation is used as a conflict log table by any
+ * subscription.
+ */
+bool
+IsConflictLogRelid(Oid relid)
+{
+ Relation rel;
+ TableScanDesc scan;
+ HeapTuple tup;
+ bool is_clt = false;
+
+ rel = table_open(SubscriptionRelationId, AccessShareLock);
+ scan = table_beginscan_catalog(rel, 0, NULL);
+
+ while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+ {
+ Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
+ Oid nspid;
+ char *relname;
+
+ relname = get_subscription_conflict_log_table(subform->oid, &nspid);
+ if (relname && relid == get_relname_relid(relname, nspid))
+ {
+ is_clt = true;
+ break;
+ }
+ }
+
+ table_endscan(scan);
+ table_close(rel, AccessShareLock);
+
+ return is_clt;
+}
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 16695592265..16c103ecdd2 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,24 @@
#include "postgres.h"
#include "access/commit_ts.h"
+#include "access/heapam.h"
#include "access/tableam.h"
+#include "access/table.h"
+#include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_namespace_d.h"
+#include "catalog/pg_type.h"
#include "executor/executor.h"
+#include "executor/spi.h"
+#include "funcapi.h"
#include "pgstat.h"
#include "replication/conflict.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
static const char *const ConflictTypeNames[] = {
[CT_INSERT_EXISTS] = "insert_exists",
@@ -50,8 +61,26 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *localslot,
TupleTableSlot *remoteslot,
Oid indexoid);
+static void build_index_datums_from_slot(EState *estate, Relation localrel,
+ TupleTableSlot *slot,
+ Relation indexDesc, Datum *values,
+ bool *isnull);
static char *build_index_value_desc(EState *estate, Relation localrel,
TupleTableSlot *slot, Oid indexoid);
+static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot);
+static Datum tuple_table_slot_to_ri_json_datum(EState *estate,
+ Relation localrel,
+ Oid replica_index,
+ TupleTableSlot *slot);
+static void prepare_conflict_log_tuple(EState *estate, Relation rel,
+ Relation conflictlogrel,
+ TransactionId local_xid,
+ TimestampTz local_ts,
+ ConflictType conflict_type,
+ RepOriginId origin_id,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *localslot,
+ TupleTableSlot *remoteslot);
/*
* Get the xmin and commit timestamp data (origin and timestamp) associated
@@ -106,12 +135,14 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
TupleTableSlot *remoteslot, List *conflicttuples)
{
Relation localrel = relinfo->ri_RelationDesc;
+ Relation conflictlogrel = GetConflictLogTableRel();
StringInfoData err_detail;
initStringInfo(&err_detail);
/* Form errdetail message by combining conflicting tuples information. */
foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+ {
errdetail_apply_conflict(estate, relinfo, type, searchslot,
conflicttuple->slot, remoteslot,
conflicttuple->indexoid,
@@ -120,6 +151,30 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
conflicttuple->ts,
&err_detail);
+ /* Insert conflict details to conflict log table. */
+ if (conflictlogrel)
+ {
+ /*
+ * Prepare the conflict log tuple. If the error level is below
+ * ERROR, insert it immediately. Otherwise, defer the insertion to
+ * a new transaction after the current one aborts, ensuring the
+ * insertion of the log tuple is not rolled back.
+ */
+ prepare_conflict_log_tuple(estate,
+ relinfo->ri_RelationDesc,
+ conflictlogrel,
+ conflicttuple->xmin,
+ conflicttuple->ts, type,
+ conflicttuple->origin,
+ searchslot, conflicttuple->slot,
+ remoteslot);
+ if (elevel < ERROR)
+ InsertConflictLogTuple(conflictlogrel);
+
+ table_close(conflictlogrel, AccessExclusiveLock);
+ }
+ }
+
pgstat_report_subscription_conflict(MySubscription->oid, type);
ereport(elevel,
@@ -162,6 +217,69 @@ InitConflictIndexes(ResultRelInfo *relInfo)
relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
}
+/*
+ * GetConflictLogTableRel
+ *
+ * Get the information of the specific conflict log table defined in
+ * pg_subscription and opens the relation for insertion. The caller is
+ * responsible for closing the returned relation handle.
+ */
+Relation
+GetConflictLogTableRel(void)
+{
+ Oid nspid;
+ Oid conflictlogrelid;
+ Relation conflictlogrel = NULL;
+ char *conflictlogtable;
+
+ /* If conflict log table is not set for the subscription just return. */
+ conflictlogtable = get_subscription_conflict_log_table(
+ MyLogicalRepWorker->subid, &nspid);
+ if (conflictlogtable == NULL)
+ {
+ pfree(conflictlogtable);
+ return NULL;
+ }
+
+ conflictlogrelid = get_relname_relid(conflictlogtable, nspid);
+ if (OidIsValid(conflictlogrelid))
+ conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock);
+
+ /* Conflict log table is dropped or not accessible. */
+ if (conflictlogrel == NULL)
+ ereport(WARNING,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("conflict log table \"%s.%s\" does not exist",
+ get_namespace_name(nspid), conflictlogtable)));
+
+ pfree(conflictlogtable);
+
+ return conflictlogrel;
+}
+
+/*
+ * InsertConflictLogTuple
+ *
+ * Insert conflict log tuple into the conflict log table. It uses
+ * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding
+ * of the tuple inserted into the conflict log table.
+ */
+void
+InsertConflictLogTuple(Relation conflictlogrel)
+{
+ int options = HEAP_INSERT_NO_LOGICAL;
+
+ /* A valid tuple must be prepared and store into MyLogicalRepWorker. */
+ Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+
+ heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+ GetCurrentCommandId(true), options, NULL);
+
+ /* Free conflict log tuple. */
+ heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+ MyLogicalRepWorker->conflict_log_tuple = NULL;
+}
+
/*
* Add SQLSTATE error code to the current conflict report.
*/
@@ -472,6 +590,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
return tuple_value.data;
}
+/*
+ * Helper function to extract the "raw" index key Datums and their null flags
+ * from a TupleTableSlot, given an already open index descriptor.
+ * This is the reusable core logic.
+ */
+static void
+build_index_datums_from_slot(EState *estate, Relation localrel,
+ TupleTableSlot *slot,
+ Relation indexDesc, Datum *values,
+ bool *isnull)
+{
+ TupleTableSlot *tableslot = slot;
+
+ /*
+ * If the slot is a virtual slot, copy it into a heap tuple slot as
+ * FormIndexDatum only works with heap tuple slots.
+ */
+ if (TTS_IS_VIRTUAL(slot))
+ {
+ /* Slot is created within the EState's tuple table */
+ tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+ tableslot = ExecCopySlot(tableslot, slot);
+ }
+
+ /*
+ * Initialize ecxt_scantuple for potential use in FormIndexDatum
+ */
+ GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+ /* Form the index datums */
+ FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values,
+ isnull);
+}
+
/*
* Helper functions to construct a string describing the contents of an index
* entry. See BuildIndexValueDescription for details.
@@ -487,41 +639,203 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
Relation indexDesc;
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
- TupleTableSlot *tableslot = slot;
- if (!tableslot)
+ if (!slot)
return NULL;
Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
indexDesc = index_open(indexoid, NoLock);
- /*
- * If the slot is a virtual slot, copy it into a heap tuple slot as
- * FormIndexDatum only works with heap tuple slots.
- */
- if (TTS_IS_VIRTUAL(slot))
+ build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+ isnull);
+
+ index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+ index_close(indexDesc, NoLock);
+
+ return index_value;
+}
+
+/*
+ * tuple_table_slot_to_json_datum
+ *
+ * Helper function to convert a TupleTableSlot to Jsonb.
+ */
+static Datum
+tuple_table_slot_to_json_datum(TupleTableSlot *slot)
+{
+ HeapTuple tuple;
+ Datum datum;
+ Datum json;
+
+ Assert(slot != NULL);
+
+ tuple = ExecCopySlotHeapTuple(slot);
+ datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+
+ json = DirectFunctionCall1(row_to_json, datum);
+ heap_freetuple(tuple);
+
+ return json;
+}
+
+/*
+ * tuple_table_slot_to_ri_json_datum
+ *
+ * Fetch replica identity key from the tuple table slot and convert into a
+ * jsonb datum.
+ */
+static Datum
+tuple_table_slot_to_ri_json_datum(EState *estate, Relation localrel,
+ Oid replica_index, TupleTableSlot *slot)
+{
+ Relation indexDesc;
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ HeapTuple tuple;
+ TupleDesc tupdesc;
+ Datum datum;
+
+ Assert(slot != NULL);
+
+ Assert(CheckRelationOidLockedByMe(replica_index, RowExclusiveLock, true));
+
+ indexDesc = index_open(replica_index, NoLock);
+
+ build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+ isnull);
+ tupdesc = RelationGetDescr(indexDesc);
+
+ /* Bless the tupdesc so it can be looked up by row_to_json. */
+ BlessTupleDesc(tupdesc);
+
+ /* Form the replica identity tuple. */
+ tuple = heap_form_tuple(tupdesc, values, isnull);
+ datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+ index_close(indexDesc, NoLock);
+
+ /* Convert to a JSONB datum. */
+ return DirectFunctionCall1(row_to_json, datum);
+}
+
+/*
+ * prepare_conflict_log_tuple
+ *
+ * This routine prepares a tuple detailing a conflict encountered during
+ * logical replication. The prepared tuple will be stored in
+ * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * conflict log table by calling InsertConflictLogTuple.
+ */
+static void
+prepare_conflict_log_tuple(EState *estate, Relation rel,
+ Relation conflictlogrel,
+ TransactionId local_xid, TimestampTz local_ts,
+ ConflictType conflict_type, RepOriginId origin_id,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *localslot,
+ TupleTableSlot *remoteslot)
+{
+#define MAX_CONFLICT_ATTR_NUM 15
+ Datum values[MAX_CONFLICT_ATTR_NUM];
+ bool nulls[MAX_CONFLICT_ATTR_NUM];
+ int attno;
+ char *origin = NULL;
+ char *remote_origin = NULL;
+ HeapTuple tup;
+ MemoryContext oldctx;
+
+ Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+
+ /* Initialize values and nulls arrays. */
+ memset(values, 0, sizeof(Datum) * MAX_CONFLICT_ATTR_NUM);
+ memset(nulls, 0, sizeof(bool) * MAX_CONFLICT_ATTR_NUM);
+
+ /* Populate the values and nulls arrays. */
+ attno = 0;
+ values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel));
+
+ values[attno++] =
+ CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+
+ values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel));
+
+ values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+
+ if (TransactionIdIsValid(local_xid))
+ values[attno++] = TransactionIdGetDatum(local_xid);
+ else
+ nulls[attno++] = true;
+
+ if (TransactionIdIsValid(remote_xid))
+ values[attno++] = TransactionIdGetDatum(remote_xid);
+ else
+ nulls[attno++] = true;
+
+ values[attno++] = LSNGetDatum(remote_final_lsn);
+
+ if (local_ts > 0)
+ values[attno++] = TimestampTzGetDatum(local_ts);
+ else
+ nulls[attno++] = true;
+
+ if (remote_commit_ts > 0)
+ values[attno++] = TimestampTzGetDatum(remote_commit_ts);
+ else
+ nulls[attno++] = true;
+
+ if (origin_id != InvalidRepOriginId)
+ replorigin_by_oid(origin_id, true, &origin);
+
+ if (origin != NULL)
+ values[attno++] = CStringGetTextDatum(origin);
+ else
+ nulls[attno++] = true;
+
+ if (replorigin_session_origin != InvalidRepOriginId)
+ replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+ if (remote_origin != NULL)
+ values[attno++] = CStringGetTextDatum(remote_origin);
+ else
+ nulls[attno++] = true;
+
+ if (!TupIsNull(searchslot))
{
- tableslot = table_slot_create(localrel, &estate->es_tupleTable);
- tableslot = ExecCopySlot(tableslot, slot);
+ Oid replica_index = GetRelationIdentityOrPK(rel);
+
+ /*
+ * If the table has a valid replica identity index, build the index
+ * json datum from key value. Otherwise, construct it from the complete
+ * tuple in REPLICA IDENTITY FULL cases.
+ */
+ if (OidIsValid(replica_index))
+ values[attno++] = tuple_table_slot_to_ri_json_datum(estate, rel,
+ replica_index,
+ searchslot);
+ else
+ values[attno++] = tuple_table_slot_to_json_datum(searchslot);
}
+ else
+ nulls[attno++] = true;
- /*
- * Initialize ecxt_scantuple for potential use in FormIndexDatum when
- * index expressions are present.
- */
- GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+ if (!TupIsNull(localslot))
+ values[attno++] = tuple_table_slot_to_json_datum(localslot);
+ else
+ nulls[attno++] = true;
- /*
- * The values/nulls arrays passed to BuildIndexValueDescription should be
- * the results of FormIndexDatum, which are the "raw" input to the index
- * AM.
- */
- FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+ if (!TupIsNull(remoteslot))
+ values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
+ else
+ nulls[attno++] = true;
- index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+ Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
- index_close(indexDesc, NoLock);
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ tup = heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+ MemoryContextSwitchTo(oldctx);
- return index_value;
+ /* Store conflict_log_tuple into the worker slot for inserting it later. */
+ MyLogicalRepWorker->conflict_log_tuple = tup;
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index fdf1ccad462..2364146ca36 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -477,6 +477,7 @@ retry:
worker->oldest_nonremovable_xid = retain_dead_tuples
? MyReplicationSlot->data.xmin
: InvalidTransactionId;
+ worker->conflict_log_tuple = NULL;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 93970c6af29..fee757ee931 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -482,7 +482,9 @@ static bool MySubscriptionValid = false;
static List *on_commit_wakeup_workers_subids = NIL;
bool in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId remote_xid = InvalidTransactionId;
+TimestampTz remote_commit_ts = 0;
/* fields valid only when processing streamed transaction */
static bool in_streamed_transaction = false;
@@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s)
set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
+ remote_commit_ts = begin_data.committime;
+ remote_xid = begin_data.xid;
maybe_start_skipping_changes(begin_data.final_lsn);
@@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s)
/* extract XID of the top-level transaction */
stream_xid = logicalrep_read_stream_start(s, &first_segment);
+ remote_xid = stream_xid;
+ remote_final_lsn = InvalidXLogRecPtr;
+ remote_commit_ts = 0;
+
if (!TransactionIdIsValid(stream_xid))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -5609,6 +5617,26 @@ start_apply(XLogRecPtr origin_startpos)
pgstat_report_subscription_error(MySubscription->oid,
MyLogicalRepWorker->type);
+ /*
+ * Insert any pending conflict log tuple under a new transaction.
+ */
+ if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+ {
+ Relation conflictlogrel;
+
+ StartTransactionCommand();
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /* Open conflict log table. */
+ conflictlogrel = GetConflictLogTableRel();
+ InsertConflictLogTuple(conflictlogrel);
+ MyLogicalRepWorker->conflict_log_tuple = NULL;
+ table_close(conflictlogrel, AccessExclusiveLock);
+
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+ }
+
PG_RE_THROW();
}
}
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index fa7cd7e06a7..12db6676406 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -3881,3 +3881,41 @@ get_subscription_name(Oid subid, bool missing_ok)
return subname;
}
+
+/*
+ * get_subscription_conflict_log_table
+ *
+ * Get conflict log table name and namespace id from subscription.
+ */
+char *
+get_subscription_conflict_log_table(Oid subid, Oid *nspid)
+{
+ HeapTuple tup;
+ Datum datum;
+ bool isnull;
+ char *relname = NULL;
+ Form_pg_subscription subform;
+
+ *nspid = InvalidOid;
+
+ tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+ if (!HeapTupleIsValid(tup))
+ return NULL;
+
+ subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+ /* Get conflict log table name. */
+ datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subconflictlogtable,
+ &isnull);
+ if (!isnull)
+ {
+ *nspid = subform->subconflictlognspid;
+ relname = pstrdup(TextDatumGetCString(datum));
+ }
+
+ ReleaseSysCache(tup);
+ return relname;
+}
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 36f24502842..c18ec248e5d 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6806,7 +6806,7 @@ describeSubscriptions(const char *pattern, bool verbose)
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
false, false, false, false, false, false, false, false, false, false,
- false, false, false, false};
+ false, false, false, false, false};
if (pset.sversion < 100000)
{
@@ -6900,6 +6900,12 @@ describeSubscriptions(const char *pattern, bool verbose)
appendPQExpBuffer(&buf,
", subskiplsn AS \"%s\"\n",
gettext_noop("Skip LSN"));
+
+ /* Conflict log table is only supported in v19 and higher */
+ if (pset.sversion >= 190000)
+ appendPQExpBuffer(&buf,
+ ", subconflictlogtable AS \"%s\"\n",
+ gettext_noop("Conflict log table"));
}
/* Only display subscriptions in current database. */
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 51806597037..4057c0a22b4 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2344,8 +2344,8 @@ match_previous_words(int pattern_id,
COMPLETE_WITH("(", "PUBLICATION");
/* ALTER SUBSCRIPTION <name> SET ( */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "("))
- COMPLETE_WITH("binary", "disable_on_error", "failover",
- "max_retention_duration", "origin",
+ COMPLETE_WITH("binary", "conflict_log_table", "disable_on_error",
+ "failover", "max_retention_duration", "origin",
"password_required", "retain_dead_tuples",
"run_as_owner", "slot_name", "streaming",
"synchronous_commit", "two_phase");
@@ -3814,8 +3814,8 @@ match_previous_words(int pattern_id,
COMPLETE_WITH("WITH (");
/* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */
else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "("))
- COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
- "disable_on_error", "enabled", "failover",
+ COMPLETE_WITH("binary", "conflict_log_table", "connect", "copy_data",
+ "create_slot", "disable_on_error", "enabled", "failover",
"max_retention_duration", "origin",
"password_required", "retain_dead_tuples",
"run_as_owner", "slot_name", "streaming",
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 55cb9b1eefa..f4526c15ec3 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -80,6 +80,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subretaindeadtuples; /* True if dead tuples useful for
* conflict detection are retained */
+ Oid subconflictlognspid; /* Namespace Oid in which the conflict
+ * log table is created. */
int32 submaxretention; /* The maximum duration (in milliseconds)
* for which information useful for
@@ -105,6 +107,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
/* Only publish data originating from the specified origin */
text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
+
+ /* Conflict log table name if specified */
+ text subconflictlogtable;
#endif
} FormData_pg_subscription;
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index fb4e26a51a4..b5e9cbf8bfe 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -36,4 +36,6 @@ extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
bool retention_active,
bool max_retention_set);
+extern bool IsConflictLogRelid(Oid relid);
+
#endif /* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index c8fbf9e51b8..ae752015281 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -9,9 +9,11 @@
#ifndef CONFLICT_H
#define CONFLICT_H
+#include "access/htup.h"
#include "access/xlogdefs.h"
#include "nodes/pg_list.h"
#include "utils/timestamp.h"
+#include "utils/relcache.h"
/* Avoid including execnodes.h here */
typedef struct EState EState;
@@ -89,4 +91,6 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
List *conflicttuples);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern Relation GetConflictLogTableRel(void);
+extern void InsertConflictLogTuple(Relation conflictlogrel);
#endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f081619f151..0d07f5efe47 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -101,6 +101,9 @@ typedef struct LogicalRepWorker
*/
TransactionId oldest_nonremovable_xid;
+ /* A conflict log tuple which is prepared but not yet inserted. */
+ HeapTuple conflict_log_tuple;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
extern PGDLLIMPORT List *table_states_not_ready;
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId remote_xid;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
Oid subid, Oid relid,
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 50fb149e9ac..3bebf04bf51 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -210,6 +210,7 @@ extern Oid get_publication_oid(const char *pubname, bool missing_ok);
extern char *get_publication_name(Oid pubid, bool missing_ok);
extern Oid get_subscription_oid(const char *subname, bool missing_ok);
extern char *get_subscription_name(Oid subid, bool missing_ok);
+extern char *get_subscription_conflict_log_table(Oid subid, Oid *nspid);
#define type_is_array(typid) (get_element_type(typid) != InvalidOid)
/* type_is_array_domain accepts both plain arrays and domains over arrays */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 327d1e7731f..f71ff58424e 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
-- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 |
(1 row)
-- ok - with lsn = NONE
@@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 |
(1 row)
BEGIN;
@@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 |
(1 row)
-- rename back to keep the rest simple
@@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
-- fail - publication already exists
@@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
-- fail - publication used more than once
@@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
-- we can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -433,10 +433,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -450,19 +450,19 @@ NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabl
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
-- ok
ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -517,7 +517,150 @@ COMMIT;
-- ok, owning it is enough for this stuff
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+--
+-- CONFLICT LOG TABLE TESTS
+--
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+-- fail - conflict_log_table specified when table already exists
+CREATE TABLE public.regress_conflict_log_temp (id int);
+CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log_temp');
+ERROR: cannot create conflict log table "public.regress_conflict_log_temp" because a table with that name already exists
+HINT: Use a different name for the conflict log table or drop the existing table.
+DROP TABLE public.regress_conflict_log_temp;
+-- ok - conflict_log_table creation with CREATE SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+-- check metadata in pg_subscription
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+ subname | subconflictlogtable | is_public_schema
+------------------------+-----------------------+------------------
+ regress_conflict_test1 | regress_conflict_log1 | t
+(1 row)
+
+-- check if the table exists and has the correct schema (15 columns)
+SELECT count(*) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attnum > 0;
+ count
+-------
+ 14
+(1 row)
+
+-- check a specific column type (e.g., key_tuple should be JSON)
+SELECT format_type(atttypid, atttypmod) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attname = 'key_tuple';
+ format_type
+-------------
+ json
+(1 row)
+
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+------------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-----------------------
+ regress_conflict_test1 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | regress_conflict_log1
+(1 row)
+
+-- ok - adding conflict_log_table with ALTER SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log2');
+-- check metadata after ALTER
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+ subname | subconflictlogtable | is_public_schema
+------------------------+-----------------------+------------------
+ regress_conflict_test2 | regress_conflict_log2 | t
+(1 row)
+
+-- ok - change the conflict log table name for an existing subscription that already had one
+CREATE SCHEMA clt;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3');
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+ subname | subconflictlogtable | is_public_schema
+------------------------+-----------------------+------------------
+ regress_conflict_test2 | regress_conflict_log3 | f
+(1 row)
+
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table
+------------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-----------------------
+ regress_conflict_test1 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | regress_conflict_log1
+ regress_conflict_test2 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | regress_conflict_log3
+(2 rows)
+
+-- check the new table was created and the old table was dropped
+SELECT count(*) FROM pg_class WHERE relname = 'regress_conflict_log2';
+ count
+-------
+ 0
+(1 row)
+
+SELECT count(*) FROM pg_attribute WHERE attrelid = 'clt.regress_conflict_log3'::regclass AND attnum > 0;
+ count
+-------
+ 14
+(1 row)
+
+-- ok (NOTICE) - set conflict_log_table to one already used by this subscription
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3');
+NOTICE: "clt.regress_conflict_log3" is already in use as the conflict log table for this subscription
+-- fail - try to publish the conflict_log_table
+CREATE PUBLICATION pub FOR TABLE clt.regress_conflict_log3;
+ERROR: cannot add relation "regress_conflict_log3" to publication
+DETAIL: This operation is not supported for conflict log tables.
+-- suppress warning that depends on wal_level
+SET client_min_messages = 'ERROR';
+-- ok - conflict_log_table should not be published with ALL TABLE
+CREATE PUBLICATION pub FOR TABLES IN SCHEMA clt;
+SELECT * FROM pg_publication_tables WHERE pubname = 'pub';
+ pubname | schemaname | tablename | attnames | rowfilter
+---------+------------+-----------+----------+-----------
+(0 rows)
+
+\dt+ clt.regress_conflict_log3
+ List of tables
+ Schema | Name | Type | Owner | Persistence | Size | Description
+--------+-----------------------+-------+---------------------------+-------------+------------+-------------
+ clt | regress_conflict_log3 | table | regress_subscription_user | permanent | 8192 bytes |
+(1 row)
+
+DROP PUBLICATION pub;
+-- fail - set conflict_log_table to one already used by a different subscription
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1');
+ERROR: cannot create conflict log table "public.regress_conflict_log1" because a table with that name already exists
+HINT: Use a different name for the conflict log table or drop the existing table.
+-- ok - dropping subscription also drops the log table
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+-- should return NULL, meaning the table was dropped
+SELECT to_regclass('public.regress_conflict_log1');
+ to_regclass
+-------------
+
+(1 row)
+
+-- ok - dropping subscription when the log table was manually dropped first
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+DROP TABLE public.regress_conflict_log1;
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+-- should return NULL, meaning the subscription was dropped successfully
+SELECT subname FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+ subname
+---------
+(0 rows)
+
+-- Clean up remaining test subscription
+ALTER SUBSCRIPTION regress_conflict_test2 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test2;
RESET SESSION AUTHORIZATION;
+DROP SCHEMA clt;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
DROP ROLE regress_subscription_user3;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index ef0c298d2df..49bfb683c57 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -365,7 +365,94 @@ COMMIT;
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+--
+-- CONFLICT LOG TABLE TESTS
+--
+
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+
+-- fail - conflict_log_table specified when table already exists
+CREATE TABLE public.regress_conflict_log_temp (id int);
+CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log_temp');
+DROP TABLE public.regress_conflict_log_temp;
+
+-- ok - conflict_log_table creation with CREATE SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+
+-- check metadata in pg_subscription
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+
+-- check if the table exists and has the correct schema (15 columns)
+SELECT count(*) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attnum > 0;
+
+-- check a specific column type (e.g., key_tuple should be JSON)
+SELECT format_type(atttypid, atttypmod) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attname = 'key_tuple';
+
+\dRs+
+
+-- ok - adding conflict_log_table with ALTER SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log2');
+
+-- check metadata after ALTER
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- ok - change the conflict log table name for an existing subscription that already had one
+CREATE SCHEMA clt;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3');
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+\dRs+
+
+-- check the new table was created and the old table was dropped
+SELECT count(*) FROM pg_class WHERE relname = 'regress_conflict_log2';
+SELECT count(*) FROM pg_attribute WHERE attrelid = 'clt.regress_conflict_log3'::regclass AND attnum > 0;
+
+-- ok (NOTICE) - set conflict_log_table to one already used by this subscription
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3');
+
+-- fail - try to publish the conflict_log_table
+CREATE PUBLICATION pub FOR TABLE clt.regress_conflict_log3;
+
+-- suppress warning that depends on wal_level
+SET client_min_messages = 'ERROR';
+
+-- ok - conflict_log_table should not be published with ALL TABLE
+CREATE PUBLICATION pub FOR TABLES IN SCHEMA clt;
+SELECT * FROM pg_publication_tables WHERE pubname = 'pub';
+\dt+ clt.regress_conflict_log3
+DROP PUBLICATION pub;
+
+-- fail - set conflict_log_table to one already used by a different subscription
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1');
+
+-- ok - dropping subscription also drops the log table
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+
+-- should return NULL, meaning the table was dropped
+SELECT to_regclass('public.regress_conflict_log1');
+
+-- ok - dropping subscription when the log table was manually dropped first
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+DROP TABLE public.regress_conflict_log1;
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+
+-- should return NULL, meaning the subscription was dropped successfully
+SELECT subname FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+
+-- Clean up remaining test subscription
+ALTER SUBSCRIPTION regress_conflict_test2 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test2;
+
RESET SESSION AUTHORIZATION;
+DROP SCHEMA clt;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
DROP ROLE regress_subscription_user3;
--
2.49.0