v1-0001-logical-replication-prefetch.patch

text/plain

Filename: v1-0001-logical-replication-prefetch.patch
Type: text/plain
Part: 0
Message: Logical replication prefetch
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42d..c9c4223b22e 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -131,7 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
  * invoking table_tuple_lock.
  */
 static bool
-should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd, LockTupleMode lockmode)
 {
 	bool		refetch = false;
 
@@ -141,22 +141,28 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
 			break;
 		case TM_Updated:
 			/* XXX: Improve handling here */
-			if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
-				ereport(LOG,
-						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-						 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
-			else
-				ereport(LOG,
-						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-						 errmsg("concurrent update, retrying")));
-			refetch = true;
+			if (lockmode != LockTupleTryExclusive)
+			{
+				if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+					ereport(LOG,
+							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+							 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+				else
+					ereport(LOG,
+							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+							 errmsg("concurrent update, retrying")));
+				refetch = true;
+			}
 			break;
 		case TM_Deleted:
-			/* XXX: Improve handling here */
-			ereport(LOG,
-					(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-					 errmsg("concurrent delete, retrying")));
-			refetch = true;
+			if (lockmode != LockTupleTryExclusive)
+			{
+				/* XXX: Improve handling here */
+				ereport(LOG,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("concurrent delete, retrying")));
+				refetch = true;
+			}
 			break;
 		case TM_Invisible:
 			elog(ERROR, "attempted to lock invisible tuple");
@@ -236,8 +242,16 @@ retry:
 		 */
 		if (TransactionIdIsValid(xwait))
 		{
-			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-			goto retry;
+			if (lockmode == LockTupleTryExclusive)
+			{
+				found = false;
+				break;
+			}
+			else if (lockmode != LockTupleNoLock)
+			{
+				XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+				goto retry;
+			}
 		}
 
 		/* Found our tuple and it's not locked */
@@ -246,7 +260,7 @@ retry:
 	}
 
 	/* Found tuple, try to lock it in the lockmode. */
-	if (found)
+	if (found && lockmode != LockTupleNoLock)
 	{
 		TM_FailureData tmfd;
 		TM_Result	res;
@@ -256,14 +270,14 @@ retry:
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 							   outslot,
 							   GetCurrentCommandId(false),
-							   lockmode,
+							   lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode,
 							   LockWaitBlock,
 							   0 /* don't follow updates */ ,
 							   &tmfd);
 
 		PopActiveSnapshot();
 
-		if (should_refetch_tuple(res, &tmfd))
+		if (should_refetch_tuple(res, &tmfd, lockmode))
 			goto retry;
 	}
 
@@ -395,16 +409,23 @@ retry:
 		 */
 		if (TransactionIdIsValid(xwait))
 		{
-			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-			goto retry;
+			if (lockmode == LockTupleTryExclusive)
+			{
+				found = false;
+				break;
+			}
+			else if (lockmode != LockTupleNoLock)
+			{
+				XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+				goto retry;
+			}
 		}
-
 		/* Found our tuple and it's not locked */
 		break;
 	}
 
 	/* Found tuple, try to lock it in the lockmode. */
-	if (found)
+	if (found && lockmode != LockTupleNoLock)
 	{
 		TM_FailureData tmfd;
 		TM_Result	res;
@@ -414,14 +435,14 @@ retry:
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 							   outslot,
 							   GetCurrentCommandId(false),
-							   lockmode,
+							   lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode,
 							   LockWaitBlock,
 							   0 /* don't follow updates */ ,
 							   &tmfd);
 
 		PopActiveSnapshot();
 
-		if (should_refetch_tuple(res, &tmfd))
+		if (should_refetch_tuple(res, &tmfd, lockmode))
 			goto retry;
 	}
 
@@ -508,7 +529,7 @@ retry:
 
 	PopActiveSnapshot();
 
-	if (should_refetch_tuple(res, &tmfd))
+	if (should_refetch_tuple(res, &tmfd, LockTupleShare))
 		goto retry;
 
 	return true;
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..d2c426ecab7 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -400,7 +400,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
  * Try to get a parallel apply worker from the pool. If none is available then
  * start a new one.
  */
-static ParallelApplyWorkerInfo *
+ParallelApplyWorkerInfo *
 pa_launch_parallel_worker(void)
 {
 	MemoryContext oldcontext;
@@ -729,6 +729,43 @@ ProcessParallelApplyInterrupts(void)
 	}
 }
 
