v2-0002-pg_stat_statements-modernize-entry-storage-with-p.patch

application/octet-stream

Filename: v2-0002-pg_stat_statements-modernize-entry-storage-with-p.patch
Type: application/octet-stream
Part: 3
Message: Re: Improve pg_stat_statements scalability
From 2c73cbe2091594561a52903ecbf7ff29805a0b2c Mon Sep 17 00:00:00 2001
From: Sami Imseih <samimseih@gmail.com>
Date: Thu, 14 May 2026 12:27:43 -0500
Subject: [PATCH v2 2/4] pg_stat_statements: modernize entry storage with
 pgstat kind

Replace the private shared-memory hash table with the pgstat subsystem's
dshash, move counter updates to backend-local pending entries that flush
periodically, and introduce admission control with timestamp-throttled
inline eviction: when entry count reaches pgss_max, a backend attempts
eviction using a conditional lock and a shared timestamp that ensures at
most one eviction cycle per 10 seconds.  Other backends skip entry
creation without blocking.

Variance/stddev computation uses Welford's online algorithm in per-backend
pending accumulation, merged into shared memory via Chan's parallel variance
algorithm during flush.
See <http://www.johndcook.com/blog/standard_deviation/>

pg_stat_statements.max is now PGC_SIGHUP (changeable without restart).
---
 contrib/pg_stat_statements/Makefile           |    1 +
 .../pg_stat_statements/expected/parallel.out  |    1 +
 contrib/pg_stat_statements/meson.build        |    1 +
 .../pg_stat_statements--1.13--1.14.sql        |    7 +
 .../pg_stat_statements/pg_stat_statements.c   | 1794 +++++++++--------
 .../pg_stat_statements.conf                   |    1 +
 .../pg_stat_statements.control                |    2 +-
 contrib/pg_stat_statements/sql/parallel.sql   |    1 +
 doc/src/sgml/pgstatstatements.sgml            |   18 +-
 src/tools/pgindent/typedefs.list              |    1 +
 10 files changed, 1011 insertions(+), 816 deletions(-)
 create mode 100644 contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql

diff --git a/contrib/pg_stat_statements/Makefile b/contrib/pg_stat_statements/Makefile
index c27e9529bb6..d7142f71cf7 100644
--- a/contrib/pg_stat_statements/Makefile
+++ b/contrib/pg_stat_statements/Makefile
@@ -7,6 +7,7 @@ OBJS = \
 
 EXTENSION = pg_stat_statements
 DATA = pg_stat_statements--1.4.sql \
+	pg_stat_statements--1.13--1.14.sql \
 	pg_stat_statements--1.12--1.13.sql \
 	pg_stat_statements--1.11--1.12.sql pg_stat_statements--1.10--1.11.sql \
 	pg_stat_statements--1.9--1.10.sql pg_stat_statements--1.8--1.9.sql \
diff --git a/contrib/pg_stat_statements/expected/parallel.out b/contrib/pg_stat_statements/expected/parallel.out
index 8af3bd2c915..bff0da7634b 100644
--- a/contrib/pg_stat_statements/expected/parallel.out
+++ b/contrib/pg_stat_statements/expected/parallel.out
@@ -20,6 +20,7 @@ SELECT count(*) FROM pgss_parallel_tab;
      0
 (1 row)
 
+RESET max_parallel_workers_per_gather;
 SELECT query,
   parallel_workers_to_launch > 0 AS has_workers_to_launch,
   parallel_workers_launched > 0 AS has_workers_launched
diff --git a/contrib/pg_stat_statements/meson.build b/contrib/pg_stat_statements/meson.build
index 9d78cb88b7d..77148949c0d 100644
--- a/contrib/pg_stat_statements/meson.build
+++ b/contrib/pg_stat_statements/meson.build
@@ -21,6 +21,7 @@ contrib_targets += pg_stat_statements
 install_data(
   'pg_stat_statements.control',
   'pg_stat_statements--1.4.sql',
+  'pg_stat_statements--1.13--1.14.sql',
   'pg_stat_statements--1.12--1.13.sql',
   'pg_stat_statements--1.11--1.12.sql',
   'pg_stat_statements--1.10--1.11.sql',
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql b/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql
new file mode 100644
index 00000000000..eb528a0d9ca
--- /dev/null
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql
@@ -0,0 +1,7 @@
+/* contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql */
+
+-- complain if script is sourced in psql, rather than via ALTER EXTENSION
+\echo Use "ALTER EXTENSION pg_stat_statements UPDATE TO '1.14'" to load this file. \quit
+
+ALTER FUNCTION pg_stat_statements(boolean) PARALLEL RESTRICTED;
+ALTER FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) PARALLEL RESTRICTED;
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 92315627916..0e6e65e3e51 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -5,8 +5,10 @@
  *		usage across a whole database cluster.
  *
  * Execution costs are totaled for each distinct source query, and kept in
- * a shared hashtable.  (We track only as many distinct queries as will fit
- * in the designated amount of shared memory.)
+ * a dshash table managed by the pgstat subsystem (custom stats kind
+ * PGSTAT_KIND_PGSS).  Counter updates accumulate in backend-local pending
+ * entries and are flushed to shared memory periodically or on demand via
+ * pgstat_report_anytime_stat().
  *
  * Starting in Postgres 9.2, this module normalized query entries.  As of
  * Postgres 14, the normalization is done by the core if compute_query_id is
@@ -14,24 +16,15 @@
  *
  * To facilitate presenting entries to users, we create "representative" query
  * strings in which constants are replaced with parameter symbols ($n), to
- * make it clearer what a normalized entry can represent.  To save on shared
- * memory, and to avoid having to truncate oversized query strings, we store
- * these strings in a temporary external query-texts file.  Offsets into this
- * file are kept in shared memory.
+ * make it clearer what a normalized entry can represent.
  *
- * Note about locking issues: to create or delete an entry in the shared
- * hashtable, one must hold pgss->lock exclusively.  Modifying any field
- * in an entry except the counters requires the same.  To look up an entry,
- * one must hold the lock shared.  To read or update the counters within
- * an entry, one must hold the lock shared or exclusive (so the entry doesn't
- * disappear!) and also take the entry's mutex spinlock.
- * The shared state variable pgss->extent (the next free spot in the external
- * query-text file) should be accessed only while holding either the
- * pgss->mutex spinlock, or exclusive lock on pgss->lock.  We use the mutex to
- * allow reserving file space while holding only shared lock on pgss->lock.
- * Rewriting the entire external query-text file, eg for garbage collection,
- * requires holding pgss->lock exclusively; this allows individual entries
- * in the file to be read or written while holding only shared lock.
+ * Each shared pgstat entry carries its own query text, stored in an
+ * external file (PGSS_TEXT_FILE).
+ *
+ * Eviction of least-used entries is throttled to run at most once every
+ * EVICTION_INTERVAL_MS milliseconds.  When eviction is needed, a single
+ * backend performs it inline using a conditional lock; other backends simply
+ * skip entry creation until space is freed.
  *
  *
  * Copyright (c) 2008-2026, PostgreSQL Global Development Group
@@ -49,6 +42,7 @@
 
 #include "access/htup_details.h"
 #include "access/parallel.h"
+#include "access/xact.h"
 #include "catalog/pg_authid.h"
 #include "executor/instrument.h"
 #include "funcapi.h"
@@ -58,7 +52,9 @@
 #include "nodes/queryjumble.h"
 #include "optimizer/planner.h"
 #include "parser/analyze.h"
+#include "common/hashfn.h"
 #include "pgstat.h"
+#include "utils/pgstat_internal.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
@@ -76,9 +72,6 @@ PG_MODULE_MAGIC_EXT(
 					.version = PG_VERSION
 );
 
-/* Location of permanent stats file (valid when database is shut down) */
-#define PGSS_DUMP_FILE	PGSTAT_STAT_PERMANENT_DIRECTORY "/pg_stat_statements.stat"
-
 /*
  * Location of external query text file.
  */
@@ -87,18 +80,16 @@ PG_MODULE_MAGIC_EXT(
 /* Magic number identifying the stats file format */
 static const uint32 PGSS_FILE_HEADER = 0x20250731;
 
-/* PostgreSQL major version number, changes in which invalidate all entries */
-static const uint32 PGSS_PG_MAJOR_VERSION = PG_VERSION_NUM / 100;
+/* Custom pgstat kind ID for pg_stat_statements entries */
+#define PGSTAT_KIND_PGSS	PGSTAT_KIND_EXPERIMENTAL
 
 /* XXX: Should USAGE_EXEC reflect execution time and/or buffer usage? */
 #define USAGE_EXEC(duration)	(1.0)
 #define USAGE_INIT				(1.0)	/* including initial planning */
-#define ASSUMED_MEDIAN_INIT		(10.0)	/* initial assumed median usage */
 #define ASSUMED_LENGTH_INIT		1024	/* initial assumed mean query length */
 #define USAGE_DECREASE_FACTOR	(0.99)	/* decreased every entry_dealloc */
-#define STICKY_DECREASE_FACTOR	(0.50)	/* factor for sticky entries */
 #define USAGE_DEALLOC_PERCENT	5	/* free this % of entries at once */
-#define IS_STICKY(c)	((c.calls[PGSS_PLAN] + c.calls[PGSS_EXEC]) == 0)
+#define EVICTION_INTERVAL_MS	10000	/* min ms between eviction cycles */
 
 /*
  * Extension version number, for supporting older extension versions' objects
@@ -140,18 +131,27 @@ typedef enum pgssStoreKind
  * zero the padding bytes.  Otherwise, things will break, because pgss_hash is
  * created using HASH_BLOBS, and thus tag_hash is used to hash this.
  */
-typedef struct pgssHashKey
+typedef struct pgssHashKey pgssHashKey;
+typedef struct Counters Counters;
+typedef struct pgssGlobalStats pgssGlobalStats;
+typedef struct pgssSharedState pgssSharedState;
+typedef struct PgStatShared_Pgss PgStatShared_Pgss;
+typedef struct PgStat_PendingPgss PgStat_PendingPgss;
+typedef struct UsageEntry UsageEntry;
+typedef struct PendingDrop PendingDrop;
+
+struct pgssHashKey
 {
 	Oid			userid;			/* user OID */
 	Oid			dbid;			/* database OID */
 	int64		queryid;		/* query identifier */
 	bool		toplevel;		/* query executed at top level */
-} pgssHashKey;
+};
 
 /*
  * The actual stats counters kept within pgssEntry.
  */
-typedef struct Counters
+struct Counters
 {
 	int64		calls[PGSS_NUMKIND];	/* # of times planned/executed */
 	double		total_time[PGSS_NUMKIND];	/* total planning/execution time,
@@ -212,54 +212,76 @@ typedef struct Counters
 											 * launched */
 	int64		generic_plan_calls; /* number of calls using a generic plan */
 	int64		custom_plan_calls;	/* number of calls using a custom plan */
-} Counters;
+};
 
 /*
  * Global statistics for pg_stat_statements
  */
