v3-0001-logical-replication-prefetch.patch

text/plain

Filename: v3-0001-logical-replication-prefetch.patch
Type: text/plain
Part: 0
Message: Re: Logical replication prefetch
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42..3c50f17227 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -275,6 +275,47 @@ retry:
 	return found;
 }
 
+/*
+ * Search the relation 'rel' for tuple using the index.
+ * Returns true if tuple is found.
+ */
+bool
+RelationPrefetchIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot,
+					  TupleTableSlot *outslot)
+{
+	ScanKeyData skey[INDEX_MAX_KEYS];
+	int			skey_attoff;
+	IndexScanDesc scan;
+	SnapshotData snap;
+	Relation	idxrel;
+	bool		found;
+
+	/* Do not do prefetch when there is no index */
+	if (!OidIsValid(idxoid))
+		return false;
+
+	/* Open the index. */
+	idxrel = index_open(idxoid, AccessShareLock);
+
+	InitDirtySnapshot(snap);
+
+	/* Build scan key. */
+	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+	/* Start an index scan. */
+	scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
+	index_rescan(scan, skey, skey_attoff, NULL, 0);
+
+	/* Try to find the tuple */
+	found = index_getnext_slot(scan, ForwardScanDirection, outslot);
+
+	/* Cleanup */
+	index_endscan(scan);
+	index_close(idxrel, AccessShareLock);
+
+	return found;
+}
+
 /*
  * Compare the tuples in the slots by checking if they have equal values.
  */
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d351..a207a4acdc 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -396,6 +396,57 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
 	return true;
 }
 
+/*
+ * Try to get a parallel prefetch worker.
+ */
+ParallelApplyWorkerInfo *
+pa_launch_prefetch_worker(void)
+{
+	MemoryContext oldcontext;
+	bool		launched;
+	ParallelApplyWorkerInfo *winfo;
+
+	/*
+	 * Start a new parallel prefetch worker.
+	 *
+	 * The worker info can be used for the lifetime of the worker process, so
+	 * create it in a permanent context.
+	 */
+	oldcontext = MemoryContextSwitchTo(ApplyContext);
+
+	winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
+
+	/* Setup shared memory. */
+	if (!pa_setup_dsm(winfo))
+	{
+		MemoryContextSwitchTo(oldcontext);
+		pfree(winfo);
+		return NULL;
+	}
+
+	launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_PREFETCH,
+										MyLogicalRepWorker->dbid,
+										MySubscription->oid,
+										MySubscription->name,
+										MyLogicalRepWorker->userid,
+										InvalidOid,
+										dsm_segment_handle(winfo->dsm_seg));
+
+	if (launched)
+	{
+		winfo->do_prefetch = true;
+	}
+	else
+	{
+		pa_free_worker_info(winfo);
+		winfo = NULL;
+	}
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return winfo;
+}
+
 /*
  * Try to get a parallel apply worker from the pool. If none is available then
  * start a new one.
@@ -943,20 +994,22 @@ ParallelApplyWorkerMain(Datum main_arg)
 
 	InitializingApplyWorker = false;
 
-	/* Setup replication origin tracking. */
-	StartTransactionCommand();
-	ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+	if (am_parallel_apply_worker())
+	{
+		/* Setup replication origin tracking. */
+		StartTransactionCommand();
+		ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
 									   originname, sizeof(originname));
-	originid = replorigin_by_name(originname, false);
-
-	/*
-	 * The parallel apply worker doesn't need to monopolize this replication
-	 * origin which was already acquired by its leader process.
-	 */
-	replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-	replorigin_session_origin = originid;
-	CommitTransactionCommand();
+		originid = replorigin_by_name(originname, false);
 