+
+static void
+pa_apply_dispatch(StringInfo s)
+{
+	if (MyParallelShared->do_prefetch)
+	{
+		PG_TRY();
+		{
+			apply_dispatch(s);
+		}
+		PG_CATCH();
+		{
+			HOLD_INTERRUPTS();
+
+			elog(DEBUG1, "Failed to prefetch LR operation");
+
+			/* TODO: should we somehow dump the error or just silently ignore it? */
+			/* EmitErrorReport(); */
+			FlushErrorState();
+
+			RESUME_INTERRUPTS();
+
+			lr_prefetch_errors += 1;
+		}
+		PG_END_TRY();
+		if (!prefetch_replica_identity_only)
+		{
+			/* We need to abort transaction to undo insert */
+			AbortCurrentTransaction();
+		}
+	}
+	else
+	{
+		apply_dispatch(s);
+	}
+}
+
 /* Parallel apply worker main loop. */
 static void
 LogicalParallelApplyLoop(shm_mq_handle *mqh)
@@ -794,7 +831,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 			 */
 			s.cursor += SIZE_STATS_MESSAGE;
 
-			apply_dispatch(&s);
+			pa_apply_dispatch(&s);
 		}
 		else if (shmq_res == SHM_MQ_WOULD_BLOCK)
 		{
@@ -943,20 +980,27 @@ ParallelApplyWorkerMain(Datum main_arg)
 
 	InitializingApplyWorker = false;
 
-	/* Setup replication origin tracking. */
-	StartTransactionCommand();
-	ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+	if (!MyParallelShared->do_prefetch)
+	{
+		/* Setup replication origin tracking. */
+		StartTransactionCommand();
+		ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
 									   originname, sizeof(originname));
-	originid = replorigin_by_name(originname, false);
-
-	/*
-	 * The parallel apply worker doesn't need to monopolize this replication
-	 * origin which was already acquired by its leader process.
-	 */
-	replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-	replorigin_session_origin = originid;
-	CommitTransactionCommand();
+		originid = replorigin_by_name(originname, false);
 
+		/*
+		 * The parallel apply worker doesn't need to monopolize this replication
+		 * origin which was already acquired by its leader process.
+		 */
+		replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
+		replorigin_session_origin = originid;
+		CommitTransactionCommand();
+	}
+	else
+	{
+		/* Do not write WAL for prefetch */
+		wal_level = WAL_LEVEL_MINIMAL;
+	}
 	/*
 	 * Setup callback for syscache so that we know when something changes in
 	 * the subscription relation state.
@@ -1149,8 +1193,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 	shm_mq_result result;
 	TimestampTz startTime = 0;
 
-	Assert(!IsTransactionState());
-	Assert(!winfo->serialize_changes);
+	if (!winfo->shared->do_prefetch)
+	{
+		Assert(!IsTransactionState());
+		Assert(!winfo->serialize_changes);
+	}
 
 	/*
 	 * We don't try to send data to parallel worker for 'immediate' mode. This
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4aed0dfcebb..ff2eaad5462 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
 int			max_logical_replication_workers = 4;
 int			max_sync_workers_per_subscription = 2;
 int			max_parallel_apply_workers_per_subscription = 2;
+int			max_parallel_prefetch_workers_per_subscription = 2;
 
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fd11805a44c..8ff0076dad3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
 /* Are we initializing an apply worker? */
 bool		InitializingApplyWorker = false;
 
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = true;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
 /*
  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -329,6 +341,11 @@ bool		InitializingApplyWorker = false;
 static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 
+/*
+ * If operation is performed by parallel prefetch worker
+ */
+#define is_prefetching()	(am_parallel_apply_worker() && MyParallelShared->do_prefetch)
+
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
@@ -556,6 +573,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 	TransApplyAction apply_action;
 	StringInfoData original_msg;
 
+	if (is_prefetching())
+	{
+		return false;
+	}
+
 	apply_action = get_transaction_apply_action(stream_xid, &winfo);
 
 	/* not in streaming mode */
@@ -2380,6 +2402,27 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
 						RelationGetRelationName(rel))));
 }
 
+#define SAFE_APPLY(call)									\
+	if (is_prefetching())									\
+	{														\
+		PG_TRY();											\
+		{													\
+			call;											\
+		}													\
+		PG_CATCH();											\
+		{													\
+			HOLD_INTERRUPTS();								\
+			elog(DEBUG1, "Failed to prefetch LR operation");\
+			FlushErrorState();								\
+			RESUME_INTERRUPTS();							\
+			lr_prefetch_errors += 1;						\
+		}													\
+		PG_END_TRY();										\
+	} else {												\
+		call;												\
+	}
+
+
 /*
  * Handle INSERT message.
  */