-typedef struct pgssGlobalStats
+struct pgssGlobalStats
 {
 	int64		dealloc;		/* # of times entries were deallocated */
 	TimestampTz stats_reset;	/* timestamp with all stats reset */
-} pgssGlobalStats;
-
-/*
- * Statistics per statement
- *
- * Note: in event of a failure in garbage collection of the query text file,
- * we reset query_offset to zero and query_len to -1.  This will be seen as
- * an invalid state by qtext_fetch().
- */
-typedef struct pgssEntry
-{
-	pgssHashKey key;			/* hash key of entry - MUST BE FIRST */
-	Counters	counters;		/* the statistics for this query */
-	Size		query_offset;	/* query text offset in external file */
-	int			query_len;		/* # of valid bytes in query string, or -1 */
-	int			encoding;		/* query text encoding */
-	TimestampTz stats_since;	/* timestamp of entry allocation */
-	TimestampTz minmax_stats_since; /* timestamp of last min/max values reset */
-	slock_t		mutex;			/* protects the counters only */
-} pgssEntry;
+};
 
 /*
  * Global shared state
  */
-typedef struct pgssSharedState
+struct pgssSharedState
 {
-	LWLockPadded lock;			/* protects hashtable search/modification */
-	double		cur_median_usage;	/* current median usage in hashtable */
+	LWLockPadded lock;			/* protects query text file operations */
 	Size		mean_query_len; /* current mean entry text length */
 	slock_t		mutex;			/* protects following fields only: */
 	Size		extent;			/* current extent of query file */
 	int			n_writers;		/* number of active writers to query file */
 	int			gc_count;		/* query file garbage collection cycle count */
+	TimestampTz last_eviction_time; /* throttle: last time entry_dealloc ran */
 	pgssGlobalStats stats;		/* global statistics for pgss */
-} pgssSharedState;
+};
+
+/*
+ * Shared memory entry for pgstat custom kind.
+ * This is what lives in the pgstat shared hash table.
+ */
+struct PgStatShared_Pgss
+{
+	PgStatShared_Common header;
+	pgssHashKey key;			/* full original key for reconstruction */
+	Counters	counters;		/* the statistics */
+	TimestampTz stats_since;	/* timestamp of entry allocation */
+	TimestampTz minmax_stats_since; /* timestamp of last min/max reset */
+	int			query_len;		/* length of query text, or -1 if invalid */
+	int			encoding;		/* encoding of query text */
+
+	Size		query_offset;	/* offset in external query text file */
+};
+
+/*
+ * Pending (backend-local) stats entry, accumulated before flush.
+ */
+struct PgStat_PendingPgss
+{
+	Counters	counters;
+};
+
+/*
+ * Used during entry reset to collect keys for deferred drop.
+ */
+struct PendingDrop
+{
+	Oid			dbid;
+	uint64		objid;
+};
 
 /* Links to shared memory state */
 static pgssSharedState *pgss;
-static HTAB *pgss_hash;
+
+/* Buffer used during serialization to avoid reloading text file per entry */
+static char *pgss_qtext_write_buffer = NULL;
+static Size pgss_qtext_write_buffer_size = 0;
+
+/* File handle used during deserialization to rebuild query text file */
+static FILE *pgss_qtext_rebuild_file = NULL;
+static Size pgss_qtext_rebuild_extent = 0;
 
 static void pgss_shmem_request(void *arg);
 static void pgss_shmem_init(void *arg);
@@ -269,6 +291,32 @@ static const ShmemCallbacks pgss_shmem_callbacks = {
 	.init_fn = pgss_shmem_init,
 };
 
+/* pgstat custom kind callbacks */
+static bool pgss_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait);
+static void pgss_to_serialized_data(const PgStat_HashKey *key,
+									const PgStatShared_Common *header,
+									FILE *statfile);
+static bool pgss_from_serialized_data(const PgStat_HashKey *key,
+									  PgStatShared_Common *header,
+									  FILE *statfile);
+static void pgss_finish(PgStat_StatsFileOp status);
+
+static const PgStat_KindInfo pgss_kind_info = {
+	.name = "pg_stat_statements",
+	.fixed_amount = false,
+	.write_to_file = true,
+	.track_entry_count = true,
+	.accessed_across_databases = true,
+	.shared_size = sizeof(PgStatShared_Pgss),
+	.shared_data_off = offsetof(PgStatShared_Pgss, key),
+	.shared_data_len = sizeof(PgStatShared_Pgss) - offsetof(PgStatShared_Pgss, key),
+	.pending_size = sizeof(PgStat_PendingPgss),
+	.flush_pending_cb = pgss_flush_pending_cb,
+	.to_serialized_data = pgss_to_serialized_data,
+	.from_serialized_data = pgss_from_serialized_data,
+	.finish = pgss_finish,
+};
+
 /*---- Local variables ----*/
 
 /* Current nesting depth of planner/ExecutorRun/ProcessUtility calls */
@@ -306,7 +354,6 @@ static bool pgss_track_utility = true;	/* whether to track utility commands */
 static bool pgss_track_planning = false;	/* whether to track planning
 											 * duration */
 static bool pgss_save = true;	/* whether to save stats across shutdown */
