pgss_001.v1.patch
text/x-patch
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: unified
Series: patch v1
| File | + | − |
|---|---|---|
| contrib/pg_stat_statements/expected/wal.out | 1 | 1 |
| contrib/pg_stat_statements/pg_stat_statements.c | 346 | 110 |
| contrib/pg_stat_statements/t/020_crash.pl | 80 | 0 |
diff --git a/contrib/pg_stat_statements/expected/wal.out b/contrib/pg_stat_statements/expected/wal.out
index 34a2bf5b03..4b2220a96b 100644
--- a/contrib/pg_stat_statements/expected/wal.out
+++ b/contrib/pg_stat_statements/expected/wal.out
@@ -17,7 +17,7 @@ FROM pg_stat_statements ORDER BY query COLLATE "C";
--------------------------------------------------------------+-------+------+---------------------+-----------------------+---------------------
DELETE FROM pgss_wal_tab WHERE a > $1 | 1 | 1 | t | t | t
INSERT INTO pgss_wal_tab VALUES(generate_series($1, $2), $3) | 1 | 10 | t | t | t
- SELECT pg_stat_statements_reset() IS NOT NULL AS t | 1 | 1 | f | f | f
+ SELECT pg_stat_statements_reset() IS NOT NULL AS t | 1 | 1 | t | t | t
SET pg_stat_statements.track_utility = FALSE | 1 | 0 | f | f | t
UPDATE pgss_wal_tab SET b = $1 WHERE a > $2 | 1 | 3 | t | t | t
(5 rows)
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 67cec865ba..d0220fd9eb 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -74,6 +74,10 @@
#include "utils/memutils.h"
#include "utils/timestamp.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xloginsert.h"
+
PG_MODULE_MAGIC;
/* Location of permanent stats file (valid when database is shut down) */
@@ -323,7 +327,6 @@ PG_FUNCTION_INFO_V1(pg_stat_statements_info);
static void pgss_shmem_request(void);
static void pgss_shmem_startup(void);
-static void pgss_shmem_shutdown(int code, Datum arg);
static void pgss_post_parse_analyze(ParseState *pstate, Query *query,
JumbleState *jstate);
static PlannedStmt *pgss_planner(Query *parse,
@@ -370,6 +373,50 @@ static void fill_in_constant_lengths(JumbleState *jstate, const char *query,
int query_loc);
static int comp_location(const void *a, const void *b);
+/* RMGR API */
+#define CUSTOMRMGR_ID RM_EXPERIMENTAL_ID
+#define CUSTOMRMGR_NAME "pgss_rmgr"
+
+static void rmgr_redo(XLogReaderState *record);
+static void rmgr_desc(StringInfo buf, XLogReaderState *record);
+static const char *rmgr_identify(uint8 info);
+static void rmgr_checkpoint(int flags);
+
+/* WAL record definitions */
+#define PGSS_XLOG_INSERT 0x00
+#define PGSS_XLOG_RESET 0x10
+
+/* The necessary fields from pgssEntry */
+typedef struct pgssXLogInsert
+{
+ uint32 header;
+ pgssHashKey key;
+ Counters counters;
+ int encoding;
+ TimestampTz stats_since;
+ TimestampTz minmax_stats_since;
+ int query_len;
+ char qtext[FLEXIBLE_ARRAY_MEMBER];
+} pgssXLogInsert;
+
+/* The params of entry_reset() function */
+typedef struct pgssXLogReset
+{
+ uint32 header;
+ Oid userid;
+ uint64 queryid;
+ Oid dbid;
+ bool minmax_only;
+} pgssXLogReset;
+
+/* RMGR data */
+const RmgrData pgss_rmgr = {
+ .rm_name = CUSTOMRMGR_NAME,
+ .rm_redo = rmgr_redo,
+ .rm_checkpoint = rmgr_checkpoint,
+ .rm_identify = rmgr_identify,
+ .rm_desc = rmgr_desc
+};
/*
* Module load callback
@@ -457,6 +504,8 @@ _PG_init(void)
MarkGUCPrefixReserved("pg_stat_statements");
+ RegisterCustomRmgr(CUSTOMRMGR_ID, &pgss_rmgr);
+
/*
* Install hooks.
*/
@@ -556,9 +605,12 @@ pgss_shmem_startup(void)
/*
* If we're in the postmaster (or a standalone backend...), set up a shmem
* exit hook to dump the statistics to disk.
+ *
+ * Now we do it at CHECKPOINT.
+ *
+ *if (!IsUnderPostmaster)
+ * on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
*/
- if (!IsUnderPostmaster)
- on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
/*
* Done if some other process already completed our initialization.
@@ -720,108 +772,6 @@ fail:
*/
}
-/*
- * shmem_shutdown hook: Dump statistics into file.
- *
- * Note: we don't bother with acquiring lock, because there should be no
- * other processes running when this is called.
- */
-static void
-pgss_shmem_shutdown(int code, Datum arg)
-{
- FILE *file;
- char *qbuffer = NULL;
- Size qbuffer_size = 0;
- HASH_SEQ_STATUS hash_seq;
- int32 num_entries;
- pgssEntry *entry;
-
- /* Don't try to dump during a crash. */
- if (code)
- return;
-
- /* Safety check ... shouldn't get here unless shmem is set up. */
- if (!pgss || !pgss_hash)
- return;
-
- /* Don't dump if told not to. */
- if (!pgss_save)
- return;
-
- file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W);
- if (file == NULL)
- goto error;
-
- if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
- goto error;
- if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1)
- goto error;
- num_entries = hash_get_num_entries(pgss_hash);
- if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
- goto error;
-
- qbuffer = qtext_load_file(&qbuffer_size);
- if (qbuffer == NULL)
- goto error;
-
- /*
- * When serializing to disk, we store query texts immediately after their
- * entry data. Any orphaned query texts are thereby excluded.
- */
- hash_seq_init(&hash_seq, pgss_hash);
- while ((entry = hash_seq_search(&hash_seq)) != NULL)
- {
- int len = entry->query_len;
- char *qstr = qtext_fetch(entry->query_offset, len,
- qbuffer, qbuffer_size);
-
- if (qstr == NULL)
- continue; /* Ignore any entries with bogus texts */
-
- if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 ||
- fwrite(qstr, 1, len + 1, file) != len + 1)
- {
- /* note: we assume hash_seq_term won't change errno */
- hash_seq_term(&hash_seq);
- goto error;
- }
- }
-
- /* Dump global statistics for pg_stat_statements */
- if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
- goto error;
-
- free(qbuffer);
- qbuffer = NULL;
-
- if (FreeFile(file))
- {
- file = NULL;
- goto error;
- }
-
- /*
- * Rename file into place, so we atomically replace any old one.
- */
- (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG);
-
- /* Unlink query-texts file; it's not needed while shutdown */
- unlink(PGSS_TEXT_FILE);
-
- return;
-
-error:
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not write file \"%s\": %m",
- PGSS_DUMP_FILE ".tmp")));
- free(qbuffer);
- if (file)
- FreeFile(file);
- unlink(PGSS_DUMP_FILE ".tmp");
- unlink(PGSS_TEXT_FILE);
-}
-
/*
* Post-parse-analysis hook: mark query with a queryId
*/
@@ -1284,6 +1234,7 @@ pgss_store(const char *query, uint64 queryId,
pgssEntry *entry;
char *norm_query = NULL;
int encoding = GetDatabaseEncoding();
+ bool qtext_stored = false;
Assert(query != NULL);
@@ -1325,7 +1276,6 @@ pgss_store(const char *query, uint64 queryId,
{
Size query_offset;
int gc_count;
- bool stored;
bool do_gc;
/*
@@ -1345,7 +1295,7 @@ pgss_store(const char *query, uint64 queryId,
}
/* Append new query text to file with only shared lock held */
- stored = qtext_store(norm_query ? norm_query : query, query_len,
+ qtext_stored = qtext_store(norm_query ? norm_query : query, query_len,
&query_offset, &gc_count);
/*
@@ -1366,12 +1316,12 @@ pgss_store(const char *query, uint64 queryId,
* This should be infrequent enough that doing it while holding
* exclusive lock isn't a performance problem.
*/
- if (!stored || pgss->gc_count != gc_count)
- stored = qtext_store(norm_query ? norm_query : query, query_len,
+ if (!qtext_stored || pgss->gc_count != gc_count)
+ qtext_stored = qtext_store(norm_query ? norm_query : query, query_len,
&query_offset, NULL);
/* If we failed to write to the text file, give up */
- if (!stored)
+ if (!qtext_stored)
goto done;
/* OK to create a new hashtable entry */
@@ -1486,6 +1436,38 @@ pgss_store(const char *query, uint64 queryId,
SpinLockRelease(&e->mutex);
}
+ /* Write entry to XLOG */
+ if (pgss_save && !RecoveryInProgress())
+ {
+ pgssXLogInsert *xlog_entry;
+ XLogRecPtr ptr;
+
+ xlog_entry = palloc(sizeof(pgssXLogInsert));
+ xlog_entry->header = PGSS_FILE_HEADER;
+ xlog_entry->key = entry->key;
+ xlog_entry->counters = entry->counters;
+ xlog_entry->encoding = entry->encoding;
+ xlog_entry->stats_since = entry->stats_since;
+ xlog_entry->minmax_stats_since = entry->minmax_stats_since;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) xlog_entry, offsetof(pgssXLogInsert, qtext));
+
+ /* Write the query text if need */
+ if (qtext_stored)
+ {
+ xlog_entry->query_len = entry->query_len;
+ XLogRegisterData(norm_query ? norm_query : (char *) query, query_len);
+ }
+ else
+ xlog_entry->query_len = 0;
+
+ XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
+ ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_INSERT);
+ XLogFlush(ptr);
+ pfree(xlog_entry);
+ }
+
done:
LWLockRelease(pgss->lock);
@@ -2676,6 +2658,27 @@ entry_reset(Oid userid, Oid dbid, uint64 queryid, bool minmax_only)
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
num_entries = hash_get_num_entries(pgss_hash);
+ /* Write entry to XLOG */
+ if (pgss_save && !RecoveryInProgress())
+ {
+ pgssXLogReset *xlrec;
+ XLogRecPtr ptr;
+
+ xlrec = palloc0(sizeof(pgssXLogReset));
+ xlrec->header = PGSS_FILE_HEADER;
+ xlrec->userid = userid;
+ xlrec->dbid = dbid;
+ xlrec->queryid = queryid;
+ xlrec->minmax_only = minmax_only;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) xlrec, sizeof(pgssXLogReset));
+ XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
+ ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_RESET);
+ XLogFlush(ptr);
+ pfree(xlrec);
+ }
+
stats_reset = GetCurrentTimestamp();
if (userid != 0 && dbid != 0 && queryid != UINT64CONST(0))
@@ -3010,3 +3013,236 @@ comp_location(const void *a, const void *b)
return pg_cmp_s32(l, r);
}
+
+static void
+rmgr_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ /*
+ * Because we did not restore records from storage,
+ * we also do not restore records from WAL.
+ */
+ if (!pgss_save)
+ return;
+
+ if (info == PGSS_XLOG_INSERT)
+ {
+ pgssXLogInsert *xlrec = (pgssXLogInsert *) XLogRecGetData(record);
+ pgssEntry *entry;
+
+ if (xlrec->header != PGSS_FILE_HEADER)
+ {
+ elog(WARNING, "Skip the inconsistent WAL record");
+ return;
+ }
+
+ /* Safety check... */
+ if (!pgss || !pgss_hash)
+ return;
+
+ LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
+
+ entry = (pgssEntry *) hash_search(pgss_hash, &xlrec->key, HASH_FIND, NULL);
+
+ /* Create new entry, if not present */
+ if (!entry)
+ {
+ Size query_offset;
+ bool stored;
+ char *query;
+
+ Assert(xlrec->query_len > 0);
+
+ query = (char *) xlrec->qtext;
+
+ /* Append new query text to file */
+ stored = qtext_store(query, xlrec->query_len, &query_offset, NULL);
+
+ /* If we failed to write to the text file, give up */
+ if (!stored)
+ {
+ LWLockRelease(pgss->lock);
+ return;
+ }
+
+ /* OK to create a new hashtable entry */
+ entry = entry_alloc(&xlrec->key, query_offset, xlrec->query_len,
+ xlrec->encoding, false);
+
+ /* If needed, perform garbage collection */
+ gc_qtexts();
+ }
+
+ /* Copy the necessary data from XLog record */
+ entry->counters = xlrec->counters;
+
+ entry->encoding = xlrec->encoding;
+ entry->stats_since = xlrec->stats_since;
+ entry->minmax_stats_since = xlrec->minmax_stats_since;
+
+ LWLockRelease(pgss->lock);
+ }
+ else if (info == PGSS_XLOG_RESET)
+ {
+ pgssXLogReset *xlrec = (pgssXLogReset *) XLogRecGetData(record);
+
+ if (xlrec->header != PGSS_FILE_HEADER)
+ {
+ elog(WARNING, "Skip the inconsistent WAL record");
+ return;
+ }
+
+ entry_reset(xlrec->userid, xlrec->dbid, xlrec->queryid, xlrec->minmax_only);
+ }
+}
+
+static void
+rmgr_desc(StringInfo buf, XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info == PGSS_XLOG_INSERT)
+ {
+ pgssXLogInsert *xlrec = (pgssXLogInsert *) rec;
+
+ if (xlrec->header != PGSS_FILE_HEADER)
+ {
+ elog(WARNING, "Skip the inconsistent WAL record");
+ return;
+ }
+
+ appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT
+ ", toplevel: %d",
+ xlrec->key.userid, xlrec->key.dbid, xlrec->key.queryid,
+ (int) xlrec->key.toplevel);
+ }
+ else if (info == PGSS_XLOG_RESET)
+ {
+ pgssXLogReset *xlrec = (pgssXLogReset *) rec;
+
+ if (xlrec->header != PGSS_FILE_HEADER)
+ {
+ elog(WARNING, "Skip the inconsistent WAL record");
+ return;
+ }
+
+ appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT
+ ", minmax_only: %d",
+ xlrec->userid, xlrec->dbid, xlrec->queryid,
+ (int) xlrec->minmax_only);
+ }
+}
+
+static const char *
+rmgr_identify(uint8 info)
+{
+ if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_INSERT)
+ return "INSERT";
+ if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_RESET)
+ return "RESET";
+
+ return NULL;
+}
+
+static void
+rmgr_checkpoint(int flags)
+{
+ FILE *file;
+ char *qbuffer = NULL;
+ Size qbuffer_size = 0;
+ HASH_SEQ_STATUS hash_seq;
+ int32 num_entries;
+ pgssEntry *entry;
+
+ /* Safety check ... shouldn't get here unless shmem is set up. */
+ if (!pgss || !pgss_hash)
+ return;
+
+ /* Don't dump if told not to. */
+ if (!pgss_save)
+ return;
+
+ /* XXX: Can there be concurrent CHECKPOINTs? */
+ LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
+
+ file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W);
+ if (file == NULL)
+ goto error;
+
+ if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
+ goto error;
+ if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1)
+ goto error;
+ num_entries = hash_get_num_entries(pgss_hash);
+ if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
+ goto error;
+
+ qbuffer = qtext_load_file(&qbuffer_size);
+ if (qbuffer == NULL)
+ goto error;
+
+ /*
+ * When serializing to disk, we store query texts immediately after their
+ * entry data. Any orphaned query texts are thereby excluded.
+ */
+ hash_seq_init(&hash_seq, pgss_hash);
+ while ((entry = hash_seq_search(&hash_seq)) != NULL)
+ {
+ int len = entry->query_len;
+ char *qstr = qtext_fetch(entry->query_offset, len,
+ qbuffer, qbuffer_size);
+
+ if (qstr == NULL)
+ continue; /* Ignore any entries with bogus texts */
+
+ if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 ||
+ fwrite(qstr, 1, len + 1, file) != len + 1)
+ {
+ /* note: we assume hash_seq_term won't change errno */
+ hash_seq_term(&hash_seq);
+ goto error;
+ }
+ }
+
+ /* Dump global statistics for pg_stat_statements */
+ if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
+ goto error;
+
+ free(qbuffer);
+ qbuffer = NULL;
+
+ if (FreeFile(file))
+ {
+ file = NULL;
+ goto error;
+ }
+
+ /*
+ * Rename file into place, so we atomically replace any old one.
+ */
+ (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG);
+
+ /* Unlink query-texts file; it's not needed while shutdown */
+ if (flags & CHECKPOINT_IS_SHUTDOWN)
+ unlink(PGSS_TEXT_FILE);
+
+ LWLockRelease(pgss->lock);
+ return;
+
+error:
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("could not write file \"%s\": %m",
+ PGSS_DUMP_FILE ".tmp")));
+ free(qbuffer);
+ if (file)
+ FreeFile(file);
+ unlink(PGSS_DUMP_FILE ".tmp");
+
+ if (flags & CHECKPOINT_IS_SHUTDOWN)
+ unlink(PGSS_TEXT_FILE);
+
+ LWLockRelease(pgss->lock);
+}
diff --git a/contrib/pg_stat_statements/t/020_crash.pl b/contrib/pg_stat_statements/t/020_crash.pl
new file mode 100644
index 0000000000..f33bb43d7d
--- /dev/null
+++ b/contrib/pg_stat_statements/t/020_crash.pl
@@ -0,0 +1,80 @@
+# Copyright (c) 2023-2024, PostgreSQL Global Development Group
+
+# Tests for checking that pg_stat_statements contents are preserved
+# across restarts.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('main');
+$node->init;
+$node->append_conf('postgresql.conf',
+ q[
+ shared_preload_libraries = 'pg_stat_statements'
+ restart_after_crash = 1
+ ]);
+$node->start;
+
+$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_statements');
+
+# Without the CHECKPOINT hook, we won't see this query in pg_stat_statements
+# after a server crash.
+$node->safe_psql('postgres', 'CREATE TABLE t1 (a int)');
+
+$node->safe_psql('postgres', 'CHECKPOINT');
+$node->safe_psql('postgres', 'SELECT a FROM t1');
+
+is( $node->safe_psql(
+ 'postgres',
+ "SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query"
+ ),
+ "CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1",
+ 'pg_stat_statements populated');
+
+
+# Perform a server shutdown by killing the backend.
+my $psql_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+my ($killme_stdin, $killme_stdout, $killme_stderr) = ('', '', '');
+my $killme = IPC::Run::start(
+ [
+ 'psql', '-X', '-qAt', '-v', 'ON_ERROR_STOP=1', '-f', '-', '-d',
+ $node->connstr('postgres')
+ ],
+ '<',
+ \$killme_stdin,
+ '>',
+ \$killme_stdout,
+ '2>',
+ \$killme_stderr,
+ $psql_timeout);
+
+$killme_stdin .= "SELECT pg_backend_pid();\n";
+ok( pump_until(
+ $killme, $psql_timeout, \$killme_stdout, qr/[[:digit:]]+[\r\n]$/m),
+ 'acquired pid for SIGQUIT');
+my $pid = $killme_stdout;
+chomp($pid);
+
+my $ret = PostgreSQL::Test::Utils::system_log('pg_ctl', 'kill', 'QUIT', $pid);
+is($ret, 0, "killed process with SIGQUIT");
+
+$killme->finish;
+
+# Wait till server restarts
+is($node->poll_query_until('postgres', undef, ''),
+ "1", "reconnected after SIGQUIT");
+
+is( $node->safe_psql(
+ 'postgres',
+ "SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query"
+ ),
+ "CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1\nSELECT pg_backend_pid()",
+ 'pg_stat_statements data kept across the server crash');
+
+$node->stop;
+
+done_testing();