@@ -2453,7 +2496,7 @@ apply_handle_insert(StringInfo s)
 		ResultRelInfo *relinfo = edata->targetRelInfo;
 
 		ExecOpenIndices(relinfo, false);
-		apply_handle_insert_internal(edata, relinfo, remoteslot);
+		SAFE_APPLY(apply_handle_insert_internal(edata, relinfo, remoteslot));
 		ExecCloseIndices(relinfo);
 	}
 
@@ -2487,13 +2530,34 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
 		   !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
 		   RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
 
-	/* Caller will not have done this bit. */
-	Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
-	InitConflictIndexes(relinfo);
+	if (is_prefetching() && prefetch_replica_identity_only)
+	{
+		TupleTableSlot *localslot = NULL;
+		LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+		Relation		localrel = relinfo->ri_RelationDesc;
+		EPQState		epqstate;
 
-	/* Do the insert. */
-	TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
-	ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+		EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
+
+		(void)FindReplTupleInLocalRel(edata, localrel,
+									  &relmapentry->remoterel,
+									  relmapentry->localindexoid,
+									  remoteslot, &localslot);
+	}
+	else
+	{
+		/* Caller will not have done this bit. */
+		Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+		InitConflictIndexes(relinfo);
+
+		/* Do the insert. */
+		TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+		ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+	}
+	if (is_prefetching())
+	{
+		lr_prefetch_inserts += 1;
+	}
 }
 
 /*
@@ -2637,8 +2701,8 @@ apply_handle_update(StringInfo s)
 		apply_handle_tuple_routing(edata,
 								   remoteslot, &newtup, CMD_UPDATE);
 	else
-		apply_handle_update_internal(edata, edata->targetRelInfo,
-									 remoteslot, &newtup, rel->localindexoid);
+		SAFE_APPLY(apply_handle_update_internal(edata, edata->targetRelInfo,
+												remoteslot, &newtup, rel->localindexoid));
 
 	finish_edata(edata);
 
@@ -2682,6 +2746,16 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 									localindexoid,
 									remoteslot, &localslot);
 
+	if (is_prefetching())
+	{
+		if (found)
+			lr_prefetch_hits += 1;
+		else
+			lr_prefetch_misses += 1;
+		if (prefetch_replica_identity_only)
+			goto Cleanup;
+	}
+
 	/*
 	 * Tuple found.
 	 *
@@ -2739,7 +2813,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 							remoteslot, newslot, list_make1(&conflicttuple));
 	}
 
-	/* Cleanup. */
+  Cleanup:
 	ExecCloseIndices(relinfo);
 	EvalPlanQualEnd(&epqstate);
 }
@@ -2820,8 +2894,8 @@ apply_handle_delete(StringInfo s)
 		ResultRelInfo *relinfo = edata->targetRelInfo;
 
 		ExecOpenIndices(relinfo, false);
-		apply_handle_delete_internal(edata, relinfo,
-									 remoteslot, rel->localindexoid);
+		SAFE_APPLY(apply_handle_delete_internal(edata, relinfo,
+												remoteslot, rel->localindexoid));
 		ExecCloseIndices(relinfo);
 	}
 
@@ -2867,6 +2941,15 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
 									remoteslot, &localslot);
 
+	if (is_prefetching())
+	{
+		if (found)
+			lr_prefetch_hits += 1;
+		else
+			lr_prefetch_misses += 1;
+		goto Cleanup;
+	}
+
 	/* If found delete it. */
 	if (found)
 	{
@@ -2900,7 +2983,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 							remoteslot, NULL, list_make1(&conflicttuple));
 	}
 
-	/* Cleanup. */
+  Cleanup:
 	EvalPlanQualEnd(&epqstate);
 }
 
@@ -2921,6 +3004,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
 	EState	   *estate = edata->estate;
 	bool		found;
 