-
 #define pgss_enabled(level) \
 	(!IsParallelWorker() && \
 	(pgss_track == PGSS_TRACK_ALL || \
@@ -335,7 +382,6 @@ PG_FUNCTION_INFO_V1(pg_stat_statements_1_13);
 PG_FUNCTION_INFO_V1(pg_stat_statements);
 PG_FUNCTION_INFO_V1(pg_stat_statements_info);
 
-static void pgss_shmem_shutdown(int code, Datum arg);
 static void pgss_post_parse_analyze(ParseState *pstate, Query *query,
 									const JumbleState *jstate);
 static PlannedStmt *pgss_planner(Query *parse,
@@ -368,8 +414,6 @@ static void pgss_store(const char *query, int64 queryId,
 static void pg_stat_statements_internal(FunctionCallInfo fcinfo,
 										pgssVersion api_version,
 										bool showtext);
-static pgssEntry *entry_alloc(pgssHashKey *key, Size query_offset, int query_len,
-							  int encoding, bool sticky);
 static void entry_dealloc(void);
 static bool qtext_store(const char *query, int query_len,
 						Size *query_offset, int *gc_count);
@@ -382,6 +426,35 @@ static TimestampTz entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_
 static char *generate_normalized_query(const JumbleState *jstate,
 									   const char *query,
 									   int query_loc, int *query_len_p);
+static void pgss_entry_init(PgStatShared_Pgss *shared_entry,
+							const pgssHashKey *key, int encoding);
+static void pgss_store_query_text(PgStatShared_Pgss *shared_entry,
+								  const char *query, int query_len,
+								  int encoding);
+
+struct UsageEntry
+{
+	pgssHashKey key;
+	double		usage;
+};
+
+static int	entry_cmp(const void *a, const void *b);
+static void pgss_maybe_evict(void);
+
+/*
+ * Compute a uint64 objid from a pgssHashKey for use in PgStat_HashKey.
+ * We hash (userid, queryid, toplevel) together since dbid goes into dboid.
+ */
+static inline uint64
+pgss_objid(const pgssHashKey *key)
+{
+	uint64		hashval;
+
+	hashval = murmurhash64((uint64) key->userid);
+	hashval = hash_combine64(hashval, murmurhash64((uint64) key->queryid));
+	hashval = hash_combine64(hashval, murmurhash64((uint64) key->toplevel));
+	return hashval;
+}
 
 /*
  * Module load callback
@@ -416,7 +489,7 @@ _PG_init(void)
 							5000,
 							100,
 							INT_MAX / 2,
-							PGC_POSTMASTER,
+							PGC_SIGHUP,
 							0,
 							NULL,
 							NULL,
@@ -474,6 +547,11 @@ _PG_init(void)
 	 */
 	RegisterShmemCallbacks(&pgss_shmem_callbacks);
 
+	/*
+	 * Register custom statistics kind for pg_stat_statements entries.
+	 */
+	pgstat_register_kind(PGSTAT_KIND_PGSS, &pgss_kind_info);
+
 	/*
 	 * Install hooks.
 	 */
@@ -505,13 +583,6 @@ _PG_init(void)
 static void
 pgss_shmem_request(void *arg)
 {
-	ShmemRequestHash(.name = "pg_stat_statements hash",
-					 .nelems = pgss_max,
-					 .hash_info.keysize = sizeof(pgssHashKey),
-					 .hash_info.entrysize = sizeof(pgssEntry),
-					 .hash_flags = HASH_ELEM | HASH_BLOBS,
-					 .ptr = &pgss_hash,
-		);
 	ShmemRequestStruct(.name = "pg_stat_statements",
 					   .size = sizeof(pgssSharedState),
 					   .ptr = (void **) &pgss,
@@ -530,14 +601,7 @@ static void
 pgss_shmem_init(void *arg)
 {
 	int			tranche_id;
-	FILE	   *file = NULL;
 	FILE	   *qfile = NULL;
-	uint32		header;
-	int32		num;
-	int32		pgver;
-	int32		i;
-	int			buffer_size;
-	char	   *buffer = NULL;
 
 	/*
 	 * We already checked that we're loaded from shared_preload_libraries in
@@ -546,285 +610,33 @@ pgss_shmem_init(void *arg)
 	Assert(!IsUnderPostmaster);
 
 	/*
-	 * Initialize the shmem area with no statistics.
+	 * Initialize the shmem area.
 	 */
 	tranche_id = LWLockNewTrancheId("pg_stat_statements");
 	LWLockInitialize(&pgss->lock.lock, tranche_id);
-	pgss->cur_median_usage = ASSUMED_MEDIAN_INIT;
 	pgss->mean_query_len = ASSUMED_LENGTH_INIT;
 	SpinLockInit(&pgss->mutex);
 	pgss->extent = 0;
 	pgss->n_writers = 0;
 	pgss->gc_count = 0;
+	pgss->last_eviction_time = 0;
 	pgss->stats.dealloc = 0;
 	pgss->stats.stats_reset = GetCurrentTimestamp();
 
-	/* The hash table must've also been initialized by now */
-	Assert(pgss_hash != NULL);
-
-	/*
-	 * Set up a shmem exit hook to dump the statistics to disk on postmaster
-	 * (or standalone backend) exit.
-	 */
-	on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
-
-	/*
-	 * Load any pre-existing statistics from file.
-	 *
-	 * Note: we don't bother with locks here, because there should be no other
-	 * processes running when this code is reached.
-	 */
-
 	/* Unlink query text file possibly left over from crash */
 	unlink(PGSS_TEXT_FILE);
 
 	/* Allocate new query text temp file */
 	qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
 	if (qfile == NULL)
-		goto write_error;
-
-	/*
-	 * If we were told not to load old statistics, we're done.  (Note we do
-	 * not try to unlink any old dump file in this case.  This seems a bit
-	 * questionable but it's the historical behavior.)
-	 */
-	if (!pgss_save)
-	{
-		FreeFile(qfile);
-		return;
-	}
-
-	/*
-	 * Attempt to load old statistics from the dump file.
-	 */
-	file = AllocateFile(PGSS_DUMP_FILE, PG_BINARY_R);
-	if (file == NULL)
 	{
-		if (errno != ENOENT)
-			goto read_error;
-		/* No existing persisted stats file, so we're done */
-		FreeFile(qfile);
+		ereport(LOG,
+				(errcode_for_file_access(),
+				 errmsg("could not write file \"%s\": %m",
+						PGSS_TEXT_FILE)));
 		return;
 	}
-
-	buffer_size = 2048;
-	buffer = (char *) palloc(buffer_size);
-
-	if (fread(&header, sizeof(uint32), 1, file) != 1 ||
-		fread(&pgver, sizeof(uint32), 1, file) != 1 ||
-		fread(&num, sizeof(int32), 1, file) != 1)
-		goto read_error;
-
-	if (header != PGSS_FILE_HEADER ||
-		pgver != PGSS_PG_MAJOR_VERSION)
-		goto data_error;
-
-	for (i = 0; i < num; i++)
-	{
-		pgssEntry	temp;
-		pgssEntry  *entry;
-		Size		query_offset;
-
-		if (fread(&temp, sizeof(pgssEntry), 1, file) != 1)
-			goto read_error;
-
-		/* Encoding is the only field we can easily sanity-check */
-		if (!PG_VALID_BE_ENCODING(temp.encoding))
-			goto data_error;
-
-		/* Resize buffer as needed */
-		if (temp.query_len >= buffer_size)
-		{
-			buffer_size = Max(buffer_size * 2, temp.query_len + 1);
-			buffer = repalloc(buffer, buffer_size);
-		}
-
-		if (fread(buffer, 1, temp.query_len + 1, file) != temp.query_len + 1)
-			goto read_error;
-
-		/* Should have a trailing null, but let's make sure */
-		buffer[temp.query_len] = '\0';
-
-		/* Skip loading "sticky" entries */
-		if (IS_STICKY(temp.counters))
-			continue;
-
-		/* Store the query text */
-		query_offset = pgss->extent;
-		if (fwrite(buffer, 1, temp.query_len + 1, qfile) != temp.query_len + 1)
-			goto write_error;
-		pgss->extent += temp.query_len + 1;
-
-		/* make the hashtable entry (discards old entries if too many) */
-		entry = entry_alloc(&temp.key, query_offset, temp.query_len,
-							temp.encoding,
-							false);
-
-		/* copy in the actual stats */
-		entry->counters = temp.counters;
-		entry->stats_since = temp.stats_since;
-		entry->minmax_stats_since = temp.minmax_stats_since;
-	}
-
-	/* Read global statistics for pg_stat_statements */
-	if (fread(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
-		goto read_error;
-
-	pfree(buffer);
-	FreeFile(file);
 	FreeFile(qfile);
-
-	/*
-	 * Remove the persisted stats file so it's not included in
-	 * backups/replication standbys, etc.  A new file will be written on next
-	 * shutdown.
-	 *
-	 * Note: it's okay if the PGSS_TEXT_FILE is included in a basebackup,
-	 * because we remove that file on startup; it acts inversely to
-	 * PGSS_DUMP_FILE, in that it is only supposed to be around when the
-	 * server is running, whereas PGSS_DUMP_FILE is only supposed to be around
-	 * when the server is not running.  Leaving the file creates no danger of
-	 * a newly restored database having a spurious record of execution costs,
-	 * which is what we're really concerned about here.
-	 */
-	unlink(PGSS_DUMP_FILE);
-
-	return;
-
-read_error:
-	ereport(LOG,
-			(errcode_for_file_access(),
-			 errmsg("could not read file \"%s\": %m",
-					PGSS_DUMP_FILE)));
-	goto fail;
-data_error:
-	ereport(LOG,
-			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-			 errmsg("ignoring invalid data in file \"%s\"",
-					PGSS_DUMP_FILE)));
-	goto fail;
-write_error:
-	ereport(LOG,
-			(errcode_for_file_access(),
-			 errmsg("could not write file \"%s\": %m",
-					PGSS_TEXT_FILE)));
-fail:
-	if (buffer)
-		pfree(buffer);
-	if (file)
-		FreeFile(file);
-	if (qfile)
-		FreeFile(qfile);
-	/* If possible, throw away the bogus file; ignore any error */
-	unlink(PGSS_DUMP_FILE);
-
-	/*
-	 * Don't unlink PGSS_TEXT_FILE here; it should always be around while the
-	 * server is running with pg_stat_statements enabled
-	 */
-}
-
-/*
- * shmem_shutdown hook: Dump statistics into file.
- *
- * Note: we don't bother with acquiring lock, because there should be no
- * other processes running when this is called.
- */
-static void
-pgss_shmem_shutdown(int code, Datum arg)
-{
-	FILE	   *file;
-	char	   *qbuffer = NULL;
-	Size		qbuffer_size = 0;
-	HASH_SEQ_STATUS hash_seq;
-	int32		num_entries;
-	pgssEntry  *entry;
-
-	/* Don't try to dump during a crash. */
-	if (code)
-		return;
-
-	/* Safety check ... shouldn't get here unless shmem is set up. */
-	if (!pgss || !pgss_hash)
-		return;
-
-	/* Don't dump if told not to. */
-	if (!pgss_save)
-		return;
-
-	file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W);
-	if (file == NULL)
-		goto error;
-
-	if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
-		goto error;
-	if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1)
-		goto error;
-	num_entries = hash_get_num_entries(pgss_hash);
-	if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
-		goto error;
-
-	qbuffer = qtext_load_file(&qbuffer_size);
-	if (qbuffer == NULL)
-		goto error;
-
-	/*
-	 * When serializing to disk, we store query texts immediately after their
-	 * entry data.  Any orphaned query texts are thereby excluded.
-	 */
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
-	{
-		int			len = entry->query_len;
-		char	   *qstr = qtext_fetch(entry->query_offset, len,
-									   qbuffer, qbuffer_size);
-
-		if (qstr == NULL)
-			continue;			/* Ignore any entries with bogus texts */
-
-		if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 ||
-			fwrite(qstr, 1, len + 1, file) != len + 1)
-		{
-			/* note: we assume hash_seq_term won't change errno */
-			hash_seq_term(&hash_seq);
-			goto error;
-		}
-	}
-
-	/* Dump global statistics for pg_stat_statements */
-	if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
-		goto error;
-
-	pfree(qbuffer);
-	qbuffer = NULL;
-
-	if (FreeFile(file))
-	{
-		file = NULL;
-		goto error;
-	}
-
-	/*
-	 * Rename file into place, so we atomically replace any old one.
-	 */
-	(void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG);
-
-	/* Unlink query-texts file; it's not needed while shutdown */
-	unlink(PGSS_TEXT_FILE);
-
-	return;
-
-error:
-	ereport(LOG,
-			(errcode_for_file_access(),
-			 errmsg("could not write file \"%s\": %m",
-					PGSS_DUMP_FILE ".tmp")));
-	if (qbuffer)
-		pfree(qbuffer);
-	if (file)
-		FreeFile(file);
-	unlink(PGSS_DUMP_FILE ".tmp");
-	unlink(PGSS_TEXT_FILE);
 }
 
 /*
@@ -837,7 +649,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query, const JumbleState *jst
 		prev_post_parse_analyze_hook(pstate, query, jstate);
 
 	/* Safety check... */
-	if (!pgss || !pgss_hash || !pgss_enabled(nesting_level))
+	if (!pgss || !pgss_enabled(nesting_level))
 		return;
 
 	/*
@@ -1254,9 +1066,62 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
 	}
 }
 
+/*
+ * Initialize a freshly-created shared entry.
+ *
+ * Caller must hold the entry lock.  The entry is considered "new" when
+ * key.queryid is still zero (as left by pgstat entry creation).
+ */
+static void
+pgss_entry_init(PgStatShared_Pgss *shared_entry,
+				const pgssHashKey *key, int encoding)
+{
+	if (shared_entry->key.queryid != INT64CONST(0))
+		return;
+
+	shared_entry->key = *key;
+	memset(&shared_entry->counters, 0, sizeof(Counters));
+	shared_entry->counters.usage = USAGE_INIT;
+	shared_entry->stats_since = GetCurrentTimestamp();
+	shared_entry->minmax_stats_since = shared_entry->stats_since;
+	shared_entry->query_len = -1;
+	shared_entry->encoding = encoding;
+	shared_entry->query_offset = 0;
+}
+
+/*
+ * Store query text into a shared entry via the external text file.
+ *
+ * Caller must hold the entry lock.  Does nothing if text is already present.
+ */
+static void
+pgss_store_query_text(PgStatShared_Pgss *shared_entry,
+					  const char *query, int query_len, int encoding)
+{
+	Size		query_offset;
+	int			gc_count;
+
+	if (shared_entry->query_len >= 0)
+		return;
+
+	LWLockAcquire(&pgss->lock.lock, LW_SHARED);
+	if (qtext_store(query, query_len, &query_offset, &gc_count))
+	{
+		shared_entry->query_offset = query_offset;
+		shared_entry->query_len = query_len;
+		shared_entry->encoding = encoding;
+	}
+	LWLockRelease(&pgss->lock.lock);
+}
+
 /*
  * Store some statistics for a statement.
  *
+ * Shared entry creation and query text storage are written directly to
+ * shared memory, making entries immediately visible to other backends.
+ * Counter accumulation is done in backend-local pending entries, flushed
+ * periodically by pgss_flush_pending_cb.
+ *
  * If jstate is not NULL then we're trying to create an entry for which
  * we have no statistics as yet; we just want to record the normalized
  * query string.  total_time, rows, bufusage and walusage are ignored in this
@@ -1279,14 +1144,16 @@ pgss_store(const char *query, int64 queryId,
 		   PlannedStmtOrigin planOrigin)
 {
 	pgssHashKey key;
-	pgssEntry  *entry;
+	PgStat_EntryRef *entry_ref;
+	PgStatShared_Pgss *shared_entry;
+	PgStat_PendingPgss *pending;
 	char	   *norm_query = NULL;
 	int			encoding = GetDatabaseEncoding();
 
 	Assert(query != NULL);
 
 	/* Safety check... */
-	if (!pgss || !pgss_hash)
+	if (!pgss)
 		return;
 
 	/*
@@ -1313,192 +1180,171 @@ pgss_store(const char *query, int64 queryId,
 	key.queryid = queryId;
 	key.toplevel = (nesting_level == 0);
 
-	/* Lookup the hash table entry with shared lock. */
-	LWLockAcquire(&pgss->lock.lock, LW_SHARED);
-
-	entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
-
-	/* Create new entry, if not present */
-	if (!entry)
+	/*
+	 * If jstate is set, create the shared entry and store normalized query
+	 * text.  Don't increment counters; entries with zero calls are not
+	 * returned by pg_stat_statements_internal().
+	 */
+	if (jstate)
 	{
-		Size		query_offset;
-		int			gc_count;
-		bool		stored;
-		bool		do_gc;
+		const char *store_query;
 
-		/*
-		 * Create a new, normalized query string if caller asked.  We don't
-		 * need to hold the lock while doing this work.  (Note: in any case,
-		 * it's possible that someone else creates a duplicate hashtable entry
-		 * in the interval where we don't hold the lock below.  That case is
-		 * handled by entry_alloc.)
-		 */
-		if (jstate)
+		if (pgstat_get_entry_count(PGSTAT_KIND_PGSS) >= pgss_max)
 		{
-			LWLockRelease(&pgss->lock.lock);
-			norm_query = generate_normalized_query(jstate, query,
-												   query_location,
-												   &query_len);
-			LWLockAcquire(&pgss->lock.lock, LW_SHARED);
+			pgss_maybe_evict();
+			return;
 		}
 
-		/* Append new query text to file with only shared lock held */
-		stored = qtext_store(norm_query ? norm_query : query, query_len,
-							 &query_offset, &gc_count);
-
-		/*
-		 * Determine whether we need to garbage collect external query texts
-		 * while the shared lock is still held.  This micro-optimization
-		 * avoids taking the time to decide this while holding exclusive lock.
-		 */
-		do_gc = need_gc_qtexts();
-
-		/* Need exclusive lock to make a new hashtable entry - promote */
-		LWLockRelease(&pgss->lock.lock);
-		LWLockAcquire(&pgss->lock.lock, LW_EXCLUSIVE);
+		norm_query = generate_normalized_query(jstate, query,
+											   query_location,
+											   &query_len);
+		store_query = norm_query ? norm_query : query;
 
-		/*
-		 * A garbage collection may have occurred while we weren't holding the
-		 * lock.  In the unlikely event that this happens, the query text we
-		 * stored above will have been garbage collected, so write it again.
-		 * This should be infrequent enough that doing it while holding
-		 * exclusive lock isn't a performance problem.
-		 */
-		if (!stored || pgss->gc_count != gc_count)
-			stored = qtext_store(norm_query ? norm_query : query, query_len,
-								 &query_offset, NULL);
+		entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_PGSS,
+												key.dbid,
+												pgss_objid(&key),
+												true);
+		if (!entry_ref)
+		{
+			if (norm_query)
+				pfree(norm_query);
+			return;
+		}
 
-		/* If we failed to write to the text file, give up */
-		if (!stored)
-			goto done;
+		shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats;
+		pgss_entry_init(shared_entry, &key, encoding);
+		pgss_store_query_text(shared_entry, store_query, query_len, encoding);
 
-		/* OK to create a new hashtable entry */
-		entry = entry_alloc(&key, query_offset, query_len, encoding,
-							jstate != NULL);
+		pgstat_unlock_entry(entry_ref);
 
-		/* If needed, perform garbage collection while exclusive lock held */
-		if (do_gc)
-			gc_qtexts();
+		if (norm_query)
+			pfree(norm_query);
+		return;
 	}
 
-	/* Increment the counts, except when jstate is not NULL */
-	if (!jstate)
+	/*
+	 * Normal case: accumulate stats in a pending entry.  The pending entry
+	 * will be flushed to shared memory by pgss_flush_pending_cb.
+	 *
+	 * But first, ensure the shared entry exists with query text.
+	 */
+	entry_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS, key.dbid,
+									 pgss_objid(&key), false, NULL);
+	if (!entry_ref)
 	{
-		Assert(kind == PGSS_PLAN || kind == PGSS_EXEC);
-
 		/*
-		 * Grab the spinlock while updating the counters (see comment about
-		 * locking rules at the head of the file)
+		 * Entry doesn't exist yet.  Don't create a new one if we've already
+		 * hit the configured maximum; eviction will free space eventually.
 		 */
-		SpinLockAcquire(&entry->mutex);
+		if (pgstat_get_entry_count(PGSTAT_KIND_PGSS) >= pgss_max)
+		{
+			pgss_maybe_evict();
+			return;
+		}
 
-		/* "Unstick" entry if it was previously sticky */
-		if (IS_STICKY(entry->counters))
-			entry->counters.usage = USAGE_INIT;
+		entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_PGSS,
+												key.dbid,
+												pgss_objid(&key),
+												true);
+		if (!entry_ref)
+			return;
 
-		entry->counters.calls[kind] += 1;
-		entry->counters.total_time[kind] += total_time;
+		shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats;
+		pgss_entry_init(shared_entry, &key, encoding);
+		pgss_store_query_text(shared_entry, query, query_len, encoding);
 
-		if (entry->counters.calls[kind] == 1)
-		{
-			entry->counters.min_time[kind] = total_time;
-			entry->counters.max_time[kind] = total_time;
-			entry->counters.mean_time[kind] = total_time;
-		}
-		else
-		{
-			/*
-			 * Welford's method for accurately computing variance. See
-			 * <http://www.johndcook.com/blog/standard_deviation/>
-			 */
-			double		old_mean = entry->counters.mean_time[kind];
+		pgstat_unlock_entry(entry_ref);
+	}
 
-			entry->counters.mean_time[kind] +=
-				(total_time - old_mean) / entry->counters.calls[kind];
-			entry->counters.sum_var_time[kind] +=
-				(total_time - old_mean) * (total_time - entry->counters.mean_time[kind]);
+	/*
+	 * Now accumulate stats in the pending entry.
+	 */
+	Assert(kind == PGSS_PLAN || kind == PGSS_EXEC);
 
-			/*
-			 * Calculate min and max time. min = 0 and max = 0 means that the
-			 * min/max statistics were reset
-			 */
-			if (entry->counters.min_time[kind] == 0
-				&& entry->counters.max_time[kind] == 0)
-			{
-				entry->counters.min_time[kind] = total_time;
-				entry->counters.max_time[kind] = total_time;
-			}
-			else
-			{
-				if (entry->counters.min_time[kind] > total_time)
-					entry->counters.min_time[kind] = total_time;
-				if (entry->counters.max_time[kind] < total_time)
-					entry->counters.max_time[kind] = total_time;
-			}
-		}
-		entry->counters.rows += rows;
-		entry->counters.shared_blks_hit += bufusage->shared_blks_hit;
-		entry->counters.shared_blks_read += bufusage->shared_blks_read;
-		entry->counters.shared_blks_dirtied += bufusage->shared_blks_dirtied;
-		entry->counters.shared_blks_written += bufusage->shared_blks_written;
-		entry->counters.local_blks_hit += bufusage->local_blks_hit;
-		entry->counters.local_blks_read += bufusage->local_blks_read;
-		entry->counters.local_blks_dirtied += bufusage->local_blks_dirtied;
-		entry->counters.local_blks_written += bufusage->local_blks_written;
-		entry->counters.temp_blks_read += bufusage->temp_blks_read;
-		entry->counters.temp_blks_written += bufusage->temp_blks_written;
-		entry->counters.shared_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_read_time);
-		entry->counters.shared_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_write_time);
-		entry->counters.local_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_read_time);
-		entry->counters.local_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_write_time);
-		entry->counters.temp_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_read_time);
-		entry->counters.temp_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_write_time);
-		entry->counters.usage += USAGE_EXEC(total_time);
-		entry->counters.wal_records += walusage->wal_records;
-		entry->counters.wal_fpi += walusage->wal_fpi;
-		entry->counters.wal_bytes += walusage->wal_bytes;
-		entry->counters.wal_buffers_full += walusage->wal_buffers_full;
-		if (jitusage)
-		{
-			entry->counters.jit_functions += jitusage->created_functions;
-			entry->counters.jit_generation_time += INSTR_TIME_GET_MILLISEC(jitusage->generation_counter);
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_PGSS, key.dbid,
+										  pgss_objid(&key), NULL);
+	pending = (PgStat_PendingPgss *) entry_ref->pending;
 
