v7-0001-Parallel-index-autovacuum-with-bgworkers.patch
text/x-patch
Filename: v7-0001-Parallel-index-autovacuum-with-bgworkers.patch
Type: text/x-patch
Part: 1
From 55b76f15bbc3991b7457de6c1d6998d39b16292c Mon Sep 17 00:00:00 2001
From: Daniil Davidov <d.davydov@postgrespro.ru>
Date: Fri, 16 May 2025 11:58:40 +0700
Subject: [PATCH v7 1/2] Parallel index autovacuum with bgworkers
---
src/backend/access/common/reloptions.c | 12 ++
src/backend/access/heap/vacuumlazy.c | 6 +-
src/backend/commands/vacuumparallel.c | 57 ++++++--
src/backend/postmaster/autovacuum.c | 135 +++++++++++++++++-
src/backend/utils/init/globals.c | 1 +
src/backend/utils/misc/guc_tables.c | 10 ++
src/backend/utils/misc/postgresql.conf.sample | 1 +
src/include/miscadmin.h | 1 +
src/include/postmaster/autovacuum.h | 4 +
src/include/utils/guc_hooks.h | 2 +
src/include/utils/rel.h | 2 +
11 files changed, 220 insertions(+), 11 deletions(-)
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 50747c16396..e36d59f632b 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -222,6 +222,16 @@ static relopt_int intRelOpts[] =
},
SPGIST_DEFAULT_FILLFACTOR, SPGIST_MIN_FILLFACTOR, 100
},
+ {
+ {
+ "parallel_autovacuum_workers",
+ "Maximum number of parallel autovacuum workers that can be taken from bgworkers pool for processing this table. "
+ "If value is 0 then parallel degree will computed based on number of indexes.",
+ RELOPT_KIND_HEAP,
+ ShareUpdateExclusiveLock
+ },
+ -1, -1, 1024
+ },
{
{
"autovacuum_vacuum_threshold",
@@ -1872,6 +1882,8 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
{"fillfactor", RELOPT_TYPE_INT, offsetof(StdRdOptions, fillfactor)},
{"autovacuum_enabled", RELOPT_TYPE_BOOL,
offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, enabled)},
+ {"parallel_autovacuum_workers", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, parallel_autovacuum_workers)},
{"autovacuum_vacuum_threshold", RELOPT_TYPE_INT,
offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_threshold)},
{"autovacuum_vacuum_max_threshold", RELOPT_TYPE_INT,
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 14036c27e87..7e0ae0184aa 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -3477,6 +3477,10 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
autovacuum_work_mem != -1 ?
autovacuum_work_mem : maintenance_work_mem;
+ int elevel = AmAutoVacuumWorkerProcess() ||
+ vacrel->verbose ?
+ INFO : DEBUG2;
+
/*
* Initialize state for a parallel vacuum. As of now, only one worker can
* be used for an index, so we invoke parallelism only if there are at
@@ -3503,7 +3507,7 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels,
vacrel->nindexes, nworkers,
vac_work_mem,
- vacrel->verbose ? INFO : DEBUG2,
+ elevel,
vacrel->bstrategy);
/*
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index 0feea1d30ec..6ec610e29e4 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -1,7 +1,9 @@
/*-------------------------------------------------------------------------
*
* vacuumparallel.c
- * Support routines for parallel vacuum execution.
+ * Support routines for parallel vacuum and autovacuum execution. In the
+ * future comments, the word "vacuum" will refer to both vacuum and
+ * autovacuum.
*
* This file contains routines that are intended to support setting up, using,
* and tearing down a ParallelVacuumState.
@@ -34,6 +36,7 @@
#include "executor/instrument.h"
#include "optimizer/paths.h"
#include "pgstat.h"
+#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
@@ -371,10 +374,12 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
shared->relid = RelationGetRelid(rel);
shared->elevel = elevel;
shared->queryid = pgstat_get_my_query_id();
+
shared->maintenance_work_mem_worker =
(nindexes_mwm > 0) ?
- maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
- maintenance_work_mem;
+ vac_work_mem / Min(parallel_workers, nindexes_mwm) :
+ vac_work_mem;
+
shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
/* Prepare DSA space for dead items */
@@ -435,6 +440,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
void
parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
{
+ int nlaunched_workers;
+
Assert(!IsParallelWorker());
/* Copy the updated statistics */
@@ -453,7 +460,13 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
TidStoreDestroy(pvs->dead_items);
+ nlaunched_workers = pvs->pcxt->nworkers_launched; /* remember this value */
DestroyParallelContext(pvs->pcxt);
+
+ /* Release all launched (i.e. reserved) parallel autovacuum workers. */
+ if (AmAutoVacuumWorkerProcess())
+ AutoVacuumReleaseParallelWorkers(nlaunched_workers);
+
ExitParallelMode();
pfree(pvs->will_parallel_vacuum);
@@ -553,12 +566,17 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
int nindexes_parallel_bulkdel = 0;
int nindexes_parallel_cleanup = 0;
int parallel_workers;
+ int max_parallel_workers;
+
+ max_parallel_workers = AmAutoVacuumWorkerProcess() ?
+ autovacuum_max_parallel_workers :
+ max_parallel_maintenance_workers;
/*
* We don't allow performing parallel operation in standalone backend or
* when parallelism is disabled.
*/
- if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
+ if (!IsUnderPostmaster || max_parallel_workers == 0)
return 0;
/*
@@ -597,8 +615,8 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
parallel_workers = (nrequested > 0) ?
Min(nrequested, nindexes_parallel) : nindexes_parallel;
- /* Cap by max_parallel_maintenance_workers */
- parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+ /* Cap by GUC variable */
+ parallel_workers = Min(parallel_workers, max_parallel_workers);
return parallel_workers;
}
@@ -646,6 +664,13 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
*/
nworkers = Min(nworkers, pvs->pcxt->nworkers);
+ /*
+ * Also reserve workers in autovacuum global state. Note, that we may be
+ * given fewer workers than we requested.
+ */
+ if (AmAutoVacuumWorkerProcess() && nworkers > 0)
+ nworkers = AutoVacuumReserveParallelWorkers(nworkers);
+
/*
* Set index vacuum status and mark whether parallel vacuum worker can
* process it.
@@ -690,6 +715,16 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
LaunchParallelWorkers(pvs->pcxt);
+ if (AmAutoVacuumWorkerProcess() &&
+ pvs->pcxt->nworkers_launched < nworkers)
+ {
+ /*
+ * Tell autovacuum that we could not launch all the previously
+ * reserved workers.
+ */
+ AutoVacuumReleaseParallelWorkers(nworkers - pvs->pcxt->nworkers_launched);
+ }
+
if (pvs->pcxt->nworkers_launched > 0)
{
/*
@@ -709,13 +744,19 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
"launched %d parallel vacuum workers for index vacuuming (planned: %d)",
pvs->pcxt->nworkers_launched),
- pvs->pcxt->nworkers_launched, nworkers)));
+ pvs->pcxt->nworkers_launched, nworkers),
+ AmAutoVacuumWorkerProcess() ?
+ errhint("workers were launched for parallel autovacuum") :
+ errhint("workers were launched for parallel vacuum")));
else
ereport(pvs->shared->elevel,
(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
"launched %d parallel vacuum workers for index cleanup (planned: %d)",
pvs->pcxt->nworkers_launched),
- pvs->pcxt->nworkers_launched, nworkers)));
+ pvs->pcxt->nworkers_launched, nworkers),
+ AmAutoVacuumWorkerProcess() ?
+ errhint("workers were launched for parallel autovacuum") :
+ errhint("workers were launched for parallel vacuum")));
}
/* Vacuum the indexes that can be processed by only leader process */
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 9474095f271..98609ac8f8f 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -285,6 +285,7 @@ typedef struct AutoVacuumWorkItem
* av_workItems work item array
* av_nworkersForBalance the number of autovacuum workers to use when
* calculating the per worker cost limit
+ * av_freeParallelWorkers the number of free parallel autovacuum workers
*
* This struct is protected by AutovacuumLock, except for av_signal and parts
* of the worker list (see above).
@@ -299,6 +300,7 @@ typedef struct
WorkerInfo av_startingWorker;
AutoVacuumWorkItem av_workItems[NUM_WORKITEMS];
pg_atomic_uint32 av_nworkersForBalance;
+ uint32 av_freeParallelWorkers;
} AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem;
@@ -354,6 +356,7 @@ static void autovac_report_workitem(AutoVacuumWorkItem *workitem,
static void avl_sigusr2_handler(SIGNAL_ARGS);
static bool av_worker_available(void);
static void check_av_worker_gucs(void);
+static void check_parallel_av_gucs(int prev_max_parallel_workers);
@@ -753,7 +756,9 @@ ProcessAutoVacLauncherInterrupts(void)
if (ConfigReloadPending)
{
int autovacuum_max_workers_prev = autovacuum_max_workers;
+ int autovacuum_max_parallel_workers_prev;
+ autovacuum_max_parallel_workers_prev = autovacuum_max_parallel_workers;
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
@@ -769,6 +774,14 @@ ProcessAutoVacLauncherInterrupts(void)
if (autovacuum_max_workers_prev != autovacuum_max_workers)
check_av_worker_gucs();
+ /*
+ * If autovacuum_max_parallel_workers changed, we must take care of
+ * the correct value of available parallel autovacuum workers in shmem.
+ */
+ if (autovacuum_max_parallel_workers_prev !=
+ autovacuum_max_parallel_workers)
+ check_parallel_av_gucs(autovacuum_max_parallel_workers_prev);
+
/* rebuild the list in case the naptime changed */
rebuild_database_list(InvalidOid);
}
@@ -2847,8 +2860,12 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
*/
tab->at_params.index_cleanup = VACOPTVALUE_UNSPECIFIED;
tab->at_params.truncate = VACOPTVALUE_UNSPECIFIED;
- /* As of now, we don't support parallel vacuum for autovacuum */
- tab->at_params.nworkers = -1;
+
+ /* Decide whether we need to process indexes of table in parallel. */
+ tab->at_params.nworkers = avopts
+ ? avopts->parallel_autovacuum_workers
+ : -1;
+
tab->at_params.freeze_min_age = freeze_min_age;
tab->at_params.freeze_table_age = freeze_table_age;
tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age;
@@ -3329,6 +3346,64 @@ AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId,
return result;
}
+/*
+ * In order to meet the 'autovacuum_max_parallel_workers' limit, leader worker
+ * must call this function. It returns the number of parallel workers that
+ * actually can be launched and reserves (if any) these workers in global
+ * autovacuum state.
+ *
+ * NOTE: We will try to provide as many workers as requested, even if caller
+ * will occupy all available workers.
+ */
+int
+AutoVacuumReserveParallelWorkers(int nworkers)
+{
+ int can_launch;
+
+ /* Only leader worker can call this function. */
+ Assert(AmAutoVacuumWorkerProcess() && !IsParallelWorker());
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ /* Provide as many workers as we can. */
+ can_launch = Min(AutoVacuumShmem->av_freeParallelWorkers, nworkers);
+ AutoVacuumShmem->av_freeParallelWorkers -= nworkers;
+
+ LWLockRelease(AutovacuumLock);
+ return can_launch;
+}
+
+/*
+ * When parallel autovacuum worker die, leader worker must call this function
+ * in order to refresh global autovacuum state. Thus, other leaders will be able
+ * to use these workers.
+ *
+ * 'nworkers' - how many workers caller wants to release.
+ */
+void
+AutoVacuumReleaseParallelWorkers(int nworkers)
+{
+ /* Only leader worker can call this function. */
+ Assert(AmAutoVacuumWorkerProcess() && !IsParallelWorker());
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ AutoVacuumShmem->av_freeParallelWorkers += nworkers;
+
+ /*
+ * If autovacuum_max_parallel_workers variable was reduced during parallel
+ * autovacuum execution, we must cap available workers number by its new
+ * value.
+ */
+ if (AutoVacuumShmem->av_freeParallelWorkers >
+ autovacuum_max_parallel_workers)
+ {
+ AutoVacuumShmem->av_freeParallelWorkers =
+ autovacuum_max_parallel_workers;
+ }
+
+ LWLockRelease(AutovacuumLock);
+}
+
/*
* autovac_init
* This is called at postmaster initialization.
@@ -3389,6 +3464,8 @@ AutoVacuumShmemInit(void)
Assert(!found);
AutoVacuumShmem->av_launcherpid = 0;
+ AutoVacuumShmem->av_freeParallelWorkers =
+ autovacuum_max_parallel_workers;
dclist_init(&AutoVacuumShmem->av_freeWorkers);
dlist_init(&AutoVacuumShmem->av_runningWorkers);
AutoVacuumShmem->av_startingWorker = NULL;
@@ -3439,6 +3516,15 @@ check_autovacuum_work_mem(int *newval, void **extra, GucSource source)
return true;
}
+bool
+check_autovacuum_max_parallel_workers(int *newval, void **extra,
+ GucSource source)
+{
+ if (*newval >= max_worker_processes)
+ return false;
+ return true;
+}
+
/*
* Returns whether there is a free autovacuum worker slot available.
*/
@@ -3470,3 +3556,48 @@ check_av_worker_gucs(void)
errdetail("The server will only start up to \"autovacuum_worker_slots\" (%d) autovacuum workers at a given time.",
autovacuum_worker_slots)));
}
+
+/*
+ * Make sure that number of available parallel workers corresponds to the
+ * autovacuum_max_parallel_workers parameter (after it was changed).
+ */
+static void
+check_parallel_av_gucs(int prev_max_parallel_workers)
+{
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ if (AutoVacuumShmem->av_freeParallelWorkers >
+ autovacuum_max_parallel_workers)
+ {
+ Assert(prev_max_parallel_workers > autovacuum_max_parallel_workers);
+
+ /*
+ * Number of available workers must not exceed limit.
+ *
+ * Note, that if some parallel autovacuum workers are running at this
+ * moment, available workers number will not exceed limit after
+ * releasing them (see ParallelAutoVacuumReleaseWorkers).
+ */
+ AutoVacuumShmem->av_freeParallelWorkers =
+ autovacuum_max_parallel_workers;
+ }
+ else if ((AutoVacuumShmem->av_freeParallelWorkers <
+ autovacuum_max_parallel_workers) &&
+ (autovacuum_max_parallel_workers > prev_max_parallel_workers))
+ {
+ /*
+ * If user wants to increase number of parallel autovacuum workers, we
+ * must increase number of available workers in shmem.
+ */
+ AutoVacuumShmem->av_freeParallelWorkers +=
+ (autovacuum_max_parallel_workers - prev_max_parallel_workers);
+
+ /*
+ * Nothing to do when autovacuum_max_parallel_workers <
+ * prev_max_parallel_workers. Available workers number will be capped
+ * inside ParallelAutoVacuumReleaseWorkers.
+ */
+ }
+
+ LWLockRelease(AutovacuumLock);
+}
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index d31cb45a058..977644978c1 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -143,6 +143,7 @@ int NBuffers = 16384;
int MaxConnections = 100;
int max_worker_processes = 8;
int max_parallel_workers = 8;
+int autovacuum_max_parallel_workers = 0;
int MaxBackends = 0;
/* GUC parameters for vacuum */
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d14b1678e7f..b6a192af8f8 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3604,6 +3604,16 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"autovacuum_max_parallel_workers", PGC_SIGHUP, VACUUM_AUTOVACUUM,
+ gettext_noop("Maximum number of parallel autovacuum workers, that can be taken from bgworkers pool."),
+ gettext_noop("This parameter is capped by \"max_worker_processes\" (not by \"autovacuum_max_workers\"!)."),
+ },
+ &autovacuum_max_parallel_workers,
+ 0, 0, MAX_BACKENDS,
+ check_autovacuum_max_parallel_workers, NULL, NULL
+ },
+
{
{"max_parallel_maintenance_workers", PGC_USERSET, RESOURCES_WORKER_PROCESSES,
gettext_noop("Sets the maximum number of parallel processes per maintenance operation."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index a9d8293474a..bbf5307000f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -683,6 +683,7 @@
autovacuum_worker_slots = 16 # autovacuum worker slots to allocate
# (change requires restart)
#autovacuum_max_workers = 3 # max number of autovacuum subprocesses
+#autovacuum_max_parallel_workers = 0 # disabled by default and limited by max_worker_processes
#autovacuum_naptime = 1min # time between autovacuum runs
#autovacuum_vacuum_threshold = 50 # min number of row updates before
# vacuum
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 1bef98471c3..85926415657 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -177,6 +177,7 @@ extern PGDLLIMPORT int MaxBackends;
extern PGDLLIMPORT int MaxConnections;
extern PGDLLIMPORT int max_worker_processes;
extern PGDLLIMPORT int max_parallel_workers;
+extern PGDLLIMPORT int autovacuum_max_parallel_workers;
extern PGDLLIMPORT int commit_timestamp_buffers;
extern PGDLLIMPORT int multixact_member_buffers;
diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h
index e8135f41a1c..863d206f2bd 100644
--- a/src/include/postmaster/autovacuum.h
+++ b/src/include/postmaster/autovacuum.h
@@ -64,6 +64,10 @@ pg_noreturn extern void AutoVacWorkerMain(const void *startup_data, size_t start
extern bool AutoVacuumRequestWork(AutoVacuumWorkItemType type,
Oid relationId, BlockNumber blkno);
+/* parallel autovacuum stuff */
+extern int AutoVacuumReserveParallelWorkers(int nworkers);
+extern void AutoVacuumReleaseParallelWorkers(int nworkers);
+
/* shared memory stuff */
extern Size AutoVacuumShmemSize(void);
extern void AutoVacuumShmemInit(void);
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index 82ac8646a8d..b45023a90b2 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -31,6 +31,8 @@ extern void assign_application_name(const char *newval, void *extra);
extern const char *show_archive_command(void);
extern bool check_autovacuum_work_mem(int *newval, void **extra,
GucSource source);
+extern bool check_autovacuum_max_parallel_workers(int *newval, void **extra,
+ GucSource source);
extern bool check_vacuum_buffer_usage_limit(int *newval, void **extra,
GucSource source);
extern bool check_backtrace_functions(char **newval, void **extra,
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index b552359915f..29c32f75780 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -311,6 +311,8 @@ typedef struct ForeignKeyCacheInfo
typedef struct AutoVacOpts
{
bool enabled;
+ int parallel_autovacuum_workers; /* max number of parallel
+ autovacuum workers */
int vacuum_threshold;
int vacuum_max_threshold;
int vacuum_ins_threshold;
--
2.43.0