v1-0001-Release-replication-slot-on-error-in-slot-SQL-functions.patch

application/octet-stream

Filename: v1-0001-Release-replication-slot-on-error-in-slot-SQL-functions.patch
Type: application/octet-stream
Part: 0
Message: [PATCH] Release replication slot on error in SQL-callable slot functions
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 16fbd38373..3ba84a16ea 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -148,38 +148,47 @@ create_logical_replication_slot(char *name, char *plugin,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
 						  false, failover, false);
 
-	/*
-	 * Ensure the logical decoding is enabled before initializing the logical
-	 * decoding context.
-	 */
-	EnsureLogicalDecodingEnabled();
-	Assert(IsLogicalDecodingEnabled());
+	PG_TRY();
+	{
+		/*
+		 * Ensure the logical decoding is enabled before initializing the logical
+		 * decoding context.
+		 */
+		EnsureLogicalDecodingEnabled();
+		Assert(IsLogicalDecodingEnabled());
 
-	/*
-	 * Create logical decoding context to find start point or, if we don't
-	 * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
-	 *
-	 * Note: when !find_startpoint this is still important, because it's at
-	 * this point that the output plugin is validated.
-	 */
-	ctx = CreateInitDecodingContext(plugin, NIL,
-									false,	/* just catalogs is OK */
-									false,	/* not repack */
-									restart_lsn,
-									XL_ROUTINE(.page_read = read_local_xlog_page,
-											   .segment_open = wal_segment_open,
-											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
+		/*
+		 * Create logical decoding context to find start point or, if we don't
+		 * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
+		 *
+		 * Note: when !find_startpoint this is still important, because it's at
+		 * this point that the output plugin is validated.
+		 */
+		ctx = CreateInitDecodingContext(plugin, NIL,
+										false,	/* just catalogs is OK */
+										false,	/* not repack */
+										restart_lsn,
+										XL_ROUTINE(.page_read = read_local_xlog_page,
+												   .segment_open = wal_segment_open,
+												   .segment_close = wal_segment_close),
+										NULL, NULL, NULL);
 
-	/*
-	 * If caller needs us to determine the decoding start point, do so now.
-	 * This might take a while.
-	 */
-	if (find_startpoint)
-		DecodingContextFindStartpoint(ctx);
+		/*
+		 * If caller needs us to determine the decoding start point, do so now.
+		 * This might take a while.
+		 */
+		if (find_startpoint)
+			DecodingContextFindStartpoint(ctx);
 
-	/* don't need the decoding context anymore */
-	FreeDecodingContext(ctx);
+		/* don't need the decoding context anymore */
+		FreeDecodingContext(ctx);
+	}
+	PG_CATCH();
+	{
+		ReplicationSlotRelease();
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
 }
 
 /*
@@ -569,46 +578,55 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	/* Acquire the slot so we "own" it */
 	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
-	/* A slot whose restart_lsn has never been reserved cannot be advanced */
-	if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn))
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("replication slot \"%s\" cannot be advanced",
-						NameStr(*slotname)),
-				 errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
+	PG_TRY();
+	{
+		/* A slot whose restart_lsn has never been reserved cannot be advanced */
+		if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn))
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("replication slot \"%s\" cannot be advanced",
+							NameStr(*slotname)),
+					 errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
 
-	/*
-	 * Check if the slot is not moving backwards.  Physical slots rely simply
-	 * on restart_lsn as a minimum point, while logical slots have confirmed
-	 * consumption up to confirmed_flush, meaning that in both cases data
-	 * older than that is not available anymore.
-	 */
-	if (OidIsValid(MyReplicationSlot->data.database))
-		minlsn = MyReplicationSlot->data.confirmed_flush;
-	else
-		minlsn = MyReplicationSlot->data.restart_lsn;
+		/*
+		 * Check if the slot is not moving backwards.  Physical slots rely simply
+		 * on restart_lsn as a minimum point, while logical slots have confirmed
+		 * consumption up to confirmed_flush, meaning that in both cases data
+		 * older than that is not available anymore.
+		 */
+		if (OidIsValid(MyReplicationSlot->data.database))
+			minlsn = MyReplicationSlot->data.confirmed_flush;
+		else
+			minlsn = MyReplicationSlot->data.restart_lsn;
 
-	if (moveto < minlsn)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
-						LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
+		if (moveto < minlsn)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
+							LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
 
-	/* Do the actual slot update, depending on the slot type */
-	if (OidIsValid(MyReplicationSlot->data.database))
-		endlsn = pg_logical_replication_slot_advance(moveto);
-	else
-		endlsn = pg_physical_replication_slot_advance(moveto);
+		/* Do the actual slot update, depending on the slot type */
+		if (OidIsValid(MyReplicationSlot->data.database))
+			endlsn = pg_logical_replication_slot_advance(moveto);
+		else
+			endlsn = pg_physical_replication_slot_advance(moveto);
 
-	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
-	nulls[0] = false;
+		values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+		nulls[0] = false;
 
-	/*
-	 * Recompute the minimum LSN and xmin across all slots to adjust with the
-	 * advancing potentially done.
-	 */
-	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
+		/*
+		 * Recompute the minimum LSN and xmin across all slots to adjust with the
+		 * advancing potentially done.
+		 */
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+	}
+	PG_CATCH();
+	{
+		ReplicationSlotRelease();
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
 
 	ReplicationSlotRelease();
 
@@ -763,7 +781,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 	/*
 	 * Update the destination slot to current values of the source slot;
 	 * recheck that the source slot is still the one we saw previously.
+	 *
+	 * Use PG_TRY to ensure we release (and, for ephemeral slots, drop)
+	 * the destination slot if any validation error occurs.  Without this,
+	 * an error caught by a PL/pgSQL EXCEPTION handler would leave
+	 * MyReplicationSlot set, crashing on the next slot operation.
 	 */
+	PG_TRY();
 	{
 		TransactionId copy_effective_xmin;
 		TransactionId copy_effective_catalog_xmin;
@@ -797,9 +821,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		 * or the restart_lsn either is invalid or has gone backward. (The
 		 * restart_lsn could go backwards if the source slot is dropped and
 		 * copied from an older slot during installation.)
-		 *
-		 * Since erroring out will release and drop the destination slot we
-		 * don't need to release it here.
 		 */
 		if (copy_restart_lsn < src_restart_lsn ||
 			src_islogical != copy_islogical ||
@@ -857,6 +878,12 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		}
 #endif
 	}
+	PG_CATCH();
+	{
+		ReplicationSlotRelease();
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
 
 	/* target slot fully created, mark as persistent if needed */
 	if (logical_slot && !temporary)