-			if (INSTR_TIME_GET_MILLISEC(jitusage->deform_counter))
-				entry->counters.jit_deform_count++;
-			entry->counters.jit_deform_time += INSTR_TIME_GET_MILLISEC(jitusage->deform_counter);
+	pending->counters.calls[kind] += 1;
+	pending->counters.total_time[kind] += total_time;
 
-			if (INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter))
-				entry->counters.jit_inlining_count++;
-			entry->counters.jit_inlining_time += INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter);
+	/*
+	 * Compute mean and sum of squared deviations using Welford's online
+	 * algorithm.  These per-backend values are later merged into shared
+	 * memory using Chan's parallel variance algorithm in the flush callback.
+	 * See <http://www.johndcook.com/blog/standard_deviation/>
+	 */
+	if (pending->counters.calls[kind] == 1)
+	{
+		pending->counters.min_time[kind] = total_time;
+		pending->counters.max_time[kind] = total_time;
+		pending->counters.mean_time[kind] = total_time;
+	}
+	else
+	{
+		double		old_mean = pending->counters.mean_time[kind];
+
+		if (pending->counters.min_time[kind] > total_time)
+			pending->counters.min_time[kind] = total_time;
+		if (pending->counters.max_time[kind] < total_time)
+			pending->counters.max_time[kind] = total_time;
+		pending->counters.mean_time[kind] +=
+			(total_time - old_mean) / pending->counters.calls[kind];
+		pending->counters.sum_var_time[kind] +=
+			(total_time - old_mean) * (total_time - pending->counters.mean_time[kind]);
+	}
 
-			if (INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter))
-				entry->counters.jit_optimization_count++;
-			entry->counters.jit_optimization_time += INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter);
+	pending->counters.rows += rows;
+	pending->counters.shared_blks_hit += bufusage->shared_blks_hit;
+	pending->counters.shared_blks_read += bufusage->shared_blks_read;
+	pending->counters.shared_blks_dirtied += bufusage->shared_blks_dirtied;
+	pending->counters.shared_blks_written += bufusage->shared_blks_written;
+	pending->counters.local_blks_hit += bufusage->local_blks_hit;
+	pending->counters.local_blks_read += bufusage->local_blks_read;
+	pending->counters.local_blks_dirtied += bufusage->local_blks_dirtied;
+	pending->counters.local_blks_written += bufusage->local_blks_written;
+	pending->counters.temp_blks_read += bufusage->temp_blks_read;
+	pending->counters.temp_blks_written += bufusage->temp_blks_written;
+	pending->counters.shared_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_read_time);
+	pending->counters.shared_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_write_time);
+	pending->counters.local_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_read_time);
+	pending->counters.local_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_write_time);
+	pending->counters.temp_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_read_time);
+	pending->counters.temp_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_write_time);
+	pending->counters.usage += USAGE_EXEC(total_time);
+	pending->counters.wal_records += walusage->wal_records;
+	pending->counters.wal_fpi += walusage->wal_fpi;
+	pending->counters.wal_bytes += walusage->wal_bytes;
+	pending->counters.wal_buffers_full += walusage->wal_buffers_full;
+	if (jitusage)
+	{
+		pending->counters.jit_functions += jitusage->created_functions;
+		pending->counters.jit_generation_time += INSTR_TIME_GET_MILLISEC(jitusage->generation_counter);
 
-			if (INSTR_TIME_GET_MILLISEC(jitusage->emission_counter))
-				entry->counters.jit_emission_count++;
-			entry->counters.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter);
-		}
+		if (INSTR_TIME_GET_MILLISEC(jitusage->deform_counter))
+			pending->counters.jit_deform_count++;
+		pending->counters.jit_deform_time += INSTR_TIME_GET_MILLISEC(jitusage->deform_counter);
 
-		/* parallel worker counters */
-		entry->counters.parallel_workers_to_launch += parallel_workers_to_launch;
-		entry->counters.parallel_workers_launched += parallel_workers_launched;
+		if (INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter))
+			pending->counters.jit_inlining_count++;
+		pending->counters.jit_inlining_time += INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter);
 
-		/* plan cache counters */
-		if (planOrigin == PLAN_STMT_CACHE_GENERIC)
-			entry->counters.generic_plan_calls++;
-		else if (planOrigin == PLAN_STMT_CACHE_CUSTOM)
-			entry->counters.custom_plan_calls++;
+		if (INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter))
+			pending->counters.jit_optimization_count++;
+		pending->counters.jit_optimization_time += INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter);
 
-		SpinLockRelease(&entry->mutex);
+		if (INSTR_TIME_GET_MILLISEC(jitusage->emission_counter))
+			pending->counters.jit_emission_count++;
+		pending->counters.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter);
 	}
 
-done:
-	LWLockRelease(&pgss->lock.lock);
+	pending->counters.parallel_workers_to_launch += parallel_workers_to_launch;
+	pending->counters.parallel_workers_launched += parallel_workers_launched;
 