+	LockTupleMode lockmode = is_prefetching() ? prefetch_replica_identity_only ? LockTupleNoLock : LockTupleTryExclusive : LockTupleExclusive;
+
 	/*
 	 * Regardless of the top-level operation, we're performing a read here, so
 	 * check for SELECT privileges.
@@ -2946,11 +3031,11 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
 #endif
 
 		found = RelationFindReplTupleByIndex(localrel, localidxoid,
-											 LockTupleExclusive,
+											 lockmode,
 											 remoteslot, *localslot);
 	}
 	else
-		found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
+		found = RelationFindReplTupleSeq(localrel, lockmode,
 										 remoteslot, *localslot);
 
 	return found;
@@ -3041,14 +3126,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	switch (operation)
 	{
 		case CMD_INSERT:
-			apply_handle_insert_internal(edata, partrelinfo,
-										 remoteslot_part);
+			SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo,
+													remoteslot_part));
 			break;
 
 		case CMD_DELETE:
-			apply_handle_delete_internal(edata, partrelinfo,
-										 remoteslot_part,
-										 part_entry->localindexoid);
+			SAFE_APPLY(apply_handle_delete_internal(edata, partrelinfo,
+													remoteslot_part,
+													part_entry->localindexoid));
 			break;
 
 		case CMD_UPDATE:
@@ -3076,6 +3161,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				{
 					TupleTableSlot *newslot = localslot;
 
+					if (is_prefetching())
+						return;
+
 					/* Store the new tuple for conflict reporting */
 					slot_store_data(newslot, part_entry, newtup);
 
@@ -3101,6 +3189,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				{
 					TupleTableSlot *newslot;
 
+					if (is_prefetching())
+						return;
+
 					/* Store the new tuple for conflict reporting */
 					newslot = table_slot_create(partrel, &estate->es_tupleTable);
 					slot_store_data(newslot, part_entry, newtup);
@@ -3217,8 +3308,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 						slot_getallattrs(remoteslot);
 					}
 					MemoryContextSwitchTo(oldctx);
-					apply_handle_insert_internal(edata, partrelinfo_new,
-												 remoteslot_part);
+					SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo_new,
+															remoteslot_part));
 				}
 
 				EvalPlanQualEnd(&epqstate);
@@ -3552,7 +3643,6 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 	MemoryContextSwitchTo(ApplyMessageContext);
 }
 
-
 /* Update statistics of the worker. */
 static void
 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
@@ -3567,6 +3657,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 	}
 }
 
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+	ParallelApplyWorkerInfo* winfo;
+
+	if (buf[0] != 'w')
+		return;
+
+	switch (buf[MSG_CODE_OFFSET])
+	{
+		case LOGICAL_REP_MSG_INSERT:
+		case LOGICAL_REP_MSG_UPDATE:
+		case LOGICAL_REP_MSG_DELETE:
+			/* Round robin prefetch worker */
+			winfo = prefetch_worker[prefetch_worker_rr++ % n_prefetch_workers];
+			pa_send_data(winfo, len, buf);
+			break;
+
+		case LOGICAL_REP_MSG_TYPE:
+		case LOGICAL_REP_MSG_RELATION:
+			/* broadcast to all prefetch workers */
+			for (int i = 0; i < n_prefetch_workers; i++)
+			{
+				winfo = prefetch_worker[i];
+				pa_send_data(winfo, len, buf);
+			}
+			break;
+
+		default:
+			/* Ignore other messages */
+			break;
+	}
+}
+
 /*
  * Apply main loop.
  */
@@ -3577,6 +3703,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	bool		ping_sent = false;
 	TimeLineID	tli;
 	ErrorContextCallback errcallback;
+	char* prefetch_buf = NULL;
+	size_t prefetch_buf_pos = 0;
+	size_t prefetch_buf_used = 0;
+	size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3724,25 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 													"LogicalStreamingContext",
 													ALLOCSET_DEFAULT_SIZES);
 
+	if (max_parallel_prefetch_workers_per_subscription != 0)
+	{
+		int i;
+		for (i = 0; i < max_parallel_prefetch_workers_per_subscription; i++)
+		{
+			prefetch_worker[i] = pa_launch_parallel_worker();
+			if (!prefetch_worker[i])
+			{
+				elog(LOG, "Launch only %d prefetch worklers from %d",
+					 i, max_parallel_prefetch_workers_per_subscription);
+				break;
+			}
+			prefetch_worker[i]->in_use = true;
+			prefetch_worker[i]->shared->do_prefetch = true;
+		}
+		n_prefetch_workers = i;
+		prefetch_buf = palloc(prefetch_buf_size);
+	}
+
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
 
