preserve_commit_order.patch
text/plain
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 31a92d1a24a..13e5fc218d8 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -254,6 +298,9 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
/* A list to maintain subtransactions, if any. */
static List *subxactlist = NIL;
+/* GUC */
+bool preserve_commit_order = true;
+
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
static PartialFileSetState pa_get_fileset_state(void);
@@ -2117,2 +2120,76 @@ write_internal_relation(StringInfo s, LogicalRepRelation *rel)
}
}
+
+#include "postmaster/bgworker_internals.h"
+
+typedef struct
+{
+ ConditionVariable cv;
+ slock_t mutex;
+ size_t head;
+ size_t tail;
+ TransactionId ring[MAX_PARALLEL_WORKER_LIMIT];
+} ParallelApplyShmem;
+
+static ParallelApplyShmem* pa_shmem;
+
+void
+pa_commit(TransactionId xid)
+{
+ SpinLockAcquire(&pa_shmem->mutex);
+ pa_shmem->ring[pa_shmem->head++ % MAX_PARALLEL_WORKER_LIMIT] = xid;
+ SpinLockRelease(&pa_shmem->mutex);
+ ConditionVariableBroadcast(&pa_shmem->cv);
+}
+
+
+void
+pa_before_apply_commit(void)
+{
+ TransactionId xid = MyParallelShared->xid;
+
+ if (!preserve_commit_order)
+ return;
+
+ while (true)
+ {
+ SpinLockAcquire(&pa_shmem->mutex);
+ if (pa_shmem->head > pa_shmem->tail && pa_shmem->ring[pa_shmem->tail % MAX_PARALLEL_WORKER_LIMIT] == xid)
+ {
+ SpinLockRelease(&pa_shmem->mutex);
+ break;
+ }
+ SpinLockRelease(&pa_shmem->mutex);
+ ConditionVariableSleep(&pa_shmem->cv, WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
+ }
+ ConditionVariableCancelSleep();
+}
+
+void
+pa_after_apply_commit(void)
+{
+ SpinLockAcquire(&pa_shmem->mutex);
+ pa_shmem->tail += 1;
+ SpinLockRelease(&pa_shmem->mutex);
+ ConditionVariableBroadcast(&pa_shmem->cv);
+}
+
+Size
+ParallelApplyShmemSize(void)
+{
+ return sizeof(ParallelApplyShmem);
+}
+
+void
+ParallelApplyShmemInit(void)
+{
+ bool found;
+
+ pa_shmem = (ParallelApplyShmem*)ShmemInitStruct("Parallel worker shmem", sizeof(ParallelApplyShmem), &found);
+ if (!found)
+ {
+ pa_shmem->head = pa_shmem->tail = 0;
+ ConditionVariableInit(&pa_shmem->cv);
+ SpinLockInit(&pa_shmem->mutex);
+ }
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 22ad9051db3..bf8bfcbdd3b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1911,40 +1911,43 @@ apply_handle_commit(StringInfo s)
if (pa_send_data(winfo, s->len, s->data))
{
+ pa_commit(winfo->shared->xid);
/* Finish processing the transaction. */
pa_xact_finish(winfo, commit_data.end_lsn);
break;
}
/*
* Switch to serialize mode when we are not able to send the
* change to parallel apply worker.
*/
pa_switch_to_partial_serialize(winfo, true);
/* fall through */
case TRANS_LEADER_PARTIAL_SERIALIZE:
Assert(winfo);
stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_COMMIT,
&original_msg);
pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
/* Finish processing the transaction. */
pa_xact_finish(winfo, commit_data.end_lsn);
break;
case TRANS_PARALLEL_APPLY:
/*
* If the parallel apply worker is applying spooled messages then
* close the file before committing.
*/
if (stream_fd)
stream_close_file();
+ pa_before_apply_commit();
apply_handle_commit_internal(&commit_data);
+ pa_after_apply_commit();
MyParallelShared->last_commit_end = XactLastCommitEnd;
pa_commit_transaction();
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2fa045e6b0f..71b8abc4337 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -150,6 +150,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, InjectionPointShmemSize());
size = add_size(size, SlotSyncShmemSize());
size = add_size(size, AioShmemSize());
+ size = add_size(size, ParallelApplyShmemSize());
/* include additional requested shmem from preload libraries */
size = add_size(size, total_addin_request);
@@ -332,6 +333,7 @@ CreateOrAttachShmemStructs(void)
PgArchShmemInit();
ApplyLauncherShmemInit();
SlotSyncShmemInit();
+ ParallelApplyShmemInit();
/*
* Set up other modules that need some shared memory space
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index f137129209f..5b60a4c6655 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -73,6 +73,7 @@
#include "postmaster/walsummarizer.h"
#include "postmaster/walwriter.h"
#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "replication/syncrep.h"
@@ -976,6 +977,17 @@ struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"preserve_commit_order", PGC_SIGHUP, REPLICATION_SUBSCRIBERS,
+ gettext_noop("Commit LR transactions at subscriber in the same order as at publisher."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &preserve_commit_order,
+ true,
+ NULL, NULL, NULL
+ },
+
{
{"enable_parallel_hash", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables the planner's use of parallel hash plans."),
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index b29453e8e4f..2efeff720f2 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -34,4 +34,7 @@ extern bool IsLogicalLauncher(void);
extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
+extern Size ParallelApplyShmemSize(void);
+extern void ParallelApplyShmemInit(void);
+
#endif /* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 88912606e4d..cb030eea402 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,6 +14,8 @@
#include <signal.h>
+extern PGDLLIMPORT bool preserve_commit_order;
+
extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
extern void ApplyWorkerMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 7c0204dd6f4..e48a2219131 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -363,4 +385,10 @@ am_parallel_apply_worker(void)
return isParallelApplyWorker(MyLogicalRepWorker);
}
+extern void pa_before_apply_commit(void);
+
+extern void pa_after_apply_commit(void);
+
+extern void pa_commit(TransactionId xid);
+
#endif /* WORKER_INTERNAL_H */