-	/* We postpone this clean-up until we're out of the lock */
-	if (norm_query)
-		pfree(norm_query);
+	if (planOrigin == PLAN_STMT_CACHE_GENERIC)
+		pending->counters.generic_plan_calls++;
+	else if (planOrigin == PLAN_STMT_CACHE_CUSTOM)
+		pending->counters.custom_plan_calls++;
 }
 
 /*
@@ -1676,8 +1522,8 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 	Size		qbuffer_size = 0;
 	Size		extent = 0;
 	int			gc_count = 0;
-	HASH_SEQ_STATUS hash_seq;
-	pgssEntry  *entry;
+	dshash_seq_status hstat;
+	PgStatShared_HashEntry *p;
 
 	/*
 	 * Superusers or roles with the privileges of pg_read_all_stats members
@@ -1685,8 +1531,8 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 	 */
 	is_allowed_role = has_privs_of_role(userid, ROLE_PG_READ_ALL_STATS);
 
-	/* hash table must exist already */
-	if (!pgss || !pgss_hash)
+	/* shared state must exist already */
+	if (!pgss)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\"")));
@@ -1773,30 +1619,13 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 	}
 
 	/*
-	 * Get shared lock, load or reload the query text file if we must, and
-	 * iterate over the hashtable entries.
-	 *
-	 * With a large hash table, we might be holding the lock rather longer
-	 * than one could wish.  However, this only blocks creation of new hash
-	 * table entries, and the larger the hash table the less likely that is to
-	 * be needed.  So we can hope this is okay.  Perhaps someday we'll decide
-	 * we need to partition the hash table to limit the time spent holding any
-	 * one lock.
+	 * Get shared lock on the query text file, load or reload if needed, and
+	 * iterate over the pgstat shared hash entries.
 	 */
 	LWLockAcquire(&pgss->lock.lock, LW_SHARED);
 
 	if (showtext)
 	{
-		/*
-		 * Here it is safe to examine extent and gc_count without taking the
-		 * mutex.  Note that although other processes might change
-		 * pgss->extent just after we look at it, the strings they then write
-		 * into the file cannot yet be referenced in the hashtable, so we
-		 * don't care whether we see them or not.
-		 *
-		 * If qtext_load_file fails, we just press on; we'll return NULL for
-		 * every query text.
-		 */
 		if (qbuffer == NULL ||
 			pgss->extent != extent ||
 			pgss->gc_count != gc_count)
@@ -1807,45 +1636,75 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 		}
 	}
 
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
+	/* Flush any pending stats for this backend so they're visible */
+	pgstat_report_anytime_stat();
+
+	dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+	while ((p = dshash_seq_next(&hstat)) != NULL)
 	{
 		Datum		values[PG_STAT_STATEMENTS_COLS];
 		bool		nulls[PG_STAT_STATEMENTS_COLS];
 		int			i = 0;
 		Counters	tmp;
 		double		stddev;
-		int64		queryid = entry->key.queryid;
+		PgStatShared_Pgss *shared_entry;
+		int64		queryid;
 		TimestampTz stats_since;
 		TimestampTz minmax_stats_since;
 
-		memset(values, 0, sizeof(values));
-		memset(nulls, 0, sizeof(nulls));
+		/* Only process our kind */
+		if (p->key.kind != PGSTAT_KIND_PGSS)
+			continue;
+		if (p->dropped)
+			continue;
+
+		shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+		Assert(shared_entry);
+
+		/* Read entry data under the entry's LWLock */
+		LWLockAcquire(&shared_entry->header.lock, LW_SHARED);
+		tmp = shared_entry->counters;
+		queryid = shared_entry->key.queryid;
+		stats_since = shared_entry->stats_since;
+		minmax_stats_since = shared_entry->minmax_stats_since;
+		LWLockRelease(&shared_entry->header.lock);
+
+		/* Skip entries created at parse time but never executed */
+		if (tmp.calls[PGSS_PLAN] + tmp.calls[PGSS_EXEC] == 0)
+			continue;
 
-		values[i++] = ObjectIdGetDatum(entry->key.userid);
-		values[i++] = ObjectIdGetDatum(entry->key.dbid);
+		memset(values, 0, sizeof(values));
+		memset(nulls, 0, sizeof(nulls));
+
+		values[i++] = ObjectIdGetDatum(shared_entry->key.userid);
+		values[i++] = ObjectIdGetDatum(shared_entry->key.dbid);
 		if (api_version >= PGSS_V1_9)
-			values[i++] = BoolGetDatum(entry->key.toplevel);
+			values[i++] = BoolGetDatum(shared_entry->key.toplevel);
 
-		if (is_allowed_role || entry->key.userid == userid)
+		if (is_allowed_role || shared_entry->key.userid == userid)
 		{
 			if (api_version >= PGSS_V1_2)
 				values[i++] = Int64GetDatumFast(queryid);
 
 			if (showtext)
 			{
-				char	   *qstr = qtext_fetch(entry->query_offset,
-											   entry->query_len,
-											   qbuffer,
-											   qbuffer_size);
+				char	   *qstr = NULL;
+
+				if (shared_entry->query_len >= 0)
+				{
+					qstr = qtext_fetch(shared_entry->query_offset,
+									   shared_entry->query_len,
+									   qbuffer,
+									   qbuffer_size);
+				}
 
 				if (qstr)
 				{
 					char	   *enc;
 
 					enc = pg_any_to_server(qstr,
-										   entry->query_len,
-										   entry->encoding);
+										   shared_entry->query_len,
+										   shared_entry->encoding);
 
 					values[i++] = CStringGetTextDatum(enc);
 
@@ -1880,22 +1739,6 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 				nulls[i++] = true;
 		}
 
-		/* copy counters to a local variable to keep locking time short */
-		SpinLockAcquire(&entry->mutex);
-		tmp = entry->counters;
-		SpinLockRelease(&entry->mutex);
-
-		/*
-		 * The spinlock is not required when reading these two as they are
-		 * always updated when holding pgss->lock exclusively.
-		 */
-		stats_since = entry->stats_since;
-		minmax_stats_since = entry->minmax_stats_since;
-
-		/* Skip entry if unexecuted (ie, it's a pending "sticky" entry) */
-		if (IS_STICKY(tmp))
-			continue;
-
 		/* Note that we rely on PGSS_PLAN being 0 and PGSS_EXEC being 1. */
 		for (int kind = 0; kind < PGSS_NUMKIND; kind++)
 		{
@@ -2020,6 +1863,7 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
 	}
+	dshash_seq_term(&hstat);
 
 	LWLockRelease(&pgss->lock.lock);
 
@@ -2040,8 +1884,9 @@ pg_stat_statements_info(PG_FUNCTION_ARGS)
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_STATEMENTS_INFO_COLS] = {0};
 	bool		nulls[PG_STAT_STATEMENTS_INFO_COLS] = {0};
+	int			i = 0;
 
-	if (!pgss || !pgss_hash)
+	if (!pgss)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\"")));
@@ -2055,162 +1900,12 @@ pg_stat_statements_info(PG_FUNCTION_ARGS)
 	stats = pgss->stats;
 	SpinLockRelease(&pgss->mutex);
 
-	values[0] = Int64GetDatum(stats.dealloc);
-	values[1] = TimestampTzGetDatum(stats.stats_reset);
+	values[i++] = Int64GetDatum(stats.dealloc);
+	values[i++] = TimestampTzGetDatum(stats.stats_reset);
 
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
 
-/*
- * Allocate a new hashtable entry.
- * caller must hold an exclusive lock on pgss->lock
- *
- * "query" need not be null-terminated; we rely on query_len instead
- *
- * If "sticky" is true, make the new entry artificially sticky so that it will
- * probably still be there when the query finishes execution.  We do this by
- * giving it a median usage value rather than the normal value.  (Strictly
- * speaking, query strings are normalized on a best effort basis, though it
- * would be difficult to demonstrate this even under artificial conditions.)
- *
- * Note: despite needing exclusive lock, it's not an error for the target
- * entry to already exist.  This is because pgss_store releases and
- * reacquires lock after failing to find a match; so someone else could
- * have made the entry while we waited to get exclusive lock.
- */
-static pgssEntry *
-entry_alloc(pgssHashKey *key, Size query_offset, int query_len, int encoding,
-			bool sticky)
-{
-	pgssEntry  *entry;
-	bool		found;
-
-	/* Make space if needed */
-	while (hash_get_num_entries(pgss_hash) >= pgss_max)
-		entry_dealloc();
-
-	/* Find or create an entry with desired hash code */
-	entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER, &found);
-
-	if (!found)
-	{
-		/* New entry, initialize it */
-
-		/* reset the statistics */
-		memset(&entry->counters, 0, sizeof(Counters));
-		/* set the appropriate initial usage count */
-		entry->counters.usage = sticky ? pgss->cur_median_usage : USAGE_INIT;
-		/* re-initialize the mutex each time ... we assume no one using it */
-		SpinLockInit(&entry->mutex);
-		/* ... and don't forget the query text metadata */
-		Assert(query_len >= 0);
-		entry->query_offset = query_offset;
-		entry->query_len = query_len;
-		entry->encoding = encoding;
-		entry->stats_since = GetCurrentTimestamp();
-		entry->minmax_stats_since = entry->stats_since;
-	}
-
-	return entry;
-}
-
-/*
- * qsort comparator for sorting into increasing usage order
- */
-static int
-entry_cmp(const void *lhs, const void *rhs)
-{
-	double		l_usage = (*(pgssEntry *const *) lhs)->counters.usage;
-	double		r_usage = (*(pgssEntry *const *) rhs)->counters.usage;
-
-	if (l_usage < r_usage)
-		return -1;
-	else if (l_usage > r_usage)
-		return +1;
-	else
-		return 0;
-}
-
-/*
- * Deallocate least-used entries.
- *
- * Caller must hold an exclusive lock on pgss->lock.
- */
-static void
-entry_dealloc(void)
-{
-	HASH_SEQ_STATUS hash_seq;
-	pgssEntry **entries;
-	pgssEntry  *entry;
-	int			nvictims;
-	int			i;
-	Size		tottextlen;
-	int			nvalidtexts;
-
-	/*
-	 * Sort entries by usage and deallocate USAGE_DEALLOC_PERCENT of them.
-	 * While we're scanning the table, apply the decay factor to the usage
-	 * values, and update the mean query length.
-	 *
-	 * Note that the mean query length is almost immediately obsolete, since
-	 * we compute it before not after discarding the least-used entries.
-	 * Hopefully, that doesn't affect the mean too much; it doesn't seem worth
-	 * making two passes to get a more current result.  Likewise, the new
-	 * cur_median_usage includes the entries we're about to zap.
-	 */
-
-	entries = palloc(hash_get_num_entries(pgss_hash) * sizeof(pgssEntry *));
-
-	i = 0;
-	tottextlen = 0;
-	nvalidtexts = 0;
-
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
-	{
-		entries[i++] = entry;
-		/* "Sticky" entries get a different usage decay rate. */
-		if (IS_STICKY(entry->counters))
-			entry->counters.usage *= STICKY_DECREASE_FACTOR;
-		else
-			entry->counters.usage *= USAGE_DECREASE_FACTOR;
-		/* In the mean length computation, ignore dropped texts. */
-		if (entry->query_len >= 0)
-		{
-			tottextlen += entry->query_len + 1;
-			nvalidtexts++;
-		}
-	}
-
-	/* Sort into increasing order by usage */
-	qsort(entries, i, sizeof(pgssEntry *), entry_cmp);
-
-	/* Record the (approximate) median usage */
-	if (i > 0)
-		pgss->cur_median_usage = entries[i / 2]->counters.usage;
-	/* Record the mean query length */
-	if (nvalidtexts > 0)
-		pgss->mean_query_len = tottextlen / nvalidtexts;
-	else
-		pgss->mean_query_len = ASSUMED_LENGTH_INIT;
-
-	/* Now zap an appropriate fraction of lowest-usage entries */
-	nvictims = Max(10, i * USAGE_DEALLOC_PERCENT / 100);
-	nvictims = Min(nvictims, i);
-
-	for (i = 0; i < nvictims; i++)
-	{
-		hash_search(pgss_hash, &entries[i]->key, HASH_REMOVE, NULL);
-	}
-
-	pfree(entries);
-
-	/* Increment the number of times entries are deallocated */
-	SpinLockAcquire(&pgss->mutex);
-	pgss->stats.dealloc += 1;
-	SpinLockRelease(&pgss->mutex);
-}
-
 /*
  * Given a query string (not necessarily null-terminated), allocate a new
  * entry in the external query text file and store the string there.
@@ -2234,6 +1929,8 @@ qtext_store(const char *query, int query_len,
 	Size		off;
 	int			fd;
 
+	*query_offset = 0;
+
 	/*
 	 * We use a spinlock to protect extent/n_writers/gc_count, so that
 	 * multiple processes may execute this function concurrently.
@@ -2366,8 +2063,8 @@ qtext_load_file(Size *buffer_size)
 		/*
 		 * If we get a short read and errno doesn't get set, the reason is
 		 * probably that garbage collection truncated the file since we did
-		 * the fstat(), so we don't log a complaint --- but we don't return
-		 * the data, either, since it's most likely corrupt due to concurrent
+		 * the fstat(), so we don't log a complaint; but we don't return the
+		 * data, either, since it's most likely corrupt due to concurrent
 		 * writes from garbage collection.
 		 */
 		errno = 0;