@@ -3611,9 +3760,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	{
 		pgsocket	fd = PGINVALID_SOCKET;
 		int			rc;
-		int			len;
+		int32		len;
 		char	   *buf = NULL;
 		bool		endofstream = false;
+		bool		no_more_data = false;
 		long		wait_time;
 
 		CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3772,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
-		if (len != 0)
+		/* Loop to process all available data (without blocking). */
+		for (;;)
 		{
-			/* Loop to process all available data (without blocking). */
-			for (;;)
-			{
-				CHECK_FOR_INTERRUPTS();
+			CHECK_FOR_INTERRUPTS();
 
-				if (len == 0)
+			if (len > 0 && n_prefetch_workers != 0 && prefetch_buf_pos == prefetch_buf_used)
+			{
+				prefetch_buf_used = 0;
+				do
 				{
-					break;
-				}
-				else if (len < 0)
+					if (prefetch_buf_used + len + 4 > prefetch_buf_size)
+					{
+						prefetch_buf_size *= 2;
+						elog(DEBUG1, "Increase prefetch buffer size to %ld", prefetch_buf_size);
+						prefetch_buf = repalloc(prefetch_buf, prefetch_buf_size);
+					}
+					memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+					memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+					prefetch_buf_used += 4 + len;
+					if (prefetch_buf_used >= INIT_PREFETCH_BUF_SIZE)
+						break;
+					len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+				} while (len > 0);
+
+				no_more_data = len <= 0;
+
+				for (prefetch_buf_pos = 0; prefetch_buf_pos < prefetch_buf_used; prefetch_buf_pos += 4 + len)
 				{
-					ereport(LOG,
-							(errmsg("data stream from publisher has ended")));
-					endofstream = true;
-					break;
+					memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+					lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
 				}
-				else
-				{
-					int			c;
-					StringInfoData s;
+				memcpy(&len, prefetch_buf, 4);
+				buf = &prefetch_buf[4];
+				prefetch_buf_pos = len + 4;
+			}
 
-					if (ConfigReloadPending)
-					{
-						ConfigReloadPending = false;
-						ProcessConfigFile(PGC_SIGHUP);
-					}
+			if (len == 0)
+			{
+				break;
+			}
+			else if (len < 0)
+			{
+				ereport(LOG,
+						(errmsg("data stream from publisher has ended")));
+				endofstream = true;
+				break;
+			}
+			else
+			{
+				int			c;
+				StringInfoData s;
 
-					/* Reset timeout. */
-					last_recv_timestamp = GetCurrentTimestamp();
-					ping_sent = false;
+				if (ConfigReloadPending)
+				{
+					ConfigReloadPending = false;
+					ProcessConfigFile(PGC_SIGHUP);
+				}
 
-					/* Ensure we are reading the data into our memory context. */
-					MemoryContextSwitchTo(ApplyMessageContext);
+				/* Reset timeout. */
+				last_recv_timestamp = GetCurrentTimestamp();
+				ping_sent = false;
 
-					initReadOnlyStringInfo(&s, buf, len);
+				/* Ensure we are reading the data into our memory context. */
+				MemoryContextSwitchTo(ApplyMessageContext);
 
-					c = pq_getmsgbyte(&s);
+				initReadOnlyStringInfo(&s, buf, len);
 
-					if (c == 'w')
-					{
-						XLogRecPtr	start_lsn;
-						XLogRecPtr	end_lsn;
-						TimestampTz send_time;
+				c = pq_getmsgbyte(&s);
 
-						start_lsn = pq_getmsgint64(&s);
-						end_lsn = pq_getmsgint64(&s);
-						send_time = pq_getmsgint64(&s);
+				if (c == 'w')
+				{
+					XLogRecPtr	start_lsn;
+					XLogRecPtr	end_lsn;
+					TimestampTz send_time;
 
-						if (last_received < start_lsn)
-							last_received = start_lsn;
+					start_lsn = pq_getmsgint64(&s);
+					end_lsn = pq_getmsgint64(&s);
+					send_time = pq_getmsgint64(&s);
 
-						if (last_received < end_lsn)
-							last_received = end_lsn;
+					if (last_received < start_lsn)
+						last_received = start_lsn;
 
-						UpdateWorkerStats(last_received, send_time, false);
+					if (last_received < end_lsn)
+						last_received = end_lsn;
 
-						apply_dispatch(&s);
-					}
-					else if (c == 'k')
-					{
-						XLogRecPtr	end_lsn;
-						TimestampTz timestamp;
-						bool		reply_requested;
+					UpdateWorkerStats(last_received, send_time, false);
 
-						end_lsn = pq_getmsgint64(&s);
-						timestamp = pq_getmsgint64(&s);
-						reply_requested = pq_getmsgbyte(&s);
+					apply_dispatch(&s);
+				}
+				else if (c == 'k')
+				{
+					XLogRecPtr	end_lsn;
+					TimestampTz timestamp;
+					bool		reply_requested;
 
-						if (last_received < end_lsn)
-							last_received = end_lsn;
+					end_lsn = pq_getmsgint64(&s);
+					timestamp = pq_getmsgint64(&s);
+					reply_requested = pq_getmsgbyte(&s);
 
-						send_feedback(last_received, reply_requested, false);
-						UpdateWorkerStats(last_received, timestamp, true);
-					}
-					/* other message types are purposefully ignored */
+					if (last_received < end_lsn)
+						last_received = end_lsn;
 
-					MemoryContextReset(ApplyMessageContext);
+					send_feedback(last_received, reply_requested, false);
+					UpdateWorkerStats(last_received, timestamp, true);
 				}
+				/* other message types are purposefully ignored */
 
+				MemoryContextReset(ApplyMessageContext);
+			}
+			if (prefetch_buf_pos < prefetch_buf_used)
+			{
+				memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+				buf = &prefetch_buf[prefetch_buf_pos + 4];
+				prefetch_buf_pos += 4 + len;
+			}
+			else if (prefetch_buf_used != 0 && no_more_data)
+			{
+				break;
+			}
+			else
+			{
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 			}
 		}
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 511dc32d519..3b254898663 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "replication/syncrep.h"
+#include "replication/worker_internal.h"
 #include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"prefetch_replica_identity_only",
+			PGC_SIGHUP,
+			REPLICATION_SUBSCRIBERS,
+			gettext_noop("Whether LR prefetch work should prefetch only replica identity index or all other indexes too."),
+			NULL,
+		},
+		&prefetch_replica_identity_only,
+		true,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_parallel_prefetch_workers_per_subscription",
+			PGC_SIGHUP,
+			REPLICATION_SUBSCRIBERS,
+			gettext_noop("Maximum number of parallel prefetch workers per subscription."),
+			NULL,
+		},
+		&max_parallel_prefetch_workers_per_subscription,
+		2, 0, MAX_LR_PREFETCH_WORKERS,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"max_active_replication_origins",
 			PGC_POSTMASTER,