+		/*
+		 * The parallel apply worker doesn't need to monopolize this replication
+		 * origin which was already acquired by its leader process.
+		 */
+		replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
+		replorigin_session_origin = originid;
+		CommitTransactionCommand();
+	}
 	/*
 	 * Setup callback for syscache so that we know when something changes in
 	 * the subscription relation state.
@@ -1149,8 +1202,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 	shm_mq_result result;
 	TimestampTz startTime = 0;
 
-	Assert(!IsTransactionState());
-	Assert(!winfo->serialize_changes);
+	if (!winfo->do_prefetch)
+	{
+		Assert(!IsTransactionState());
+		Assert(!winfo->serialize_changes);
+	}
 
 	/*
 	 * We don't try to send data to parallel worker for 'immediate' mode. This
@@ -1519,6 +1575,9 @@ pa_get_fileset_state(void)
 {
 	PartialFileSetState fileset_state;
 
+	if (am_parallel_prefetch_worker())
+		return FS_EMPTY;
+
 	Assert(am_parallel_apply_worker());
 
 	SpinLockAcquire(&MyParallelShared->mutex);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4aed0dfceb..ed6c057cec 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
 int			max_logical_replication_workers = 4;
 int			max_sync_workers_per_subscription = 2;
 int			max_parallel_apply_workers_per_subscription = 2;
+int			max_parallel_prefetch_workers_per_subscription = 2;
 
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
@@ -257,7 +258,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
 		/* Skip parallel apply workers. */