@@ -2420,8 +2117,6 @@ qtext_fetch(Size query_offset, int query_len,
 
 /*
  * Do we need to garbage-collect the external query text file?
- *
- * Caller should hold at least a shared lock on pgss->lock.
  */
 static bool
 need_gc_qtexts(void)
@@ -2478,13 +2173,13 @@ gc_qtexts(void)
 	char	   *qbuffer;
 	Size		qbuffer_size;
 	FILE	   *qfile = NULL;
-	HASH_SEQ_STATUS hash_seq;
-	pgssEntry  *entry;
+	dshash_seq_status hstat;
+	PgStatShared_HashEntry *p;
 	Size		extent;
 	int			nentries;
 
 	/*
-	 * When called from pgss_store, some other session might have proceeded
+	 * When called from the bg worker, some other session might have proceeded
 	 * with garbage collection in the no-lock-held interim of lock strength
 	 * escalation.  Check once more that this is actually necessary.
 	 */
@@ -2493,21 +2188,12 @@ gc_qtexts(void)
 
 	/*
 	 * Load the old texts file.  If we fail (out of memory, for instance),
-	 * invalidate query texts.  Hopefully this is rare.  It might seem better
-	 * to leave things alone on an OOM failure, but the problem is that the
-	 * file is only going to get bigger; hoping for a future non-OOM result is
-	 * risky and can easily lead to complete denial of service.
+	 * invalidate query texts.  Hopefully this is rare.
 	 */
 	qbuffer = qtext_load_file(&qbuffer_size);
 	if (qbuffer == NULL)
 		goto gc_fail;
 
-	/*
-	 * We overwrite the query texts file in place, so as to reduce the risk of
-	 * an out-of-disk-space failure.  Since the file is guaranteed not to get
-	 * larger, this should always work on traditional filesystems; though we
-	 * could still lose on copy-on-write filesystems.
-	 */
 	qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
 	if (qfile == NULL)
 	{
@@ -2521,21 +2207,32 @@ gc_qtexts(void)
 	extent = 0;
 	nentries = 0;
 
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
+	dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+	while ((p = dshash_seq_next(&hstat)) != NULL)
 	{
-		int			query_len = entry->query_len;
-		char	   *qry = qtext_fetch(entry->query_offset,
-									  query_len,
-									  qbuffer,
-									  qbuffer_size);
+		PgStatShared_Pgss *shared_entry;
+		int			query_len;
+		char	   *qry;
+
+		if (p->key.kind != PGSTAT_KIND_PGSS)
+			continue;
+		if (p->dropped)
+			continue;
+
+		shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+
+		query_len = shared_entry->query_len;
+		if (query_len < 0)
+			continue;
+
+		qry = qtext_fetch(shared_entry->query_offset, query_len,
+						  qbuffer, qbuffer_size);
 
 		if (qry == NULL)
 		{
-			/* Trouble ... drop the text */
-			entry->query_offset = 0;
-			entry->query_len = -1;
-			/* entry will not be counted in mean query length computation */
+			/* Trouble ... mark text invalid */
+			shared_entry->query_offset = 0;
+			shared_entry->query_len = -1;
 			continue;
 		}
 
@@ -2545,19 +2242,16 @@ gc_qtexts(void)
 					(errcode_for_file_access(),
 					 errmsg("could not write file \"%s\": %m",
 							PGSS_TEXT_FILE)));
-			hash_seq_term(&hash_seq);
+			dshash_seq_term(&hstat);
 			goto gc_fail;
 		}
 
-		entry->query_offset = extent;
+		shared_entry->query_offset = extent;
 		extent += query_len + 1;
 		nentries++;
 	}
+	dshash_seq_term(&hstat);
 
-	/*
-	 * Truncate away any now-unused space.  If this fails for some odd reason,
-	 * we log it, but there's no need to fail.
-	 */
 	if (ftruncate(fileno(qfile), extent) != 0)
 		ereport(LOG,
 				(errcode_for_file_access(),
@@ -2577,13 +2271,8 @@ gc_qtexts(void)
 	elog(DEBUG1, "pgss gc of queries file shrunk size from %zu to %zu",
 		 pgss->extent, extent);
 
-	/* Reset the shared extent pointer */
 	pgss->extent = extent;
 
-	/*
-	 * Also update the mean query length, to be sure that need_gc_qtexts()
-	 * won't still think we have a problem.
-	 */
 	if (nentries > 0)
 		pgss->mean_query_len = extent / nentries;
 	else
@@ -2591,19 +2280,11 @@ gc_qtexts(void)
 
 	pfree(qbuffer);
 
-	/*
-	 * OK, count a garbage collection cycle.  (Note: even though we have
-	 * exclusive lock on pgss->lock, we must take pgss->mutex for this, since
-	 * other processes may examine gc_count while holding only the mutex.
-	 * Also, we have to advance the count *after* we've rewritten the file,
-	 * else other processes might not realize they read a stale file.)
-	 */
 	record_gc_qtexts();
 
 	return;
 
 gc_fail:
-	/* clean up resources */
 	if (qfile)
 		FreeFile(qfile);
 	if (qbuffer)
@@ -2611,18 +2292,24 @@ gc_fail:
 
 	/*
 	 * Since the contents of the external file are now uncertain, mark all
-	 * hashtable entries as having invalid texts.
+	 * entries as having invalid texts.
 	 */
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
+	dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+	while ((p = dshash_seq_next(&hstat)) != NULL)
 	{
-		entry->query_offset = 0;
-		entry->query_len = -1;
+		PgStatShared_Pgss *shared_entry;
+
+		if (p->key.kind != PGSTAT_KIND_PGSS)
+			continue;
+		if (p->dropped)
+			continue;
+
+		shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+		shared_entry->query_offset = 0;
+		shared_entry->query_len = -1;
 	}
+	dshash_seq_term(&hstat);
 
-	/*
-	 * Destroy the query text file and create a new, empty one
-	 */
 	(void) unlink(PGSS_TEXT_FILE);
 	qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
 	if (qfile == NULL)
@@ -2633,41 +2320,27 @@ gc_fail:
 	else
 		FreeFile(qfile);
 
-	/* Reset the shared extent pointer */
 	pgss->extent = 0;
-
-	/* Reset mean_query_len to match the new state */
 	pgss->mean_query_len = ASSUMED_LENGTH_INIT;
 
-	/*
-	 * Bump the GC count even though we failed.
-	 *
-	 * This is needed to make concurrent readers of file without any lock on
-	 * pgss->lock notice existence of new version of file.  Once readers
-	 * subsequently observe a change in GC count with pgss->lock held, that
-	 * forces a safe reopen of file.  Writers also require that we bump here,
-	 * of course.  (As required by locking protocol, readers and writers don't
-	 * trust earlier file contents until gc_count is found unchanged after
-	 * pgss->lock acquired in shared or exclusive mode respectively.)
-	 */
 	record_gc_qtexts();
 }
 
-#define SINGLE_ENTRY_RESET(e) \
-if (e) { \
+#define SINGLE_ENTRY_RESET(shared, key_ptr, minmax_only, stats_reset, num_remove) \
+if (shared) { \
 	if (minmax_only) { \
-		/* When requested reset only min/max statistics of an entry */ \
 		for (int kind = 0; kind < PGSS_NUMKIND; kind++) \
 		{ \
-			e->counters.max_time[kind] = 0; \
-			e->counters.min_time[kind] = 0; \
+			(shared)->counters.max_time[kind] = 0; \
+			(shared)->counters.min_time[kind] = 0; \
 		} \
-		e->minmax_stats_since = stats_reset; \
+		(shared)->minmax_stats_since = stats_reset; \
 	} \
 	else \
 	{ \
-		/* Remove the key otherwise  */ \
-		hash_search(pgss_hash, &e->key, HASH_REMOVE, NULL); \
+		(shared)->query_len = -1; \
+		pgstat_drop_entry(PGSTAT_KIND_PGSS, (key_ptr)->dbid, \
+						  pgss_objid(key_ptr)); \
 		num_remove++; \
 	} \
 }
@@ -2678,69 +2351,122 @@ if (e) { \
 static TimestampTz
 entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only)
 {
-	HASH_SEQ_STATUS hash_seq;
-	pgssEntry  *entry;
+	dshash_seq_status hstat;
+	PgStatShared_HashEntry *p;
 	FILE	   *qfile;
 	int64		num_entries;
 	int64		num_remove = 0;
-	pgssHashKey key;
 	TimestampTz stats_reset;
 
-	if (!pgss || !pgss_hash)
+	if (!pgss)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\"")));
 
 	LWLockAcquire(&pgss->lock.lock, LW_EXCLUSIVE);
-	num_entries = hash_get_num_entries(pgss_hash);
+	num_entries = pgstat_get_entry_count(PGSTAT_KIND_PGSS);
 
 	stats_reset = GetCurrentTimestamp();
 
 	if (userid != 0 && dbid != 0 && queryid != INT64CONST(0))
 	{
 		/* If all the parameters are available, use the fast path. */
+		pgssHashKey key;
+		PgStat_EntryRef *entry_ref;
+		PgStatShared_Pgss *shared_entry;
+
 		memset(&key, 0, sizeof(pgssHashKey));
 		key.userid = userid;
 		key.dbid = dbid;
 		key.queryid = queryid;
 
-		/*
-		 * Reset the entry if it exists, starting with the non-top-level
-		 * entry.
-		 */
+		/* Reset non-top-level entry */
 		key.toplevel = false;
-		entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
-
-		SINGLE_ENTRY_RESET(entry);
+		entry_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS, key.dbid,
+										 pgss_objid(&key), false, NULL);
+		if (entry_ref)
+		{
+			shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats;
+			SINGLE_ENTRY_RESET(shared_entry, &key, minmax_only, stats_reset, num_remove);
+		}
 
