From 3dd7e00cb707f97be5095b6056314e1f06fb49f7 Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Thu, 12 Jun 2025 15:30:00 +0530 Subject: [PATCH v20250612 6/7] top up patch for error message improvment and list sort --- .../replication/logical/sequencesync.c | 82 +++++++++++++++---- 1 file changed, 67 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index f402f9aca2f..6fa5b7ebcc8 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -184,27 +184,69 @@ ProcessSyncingSequencesForApply(void) static void report_error_sequences(StringInfo missing_seqs, StringInfo mismatched_seqs) { - StringInfo combined_error_msg = makeStringInfo(); - - appendStringInfo(combined_error_msg, "logical replication sequence synchronization failed for subscription \"%s\":", - MySubscription->name); + StringInfo combined_error_detail = makeStringInfo(); + StringInfo combined_error_hint = makeStringInfo(); if (missing_seqs->len) - appendStringInfo(combined_error_msg, " sequences (%s) are missing on the publisher.", - missing_seqs->data); + { + appendStringInfo(combined_error_detail, "Sequences (%s) are missing on the publisher.", + missing_seqs->data); + appendStringInfoString(combined_error_hint, "Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION or use ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES."); + } if (mismatched_seqs->len) { /* Add a separator if both types of errors exist */ if (missing_seqs->len) - appendStringInfoString(combined_error_msg, " Additionally,"); - - appendStringInfo(combined_error_msg, " parameters differ for the remote and local sequences (%s)", - mismatched_seqs->data); + { + appendStringInfo(combined_error_detail, " Additionally, parameters differ for the remote and local sequences (%s).", + mismatched_seqs->data); + appendStringInfoString(combined_error_hint," Alter or re-create local sequences to have the same parameters as the remote sequences"); + } + else + { + appendStringInfo(combined_error_detail, "Parameters differ for the remote and local sequences (%s).", + mismatched_seqs->data); + appendStringInfoString(combined_error_hint, "Alter or re-create local sequences to have the same parameters as the remote sequences."); + } + } ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("%s", combined_error_msg->data)); + errmsg("logical replication sequence synchronization failed for subscription \"%s\"", MySubscription->name), + errdetail("%s", combined_error_detail->data), + errhint("%s", combined_error_hint->data)); +} + +/* + * sequence_comparator + * + * Comparator function for sorting LogicalRepSequenceInfo objects in a list. + * It compares sequences first by namespace name and then by sequence name. + */ +static int +sequence_comparator(const ListCell *s1, const ListCell *s2) +{ + int cmp; + LogicalRepSequenceInfo *seqinfo1 = (LogicalRepSequenceInfo *)(s1->ptr_value); + LogicalRepSequenceInfo *seqinfo2 = (LogicalRepSequenceInfo *)(s2->ptr_value); + + /* Compare by namespace name first */ + if (seqinfo1->nspname == NULL && seqinfo2->nspname == NULL) + return 0; + + if (seqinfo1->nspname == NULL) + return -1; + + if (seqinfo2->nspname == NULL) + return 1; + + cmp = strcmp(seqinfo1->nspname, seqinfo2->nspname); + if (cmp != 0) + return cmp; + + /* If namespace names are equal, compare by sequence name */ + return strcmp(seqinfo1->seqname, seqinfo2->seqname); } /* @@ -218,6 +260,7 @@ copy_sequences(WalReceiverConn *conn, List *remotesequences, Oid subid) { int total_seqs = list_length(remotesequences); int current_index = 0; + int search_pos = 0; StringInfo mismatched_seqs = makeStringInfo(); StringInfo missing_seqs = makeStringInfo(); @@ -225,6 +268,9 @@ copy_sequences(WalReceiverConn *conn, List *remotesequences, Oid subid) errmsg("logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d", MySubscription->name, total_seqs)); + /* Sort the list of sequences to optimize the search */ + list_sort(remotesequences, sequence_comparator); + /* * We batch synchronize multiple sequences per transaction, because the * alternative of synchronizing each sequence individually incurs overhead @@ -274,7 +320,8 @@ copy_sequences(WalReceiverConn *conn, List *remotesequences, Oid subid) "JOIN LATERAL pg_sequence_state(s.schname, s.seqname) ps ON true\n" "JOIN pg_namespace n ON n.nspname = s.schname\n" "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n" - "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n", + "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n" + "ORDER BY s.schname, s.seqname\n", seqstr->data); res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow); @@ -349,13 +396,18 @@ copy_sequences(WalReceiverConn *conn, List *remotesequences, Oid subid) Assert(col == REMOTE_SEQ_COL_COUNT); /* Retrieve the sequence object fetched from the publisher */ - for (int i = 0; i < batch_size; i++) + while (search_pos < total_seqs) { - LogicalRepSequenceInfo *sequence_info = lfirst(list_nth_cell(remotesequences, current_index + i)); + LogicalRepSequenceInfo *sequence_info = lfirst(list_nth_cell(remotesequences, search_pos)); if (!strcmp(sequence_info->nspname, nspname) && !strcmp(sequence_info->seqname, seqname)) - seqinfo = sequence_info; + { + seqinfo = sequence_info; + search_pos++; + break; + } + search_pos++; } Assert(seqinfo); -- 2.34.1