-		if (isParallelApplyWorker(w))
+		if (isParallelApplyWorker(w) || isParallelPrefetchWorker(w))
 			continue;
 
 		if (w->in_use && w->subid == subid && w->relid == relid &&
@@ -322,6 +323,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
 	TimestampTz now;
 	bool		is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
 	bool		is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
+	bool		is_parallel_prefetch_worker = (wtype == WORKERTYPE_PARALLEL_PREFETCH);
 
 	/*----------
 	 * Sanity checks:
@@ -331,7 +333,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
 	 */
 	Assert(wtype != WORKERTYPE_UNKNOWN);
 	Assert(is_tablesync_worker == OidIsValid(relid));
-	Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
+	Assert((is_parallel_apply_worker|is_parallel_prefetch_worker) == (subworker_dsm != DSM_HANDLE_INVALID));
 
 	ereport(DEBUG1,
 			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -452,8 +454,8 @@ retry:
 	worker->relstate = SUBREL_STATE_UNKNOWN;
 	worker->relstate_lsn = InvalidXLogRecPtr;
 	worker->stream_fileset = NULL;
-	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
-	worker->parallel_apply = is_parallel_apply_worker;
+	worker->leader_pid = (is_parallel_apply_worker|is_parallel_prefetch_worker) ? MyProcPid : InvalidPid;
+	worker->parallel_apply = is_parallel_apply_worker|is_parallel_prefetch_worker;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -492,6 +494,16 @@ retry:
 			memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
 			break;
 
+		case WORKERTYPE_PARALLEL_PREFETCH:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication parallel prefetch worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+
+			memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
+			break;
+
 		case WORKERTYPE_TABLESYNC:
 			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
 			snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -626,7 +638,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
 	if (worker)
 	{
-		Assert(!isParallelApplyWorker(worker));
+		Assert(!isParallelApplyWorker(worker) && !isParallelPrefetchWorker(worker));
 		logicalrep_worker_stop_internal(worker, SIGTERM);
 	}
 
@@ -774,7 +786,7 @@ logicalrep_worker_detach(void)
 		{
 			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-			if (isParallelApplyWorker(w))
+			if (isParallelApplyWorker(w) || isParallelPrefetchWorker(w))
 				logicalrep_worker_stop_internal(w, SIGTERM);
 		}
 
@@ -1369,6 +1381,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 			case WORKERTYPE_PARALLEL_APPLY:
 				values[9] = CStringGetTextDatum("parallel apply");
 				break;
+			case WORKERTYPE_PARALLEL_PREFETCH:
+				values[9] = CStringGetTextDatum("parallel prefetch");
+				break;
 			case WORKERTYPE_TABLESYNC:
 				values[9] = CStringGetTextDatum("table synchronization");
 				break;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c90f23ee5b..f965317529 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -681,6 +681,9 @@ process_syncing_tables(XLogRecPtr current_lsn)
 			 */
 			break;
 
+		case WORKERTYPE_PARALLEL_PREFETCH:
+			break;
+
 		case WORKERTYPE_TABLESYNC:
 			process_syncing_tables_for_sync(current_lsn);
 			break;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fd11805a44..db1f8bcebd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
 /* Are we initializing an apply worker? */
 bool		InitializingApplyWorker = false;
 
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = false;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
 /*
  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -482,6 +494,9 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 					(rel->state == SUBREL_STATE_SYNCDONE &&
 					 rel->statelsn <= remote_final_lsn));
 
+		case WORKERTYPE_PARALLEL_PREFETCH:
+			return true;
+
 		case WORKERTYPE_UNKNOWN:
 			/* Should never happen. */
 			elog(ERROR, "Unknown worker type");
@@ -556,6 +571,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 	TransApplyAction apply_action;
 	StringInfoData original_msg;
 
+	if (am_parallel_prefetch_worker())
+	{
+		return false;
+	}
+
 	apply_action = get_transaction_apply_action(stream_xid, &winfo);
 
 	/* not in streaming mode */
@@ -2487,13 +2507,36 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
 		   !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
 		   RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
 
-	/* Caller will not have done this bit. */
-	Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
-	InitConflictIndexes(relinfo);
+	if (am_parallel_prefetch_worker())
+	{
+		Relation localrel = relinfo->ri_RelationDesc;
+		TupleTableSlot *localslot = table_slot_create(localrel, &estate->es_tupleTable);
+		LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+
+		if (prefetch_replica_identity_only)
+		{
+			(void)RelationPrefetchIndex(localrel, relmapentry->localindexoid, remoteslot, localslot);
+		}
+		else
+		{
+			for (int i = 0; i < relinfo->ri_NumIndices; i++)
+			{
+				Oid sec_index_oid = RelationGetRelid(relinfo->ri_IndexRelationDescs[i]);
+				(void)RelationPrefetchIndex(localrel, sec_index_oid, remoteslot, localslot);
+			}
+		}
+		lr_prefetch_inserts += 1;
+	}
+	else
+	{
+		/* Caller will not have done this bit. */
+		Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+		InitConflictIndexes(relinfo);
 
-	/* Do the insert. */
-	TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
-	ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+		/* Do the insert. */
+		TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+		ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+	}
 }
 
 /*
@@ -2677,6 +2720,32 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
 	ExecOpenIndices(relinfo, false);
 
+	if (am_parallel_prefetch_worker())
+	{
+		/*
+		 * While it may be reasonable to prefetch indexes for both old and new tuples,
+		 * we do it only for one of them (old if it exists,  new otherwise), assuming
+		 * that probability that index key is changed is quite small
+		 */
+		localslot = table_slot_create(localrel, &estate->es_tupleTable);
+		found = RelationPrefetchIndex(localrel, localindexoid, remoteslot, localslot);
+		if (found)
+			lr_prefetch_hits += 1;
+		else
+			lr_prefetch_misses += 1;
+		if (!prefetch_replica_identity_only)
+		{
+			for (int i = 0; i < relinfo->ri_NumIndices; i++)
+			{
+				Oid sec_index_oid = RelationGetRelid(relinfo->ri_IndexRelationDescs[i]);
+				if (sec_index_oid != localindexoid)
+				{
+					(void)RelationPrefetchIndex(localrel, sec_index_oid, remoteslot, localslot);
+				}
+			}
+		}
+		goto Cleanup;
+	}
 	found = FindReplTupleInLocalRel(edata, localrel,
 									&relmapentry->remoterel,
 									localindexoid,
@@ -2739,7 +2808,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 							remoteslot, newslot, list_make1(&conflicttuple));
 	}
 
-	/* Cleanup. */
+  Cleanup:
 	ExecCloseIndices(relinfo);
 	EvalPlanQualEnd(&epqstate);
 }
@@ -2864,6 +2933,17 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 		   !localrel->rd_rel->relhasindex ||
 		   RelationGetIndexList(localrel) == NIL);
 
+	if (am_parallel_prefetch_worker())
+	{
+		localslot = table_slot_create(localrel, &estate->es_tupleTable);
+		found = RelationPrefetchIndex(localrel, localindexoid, remoteslot, localslot);
+		if (found)
+			lr_prefetch_hits += 1;
+		else
+			lr_prefetch_misses += 1;
+		/* No need to prefdetch other indexes because the are not touched during delete */
+		goto Cleanup;
+	}
 	found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
 									remoteslot, &localslot);
 
