v1-0001-Release-replication-slot-on-error-in-slot-SQL-functions.patch
application/octet-stream
Filename: v1-0001-Release-replication-slot-on-error-in-slot-SQL-functions.patch
Type: application/octet-stream
Part: 0
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 16fbd38373..3ba84a16ea 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -148,38 +148,47 @@ create_logical_replication_slot(char *name, char *plugin,
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
false, failover, false);
- /*
- * Ensure the logical decoding is enabled before initializing the logical
- * decoding context.
- */
- EnsureLogicalDecodingEnabled();
- Assert(IsLogicalDecodingEnabled());
+ PG_TRY();
+ {
+ /*
+ * Ensure the logical decoding is enabled before initializing the logical
+ * decoding context.
+ */
+ EnsureLogicalDecodingEnabled();
+ Assert(IsLogicalDecodingEnabled());
- /*
- * Create logical decoding context to find start point or, if we don't
- * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
- *
- * Note: when !find_startpoint this is still important, because it's at
- * this point that the output plugin is validated.
- */
- ctx = CreateInitDecodingContext(plugin, NIL,
- false, /* just catalogs is OK */
- false, /* not repack */
- restart_lsn,
- XL_ROUTINE(.page_read = read_local_xlog_page,
- .segment_open = wal_segment_open,
- .segment_close = wal_segment_close),
- NULL, NULL, NULL);
+ /*
+ * Create logical decoding context to find start point or, if we don't
+ * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
+ *
+ * Note: when !find_startpoint this is still important, because it's at
+ * this point that the output plugin is validated.
+ */
+ ctx = CreateInitDecodingContext(plugin, NIL,
+ false, /* just catalogs is OK */
+ false, /* not repack */
+ restart_lsn,
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
+ NULL, NULL, NULL);
- /*
- * If caller needs us to determine the decoding start point, do so now.
- * This might take a while.
- */
- if (find_startpoint)
- DecodingContextFindStartpoint(ctx);
+ /*
+ * If caller needs us to determine the decoding start point, do so now.
+ * This might take a while.
+ */
+ if (find_startpoint)
+ DecodingContextFindStartpoint(ctx);
- /* don't need the decoding context anymore */
- FreeDecodingContext(ctx);
+ /* don't need the decoding context anymore */
+ FreeDecodingContext(ctx);
+ }
+ PG_CATCH();
+ {
+ ReplicationSlotRelease();
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
}
/*
@@ -569,46 +578,55 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
/* Acquire the slot so we "own" it */
ReplicationSlotAcquire(NameStr(*slotname), true, true);
- /* A slot whose restart_lsn has never been reserved cannot be advanced */
- if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slot \"%s\" cannot be advanced",
- NameStr(*slotname)),
- errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
+ PG_TRY();
+ {
+ /* A slot whose restart_lsn has never been reserved cannot be advanced */
+ if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" cannot be advanced",
+ NameStr(*slotname)),
+ errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
- /*
- * Check if the slot is not moving backwards. Physical slots rely simply
- * on restart_lsn as a minimum point, while logical slots have confirmed
- * consumption up to confirmed_flush, meaning that in both cases data
- * older than that is not available anymore.
- */
- if (OidIsValid(MyReplicationSlot->data.database))
- minlsn = MyReplicationSlot->data.confirmed_flush;
- else
- minlsn = MyReplicationSlot->data.restart_lsn;
+ /*
+ * Check if the slot is not moving backwards. Physical slots rely simply
+ * on restart_lsn as a minimum point, while logical slots have confirmed
+ * consumption up to confirmed_flush, meaning that in both cases data
+ * older than that is not available anymore.
+ */
+ if (OidIsValid(MyReplicationSlot->data.database))
+ minlsn = MyReplicationSlot->data.confirmed_flush;
+ else
+ minlsn = MyReplicationSlot->data.restart_lsn;
- if (moveto < minlsn)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
- LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
+ if (moveto < minlsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
+ LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
- /* Do the actual slot update, depending on the slot type */
- if (OidIsValid(MyReplicationSlot->data.database))
- endlsn = pg_logical_replication_slot_advance(moveto);
- else
- endlsn = pg_physical_replication_slot_advance(moveto);
+ /* Do the actual slot update, depending on the slot type */
+ if (OidIsValid(MyReplicationSlot->data.database))
+ endlsn = pg_logical_replication_slot_advance(moveto);
+ else
+ endlsn = pg_physical_replication_slot_advance(moveto);
- values[0] = NameGetDatum(&MyReplicationSlot->data.name);
- nulls[0] = false;
+ values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+ nulls[0] = false;
- /*
- * Recompute the minimum LSN and xmin across all slots to adjust with the
- * advancing potentially done.
- */
- ReplicationSlotsComputeRequiredXmin(false);
- ReplicationSlotsComputeRequiredLSN();
+ /*
+ * Recompute the minimum LSN and xmin across all slots to adjust with the
+ * advancing potentially done.
+ */
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+ }
+ PG_CATCH();
+ {
+ ReplicationSlotRelease();
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
ReplicationSlotRelease();
@@ -763,7 +781,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
/*
* Update the destination slot to current values of the source slot;
* recheck that the source slot is still the one we saw previously.
+ *
+ * Use PG_TRY to ensure we release (and, for ephemeral slots, drop)
+ * the destination slot if any validation error occurs. Without this,
+ * an error caught by a PL/pgSQL EXCEPTION handler would leave
+ * MyReplicationSlot set, crashing on the next slot operation.
*/
+ PG_TRY();
{
TransactionId copy_effective_xmin;
TransactionId copy_effective_catalog_xmin;
@@ -797,9 +821,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
* or the restart_lsn either is invalid or has gone backward. (The
* restart_lsn could go backwards if the source slot is dropped and
* copied from an older slot during installation.)
- *
- * Since erroring out will release and drop the destination slot we
- * don't need to release it here.
*/
if (copy_restart_lsn < src_restart_lsn ||
src_islogical != copy_islogical ||
@@ -857,6 +878,12 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
}
#endif
}
+ PG_CATCH();
+ {
+ ReplicationSlotRelease();
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
/* target slot fully created, mark as persistent if needed */
if (logical_slot && !temporary)