preserve_commit_order.patch

text/plain

Filename: preserve_commit_order.patch
Type: text/plain
Part: 0
Message: Re: Parallel Apply
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 31a92d1a24a..13e5fc218d8 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -254,6 +298,9 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
 /* A list to maintain subtransactions, if any. */
 static List *subxactlist = NIL;
 
+/* GUC */
+bool preserve_commit_order = true;
+
 static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
 static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
 static PartialFileSetState pa_get_fileset_state(void);
@@ -2117,2 +2120,76 @@ write_internal_relation(StringInfo s, LogicalRepRelation *rel)
 	}
 }
+
+#include "postmaster/bgworker_internals.h"
+
+typedef struct
+{
+	ConditionVariable cv;
+	slock_t mutex;
+	size_t head;
+	size_t tail;
+	TransactionId ring[MAX_PARALLEL_WORKER_LIMIT];
+} ParallelApplyShmem;
+
+static ParallelApplyShmem* pa_shmem;
+
+void
+pa_commit(TransactionId xid)
+{
+	SpinLockAcquire(&pa_shmem->mutex);
+	pa_shmem->ring[pa_shmem->head++ % MAX_PARALLEL_WORKER_LIMIT] = xid;
+	SpinLockRelease(&pa_shmem->mutex);
+	ConditionVariableBroadcast(&pa_shmem->cv);
+}
+
+
+void
+pa_before_apply_commit(void)
+{
+	TransactionId xid = MyParallelShared->xid;
+
+	if (!preserve_commit_order)
+		return;
+
+	while (true)
+	{
+		SpinLockAcquire(&pa_shmem->mutex);
+		if (pa_shmem->head > pa_shmem->tail && pa_shmem->ring[pa_shmem->tail % MAX_PARALLEL_WORKER_LIMIT] == xid)
+		{
+			SpinLockRelease(&pa_shmem->mutex);
+			break;
+		}
+		SpinLockRelease(&pa_shmem->mutex);
+		ConditionVariableSleep(&pa_shmem->cv, WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
+	}
+	ConditionVariableCancelSleep();
+}
+
+void
+pa_after_apply_commit(void)
+{
+	SpinLockAcquire(&pa_shmem->mutex);
+	pa_shmem->tail += 1;
+	SpinLockRelease(&pa_shmem->mutex);
+	ConditionVariableBroadcast(&pa_shmem->cv);
+}
+
+Size
+ParallelApplyShmemSize(void)
+{
+	return sizeof(ParallelApplyShmem);
+}
+
+void
+ParallelApplyShmemInit(void)
+{
+	bool		found;
+
+	pa_shmem = (ParallelApplyShmem*)ShmemInitStruct("Parallel worker shmem", sizeof(ParallelApplyShmem), &found);
+	if (!found)
+	{
+		pa_shmem->head = pa_shmem->tail = 0;
+		ConditionVariableInit(&pa_shmem->cv);
+		SpinLockInit(&pa_shmem->mutex);
+	}
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 22ad9051db3..bf8bfcbdd3b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1911,40 +1911,43 @@ apply_handle_commit(StringInfo s)
 			if (pa_send_data(winfo, s->len, s->data))
 			{
+				pa_commit(winfo->shared->xid);
 				/* Finish processing the transaction. */
 				pa_xact_finish(winfo, commit_data.end_lsn);
				break;
			}

			/*
			 * Switch to serialize mode when we are not able to send the
			 * change to parallel apply worker.
			 */
			pa_switch_to_partial_serialize(winfo, true);

			/* fall through */
		case TRANS_LEADER_PARTIAL_SERIALIZE:
			Assert(winfo);

			stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_COMMIT,
										 &original_msg);

			pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);

			/* Finish processing the transaction. */
			pa_xact_finish(winfo, commit_data.end_lsn);
			break;

		case TRANS_PARALLEL_APPLY:

			/*
			 * If the parallel apply worker is applying spooled messages then
			 * close the file before committing.
			 */
			if (stream_fd)
				stream_close_file();

+			pa_before_apply_commit();
			apply_handle_commit_internal(&commit_data);
+			pa_after_apply_commit();

			MyParallelShared->last_commit_end = XactLastCommitEnd;

			pa_commit_transaction();
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2fa045e6b0f..71b8abc4337 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -150,6 +150,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
 	size = add_size(size, AioShmemSize());
+	size = add_size(size, ParallelApplyShmemSize());
 
 	/* include additional requested shmem from preload libraries */
 	size = add_size(size, total_addin_request);
@@ -332,6 +333,7 @@ CreateOrAttachShmemStructs(void)
 	PgArchShmemInit();
 	ApplyLauncherShmemInit();
 	SlotSyncShmemInit();
+	ParallelApplyShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index f137129209f..5b60a4c6655 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -73,6 +73,7 @@
 #include "postmaster/walsummarizer.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "replication/syncrep.h"
@@ -976,6 +977,17 @@ struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"preserve_commit_order", PGC_SIGHUP, REPLICATION_SUBSCRIBERS,
+			gettext_noop("Commit LR transactions at subscriber in the same order as at publisher."),
+			NULL,
+			GUC_EXPLAIN
+		},
+		&preserve_commit_order,
+		true,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"enable_parallel_hash", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of parallel hash plans."),
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index b29453e8e4f..2efeff720f2 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -34,4 +34,7 @@ extern bool IsLogicalLauncher(void);
 
 extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
 
+extern Size ParallelApplyShmemSize(void);
+extern void ParallelApplyShmemInit(void);
+
 #endif							/* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 88912606e4d..cb030eea402 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,6 +14,8 @@
 
 #include <signal.h>
 
+extern PGDLLIMPORT bool preserve_commit_order;
+
 extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 
 extern void ApplyWorkerMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 7c0204dd6f4..e48a2219131 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -363,4 +385,10 @@ am_parallel_apply_worker(void)
 	return isParallelApplyWorker(MyLogicalRepWorker);
 }
 
+extern void pa_before_apply_commit(void);
+
+extern void pa_after_apply_commit(void);
+
+extern void pa_commit(TransactionId xid);
+
 #endif							/* WORKER_INTERNAL_H */