v3-0001-logical-replication-prefetch.patch
text/plain
Filename: v3-0001-logical-replication-prefetch.patch
Type: text/plain
Part: 0
Message:
Re: Logical replication prefetch
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42..3c50f17227 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -275,6 +275,47 @@ retry:
return found;
}
+/*
+ * Search the relation 'rel' for tuple using the index.
+ * Returns true if tuple is found.
+ */
+bool
+RelationPrefetchIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot,
+ TupleTableSlot *outslot)
+{
+ ScanKeyData skey[INDEX_MAX_KEYS];
+ int skey_attoff;
+ IndexScanDesc scan;
+ SnapshotData snap;
+ Relation idxrel;
+ bool found;
+
+ /* Do not do prefetch when there is no index */
+ if (!OidIsValid(idxoid))
+ return false;
+
+ /* Open the index. */
+ idxrel = index_open(idxoid, AccessShareLock);
+
+ InitDirtySnapshot(snap);
+
+ /* Build scan key. */
+ skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+ /* Start an index scan. */
+ scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
+ index_rescan(scan, skey, skey_attoff, NULL, 0);
+
+ /* Try to find the tuple */
+ found = index_getnext_slot(scan, ForwardScanDirection, outslot);
+
+ /* Cleanup */
+ index_endscan(scan);
+ index_close(idxrel, AccessShareLock);
+
+ return found;
+}
+
/*
* Compare the tuples in the slots by checking if they have equal values.
*/
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d351..a207a4acdc 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -396,6 +396,57 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
return true;
}
+/*
+ * Try to get a parallel prefetch worker.
+ */
+ParallelApplyWorkerInfo *
+pa_launch_prefetch_worker(void)
+{
+ MemoryContext oldcontext;
+ bool launched;
+ ParallelApplyWorkerInfo *winfo;
+
+ /*
+ * Start a new parallel prefetch worker.
+ *
+ * The worker info can be used for the lifetime of the worker process, so
+ * create it in a permanent context.
+ */
+ oldcontext = MemoryContextSwitchTo(ApplyContext);
+
+ winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
+
+ /* Setup shared memory. */
+ if (!pa_setup_dsm(winfo))
+ {
+ MemoryContextSwitchTo(oldcontext);
+ pfree(winfo);
+ return NULL;
+ }
+
+ launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_PREFETCH,
+ MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ InvalidOid,
+ dsm_segment_handle(winfo->dsm_seg));
+
+ if (launched)
+ {
+ winfo->do_prefetch = true;
+ }
+ else
+ {
+ pa_free_worker_info(winfo);
+ winfo = NULL;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return winfo;
+}
+
/*
* Try to get a parallel apply worker from the pool. If none is available then
* start a new one.
@@ -943,20 +994,22 @@ ParallelApplyWorkerMain(Datum main_arg)
InitializingApplyWorker = false;
- /* Setup replication origin tracking. */
- StartTransactionCommand();
- ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+ if (am_parallel_apply_worker())
+ {
+ /* Setup replication origin tracking. */
+ StartTransactionCommand();
+ ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
originname, sizeof(originname));
- originid = replorigin_by_name(originname, false);
-
- /*
- * The parallel apply worker doesn't need to monopolize this replication
- * origin which was already acquired by its leader process.
- */
- replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
- replorigin_session_origin = originid;
- CommitTransactionCommand();
+ originid = replorigin_by_name(originname, false);
+ /*
+ * The parallel apply worker doesn't need to monopolize this replication
+ * origin which was already acquired by its leader process.
+ */
+ replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
+ replorigin_session_origin = originid;
+ CommitTransactionCommand();
+ }
/*
* Setup callback for syscache so that we know when something changes in
* the subscription relation state.
@@ -1149,8 +1202,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
shm_mq_result result;
TimestampTz startTime = 0;
- Assert(!IsTransactionState());
- Assert(!winfo->serialize_changes);
+ if (!winfo->do_prefetch)
+ {
+ Assert(!IsTransactionState());
+ Assert(!winfo->serialize_changes);
+ }
/*
* We don't try to send data to parallel worker for 'immediate' mode. This
@@ -1519,6 +1575,9 @@ pa_get_fileset_state(void)
{
PartialFileSetState fileset_state;
+ if (am_parallel_prefetch_worker())
+ return FS_EMPTY;
+
Assert(am_parallel_apply_worker());
SpinLockAcquire(&MyParallelShared->mutex);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4aed0dfceb..ed6c057cec 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
int max_parallel_apply_workers_per_subscription = 2;
+int max_parallel_prefetch_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
@@ -257,7 +258,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
/* Skip parallel apply workers. */
- if (isParallelApplyWorker(w))
+ if (isParallelApplyWorker(w) || isParallelPrefetchWorker(w))
continue;
if (w->in_use && w->subid == subid && w->relid == relid &&
@@ -322,6 +323,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
TimestampTz now;
bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
+ bool is_parallel_prefetch_worker = (wtype == WORKERTYPE_PARALLEL_PREFETCH);
/*----------
* Sanity checks:
@@ -331,7 +333,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
*/
Assert(wtype != WORKERTYPE_UNKNOWN);
Assert(is_tablesync_worker == OidIsValid(relid));
- Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
+ Assert((is_parallel_apply_worker|is_parallel_prefetch_worker) == (subworker_dsm != DSM_HANDLE_INVALID));
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -452,8 +454,8 @@ retry:
worker->relstate = SUBREL_STATE_UNKNOWN;
worker->relstate_lsn = InvalidXLogRecPtr;
worker->stream_fileset = NULL;
- worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
- worker->parallel_apply = is_parallel_apply_worker;
+ worker->leader_pid = (is_parallel_apply_worker|is_parallel_prefetch_worker) ? MyProcPid : InvalidPid;
+ worker->parallel_apply = is_parallel_apply_worker|is_parallel_prefetch_worker;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -492,6 +494,16 @@ retry:
memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
break;
+ case WORKERTYPE_PARALLEL_PREFETCH:
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication parallel prefetch worker for subscription %u",
+ subid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+
+ memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
+ break;
+
case WORKERTYPE_TABLESYNC:
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -626,7 +638,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
if (worker)
{
- Assert(!isParallelApplyWorker(worker));
+ Assert(!isParallelApplyWorker(worker) && !isParallelPrefetchWorker(worker));
logicalrep_worker_stop_internal(worker, SIGTERM);
}
@@ -774,7 +786,7 @@ logicalrep_worker_detach(void)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
- if (isParallelApplyWorker(w))
+ if (isParallelApplyWorker(w) || isParallelPrefetchWorker(w))
logicalrep_worker_stop_internal(w, SIGTERM);
}
@@ -1369,6 +1381,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
case WORKERTYPE_PARALLEL_APPLY:
values[9] = CStringGetTextDatum("parallel apply");
break;
+ case WORKERTYPE_PARALLEL_PREFETCH:
+ values[9] = CStringGetTextDatum("parallel prefetch");
+ break;
case WORKERTYPE_TABLESYNC:
values[9] = CStringGetTextDatum("table synchronization");
break;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c90f23ee5b..f965317529 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -681,6 +681,9 @@ process_syncing_tables(XLogRecPtr current_lsn)
*/
break;
+ case WORKERTYPE_PARALLEL_PREFETCH:
+ break;
+
case WORKERTYPE_TABLESYNC:
process_syncing_tables_for_sync(current_lsn);
break;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fd11805a44..db1f8bcebd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
/* Are we initializing an apply worker? */
bool InitializingApplyWorker = false;
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = false;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -482,6 +494,9 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
(rel->state == SUBREL_STATE_SYNCDONE &&
rel->statelsn <= remote_final_lsn));
+ case WORKERTYPE_PARALLEL_PREFETCH:
+ return true;
+
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "Unknown worker type");
@@ -556,6 +571,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
TransApplyAction apply_action;
StringInfoData original_msg;
+ if (am_parallel_prefetch_worker())
+ {
+ return false;
+ }
+
apply_action = get_transaction_apply_action(stream_xid, &winfo);
/* not in streaming mode */
@@ -2487,13 +2507,36 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
!relinfo->ri_RelationDesc->rd_rel->relhasindex ||
RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
- /* Caller will not have done this bit. */
- Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
- InitConflictIndexes(relinfo);
+ if (am_parallel_prefetch_worker())
+ {
+ Relation localrel = relinfo->ri_RelationDesc;
+ TupleTableSlot *localslot = table_slot_create(localrel, &estate->es_tupleTable);
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+
+ if (prefetch_replica_identity_only)
+ {
+ (void)RelationPrefetchIndex(localrel, relmapentry->localindexoid, remoteslot, localslot);
+ }
+ else
+ {
+ for (int i = 0; i < relinfo->ri_NumIndices; i++)
+ {
+ Oid sec_index_oid = RelationGetRelid(relinfo->ri_IndexRelationDescs[i]);
+ (void)RelationPrefetchIndex(localrel, sec_index_oid, remoteslot, localslot);
+ }
+ }
+ lr_prefetch_inserts += 1;
+ }
+ else
+ {
+ /* Caller will not have done this bit. */
+ Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+ InitConflictIndexes(relinfo);
- /* Do the insert. */
- TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
- ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+ /* Do the insert. */
+ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+ ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+ }
}
/*
@@ -2677,6 +2720,32 @@ apply_handle_update_internal(ApplyExecutionData *edata,
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
ExecOpenIndices(relinfo, false);
+ if (am_parallel_prefetch_worker())
+ {
+ /*
+ * While it may be reasonable to prefetch indexes for both old and new tuples,
+ * we do it only for one of them (old if it exists, new otherwise), assuming
+ * that probability that index key is changed is quite small
+ */
+ localslot = table_slot_create(localrel, &estate->es_tupleTable);
+ found = RelationPrefetchIndex(localrel, localindexoid, remoteslot, localslot);
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ if (!prefetch_replica_identity_only)
+ {
+ for (int i = 0; i < relinfo->ri_NumIndices; i++)
+ {
+ Oid sec_index_oid = RelationGetRelid(relinfo->ri_IndexRelationDescs[i]);
+ if (sec_index_oid != localindexoid)
+ {
+ (void)RelationPrefetchIndex(localrel, sec_index_oid, remoteslot, localslot);
+ }
+ }
+ }
+ goto Cleanup;
+ }
found = FindReplTupleInLocalRel(edata, localrel,
&relmapentry->remoterel,
localindexoid,
@@ -2739,7 +2808,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
remoteslot, newslot, list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
ExecCloseIndices(relinfo);
EvalPlanQualEnd(&epqstate);
}
@@ -2864,6 +2933,17 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
!localrel->rd_rel->relhasindex ||
RelationGetIndexList(localrel) == NIL);
+ if (am_parallel_prefetch_worker())
+ {
+ localslot = table_slot_create(localrel, &estate->es_tupleTable);
+ found = RelationPrefetchIndex(localrel, localindexoid, remoteslot, localslot);
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ /* No need to prefdetch other indexes because the are not touched during delete */
+ goto Cleanup;
+ }
found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
remoteslot, &localslot);
@@ -2900,7 +2980,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
remoteslot, NULL, list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
EvalPlanQualEnd(&epqstate);
}
@@ -3567,6 +3647,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
}
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+ ParallelApplyWorkerInfo* winfo;
+
+ if (buf[0] != 'w')
+ return;
+
+ switch (buf[MSG_CODE_OFFSET])
+ {
+ case LOGICAL_REP_MSG_INSERT:
+ case LOGICAL_REP_MSG_UPDATE:
+ case LOGICAL_REP_MSG_DELETE:
+ /* Round robin prefetch worker */
+ winfo = prefetch_worker[prefetch_worker_rr++ % n_prefetch_workers];
+ pa_send_data(winfo, len, buf);
+ break;
+
+ case LOGICAL_REP_MSG_TYPE:
+ case LOGICAL_REP_MSG_RELATION:
+ /* broadcast to all prefetch workers */
+ for (int i = 0; i < n_prefetch_workers; i++)
+ {
+ winfo = prefetch_worker[i];
+ pa_send_data(winfo, len, buf);
+ }
+ break;
+
+ default:
+ /* Ignore other messages */
+ break;
+ }
+}
+
/*
* Apply main loop.
*/
@@ -3577,6 +3693,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ char* prefetch_buf = NULL;
+ size_t prefetch_buf_pos = 0;
+ size_t prefetch_buf_used = 0;
+ size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3714,23 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
"LogicalStreamingContext",
ALLOCSET_DEFAULT_SIZES);
+ if (max_parallel_prefetch_workers_per_subscription != 0)
+ {
+ int i;
+ for (i = 0; i < max_parallel_prefetch_workers_per_subscription; i++)
+ {
+ prefetch_worker[i] = pa_launch_prefetch_worker();
+ if (!prefetch_worker[i])
+ {
+ elog(LOG, "Launch only %d prefetch workers from %d",
+ i, max_parallel_prefetch_workers_per_subscription);
+ break;
+ }
+ }
+ n_prefetch_workers = i;
+ prefetch_buf = palloc(prefetch_buf_size);
+ }
+
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -3611,9 +3748,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
- int len;
- char *buf = NULL;
+ int32 len;
+ char *buf = NULL;
bool endofstream = false;
+ bool no_more_data = false;
long wait_time;
CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3760,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
- if (len != 0)
+ /* Loop to process all available data (without blocking). */
+ for (;;)
{
- /* Loop to process all available data (without blocking). */
- for (;;)
- {
- CHECK_FOR_INTERRUPTS();
+ CHECK_FOR_INTERRUPTS();
- if (len == 0)
+ if (len > 0 && n_prefetch_workers != 0 && prefetch_buf_pos == prefetch_buf_used)
+ {
+ prefetch_buf_used = 0;
+ do
{
- break;
- }
- else if (len < 0)
+ if (prefetch_buf_used + len + 4 > prefetch_buf_size)
+ {
+ prefetch_buf_size *= 2;
+ elog(DEBUG1, "Increase prefetch buffer size to %ld", prefetch_buf_size);
+ prefetch_buf = repalloc(prefetch_buf, prefetch_buf_size);
+ }
+ memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+ memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+ prefetch_buf_used += 4 + len;
+ if (prefetch_buf_used >= INIT_PREFETCH_BUF_SIZE)
+ break;
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+ } while (len > 0);
+
+ no_more_data = len <= 0;
+
+ for (prefetch_buf_pos = 0; prefetch_buf_pos < prefetch_buf_used; prefetch_buf_pos += 4 + len)
{
- ereport(LOG,
- (errmsg("data stream from publisher has ended")));
- endofstream = true;
- break;
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+ lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
}
- else
- {
- int c;
- StringInfoData s;
+ memcpy(&len, prefetch_buf, 4);
+ buf = &prefetch_buf[4];
+ prefetch_buf_pos = len + 4;
+ }
- if (ConfigReloadPending)
- {
- ConfigReloadPending = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
+ if (len == 0)
+ {
+ break;
+ }
+ else if (len < 0)
+ {
+ ereport(LOG,
+ (errmsg("data stream from publisher has ended")));
+ endofstream = true;
+ break;
+ }
+ else
+ {
+ int c;
+ StringInfoData s;
- /* Reset timeout. */
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
- /* Ensure we are reading the data into our memory context. */
- MemoryContextSwitchTo(ApplyMessageContext);
+ /* Reset timeout. */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
- initReadOnlyStringInfo(&s, buf, len);
+ /* Ensure we are reading the data into our memory context. */
+ MemoryContextSwitchTo(ApplyMessageContext);
- c = pq_getmsgbyte(&s);
+ initReadOnlyStringInfo(&s, buf, len);
- if (c == 'w')
- {
- XLogRecPtr start_lsn;
- XLogRecPtr end_lsn;
- TimestampTz send_time;
+ c = pq_getmsgbyte(&s);
- start_lsn = pq_getmsgint64(&s);
- end_lsn = pq_getmsgint64(&s);
- send_time = pq_getmsgint64(&s);
+ if (c == 'w')
+ {
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz send_time;
- if (last_received < start_lsn)
- last_received = start_lsn;
+ start_lsn = pq_getmsgint64(&s);
+ end_lsn = pq_getmsgint64(&s);
+ send_time = pq_getmsgint64(&s);
- if (last_received < end_lsn)
- last_received = end_lsn;
+ if (last_received < start_lsn)
+ last_received = start_lsn;
- UpdateWorkerStats(last_received, send_time, false);
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- apply_dispatch(&s);
- }
- else if (c == 'k')
- {
- XLogRecPtr end_lsn;
- TimestampTz timestamp;
- bool reply_requested;
+ UpdateWorkerStats(last_received, send_time, false);
- end_lsn = pq_getmsgint64(&s);
- timestamp = pq_getmsgint64(&s);
- reply_requested = pq_getmsgbyte(&s);
+ apply_dispatch(&s);
+ }
+ else if (c == 'k')
+ {
+ XLogRecPtr end_lsn;
+ TimestampTz timestamp;
+ bool reply_requested;
- if (last_received < end_lsn)
- last_received = end_lsn;
+ end_lsn = pq_getmsgint64(&s);
+ timestamp = pq_getmsgint64(&s);
+ reply_requested = pq_getmsgbyte(&s);
- send_feedback(last_received, reply_requested, false);
- UpdateWorkerStats(last_received, timestamp, true);
- }
- /* other message types are purposefully ignored */
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- MemoryContextReset(ApplyMessageContext);
+ send_feedback(last_received, reply_requested, false);
+ UpdateWorkerStats(last_received, timestamp, true);
}
+ /* other message types are purposefully ignored */
+ MemoryContextReset(ApplyMessageContext);
+ }
+ if (prefetch_buf_pos < prefetch_buf_used)
+ {
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+ buf = &prefetch_buf[prefetch_buf_pos + 4];
+ prefetch_buf_pos += 4 + len;
+ }
+ else if (prefetch_buf_used != 0 && no_more_data)
+ {
+ break;
+ }
+ else
+ {
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
}
}
@@ -3926,6 +4104,10 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static void
apply_worker_exit(void)
{
+ /* Don't restart prefetch workers */
+ if (am_parallel_prefetch_worker())
+ return;
+
if (am_parallel_apply_worker())
{
/*
@@ -4729,6 +4911,10 @@ InitializeLogRepWorker(void)
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
MySubscription->name,
get_rel_name(MyLogicalRepWorker->relid))));
+ else if (am_parallel_prefetch_worker())
+ ereport(LOG,
+ (errmsg("logical replication prefetch worker for subscription \"%s\" has started",
+ MySubscription->name)));
else
ereport(LOG,
(errmsg("logical replication apply worker for subscription \"%s\" has started",
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 511dc32d51..b3812d7e0e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "replication/syncrep.h"
+#include "replication/worker_internal.h"
#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"prefetch_replica_identity_only",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Whether LR prefetch work should prefetch only replica identity index or all other indexes too."),
+ NULL,
+ },
+ &prefetch_replica_identity_only,
+ false,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"max_parallel_prefetch_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of parallel prefetch workers per subscription."),
+ NULL,
+ },
+ &max_parallel_prefetch_workers_per_subscription,
+ 2, 0, MAX_LR_PREFETCH_WORKERS,
+ NULL, NULL, NULL
+ },
+
{
{"max_active_replication_origins",
PGC_POSTMASTER,
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 104b059544..3403128977 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -759,6 +759,8 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
TupleTableSlot *outslot);
extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
TupleTableSlot *searchslot, TupleTableSlot *outslot);
+extern bool RelationPrefetchIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot,
+ TupleTableSlot *outslot);
extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
EState *estate, TupleTableSlot *slot);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 82b202f330..19d1a8d466 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
extern PGDLLIMPORT int max_logical_replication_workers;
extern PGDLLIMPORT int max_sync_workers_per_subscription;
extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952..7f5b4fa51b 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -32,6 +32,7 @@ typedef enum LogicalRepWorkerType
WORKERTYPE_TABLESYNC,
WORKERTYPE_APPLY,
WORKERTYPE_PARALLEL_APPLY,
+ WORKERTYPE_PARALLEL_PREFETCH,
} LogicalRepWorkerType;
typedef struct LogicalRepWorker
@@ -214,6 +215,12 @@ typedef struct ParallelApplyWorkerInfo
*/
bool in_use;
+
+ /*
+ * Performing prefetch of pages accessed by LR operations
+ */
+ bool do_prefetch;
+
ParallelApplyWorkerShared *shared;
} ParallelApplyWorkerInfo;
@@ -237,6 +244,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
extern PGDLLIMPORT bool InitializingApplyWorker;
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
@@ -326,10 +341,13 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isParallelPrefetchWorker(worker) ((worker)->in_use && \
+ (worker)->type == WORKERTYPE_PARALLEL_PREFETCH)
#define isTablesyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_prefetch_worker(void);
static inline bool
am_tablesync_worker(void)
@@ -351,4 +369,11 @@ am_parallel_apply_worker(void)
return isParallelApplyWorker(MyLogicalRepWorker);
}
+static inline bool
+am_parallel_prefetch_worker(void)
+{
+ Assert(MyLogicalRepWorker->in_use);
+ return isParallelPrefetchWorker(MyLogicalRepWorker);
+}
+
#endif /* WORKER_INTERNAL_H */