vnisha_005-top-up-patch-for-error-message-improvment.patch
application/octet-stream
Filename: vnisha_005-top-up-patch-for-error-message-improvment.patch
Type: application/octet-stream
Part: 0
Message:
Re: Logical Replication of sequences
From 3dd7e00cb707f97be5095b6056314e1f06fb49f7 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Thu, 12 Jun 2025 15:30:00 +0530
Subject: [PATCH v20250612 6/7] top up patch for error message improvment and
list sort
---
.../replication/logical/sequencesync.c | 82 +++++++++++++++----
1 file changed, 67 insertions(+), 15 deletions(-)
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index f402f9aca2f..6fa5b7ebcc8 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -184,27 +184,69 @@ ProcessSyncingSequencesForApply(void)
static void
report_error_sequences(StringInfo missing_seqs, StringInfo mismatched_seqs)
{
- StringInfo combined_error_msg = makeStringInfo();
-
- appendStringInfo(combined_error_msg, "logical replication sequence synchronization failed for subscription \"%s\":",
- MySubscription->name);
+ StringInfo combined_error_detail = makeStringInfo();
+ StringInfo combined_error_hint = makeStringInfo();
if (missing_seqs->len)
- appendStringInfo(combined_error_msg, " sequences (%s) are missing on the publisher.",
- missing_seqs->data);
+ {
+ appendStringInfo(combined_error_detail, "Sequences (%s) are missing on the publisher.",
+ missing_seqs->data);
+ appendStringInfoString(combined_error_hint, "Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION or use ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES.");
+ }
if (mismatched_seqs->len)
{
/* Add a separator if both types of errors exist */
if (missing_seqs->len)
- appendStringInfoString(combined_error_msg, " Additionally,");
-
- appendStringInfo(combined_error_msg, " parameters differ for the remote and local sequences (%s)",
- mismatched_seqs->data);
+ {
+ appendStringInfo(combined_error_detail, " Additionally, parameters differ for the remote and local sequences (%s).",
+ mismatched_seqs->data);
+ appendStringInfoString(combined_error_hint," Alter or re-create local sequences to have the same parameters as the remote sequences");
+ }
+ else
+ {
+ appendStringInfo(combined_error_detail, "Parameters differ for the remote and local sequences (%s).",
+ mismatched_seqs->data);
+ appendStringInfoString(combined_error_hint, "Alter or re-create local sequences to have the same parameters as the remote sequences.");
+ }
+
}
ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("%s", combined_error_msg->data));
+ errmsg("logical replication sequence synchronization failed for subscription \"%s\"", MySubscription->name),
+ errdetail("%s", combined_error_detail->data),
+ errhint("%s", combined_error_hint->data));
+}
+
+/*
+ * sequence_comparator
+ *
+ * Comparator function for sorting LogicalRepSequenceInfo objects in a list.
+ * It compares sequences first by namespace name and then by sequence name.
+ */
+static int
+sequence_comparator(const ListCell *s1, const ListCell *s2)
+{
+ int cmp;
+ LogicalRepSequenceInfo *seqinfo1 = (LogicalRepSequenceInfo *)(s1->ptr_value);
+ LogicalRepSequenceInfo *seqinfo2 = (LogicalRepSequenceInfo *)(s2->ptr_value);
+
+ /* Compare by namespace name first */
+ if (seqinfo1->nspname == NULL && seqinfo2->nspname == NULL)
+ return 0;
+
+ if (seqinfo1->nspname == NULL)
+ return -1;
+
+ if (seqinfo2->nspname == NULL)
+ return 1;
+
+ cmp = strcmp(seqinfo1->nspname, seqinfo2->nspname);
+ if (cmp != 0)
+ return cmp;
+
+ /* If namespace names are equal, compare by sequence name */
+ return strcmp(seqinfo1->seqname, seqinfo2->seqname);
}
/*
@@ -218,6 +260,7 @@ copy_sequences(WalReceiverConn *conn, List *remotesequences, Oid subid)
{
int total_seqs = list_length(remotesequences);
int current_index = 0;
+ int search_pos = 0;
StringInfo mismatched_seqs = makeStringInfo();
StringInfo missing_seqs = makeStringInfo();
@@ -225,6 +268,9 @@ copy_sequences(WalReceiverConn *conn, List *remotesequences, Oid subid)
errmsg("logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
MySubscription->name, total_seqs));
+ /* Sort the list of sequences to optimize the search */
+ list_sort(remotesequences, sequence_comparator);
+
/*
* We batch synchronize multiple sequences per transaction, because the
* alternative of synchronizing each sequence individually incurs overhead
@@ -274,7 +320,8 @@ copy_sequences(WalReceiverConn *conn, List *remotesequences, Oid subid)
"JOIN LATERAL pg_sequence_state(s.schname, s.seqname) ps ON true\n"
"JOIN pg_namespace n ON n.nspname = s.schname\n"
"JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
- "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n",
+ "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
+ "ORDER BY s.schname, s.seqname\n",
seqstr->data);
res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
@@ -349,13 +396,18 @@ copy_sequences(WalReceiverConn *conn, List *remotesequences, Oid subid)
Assert(col == REMOTE_SEQ_COL_COUNT);
/* Retrieve the sequence object fetched from the publisher */
- for (int i = 0; i < batch_size; i++)
+ while (search_pos < total_seqs)
{
- LogicalRepSequenceInfo *sequence_info = lfirst(list_nth_cell(remotesequences, current_index + i));
+ LogicalRepSequenceInfo *sequence_info = lfirst(list_nth_cell(remotesequences, search_pos));
if (!strcmp(sequence_info->nspname, nspname) &&
!strcmp(sequence_info->seqname, seqname))
- seqinfo = sequence_info;
+ {
+ seqinfo = sequence_info;
+ search_pos++;
+ break;
+ }
+ search_pos++;
}
Assert(seqinfo);
--
2.34.1