@@ -2900,7 +2980,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 							remoteslot, NULL, list_make1(&conflicttuple));
 	}
 
-	/* Cleanup. */
+  Cleanup:
 	EvalPlanQualEnd(&epqstate);
 }
 
@@ -3567,6 +3647,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 	}
 }
 
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+	ParallelApplyWorkerInfo* winfo;
+
+	if (buf[0] != 'w')
+		return;
+
+	switch (buf[MSG_CODE_OFFSET])
+	{
+		case LOGICAL_REP_MSG_INSERT:
+		case LOGICAL_REP_MSG_UPDATE:
+		case LOGICAL_REP_MSG_DELETE:
+			/* Round robin prefetch worker */
+			winfo = prefetch_worker[prefetch_worker_rr++ % n_prefetch_workers];
+			pa_send_data(winfo, len, buf);
+			break;
+
+		case LOGICAL_REP_MSG_TYPE:
+		case LOGICAL_REP_MSG_RELATION:
+			/* broadcast to all prefetch workers */
+			for (int i = 0; i < n_prefetch_workers; i++)
+			{
+				winfo = prefetch_worker[i];
+				pa_send_data(winfo, len, buf);
+			}
+			break;
+
+		default:
+			/* Ignore other messages */
+			break;
+	}
+}
+
 /*
  * Apply main loop.
  */
@@ -3577,6 +3693,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	bool		ping_sent = false;
 	TimeLineID	tli;
 	ErrorContextCallback errcallback;
+	char* prefetch_buf = NULL;
+	size_t prefetch_buf_pos = 0;
+	size_t prefetch_buf_used = 0;
+	size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3714,23 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 													"LogicalStreamingContext",
 													ALLOCSET_DEFAULT_SIZES);
 
+	if (max_parallel_prefetch_workers_per_subscription != 0)
+	{
+		int i;
+		for (i = 0; i < max_parallel_prefetch_workers_per_subscription; i++)
+		{
+			prefetch_worker[i] = pa_launch_prefetch_worker();
+			if (!prefetch_worker[i])
+			{
+				elog(LOG, "Launch only %d prefetch workers from %d",
+					 i, max_parallel_prefetch_workers_per_subscription);
+				break;
+			}
+		}
+		n_prefetch_workers = i;
+		prefetch_buf = palloc(prefetch_buf_size);
+	}
+
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
 