-		/* Also reset the top-level entry if it exists. */
+		/* Reset top-level entry */
 		key.toplevel = true;
-		entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
-
-		SINGLE_ENTRY_RESET(entry);
+		entry_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS, key.dbid,
+										 pgss_objid(&key), false, NULL);
+		if (entry_ref)
+		{
+			shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats;
+			SINGLE_ENTRY_RESET(shared_entry, &key, minmax_only, stats_reset, num_remove);
+		}
 	}
-	else if (userid != 0 || dbid != 0 || queryid != INT64CONST(0))
+	else
 	{
-		/* Reset entries corresponding to valid parameters. */
-		hash_seq_init(&hash_seq, pgss_hash);
-		while ((entry = hash_seq_search(&hash_seq)) != NULL)
+		/*
+		 * Iterate all entries and reset matching ones.  We cannot call
+		 * pgstat_drop_entry() while iterating the dshash (it internally
+		 * acquires partition locks), so collect keys to drop and do it after.
+		 */
+		PendingDrop *to_drop = NULL;
+		int			num_to_drop = 0;
+		int			max_to_drop = 0;
+
+		dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+		while ((p = dshash_seq_next(&hstat)) != NULL)
 		{
-			if ((!userid || entry->key.userid == userid) &&
-				(!dbid || entry->key.dbid == dbid) &&
-				(!queryid || entry->key.queryid == queryid))
+			PgStatShared_Pgss *shared_entry;
+
+			if (p->key.kind != PGSTAT_KIND_PGSS)
+				continue;
+			if (p->dropped)
+				continue;
+
+			shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+
+			if ((!userid || shared_entry->key.userid == userid) &&
+				(!dbid || shared_entry->key.dbid == dbid) &&
+				(!queryid || shared_entry->key.queryid == queryid))
 			{
-				SINGLE_ENTRY_RESET(entry);
+				if (minmax_only)
+				{
+					for (int kind = 0; kind < PGSS_NUMKIND; kind++)
+					{
+						shared_entry->counters.max_time[kind] = 0;
+						shared_entry->counters.min_time[kind] = 0;
+					}
+					shared_entry->minmax_stats_since = stats_reset;
+				}
+				else
+				{
+					shared_entry->query_len = -1;
+
+					/* Collect for deferred drop */
+					if (num_to_drop >= max_to_drop)
+					{
+						max_to_drop = Max(max_to_drop * 2, 128);
+						if (to_drop == NULL)
+							to_drop = palloc_array(PendingDrop, max_to_drop);
+						else
+							to_drop = repalloc_array(to_drop, PendingDrop, max_to_drop);
+					}
+					to_drop[num_to_drop].dbid = shared_entry->key.dbid;
+					to_drop[num_to_drop].objid = pgss_objid(&shared_entry->key);
+					num_to_drop++;
+				}
 			}
 		}
-	}
-	else
-	{
-		/* Reset all entries. */
-		hash_seq_init(&hash_seq, pgss_hash);
-		while ((entry = hash_seq_search(&hash_seq)) != NULL)
+		dshash_seq_term(&hstat);
+
+		/* Now drop entries outside the iteration */
+		for (int i = 0; i < num_to_drop; i++)
 		{
-			SINGLE_ENTRY_RESET(entry);
+			pgstat_drop_entry(PGSTAT_KIND_PGSS,
+							  to_drop[i].dbid, to_drop[i].objid);
 		}
+		num_remove = num_to_drop;
+
+		if (to_drop)
+			pfree(to_drop);
 	}
 
 	/* All entries are removed? */
@@ -2790,6 +2516,456 @@ release_lock:
 	return stats_reset;
 }
 