diff --git a/src/include/nodes/lockoptions.h b/src/include/nodes/lockoptions.h
index 0b534e30603..88f5d2e4cc5 100644
--- a/src/include/nodes/lockoptions.h
+++ b/src/include/nodes/lockoptions.h
@@ -56,6 +56,10 @@ typedef enum LockTupleMode
 	LockTupleNoKeyExclusive,
 	/* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */
 	LockTupleExclusive,
+	/* Do not lock tuple */
+	LockTupleNoLock,
+	/* Try explusive lock, silent give up in case of conflict */
+	LockTupleTryExclusive,
 } LockTupleMode;
 
 #endif							/* LOCKOPTIONS_H */
diff --git a/src/include/port/pg_iovec.h b/src/include/port/pg_iovec.h
index 90be3af449d..8fefeb8c245 100644
--- a/src/include/port/pg_iovec.h
+++ b/src/include/port/pg_iovec.h
@@ -53,6 +53,7 @@ struct iovec
 static inline ssize_t
 pg_preadv(int fd, const struct iovec *iov, int iovcnt, off_t offset)
 {
+	pg_usleep(100);
 #if HAVE_DECL_PREADV
 	/*
 	 * Avoid a small amount of argument copying overhead in the kernel if
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 82b202f3305..19d1a8d466b 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
 extern PGDLLIMPORT int max_logical_replication_workers;
 extern PGDLLIMPORT int max_sync_workers_per_subscription;
 extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
 
 extern void ApplyLauncherRegister(void);
 extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952c..c6745e77efc 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared
 	 */
 	PartialFileSetState fileset_state;
 	FileSet		fileset;
+
+	/*
+	 * Prefetch worker
+	 */
+	bool		do_prefetch;
 } ParallelApplyWorkerShared;
 
 /*
@@ -237,6 +242,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
 
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
@@ -326,10 +339,13 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+extern void pa_prefetch_handle_modification(StringInfo s, LogicalRepMsgType action);
+
+#define isParallelApplyWorker(worker) ((worker)->in_use &&			\
 									   (worker)->type == WORKERTYPE_PARALLEL_APPLY)
 #define isTablesyncWorker(worker) ((worker)->in_use && \
 								   (worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_parallel_worker(void);
 
 static inline bool
 am_tablesync_worker(void)