@@ -3611,9 +3748,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	{
 		pgsocket	fd = PGINVALID_SOCKET;
 		int			rc;
-		int			len;
-		char	   *buf = NULL;
+		int32		len;
+		char		*buf = NULL;
 		bool		endofstream = false;
+		bool		no_more_data = false;
 		long		wait_time;
 
 		CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3760,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
-		if (len != 0)
+		/* Loop to process all available data (without blocking). */
+		for (;;)
 		{
-			/* Loop to process all available data (without blocking). */
-			for (;;)
-			{
-				CHECK_FOR_INTERRUPTS();
+			CHECK_FOR_INTERRUPTS();
 
-				if (len == 0)
+			if (len > 0 && n_prefetch_workers != 0 && prefetch_buf_pos == prefetch_buf_used)
+			{
+				prefetch_buf_used = 0;
+				do
 				{
-					break;
-				}
-				else if (len < 0)
+					if (prefetch_buf_used + len + 4 > prefetch_buf_size)
+					{
+						prefetch_buf_size *= 2;
+						elog(DEBUG1, "Increase prefetch buffer size to %ld", prefetch_buf_size);
+						prefetch_buf = repalloc(prefetch_buf, prefetch_buf_size);
+					}
+					memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+					memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+					prefetch_buf_used += 4 + len;
+					if (prefetch_buf_used >= INIT_PREFETCH_BUF_SIZE)
+						break;
+					len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+				} while (len > 0);
+
+				no_more_data = len <= 0;
+
+				for (prefetch_buf_pos = 0; prefetch_buf_pos < prefetch_buf_used; prefetch_buf_pos += 4 + len)
 				{
-					ereport(LOG,
-							(errmsg("data stream from publisher has ended")));
-					endofstream = true;
-					break;
+					memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+					lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
 				}
-				else
-				{
-					int			c;
-					StringInfoData s;
+				memcpy(&len, prefetch_buf, 4);
+				buf = &prefetch_buf[4];
+				prefetch_buf_pos = len + 4;
+			}
 
-					if (ConfigReloadPending)
-					{
-						ConfigReloadPending = false;
-						ProcessConfigFile(PGC_SIGHUP);
-					}
+			if (len == 0)
+			{
+				break;
+			}
+			else if (len < 0)
+			{
+				ereport(LOG,
+						(errmsg("data stream from publisher has ended")));
+				endofstream = true;
+				break;
+			}
+			else
+			{
+				int			c;
+				StringInfoData s;
 
-					/* Reset timeout. */
-					last_recv_timestamp = GetCurrentTimestamp();
-					ping_sent = false;
+				if (ConfigReloadPending)
+				{
+					ConfigReloadPending = false;
+					ProcessConfigFile(PGC_SIGHUP);
+				}
 
-					/* Ensure we are reading the data into our memory context. */
-					MemoryContextSwitchTo(ApplyMessageContext);
+				/* Reset timeout. */
+				last_recv_timestamp = GetCurrentTimestamp();
+				ping_sent = false;
 
-					initReadOnlyStringInfo(&s, buf, len);
+				/* Ensure we are reading the data into our memory context. */
+				MemoryContextSwitchTo(ApplyMessageContext);
 
-					c = pq_getmsgbyte(&s);
+				initReadOnlyStringInfo(&s, buf, len);
 
-					if (c == 'w')
-					{
-						XLogRecPtr	start_lsn;
-						XLogRecPtr	end_lsn;
-						TimestampTz send_time;
+				c = pq_getmsgbyte(&s);
 
-						start_lsn = pq_getmsgint64(&s);
-						end_lsn = pq_getmsgint64(&s);
-						send_time = pq_getmsgint64(&s);
+				if (c == 'w')
+				{
+					XLogRecPtr	start_lsn;
+					XLogRecPtr	end_lsn;
+					TimestampTz send_time;
 
-						if (last_received < start_lsn)
-							last_received = start_lsn;
+					start_lsn = pq_getmsgint64(&s);
+					end_lsn = pq_getmsgint64(&s);
+					send_time = pq_getmsgint64(&s);
 
-						if (last_received < end_lsn)
-							last_received = end_lsn;
+					if (last_received < start_lsn)
+						last_received = start_lsn;
 
-						UpdateWorkerStats(last_received, send_time, false);
+					if (last_received < end_lsn)
+						last_received = end_lsn;
 
-						apply_dispatch(&s);
-					}
-					else if (c == 'k')
-					{
-						XLogRecPtr	end_lsn;
-						TimestampTz timestamp;
-						bool		reply_requested;
+					UpdateWorkerStats(last_received, send_time, false);
 
-						end_lsn = pq_getmsgint64(&s);
-						timestamp = pq_getmsgint64(&s);
-						reply_requested = pq_getmsgbyte(&s);
+					apply_dispatch(&s);
+				}
+				else if (c == 'k')
+				{
+					XLogRecPtr	end_lsn;
+					TimestampTz timestamp;
+					bool		reply_requested;
 
-						if (last_received < end_lsn)
-							last_received = end_lsn;
+					end_lsn = pq_getmsgint64(&s);
+					timestamp = pq_getmsgint64(&s);
+					reply_requested = pq_getmsgbyte(&s);
 
-						send_feedback(last_received, reply_requested, false);
-						UpdateWorkerStats(last_received, timestamp, true);
-					}
-					/* other message types are purposefully ignored */
+					if (last_received < end_lsn)
+						last_received = end_lsn;
 
-					MemoryContextReset(ApplyMessageContext);
+					send_feedback(last_received, reply_requested, false);
+					UpdateWorkerStats(last_received, timestamp, true);
 				}
+				/* other message types are purposefully ignored */
 
+				MemoryContextReset(ApplyMessageContext);
+			}
+			if (prefetch_buf_pos < prefetch_buf_used)
+			{
+				memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+				buf = &prefetch_buf[prefetch_buf_pos + 4];
+				prefetch_buf_pos += 4 + len;
+			}
+			else if (prefetch_buf_used != 0 && no_more_data)
+			{
+				break;
+			}
+			else
+			{
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 			}
 		}
@@ -3926,6 +4104,10 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 static void
 apply_worker_exit(void)
 {
+	/* Don't restart prefetch workers */
+	if (am_parallel_prefetch_worker())
+		return;
+
 	if (am_parallel_apply_worker())
 	{
 		/*
@@ -4729,6 +4911,10 @@ InitializeLogRepWorker(void)
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
 						MySubscription->name,
 						get_rel_name(MyLogicalRepWorker->relid))));
+	else if (am_parallel_prefetch_worker())
+		ereport(LOG,
+				(errmsg("logical replication prefetch worker for subscription \"%s\" has started",
+						MySubscription->name)));
 	else
 		ereport(LOG,
 				(errmsg("logical replication apply worker for subscription \"%s\" has started",
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 511dc32d51..b3812d7e0e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "replication/syncrep.h"
+#include "replication/worker_internal.h"
 #include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"prefetch_replica_identity_only",
+			PGC_SIGHUP,
+			REPLICATION_SUBSCRIBERS,
+			gettext_noop("Whether LR prefetch work should prefetch only replica identity index or all other indexes too."),
+			NULL,
+		},
+		&prefetch_replica_identity_only,
+		false,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_parallel_prefetch_workers_per_subscription",
+			PGC_SIGHUP,
+			REPLICATION_SUBSCRIBERS,
+			gettext_noop("Maximum number of parallel prefetch workers per subscription."),
+			NULL,
+		},
+		&max_parallel_prefetch_workers_per_subscription,
+		2, 0, MAX_LR_PREFETCH_WORKERS,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"max_active_replication_origins",
 			PGC_POSTMASTER,
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 104b059544..3403128977 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -759,6 +759,8 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 										 TupleTableSlot *outslot);
 extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 									 TupleTableSlot *searchslot, TupleTableSlot *outslot);
+extern bool RelationPrefetchIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot,
+								  TupleTableSlot *outslot);
 
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 									 EState *estate, TupleTableSlot *slot);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 82b202f330..19d1a8d466 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
 extern PGDLLIMPORT int max_logical_replication_workers;
 extern PGDLLIMPORT int max_sync_workers_per_subscription;
 extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
 
 extern void ApplyLauncherRegister(void);
 extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952..7f5b4fa51b 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -32,6 +32,7 @@ typedef enum LogicalRepWorkerType
 	WORKERTYPE_TABLESYNC,
 	WORKERTYPE_APPLY,
 	WORKERTYPE_PARALLEL_APPLY,
+	WORKERTYPE_PARALLEL_PREFETCH,
 } LogicalRepWorkerType;
 
 typedef struct LogicalRepWorker
@@ -214,6 +215,12 @@ typedef struct ParallelApplyWorkerInfo
 	 */
 	bool		in_use;
 
+
+	/*
+	 * Performing prefetch of pages accessed by LR operations
+	 */
+	bool		do_prefetch;
+
 	ParallelApplyWorkerShared *shared;
 } ParallelApplyWorkerInfo;
 
@@ -237,6 +244,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
 
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
@@ -326,10 +341,13 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+#define isParallelApplyWorker(worker) ((worker)->in_use &&			\
 									   (worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isParallelPrefetchWorker(worker) ((worker)->in_use &&			\
+										  (worker)->type == WORKERTYPE_PARALLEL_PREFETCH)
 #define isTablesyncWorker(worker) ((worker)->in_use && \
 								   (worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_prefetch_worker(void);
 
 static inline bool
 am_tablesync_worker(void)
@@ -351,4 +369,11 @@ am_parallel_apply_worker(void)
 	return isParallelApplyWorker(MyLogicalRepWorker);
 }
 
+static inline bool
+am_parallel_prefetch_worker(void)
+{
+	Assert(MyLogicalRepWorker->in_use);
+	return isParallelPrefetchWorker(MyLogicalRepWorker);
+}
+
 #endif							/* WORKER_INTERNAL_H */