+/*
+ * pgstat flush callback: merge pending stats into shared memory.
+ *
+ * This is called by the pgstat infrastructure to flush accumulated
+ * backend-local statistics into the shared entry.
+ */
+static bool
+pgss_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait)
+{
+	PgStat_PendingPgss *pending;
+	PgStatShared_Pgss *shared_entry;
+	Counters   *shared;
+	Counters   *p;
+
+	pending = (PgStat_PendingPgss *) entry_ref->pending;
+	shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats;
+
+	if (!pgstat_lock_entry(entry_ref, nowait))
+		return false;
+
+	shared = &shared_entry->counters;
+	p = &pending->counters;
+
+	for (int kind = 0; kind < PGSS_NUMKIND; kind++)
+	{
+		if (p->calls[kind] == 0)
+			continue;
+
+		/*
+		 * Merge variance using Chan's parallel variance algorithm to combine
+		 * per-backend sum_var_time (computed via Welford's method) with the
+		 * shared aggregate.  This must be done before updating calls/totals.
+		 * See
+		 * <https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm>
+		 */
+		if (shared->calls[kind] > 0)
+		{
+			double		old_mean_a = shared->mean_time[kind];
+			double		old_mean_b = p->mean_time[kind];
+			double		delta = old_mean_a - old_mean_b;
+			double		n_a = shared->calls[kind];
+			double		n_b = p->calls[kind];
+
+			shared->sum_var_time[kind] += p->sum_var_time[kind] +
+				delta * delta * n_a * n_b / (n_a + n_b);
+		}
+		else
+		{
+			shared->sum_var_time[kind] = p->sum_var_time[kind];
+		}
+
+		shared->calls[kind] += p->calls[kind];
+		shared->total_time[kind] += p->total_time[kind];
+
+		/*
+		 * Update min/max.  If both are 0 in shared, it means a reset
+		 * happened, so treat the pending values as the new baseline.
+		 */
+		if (shared->min_time[kind] == 0 && shared->max_time[kind] == 0)
+		{
+			shared->min_time[kind] = p->min_time[kind];
+			shared->max_time[kind] = p->max_time[kind];
+		}
+		else
+		{
+			if (shared->min_time[kind] > p->min_time[kind])
+				shared->min_time[kind] = p->min_time[kind];
+			if (shared->max_time[kind] < p->max_time[kind])
+				shared->max_time[kind] = p->max_time[kind];
+		}
+
+		/* Recompute mean from totals */
+		shared->mean_time[kind] =
+			shared->total_time[kind] / shared->calls[kind];
+	}
+
+	shared->rows += p->rows;
+	shared->shared_blks_hit += p->shared_blks_hit;
+	shared->shared_blks_read += p->shared_blks_read;
+	shared->shared_blks_dirtied += p->shared_blks_dirtied;
+	shared->shared_blks_written += p->shared_blks_written;
+	shared->local_blks_hit += p->local_blks_hit;
+	shared->local_blks_read += p->local_blks_read;
+	shared->local_blks_dirtied += p->local_blks_dirtied;
+	shared->local_blks_written += p->local_blks_written;
+	shared->temp_blks_read += p->temp_blks_read;
+	shared->temp_blks_written += p->temp_blks_written;
+	shared->shared_blk_read_time += p->shared_blk_read_time;
+	shared->shared_blk_write_time += p->shared_blk_write_time;
+	shared->local_blk_read_time += p->local_blk_read_time;
+	shared->local_blk_write_time += p->local_blk_write_time;
+	shared->temp_blk_read_time += p->temp_blk_read_time;
+	shared->temp_blk_write_time += p->temp_blk_write_time;
+	shared->usage += p->usage;
+	shared->wal_records += p->wal_records;
+	shared->wal_fpi += p->wal_fpi;
+	shared->wal_bytes += p->wal_bytes;
+	shared->wal_buffers_full += p->wal_buffers_full;
+	shared->jit_functions += p->jit_functions;
+	shared->jit_generation_time += p->jit_generation_time;
+	shared->jit_deform_count += p->jit_deform_count;
+	shared->jit_deform_time += p->jit_deform_time;
+	shared->jit_inlining_count += p->jit_inlining_count;
+	shared->jit_inlining_time += p->jit_inlining_time;
+	shared->jit_optimization_count += p->jit_optimization_count;
+	shared->jit_optimization_time += p->jit_optimization_time;
+	shared->jit_emission_count += p->jit_emission_count;
+	shared->jit_emission_time += p->jit_emission_time;
+	shared->parallel_workers_to_launch += p->parallel_workers_to_launch;
+	shared->parallel_workers_launched += p->parallel_workers_launched;
+	shared->generic_plan_calls += p->generic_plan_calls;
+	shared->custom_plan_calls += p->custom_plan_calls;
+
+	pgstat_unlock_entry(entry_ref);
+
+	return true;
+}
+
+/*
+ * pgstat serialization callback: write query text info for an entry.
+ *
+ * We write the query text offset, length, encoding, and the full pgssHashKey
+ * to the stats file so we can reconstruct the entry on reload.
+ */
+static void
+pgss_to_serialized_data(const PgStat_HashKey *key,
+						const PgStatShared_Common *header,
+						FILE *statfile)
+{
+	const PgStatShared_Pgss *entry = (const PgStatShared_Pgss *) header;
+	uint32		magic = PGSS_FILE_HEADER;
+	pgssHashKey hkey = entry->key;
+	TimestampTz stats_since = entry->stats_since;
+	TimestampTz minmax_stats_since = entry->minmax_stats_since;
+	int			query_len = entry->query_len;
+	int			encoding = entry->encoding;
+
+	pgstat_write_chunk_s(statfile, &magic);
+	pgstat_write_chunk_s(statfile, &hkey);
+	pgstat_write_chunk_s(statfile, &query_len);
+	pgstat_write_chunk_s(statfile, &encoding);
+	pgstat_write_chunk_s(statfile, &stats_since);
+	pgstat_write_chunk_s(statfile, &minmax_stats_since);
+
+	/*
+	 * Write the query text itself into the stats file so it survives restarts
+	 * (PGSS_TEXT_FILE lives in a tmpfs that gets wiped).
+	 */
+	if (query_len >= 0)
+	{
+		char	   *qstr = NULL;
+
+		if (!pgss_qtext_write_buffer && pgss)
+			pgss_qtext_write_buffer = qtext_load_file(&pgss_qtext_write_buffer_size);
+
+		if (pgss_qtext_write_buffer)
+			qstr = qtext_fetch(entry->query_offset, query_len,
+							   pgss_qtext_write_buffer,
+							   pgss_qtext_write_buffer_size);
+
+		if (qstr)
+			pgstat_write_chunk(statfile, qstr, query_len + 1);
+		else
+		{
+			char		nul = '\0';
+
+			pgstat_write_chunk(statfile, &nul, 1);
+		}
+	}
+}
+
+/*
+ * pgstat deserialization callback: read query text info for an entry.
+ */
+static bool
+pgss_from_serialized_data(const PgStat_HashKey *key,
+						  PgStatShared_Common *header,
+						  FILE *statfile)
+{
+	PgStatShared_Pgss *entry = (PgStatShared_Pgss *) header;
+	int			query_len;
+	int			encoding;
+	uint32		magic;
+
+	if (!pgstat_read_chunk_s(statfile, &magic))
+		return false;
+	if (magic != PGSS_FILE_HEADER)
+	{
+		elog(WARNING, "pg_stat_statements: discarding stats with mismatched format (got 0x%08X, expected 0x%08X)",
+			 magic, PGSS_FILE_HEADER);
+		return false;
+	}
+
+	if (!pgstat_read_chunk_s(statfile, &entry->key))
+		return false;
+	if (!pgstat_read_chunk_s(statfile, &query_len))
+		return false;
+	if (!pgstat_read_chunk_s(statfile, &encoding))
+		return false;
+	if (!pgstat_read_chunk_s(statfile, &entry->stats_since))
+		return false;
+	if (!pgstat_read_chunk_s(statfile, &entry->minmax_stats_since))
+		return false;
+
+	/* Initialize text fields */
+	entry->query_len = -1;
+	entry->encoding = encoding;
+	entry->query_offset = 0;
+
+	/*
+	 * Read the query text and store it in the external file.
+	 */
+	if (query_len >= 0)
+	{
+		char	   *buf = palloc(query_len + 1);
+
+		if (!pgstat_read_chunk(statfile, buf, query_len + 1))
+		{
+			pfree(buf);
+			return false;
+		}
+
+		if (!pgss_qtext_rebuild_file)
+		{
+			pgss_qtext_rebuild_file = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
+			if (!pgss_qtext_rebuild_file)
+			{
+				pfree(buf);
+				return false;
+			}
+			pgss_qtext_rebuild_extent = 0;
+		}
+
+		entry->query_offset = pgss_qtext_rebuild_extent;
+
+		if (fwrite(buf, 1, query_len + 1, pgss_qtext_rebuild_file) != (size_t) (query_len + 1))
+		{
+			pfree(buf);
+			return false;
+		}
+		pgss_qtext_rebuild_extent += query_len + 1;
+
+		entry->query_len = query_len;
+		entry->encoding = encoding;
+		pfree(buf);
+	}
+
+	return true;
+}
+
+/*
+ * pgstat finish callback: handle end of stats file operations.
+ *
+ * For pg_stat_statements, we manage the query text file lifecycle here.
+ */
+static void
+pgss_finish(PgStat_StatsFileOp status)
+{
+	switch (status)
+	{
+		case STATS_WRITE:
+			/* Free the cached query text buffer used during serialization */
+			if (pgss_qtext_write_buffer)
+			{
+				pfree(pgss_qtext_write_buffer);
+				pgss_qtext_write_buffer = NULL;
+				pgss_qtext_write_buffer_size = 0;
+			}
+			break;
+
+		case STATS_READ:
+			/* Close the rebuild file and update shared extent */
+			if (pgss_qtext_rebuild_file)
+			{
+				FreeFile(pgss_qtext_rebuild_file);
+				pgss_qtext_rebuild_file = NULL;
+				if (pgss)
+				{
+					pgss->extent = pgss_qtext_rebuild_extent;
+				}
+				pgss_qtext_rebuild_extent = 0;
+			}
+
+			/*
+			 * If pg_stat_statements.save is disabled, discard all entries
+			 * that were just loaded from the stats file.
+			 */
+			if (!pgss_save)
+			{
+				entry_reset(0, 0, 0, false);
+			}
+			break;
+
+		case STATS_DISCARD:
+			unlink(PGSS_TEXT_FILE);
+			break;
+	}
+}
+
+/*
+ * Evict least-used entries when the entry count exceeds pgss_max.
+ *
+ * Sorts all entries by usage, applies a decay factor, then drops the
+ * bottom USAGE_DEALLOC_PERCENT of entries.
+ */
+static void
+entry_dealloc(void)
+{
+	dshash_seq_status hstat;
+	PgStatShared_HashEntry *p;
+	UsageEntry *entries;
+	int			nentries = 0;
+	int			allocated = 1024;
+	int			nvictims;
+	int			i;
+	Size		tottextlen = 0;
+	int			nvalidtexts = 0;
+
+	entries = palloc(allocated * sizeof(UsageEntry));
+
+	/* Scan all entries, collect usage info and apply decay */
+	dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+	while ((p = dshash_seq_next(&hstat)) != NULL)
+	{
+		PgStatShared_Pgss *shared_entry;
+
+		if (p->key.kind != PGSTAT_KIND_PGSS)
+			continue;
+		if (p->dropped)
+			continue;
+
+		shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+
+		/* Skip entries not yet executed; protect parse-time entries. */
+		if (shared_entry->counters.calls[PGSS_PLAN] +
+			shared_entry->counters.calls[PGSS_EXEC] == 0)
+			continue;
+
+		if (nentries >= allocated)
+		{
+			allocated *= 2;
+			entries = repalloc(entries, allocated * sizeof(UsageEntry));
+		}
+
+		entries[nentries].key = shared_entry->key;
+		entries[nentries].usage = shared_entry->counters.usage;
+		nentries++;
+
+		/* Apply decay */
+		shared_entry->counters.usage *= USAGE_DECREASE_FACTOR;
+
+		if (shared_entry->query_len >= 0)
+		{
+			tottextlen += shared_entry->query_len + 1;
+			nvalidtexts++;
+		}
+	}
+	dshash_seq_term(&hstat);
+
+	/* Sort by usage ascending */
+	qsort(entries, nentries, sizeof(UsageEntry),
+		  entry_cmp);
+
+	/* Update mean query length */
+	if (nvalidtexts > 0)
+		pgss->mean_query_len = tottextlen / nvalidtexts;
+	else
+		pgss->mean_query_len = ASSUMED_LENGTH_INIT;
+
+	/* Drop the bottom fraction */
+	nvictims = Max(10, nentries * USAGE_DEALLOC_PERCENT / 100);
+	nvictims = Min(nvictims, nentries);
+
+	for (i = 0; i < nvictims; i++)
+	{
+		pgstat_drop_entry(PGSTAT_KIND_PGSS,
+						  entries[i].key.dbid,
+						  pgss_objid(&entries[i].key));
+	}
+
+	pfree(entries);
+
+	/*
+	 * Signal other backends to invalidate their cached references to the
+	 * dropped entries.  Without this, backends keep stale refs and never
+	 * re-create evicted entries.
+	 */
+	pgstat_request_entry_refs_gc();
+
+	/* Increment dealloc counter */
+	SpinLockAcquire(&pgss->mutex);
+	pgss->stats.dealloc += 1;
+	SpinLockRelease(&pgss->mutex);
+}
+
+/*
+ * qsort comparator for eviction: sort by usage ascending.
+ */
+static int
+entry_cmp(const void *a, const void *b)
+{
+	double		l = ((const UsageEntry *) a)->usage;
+	double		r = ((const UsageEntry *) b)->usage;
+
+	if (l < r)
+		return -1;
+	else if (l > r)
+		return +1;
+	else
+		return 0;
+}
+
+/*
+ * Attempt eviction if enough time has passed since the last cycle.
+ *
+ * Uses a conditional lock so that at most one backend performs eviction at a
+ * time; others simply return without blocking.  The time check ensures we
+ * don't evict more often than EVICTION_INTERVAL_MS milliseconds.
+ */
+static void
+pgss_maybe_evict(void)
+{
+	/*
+	 * Use the statement start timestamp since this is always called from
+	 * pgss_store() at the start of query execution.
+	 */
+	TimestampTz now = GetCurrentStatementStartTimestamp();
+
+	if (!TimestampDifferenceExceeds(pgss->last_eviction_time, now,
+									EVICTION_INTERVAL_MS))
+		return;
+
+	if (!LWLockConditionalAcquire(&pgss->lock.lock, LW_EXCLUSIVE))
+		return;
+
+	/* Re-check after acquiring lock */
+	if (TimestampDifferenceExceeds(pgss->last_eviction_time, now,
+								   EVICTION_INTERVAL_MS))
+	{
+		entry_dealloc();
+		pgss->last_eviction_time = now;
+
+		/* Also handle query text GC while we hold the lock */
+		if (need_gc_qtexts())
+			gc_qtexts();
+	}
+
+	LWLockRelease(&pgss->lock.lock);
+}
+
 /*
  * Generate a normalized version of the query string that will be used to
  * represent all similar queries.
diff --git a/contrib/pg_stat_statements/pg_stat_statements.conf b/contrib/pg_stat_statements/pg_stat_statements.conf
index 0e900d7119b..21a10c41d09 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.conf
+++ b/contrib/pg_stat_statements/pg_stat_statements.conf
@@ -1,2 +1,3 @@
 shared_preload_libraries = 'pg_stat_statements'
 max_prepared_transactions = 5
+max_parallel_workers_per_gather = 0
diff --git a/contrib/pg_stat_statements/pg_stat_statements.control b/contrib/pg_stat_statements/pg_stat_statements.control
index 2eee0ceffa8..61ae41efc14 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.control
+++ b/contrib/pg_stat_statements/pg_stat_statements.control
@@ -1,5 +1,5 @@
 # pg_stat_statements extension
 comment = 'track planning and execution statistics of all SQL statements executed'
-default_version = '1.13'
+default_version = '1.14'
 module_pathname = '$libdir/pg_stat_statements'
 relocatable = true
diff --git a/contrib/pg_stat_statements/sql/parallel.sql b/contrib/pg_stat_statements/sql/parallel.sql
index 4ce1573d132..05a3e2524d5 100644
--- a/contrib/pg_stat_statements/sql/parallel.sql
+++ b/contrib/pg_stat_statements/sql/parallel.sql
@@ -16,6 +16,7 @@ SELECT pg_stat_statements_reset() IS NOT NULL AS t;
 
 SELECT count(*) FROM pgss_parallel_tab;
 
+RESET max_parallel_workers_per_gather;
 SELECT query,
   parallel_workers_to_launch > 0 AS has_workers_to_launch,
   parallel_workers_launched > 0 AS has_workers_launched
diff --git a/doc/src/sgml/pgstatstatements.sgml b/doc/src/sgml/pgstatstatements.sgml
index d753de5836e..19b1dab74c7 100644
--- a/doc/src/sgml/pgstatstatements.sgml
+++ b/doc/src/sgml/pgstatstatements.sgml
@@ -809,7 +809,6 @@ calls | 2
        <structname>pg_stat_statements</structname> view were last reset.
       </para></entry>
      </row>
-
     </tbody>
    </tgroup>
   </table>
@@ -911,11 +910,16 @@ calls | 2
       statements tracked by the module (i.e., the maximum number of rows
       in the <structname>pg_stat_statements</structname> view).  If more distinct
       statements than that are observed, information about the least-executed
-      statements is discarded.  The number of times such information was
+      statements is discarded.  Eviction is throttled to occur at most once
+      every 10 seconds; until then, new entries are simply not created once
+      the limit is reached while existing entries continue to accumulate
+      statistics normally.
+      The number of times such information was
       discarded can be seen in the
       <structname>pg_stat_statements_info</structname> view.
       The default value is 5000.
-      This parameter can only be set at server start.
+      This parameter can be changed at any time by reloading the server
+      configuration.
      </para>
     </listitem>
    </varlistentry>
@@ -1008,13 +1012,15 @@ calls | 2
 
   <para>
    The module requires additional shared memory proportional to
-   <varname>pg_stat_statements.max</varname>.  Note that this
-   memory is consumed whenever the module is loaded, even if
+   <varname>pg_stat_statements.max</varname>.  Note that this memory is
+   consumed whenever the module is loaded, even if
    <varname>pg_stat_statements.track</varname> is set to <literal>none</literal>.
   </para>
 
   <para>
-   These parameters must be set in <filename>postgresql.conf</filename>.
+   These parameters are typically set in <filename>postgresql.conf</filename>.
+   Note that <varname>pg_stat_statements.max</varname> can be changed
+   without a server restart by reloading the configuration.
    Typical usage might be:
 
 <programlisting>
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 8cf40c87043..62351ab09cd 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2322,6 +2322,7 @@ PgStatShared_Function
 PgStatShared_HashEntry
 PgStatShared_IO
 PgStatShared_Lock
+PgStatShared_Pgss
 PgStatShared_Relation
 PgStatShared_ReplSlot
 PgStatShared_SLRU
-- 
2.50.1 (Apple Git-155)