v1-0001-logical-replication-prefetch.patch
text/plain
Filename: v1-0001-logical-replication-prefetch.patch
Type: text/plain
Part: 0
Message:
Logical replication prefetch
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42d..c9c4223b22e 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -131,7 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
* invoking table_tuple_lock.
*/
static bool
-should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd, LockTupleMode lockmode)
{
bool refetch = false;
@@ -141,22 +141,28 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
break;
case TM_Updated:
/* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- refetch = true;
+ if (lockmode != LockTupleTryExclusive)
+ {
+ if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent update, retrying")));
+ refetch = true;
+ }
break;
case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- refetch = true;
+ if (lockmode != LockTupleTryExclusive)
+ {
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ refetch = true;
+ }
break;
case TM_Invisible:
elog(ERROR, "attempted to lock invisible tuple");
@@ -236,8 +242,16 @@ retry:
*/
if (TransactionIdIsValid(xwait))
{
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
+ if (lockmode == LockTupleTryExclusive)
+ {
+ found = false;
+ break;
+ }
+ else if (lockmode != LockTupleNoLock)
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
}
/* Found our tuple and it's not locked */
@@ -246,7 +260,7 @@ retry:
}
/* Found tuple, try to lock it in the lockmode. */
- if (found)
+ if (found && lockmode != LockTupleNoLock)
{
TM_FailureData tmfd;
TM_Result res;
@@ -256,14 +270,14 @@ retry:
res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
outslot,
GetCurrentCommandId(false),
- lockmode,
+ lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode,
LockWaitBlock,
0 /* don't follow updates */ ,
&tmfd);
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, lockmode))
goto retry;
}
@@ -395,16 +409,23 @@ retry:
*/
if (TransactionIdIsValid(xwait))
{
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
+ if (lockmode == LockTupleTryExclusive)
+ {
+ found = false;
+ break;
+ }
+ else if (lockmode != LockTupleNoLock)
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
}
-
/* Found our tuple and it's not locked */
break;
}
/* Found tuple, try to lock it in the lockmode. */
- if (found)
+ if (found && lockmode != LockTupleNoLock)
{
TM_FailureData tmfd;
TM_Result res;
@@ -414,14 +435,14 @@ retry:
res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
outslot,
GetCurrentCommandId(false),
- lockmode,
+ lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode,
LockWaitBlock,
0 /* don't follow updates */ ,
&tmfd);
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, lockmode))
goto retry;
}
@@ -508,7 +529,7 @@ retry:
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, LockTupleShare))
goto retry;
return true;
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..d2c426ecab7 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -400,7 +400,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
* Try to get a parallel apply worker from the pool. If none is available then
* start a new one.
*/
-static ParallelApplyWorkerInfo *
+ParallelApplyWorkerInfo *
pa_launch_parallel_worker(void)
{
MemoryContext oldcontext;
@@ -729,6 +729,43 @@ ProcessParallelApplyInterrupts(void)
}
}
+
+static void
+pa_apply_dispatch(StringInfo s)
+{
+ if (MyParallelShared->do_prefetch)
+ {
+ PG_TRY();
+ {
+ apply_dispatch(s);
+ }
+ PG_CATCH();
+ {
+ HOLD_INTERRUPTS();
+
+ elog(DEBUG1, "Failed to prefetch LR operation");
+
+ /* TODO: should we somehow dump the error or just silently ignore it? */
+ /* EmitErrorReport(); */
+ FlushErrorState();
+
+ RESUME_INTERRUPTS();
+
+ lr_prefetch_errors += 1;
+ }
+ PG_END_TRY();
+ if (!prefetch_replica_identity_only)
+ {
+ /* We need to abort transaction to undo insert */
+ AbortCurrentTransaction();
+ }
+ }
+ else
+ {
+ apply_dispatch(s);
+ }
+}
+
/* Parallel apply worker main loop. */
static void
LogicalParallelApplyLoop(shm_mq_handle *mqh)
@@ -794,7 +831,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
*/
s.cursor += SIZE_STATS_MESSAGE;
- apply_dispatch(&s);
+ pa_apply_dispatch(&s);
}
else if (shmq_res == SHM_MQ_WOULD_BLOCK)
{
@@ -943,20 +980,27 @@ ParallelApplyWorkerMain(Datum main_arg)
InitializingApplyWorker = false;
- /* Setup replication origin tracking. */
- StartTransactionCommand();
- ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+ if (!MyParallelShared->do_prefetch)
+ {
+ /* Setup replication origin tracking. */
+ StartTransactionCommand();
+ ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
originname, sizeof(originname));
- originid = replorigin_by_name(originname, false);
-
- /*
- * The parallel apply worker doesn't need to monopolize this replication
- * origin which was already acquired by its leader process.
- */
- replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
- replorigin_session_origin = originid;
- CommitTransactionCommand();
+ originid = replorigin_by_name(originname, false);
+ /*
+ * The parallel apply worker doesn't need to monopolize this replication
+ * origin which was already acquired by its leader process.
+ */
+ replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
+ replorigin_session_origin = originid;
+ CommitTransactionCommand();
+ }
+ else
+ {
+ /* Do not write WAL for prefetch */
+ wal_level = WAL_LEVEL_MINIMAL;
+ }
/*
* Setup callback for syscache so that we know when something changes in
* the subscription relation state.
@@ -1149,8 +1193,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
shm_mq_result result;
TimestampTz startTime = 0;
- Assert(!IsTransactionState());
- Assert(!winfo->serialize_changes);
+ if (!winfo->shared->do_prefetch)
+ {
+ Assert(!IsTransactionState());
+ Assert(!winfo->serialize_changes);
+ }
/*
* We don't try to send data to parallel worker for 'immediate' mode. This
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4aed0dfcebb..ff2eaad5462 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
int max_parallel_apply_workers_per_subscription = 2;
+int max_parallel_prefetch_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fd11805a44c..8ff0076dad3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
/* Are we initializing an apply worker? */
bool InitializingApplyWorker = false;
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = true;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -329,6 +341,11 @@ bool InitializingApplyWorker = false;
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
+/*
+ * If operation is performed by parallel prefetch worker
+ */
+#define is_prefetching() (am_parallel_apply_worker() && MyParallelShared->do_prefetch)
+
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -556,6 +573,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
TransApplyAction apply_action;
StringInfoData original_msg;
+ if (is_prefetching())
+ {
+ return false;
+ }
+
apply_action = get_transaction_apply_action(stream_xid, &winfo);
/* not in streaming mode */
@@ -2380,6 +2402,27 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
RelationGetRelationName(rel))));
}
+#define SAFE_APPLY(call) \
+ if (is_prefetching()) \
+ { \
+ PG_TRY(); \
+ { \
+ call; \
+ } \
+ PG_CATCH(); \
+ { \
+ HOLD_INTERRUPTS(); \
+ elog(DEBUG1, "Failed to prefetch LR operation");\
+ FlushErrorState(); \
+ RESUME_INTERRUPTS(); \
+ lr_prefetch_errors += 1; \
+ } \
+ PG_END_TRY(); \
+ } else { \
+ call; \
+ }
+
+
/*
* Handle INSERT message.
*/
@@ -2453,7 +2496,7 @@ apply_handle_insert(StringInfo s)
ResultRelInfo *relinfo = edata->targetRelInfo;
ExecOpenIndices(relinfo, false);
- apply_handle_insert_internal(edata, relinfo, remoteslot);
+ SAFE_APPLY(apply_handle_insert_internal(edata, relinfo, remoteslot));
ExecCloseIndices(relinfo);
}
@@ -2487,13 +2530,34 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
!relinfo->ri_RelationDesc->rd_rel->relhasindex ||
RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
- /* Caller will not have done this bit. */
- Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
- InitConflictIndexes(relinfo);
+ if (is_prefetching() && prefetch_replica_identity_only)
+ {
+ TupleTableSlot *localslot = NULL;
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+ Relation localrel = relinfo->ri_RelationDesc;
+ EPQState epqstate;
- /* Do the insert. */
- TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
- ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
+
+ (void)FindReplTupleInLocalRel(edata, localrel,
+ &relmapentry->remoterel,
+ relmapentry->localindexoid,
+ remoteslot, &localslot);
+ }
+ else
+ {
+ /* Caller will not have done this bit. */
+ Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+ InitConflictIndexes(relinfo);
+
+ /* Do the insert. */
+ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+ ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+ }
+ if (is_prefetching())
+ {
+ lr_prefetch_inserts += 1;
+ }
}
/*
@@ -2637,8 +2701,8 @@ apply_handle_update(StringInfo s)
apply_handle_tuple_routing(edata,
remoteslot, &newtup, CMD_UPDATE);
else
- apply_handle_update_internal(edata, edata->targetRelInfo,
- remoteslot, &newtup, rel->localindexoid);
+ SAFE_APPLY(apply_handle_update_internal(edata, edata->targetRelInfo,
+ remoteslot, &newtup, rel->localindexoid));
finish_edata(edata);
@@ -2682,6 +2746,16 @@ apply_handle_update_internal(ApplyExecutionData *edata,
localindexoid,
remoteslot, &localslot);
+ if (is_prefetching())
+ {
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ if (prefetch_replica_identity_only)
+ goto Cleanup;
+ }
+
/*
* Tuple found.
*
@@ -2739,7 +2813,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
remoteslot, newslot, list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
ExecCloseIndices(relinfo);
EvalPlanQualEnd(&epqstate);
}
@@ -2820,8 +2894,8 @@ apply_handle_delete(StringInfo s)
ResultRelInfo *relinfo = edata->targetRelInfo;
ExecOpenIndices(relinfo, false);
- apply_handle_delete_internal(edata, relinfo,
- remoteslot, rel->localindexoid);
+ SAFE_APPLY(apply_handle_delete_internal(edata, relinfo,
+ remoteslot, rel->localindexoid));
ExecCloseIndices(relinfo);
}
@@ -2867,6 +2941,15 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
remoteslot, &localslot);
+ if (is_prefetching())
+ {
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ goto Cleanup;
+ }
+
/* If found delete it. */
if (found)
{
@@ -2900,7 +2983,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
remoteslot, NULL, list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
EvalPlanQualEnd(&epqstate);
}
@@ -2921,6 +3004,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
EState *estate = edata->estate;
bool found;
+ LockTupleMode lockmode = is_prefetching() ? prefetch_replica_identity_only ? LockTupleNoLock : LockTupleTryExclusive : LockTupleExclusive;
+
/*
* Regardless of the top-level operation, we're performing a read here, so
* check for SELECT privileges.
@@ -2946,11 +3031,11 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
#endif
found = RelationFindReplTupleByIndex(localrel, localidxoid,
- LockTupleExclusive,
+ lockmode,
remoteslot, *localslot);
}
else
- found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
+ found = RelationFindReplTupleSeq(localrel, lockmode,
remoteslot, *localslot);
return found;
@@ -3041,14 +3126,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
switch (operation)
{
case CMD_INSERT:
- apply_handle_insert_internal(edata, partrelinfo,
- remoteslot_part);
+ SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo,
+ remoteslot_part));
break;
case CMD_DELETE:
- apply_handle_delete_internal(edata, partrelinfo,
- remoteslot_part,
- part_entry->localindexoid);
+ SAFE_APPLY(apply_handle_delete_internal(edata, partrelinfo,
+ remoteslot_part,
+ part_entry->localindexoid));
break;
case CMD_UPDATE:
@@ -3076,6 +3161,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
{
TupleTableSlot *newslot = localslot;
+ if (is_prefetching())
+ return;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, part_entry, newtup);
@@ -3101,6 +3189,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
{
TupleTableSlot *newslot;
+ if (is_prefetching())
+ return;
+
/* Store the new tuple for conflict reporting */
newslot = table_slot_create(partrel, &estate->es_tupleTable);
slot_store_data(newslot, part_entry, newtup);
@@ -3217,8 +3308,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
slot_getallattrs(remoteslot);
}
MemoryContextSwitchTo(oldctx);
- apply_handle_insert_internal(edata, partrelinfo_new,
- remoteslot_part);
+ SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo_new,
+ remoteslot_part));
}
EvalPlanQualEnd(&epqstate);
@@ -3552,7 +3643,6 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
MemoryContextSwitchTo(ApplyMessageContext);
}
-
/* Update statistics of the worker. */
static void
UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
@@ -3567,6 +3657,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
}
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+ ParallelApplyWorkerInfo* winfo;
+
+ if (buf[0] != 'w')
+ return;
+
+ switch (buf[MSG_CODE_OFFSET])
+ {
+ case LOGICAL_REP_MSG_INSERT:
+ case LOGICAL_REP_MSG_UPDATE:
+ case LOGICAL_REP_MSG_DELETE:
+ /* Round robin prefetch worker */
+ winfo = prefetch_worker[prefetch_worker_rr++ % n_prefetch_workers];
+ pa_send_data(winfo, len, buf);
+ break;
+
+ case LOGICAL_REP_MSG_TYPE:
+ case LOGICAL_REP_MSG_RELATION:
+ /* broadcast to all prefetch workers */
+ for (int i = 0; i < n_prefetch_workers; i++)
+ {
+ winfo = prefetch_worker[i];
+ pa_send_data(winfo, len, buf);
+ }
+ break;
+
+ default:
+ /* Ignore other messages */
+ break;
+ }
+}
+
/*
* Apply main loop.
*/
@@ -3577,6 +3703,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ char* prefetch_buf = NULL;
+ size_t prefetch_buf_pos = 0;
+ size_t prefetch_buf_used = 0;
+ size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3724,25 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
"LogicalStreamingContext",
ALLOCSET_DEFAULT_SIZES);
+ if (max_parallel_prefetch_workers_per_subscription != 0)
+ {
+ int i;
+ for (i = 0; i < max_parallel_prefetch_workers_per_subscription; i++)
+ {
+ prefetch_worker[i] = pa_launch_parallel_worker();
+ if (!prefetch_worker[i])
+ {
+ elog(LOG, "Launch only %d prefetch worklers from %d",
+ i, max_parallel_prefetch_workers_per_subscription);
+ break;
+ }
+ prefetch_worker[i]->in_use = true;
+ prefetch_worker[i]->shared->do_prefetch = true;
+ }
+ n_prefetch_workers = i;
+ prefetch_buf = palloc(prefetch_buf_size);
+ }
+
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -3611,9 +3760,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
- int len;
+ int32 len;
char *buf = NULL;
bool endofstream = false;
+ bool no_more_data = false;
long wait_time;
CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3772,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
- if (len != 0)
+ /* Loop to process all available data (without blocking). */
+ for (;;)
{
- /* Loop to process all available data (without blocking). */
- for (;;)
- {
- CHECK_FOR_INTERRUPTS();
+ CHECK_FOR_INTERRUPTS();
- if (len == 0)
+ if (len > 0 && n_prefetch_workers != 0 && prefetch_buf_pos == prefetch_buf_used)
+ {
+ prefetch_buf_used = 0;
+ do
{
- break;
- }
- else if (len < 0)
+ if (prefetch_buf_used + len + 4 > prefetch_buf_size)
+ {
+ prefetch_buf_size *= 2;
+ elog(DEBUG1, "Increase prefetch buffer size to %ld", prefetch_buf_size);
+ prefetch_buf = repalloc(prefetch_buf, prefetch_buf_size);
+ }
+ memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+ memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+ prefetch_buf_used += 4 + len;
+ if (prefetch_buf_used >= INIT_PREFETCH_BUF_SIZE)
+ break;
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+ } while (len > 0);
+
+ no_more_data = len <= 0;
+
+ for (prefetch_buf_pos = 0; prefetch_buf_pos < prefetch_buf_used; prefetch_buf_pos += 4 + len)
{
- ereport(LOG,
- (errmsg("data stream from publisher has ended")));
- endofstream = true;
- break;
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+ lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
}
- else
- {
- int c;
- StringInfoData s;
+ memcpy(&len, prefetch_buf, 4);
+ buf = &prefetch_buf[4];
+ prefetch_buf_pos = len + 4;
+ }
- if (ConfigReloadPending)
- {
- ConfigReloadPending = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
+ if (len == 0)
+ {
+ break;
+ }
+ else if (len < 0)
+ {
+ ereport(LOG,
+ (errmsg("data stream from publisher has ended")));
+ endofstream = true;
+ break;
+ }
+ else
+ {
+ int c;
+ StringInfoData s;
- /* Reset timeout. */
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
- /* Ensure we are reading the data into our memory context. */
- MemoryContextSwitchTo(ApplyMessageContext);
+ /* Reset timeout. */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
- initReadOnlyStringInfo(&s, buf, len);
+ /* Ensure we are reading the data into our memory context. */
+ MemoryContextSwitchTo(ApplyMessageContext);
- c = pq_getmsgbyte(&s);
+ initReadOnlyStringInfo(&s, buf, len);
- if (c == 'w')
- {
- XLogRecPtr start_lsn;
- XLogRecPtr end_lsn;
- TimestampTz send_time;
+ c = pq_getmsgbyte(&s);
- start_lsn = pq_getmsgint64(&s);
- end_lsn = pq_getmsgint64(&s);
- send_time = pq_getmsgint64(&s);
+ if (c == 'w')
+ {
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz send_time;
- if (last_received < start_lsn)
- last_received = start_lsn;
+ start_lsn = pq_getmsgint64(&s);
+ end_lsn = pq_getmsgint64(&s);
+ send_time = pq_getmsgint64(&s);
- if (last_received < end_lsn)
- last_received = end_lsn;
+ if (last_received < start_lsn)
+ last_received = start_lsn;
- UpdateWorkerStats(last_received, send_time, false);
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- apply_dispatch(&s);
- }
- else if (c == 'k')
- {
- XLogRecPtr end_lsn;
- TimestampTz timestamp;
- bool reply_requested;
+ UpdateWorkerStats(last_received, send_time, false);
- end_lsn = pq_getmsgint64(&s);
- timestamp = pq_getmsgint64(&s);
- reply_requested = pq_getmsgbyte(&s);
+ apply_dispatch(&s);
+ }
+ else if (c == 'k')
+ {
+ XLogRecPtr end_lsn;
+ TimestampTz timestamp;
+ bool reply_requested;
- if (last_received < end_lsn)
- last_received = end_lsn;
+ end_lsn = pq_getmsgint64(&s);
+ timestamp = pq_getmsgint64(&s);
+ reply_requested = pq_getmsgbyte(&s);
- send_feedback(last_received, reply_requested, false);
- UpdateWorkerStats(last_received, timestamp, true);
- }
- /* other message types are purposefully ignored */
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- MemoryContextReset(ApplyMessageContext);
+ send_feedback(last_received, reply_requested, false);
+ UpdateWorkerStats(last_received, timestamp, true);
}
+ /* other message types are purposefully ignored */
+ MemoryContextReset(ApplyMessageContext);
+ }
+ if (prefetch_buf_pos < prefetch_buf_used)
+ {
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+ buf = &prefetch_buf[prefetch_buf_pos + 4];
+ prefetch_buf_pos += 4 + len;
+ }
+ else if (prefetch_buf_used != 0 && no_more_data)
+ {
+ break;
+ }
+ else
+ {
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
}
}
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 511dc32d519..3b254898663 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "replication/syncrep.h"
+#include "replication/worker_internal.h"
#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"prefetch_replica_identity_only",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Whether LR prefetch work should prefetch only replica identity index or all other indexes too."),
+ NULL,
+ },
+ &prefetch_replica_identity_only,
+ true,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"max_parallel_prefetch_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of parallel prefetch workers per subscription."),
+ NULL,
+ },
+ &max_parallel_prefetch_workers_per_subscription,
+ 2, 0, MAX_LR_PREFETCH_WORKERS,
+ NULL, NULL, NULL
+ },
+
{
{"max_active_replication_origins",
PGC_POSTMASTER,
diff --git a/src/include/nodes/lockoptions.h b/src/include/nodes/lockoptions.h
index 0b534e30603..88f5d2e4cc5 100644
--- a/src/include/nodes/lockoptions.h
+++ b/src/include/nodes/lockoptions.h
@@ -56,6 +56,10 @@ typedef enum LockTupleMode
LockTupleNoKeyExclusive,
/* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */
LockTupleExclusive,
+ /* Do not lock tuple */
+ LockTupleNoLock,
+ /* Try explusive lock, silent give up in case of conflict */
+ LockTupleTryExclusive,
} LockTupleMode;
#endif /* LOCKOPTIONS_H */
diff --git a/src/include/port/pg_iovec.h b/src/include/port/pg_iovec.h
index 90be3af449d..8fefeb8c245 100644
--- a/src/include/port/pg_iovec.h
+++ b/src/include/port/pg_iovec.h
@@ -53,6 +53,7 @@ struct iovec
static inline ssize_t
pg_preadv(int fd, const struct iovec *iov, int iovcnt, off_t offset)
{
+ pg_usleep(100);
#if HAVE_DECL_PREADV
/*
* Avoid a small amount of argument copying overhead in the kernel if
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 82b202f3305..19d1a8d466b 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
extern PGDLLIMPORT int max_logical_replication_workers;
extern PGDLLIMPORT int max_sync_workers_per_subscription;
extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952c..c6745e77efc 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared
*/
PartialFileSetState fileset_state;
FileSet fileset;
+
+ /*
+ * Prefetch worker
+ */
+ bool do_prefetch;
} ParallelApplyWorkerShared;
/*
@@ -237,6 +242,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
extern PGDLLIMPORT bool InitializingApplyWorker;
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
@@ -326,10 +339,13 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+extern void pa_prefetch_handle_modification(StringInfo s, LogicalRepMsgType action);
+
+#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
#define isTablesyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_parallel_worker(void);
static inline bool
am_tablesync_worker(void)