public inbox for pgsql-hackers@postgresql.org  
help / color / mirror / Atom feed
From: Sami Imseih <samimseih@gmail.com>
To: pgsql-hackers <pgsql-hackers@postgresql.org>
Subject: Re: Improve pg_stat_statements scalability
Date: Wed, 20 May 2026 17:59:23 -0500
Message-ID: <CAA5RZ0vjpyU-RJzUYCkr0-9jqJtbS95cRFnWTiOEENE7iYNgcA@mail.gmail.com> (raw)
In-Reply-To: <CAA5RZ0u-Y4HHTdBpJMn5if6DnOnj-Bn9MmPKJ=28Y5dkpHhCoA@mail.gmail.com>
References: <CAA5RZ0vZwR_dSK6fo0P2-EnskUVN0NjLHnGnJMFDPC8-kEW3sQ@mail.gmail.com>
	<CAA5RZ0sxGs7k7vUCd_wurwrreqC3367eC6QuEQX5f4Oeu6WkDQ@mail.gmail.com>
	<CAA5RZ0u-Y4HHTdBpJMn5if6DnOnj-Bn9MmPKJ=28Y5dkpHhCoA@mail.gmail.com>

There was a failure on FreeBSD [1]. The test uses
debug_parallel_query=regress which forces parallel plans. What
happens is the parallel worker calls pg_stat_statements() (marked
PARALLEL SAFE), tries to flush pending stats, but the leader is the one
that actually accumulated those stats.

I fixed this by:

1. Setting max_parallel_workers_per_gather = 0 in
pg_stat_statements.conf, and only enabling it during
parallel.sql when we actually want to track a parallel query.

2. Bumping pg_s_s to version 1.14 and marking pg_stat_statements()
and pg_stat_statements_reset() as PARALLEL RESTRICTED to ensure
these functions only execute in the leader, which is the process
that accumulates the pending stats. These could also be marked UNSAFE,
but RESTRICTED seems better since it doesn't completely prevent
parallel plans if these functions are used with other tables; although
it's hard to imagine a real-world case where this would matter.

This also means old versions would have this issue with
debug_parallel_query, but I don't think we should change
function definitions for older versions, in case a user
downgrades pg_s_s versions. Maybe others have a
different opinion?

[1] https://cirrus-ci.com/task/5948422076760064

--
Sami


Attachments:

  [application/octet-stream] v2-0004-pg_stat_statements-extend-pg_stat_statements_info.patch (5.0K, 2-v2-0004-pg_stat_statements-extend-pg_stat_statements_info.patch)
  download | inline diff:
From 32e69e07ad6ef3f0fc9b1c8208161da3155282b3 Mon Sep 17 00:00:00 2001
From: Sami Imseih <samimseih@gmail.com>
Date: Thu, 14 May 2026 12:39:23 -0500
Subject: [PATCH v2 4/4] pg_stat_statements: extend pg_stat_statements_info
 view

Add new columns to the pg_stat_statements_info view:

  - query_text_memory_bytes: total DSA memory used for query texts
  - query_text_file_bytes: size of the external query text overflow file
  - num_entries: current number of tracked statement entries
  - last_eviction_time: timestamp of the most recent eviction cycle
---
 .../pg_stat_statements--1.12--1.13.sql        | 23 ++++++++++++
 .../pg_stat_statements/pg_stat_statements.c   | 17 ++++++++-
 doc/src/sgml/pgstatstatements.sgml            | 36 +++++++++++++++++++
 3 files changed, 75 insertions(+), 1 deletion(-)

diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.12--1.13.sql b/contrib/pg_stat_statements/pg_stat_statements--1.12--1.13.sql
index 2f0eaf14ec3..7d1cbfcc701 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.12--1.13.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.12--1.13.sql
@@ -76,3 +76,26 @@ CREATE VIEW pg_stat_statements AS
   SELECT * FROM pg_stat_statements(true);
 
 GRANT SELECT ON pg_stat_statements TO PUBLIC;
+
+ALTER EXTENSION pg_stat_statements DROP VIEW pg_stat_statements_info;
+ALTER EXTENSION pg_stat_statements DROP FUNCTION pg_stat_statements_info();
+
+DROP VIEW pg_stat_statements_info;
+DROP FUNCTION pg_stat_statements_info();
+
+CREATE FUNCTION pg_stat_statements_info(
+    OUT dealloc bigint,
+    OUT stats_reset timestamp with time zone,
+    OUT query_text_memory_bytes bigint,
+    OUT query_text_file_bytes bigint,
+    OUT num_entries bigint,
+    OUT last_eviction_time timestamp with time zone
+)
+RETURNS record
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT VOLATILE PARALLEL SAFE;
+
+CREATE VIEW pg_stat_statements_info AS
+  SELECT * FROM pg_stat_statements_info();
+
+GRANT SELECT ON pg_stat_statements_info TO PUBLIC;
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index af7637b25b9..6c8dd695ed6 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -1976,7 +1976,7 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 }
 
 /* Number of output arguments (columns) for pg_stat_statements_info */
-#define PG_STAT_STATEMENTS_INFO_COLS	2
+#define PG_STAT_STATEMENTS_INFO_COLS	6
 
 /*
  * Return statistics of pg_stat_statements.
@@ -2006,6 +2006,21 @@ pg_stat_statements_info(PG_FUNCTION_ARGS)
 
 	values[i++] = Int64GetDatum(stats.dealloc);
 	values[i++] = TimestampTzGetDatum(stats.stats_reset);
+	/* query_text_memory_bytes: DSA memory used for query texts */
+	pgss_qtext_dsa_attach();
+	if (pgss_qtext_dsa)
+		values[i++] = Int64GetDatum(dsa_get_total_size(pgss_qtext_dsa));
+	else
+		nulls[i++] = true;
+	/* query_text_file_bytes: overflow file size */
+	values[i++] = Int64GetDatum(pgss->extent);
+	/* num_entries: current entry count */
+	values[i++] = Int64GetDatum(pgstat_get_entry_count(PGSTAT_KIND_PGSS));
+	/* last eviction time */
+	if (pgss->last_eviction_time != 0)
+		values[i++] = TimestampTzGetDatum(pgss->last_eviction_time);
+	else
+		nulls[i++] = true;
 
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
diff --git a/doc/src/sgml/pgstatstatements.sgml b/doc/src/sgml/pgstatstatements.sgml
index 0f4ab34965b..2221212574d 100644
--- a/doc/src/sgml/pgstatstatements.sgml
+++ b/doc/src/sgml/pgstatstatements.sgml
@@ -809,6 +809,42 @@ calls | 2
        <structname>pg_stat_statements</structname> view were last reset.
       </para></entry>
      </row>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>query_text_memory_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Total bytes of dynamic shared memory used for query text storage.
+      </para></entry>
+     </row>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>query_text_file_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Size in bytes of the external query text file used for overflow
+       storage.
+      </para></entry>
+     </row>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>num_entries</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Current number of entries tracked by
+       <structname>pg_stat_statements</structname>.
+      </para></entry>
+     </row>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_eviction_time</structfield> <type>timestamp with time zone</type>
+      </para>
+      <para>
+       Time of the most recent entry eviction cycle, or null if no eviction
+       has occurred since server start.
+      </para></entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
-- 
2.50.1 (Apple Git-155)



  [application/octet-stream] v2-0001-pgstat-Introduce-pg_stat_report_anytime-for-mid-t.patch (28.5K, 3-v2-0001-pgstat-Introduce-pg_stat_report_anytime-for-mid-t.patch)
  download | inline diff:
From de04e151abff07dab378d5a3d006e76ea56359f6 Mon Sep 17 00:00:00 2001
From: Sami Imseih <samimseih@gmail.com>
Date: Sun, 10 May 2026 07:06:04 -0500
Subject: [PATCH v2 1/4] pgstat: Introduce pg_stat_report_anytime() for
 mid-transaction stats flush

Add an API to flush pending stats that are safe to report inside a
transaction without waiting for transaction end. Relation write
counters (tuples inserted, updated, deleted) for tables modified in
the current transaction are excluded, since their final values depend
on commit/abort outcome.

The SQL function pg_stat_report_anytime(pid) flushes the target
backend's pending stats: if the PID matches the caller's own backend
it flushes immediately, otherwise it signals the target to flush at
its next CHECK_FOR_INTERRUPTS (for regular backends) or main-loop
iteration (for auxiliary processes). The C function
pgstat_report_anytime_stat() flushes pending stats in the calling
backend only.
---
 doc/src/sgml/monitoring.sgml                 |  26 ++++
 src/backend/postmaster/autovacuum.c          |   3 +
 src/backend/postmaster/checkpointer.c        |   3 +
 src/backend/postmaster/interrupt.c           |   4 +
 src/backend/postmaster/pgarch.c              |   3 +
 src/backend/postmaster/startup.c             |   4 +
 src/backend/postmaster/walsummarizer.c       |   3 +
 src/backend/storage/ipc/procsignal.c         |   3 +
 src/backend/tcop/postgres.c                  |   3 +
 src/backend/utils/activity/pgstat.c          |  61 +++++++++-
 src/backend/utils/activity/pgstat_relation.c |  97 +++++++++------
 src/backend/utils/adt/pgstatfuncs.c          |  40 ++++++
 src/backend/utils/init/globals.c             |   1 +
 src/include/catalog/pg_proc.dat              |   6 +
 src/include/miscadmin.h                      |   1 +
 src/include/pgstat.h                         |   3 +
 src/include/storage/procsignal.h             |   2 +
 src/test/regress/expected/stats.out          | 122 +++++++++++++++++++
 src/test/regress/sql/stats.sql               |  81 ++++++++++++
 19 files changed, 431 insertions(+), 35 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 08d5b824552..bb6c928e3e7 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -5607,6 +5607,32 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        </para></entry>
       </row>
 
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_stat_report_anytime</primary>
+        </indexterm>
+        <function>pg_stat_report_anytime</function> ( <type>integer</type> )
+        <returnvalue>boolean</returnvalue>
+       </para>
+       <para>
+        Force flush of pending statistics to shared memory for the backend
+        with the specified process ID. Unlike normal statistics reporting,
+        this can be called from within a transaction. For relations modified
+        by <command>INSERT</command>, <command>UPDATE</command>, or
+        <command>DELETE</command> in the current transaction, only read
+        counters (scans, tuples fetched, blocks hit) are flushed
+        immediately; write counters (tuples inserted, updated, deleted)
+        are deferred until the transaction ends.
+        Returns <literal>true</literal> if the flush was successfully
+        triggered, <literal>false</literal> otherwise.
+       </para>
+       <para>
+        This function is restricted to superusers by default, but other users
+        can be granted EXECUTE to run the function.
+       </para></entry>
+      </row>
+
       <row>
        <entry role="func_table_entry"><para role="func_signature">
         <indexterm>
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index a5a8db2ff88..c5fddf75dab 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -825,6 +825,9 @@ ProcessAutoVacLauncherInterrupts(void)
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
 
+	if (ReportAnytimeStatsPending)
+		ProcessReportAnytimeStatsInterrupt();
+
 	/* Process sinval catchup interrupts that happened while sleeping */
 	ProcessCatchupInterrupt();
 }
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 087120db090..874cceb3970 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -694,6 +694,9 @@ ProcessCheckpointerInterrupts(void)
 	/* Perform logging of memory contexts of this process */
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
+
+	if (ReportAnytimeStatsPending)
+		ProcessReportAnytimeStatsInterrupt();
 }
 
 /*
diff --git a/src/backend/postmaster/interrupt.c b/src/backend/postmaster/interrupt.c
index a2c0ff012c5..4e09e93f8da 100644
--- a/src/backend/postmaster/interrupt.c
+++ b/src/backend/postmaster/interrupt.c
@@ -17,6 +17,7 @@
 #include <unistd.h>
 
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "postmaster/interrupt.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
@@ -48,6 +49,9 @@ ProcessMainLoopInterrupts(void)
 	/* Perform logging of memory contexts of this process */
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
+
+	if (ReportAnytimeStatsPending)
+		ProcessReportAnytimeStatsInterrupt();
 }
 
 /*
diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c
index 0f207ac0356..d83a5fda862 100644
--- a/src/backend/postmaster/pgarch.c
+++ b/src/backend/postmaster/pgarch.c
@@ -870,6 +870,9 @@ ProcessPgArchInterrupts(void)
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
 
+	if (ReportAnytimeStatsPending)
+		ProcessReportAnytimeStatsInterrupt();
+
 	if (ConfigReloadPending)
 	{
 		char	   *archiveLib = pstrdup(XLogArchiveLibrary);
diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c
index b46bac681fe..4a5534a8f9b 100644
--- a/src/backend/postmaster/startup.c
+++ b/src/backend/postmaster/startup.c
@@ -24,6 +24,7 @@
 #include "access/xlogutils.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "postmaster/auxprocess.h"
 #include "postmaster/startup.h"
 #include "storage/ipc.h"
@@ -192,6 +193,9 @@ ProcessStartupProcInterrupts(void)
 	/* Perform logging of memory contexts of this process */
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
+
+	if (ReportAnytimeStatsPending)
+		ProcessReportAnytimeStatsInterrupt();
 }
 
 
diff --git a/src/backend/postmaster/walsummarizer.c b/src/backend/postmaster/walsummarizer.c
index 4f12eaf2c85..b1239cbb07f 100644
--- a/src/backend/postmaster/walsummarizer.c
+++ b/src/backend/postmaster/walsummarizer.c
@@ -876,6 +876,9 @@ ProcessWalSummarizerInterrupts(void)
 	/* Perform logging of memory contexts of this process */
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
+
+	if (ReportAnytimeStatsPending)
+		ProcessReportAnytimeStatsInterrupt();
 }
 
 /*
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 264e4c22ca6..40023ac9888 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -711,6 +711,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_REPACK_MESSAGE))
 		HandleRepackMessageInterrupt();
 
+	if (CheckProcSignal(PROCSIG_REPORT_ANYTIME_STATS))
+		HandleReportAnytimeStatsInterrupt();
+
 	if (CheckProcSignal(PROCSIG_SLOTSYNC_MESSAGE))
 		HandleSlotSyncMessageInterrupt();
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index dbef734a93f..dbca372a3f1 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3609,6 +3609,9 @@ ProcessInterrupts(void)
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
 
+	if (ReportAnytimeStatsPending)
+		ProcessReportAnytimeStatsInterrupt();
+
 	if (ParallelApplyMessagePending)
 		ProcessParallelApplyMessages();
 
diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c
index b67da88c7dc..9b5d9bf09cb 100644
--- a/src/backend/utils/activity/pgstat.c
+++ b/src/backend/utils/activity/pgstat.c
@@ -106,6 +106,7 @@
 
 #include "access/xact.h"
 #include "lib/dshash.h"
+#include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -845,6 +846,57 @@ pgstat_force_next_flush(void)
 	pgStatForceNextFlush = true;
 }
 
+/*
+ * Immediately flush all pending statistics entries to shared memory.
+ *
+ * Unlike pgstat_report_stat(), this can be called anytime, including
+ * within a transaction.
+ */
+void
+pgstat_report_anytime_stat(void)
+{
+	pgstat_flush_pending_entries(false);
+
+	for (PgStat_Kind kind = PGSTAT_KIND_MIN; kind <= PGSTAT_KIND_MAX; kind++)
+	{
+		const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
+
+		if (!kind_info || !kind_info->flush_static_cb)
+			continue;
+
+		kind_info->flush_static_cb(false);
+	}
+}
+
+/*
+ * HandleReportAnytimeStatsInterrupt
+ *		Handle receipt of an interrupt requesting an anytime stats report.
+ *
+ * All the actual work is deferred to ProcessReportAnytimeStatsInterrupt(),
+ * because we cannot safely acquire locks inside the signal handler.
+ */
+void
+HandleReportAnytimeStatsInterrupt(void)
+{
+	InterruptPending = true;
+	ReportAnytimeStatsPending = true;
+	/* latch will be set by procsignal_sigusr1_handler */
+}
+
+/*
+ * ProcessReportAnytimeStatsInterrupt
+ *		Report all pending statistics to shared memory.
+ *
+ * Called from ProcessInterrupts() when ReportAnytimeStatsPending is set.
+ */
+void
+ProcessReportAnytimeStatsInterrupt(void)
+{
+	ReportAnytimeStatsPending = false;
+
+	pgstat_report_anytime_stat();
+}
+
 /*
  * Only for use by pgstat_reset_counters()
  */
@@ -1414,7 +1466,14 @@ pgstat_flush_pending_entries(bool nowait)
 		/* flush the stats, if possible */
 		did_flush = kind_info->flush_pending_cb(entry_ref, nowait);
 
-		Assert(did_flush || nowait);
+		/*
+		 * When nowait is false we block for the lock, so the only reason a
+		 * flush_pending_cb can legitimately return false is that the entry
+		 * has active transaction state that must not be freed yet (e.g.
+		 * relation stats with trans != NULL).  That situation only arises
+		 * mid-transaction, hence the IsTransactionOrTransactionBlock() check.
+		 */
+		Assert(did_flush || nowait || IsTransactionOrTransactionBlock());
 
 		/* determine next entry, before deleting the pending entry */
 		if (dlist_has_next(&pgStatPending, cur))
diff --git a/src/backend/utils/activity/pgstat_relation.c b/src/backend/utils/activity/pgstat_relation.c
index b2ca28f83ba..848687a9f7e 100644
--- a/src/backend/utils/activity/pgstat_relation.c
+++ b/src/backend/utils/activity/pgstat_relation.c
@@ -828,64 +828,76 @@ pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 
 	/*
 	 * Ignore entries that didn't accumulate any actual counts, such as
-	 * indexes that were opened by the planner but not used.
+	 * indexes that were opened by the planner but not used.  The entry cannot
+	 * be freed if there is active transaction state, since
+	 * AtEOXact_PgStat_Relations will still merge counters into it.
 	 */
 	if (pg_memory_is_all_zeros(&lstats->counts,
 							   sizeof(struct PgStat_TableCounts)))
-		return true;
+		return (lstats->trans == NULL);
 
 	if (!pgstat_lock_entry(entry_ref, nowait))
 		return false;
 
-	/* add the values to the shared entry. */
+	/* Update counters that are always safe to flush. */
 	tabentry = &shtabstats->stats;
 
 	tabentry->numscans += lstats->counts.numscans;
 	if (lstats->counts.numscans)
 	{
-		TimestampTz t = GetCurrentTransactionStopTimestamp();
+		TimestampTz t = IsTransactionOrTransactionBlock() ?
+			GetCurrentStatementStartTimestamp() :
+			GetCurrentTransactionStopTimestamp();
 
 		if (t > tabentry->lastscan)
 			tabentry->lastscan = t;
 	}
 	tabentry->tuples_returned += lstats->counts.tuples_returned;
 	tabentry->tuples_fetched += lstats->counts.tuples_fetched;
-	tabentry->tuples_inserted += lstats->counts.tuples_inserted;
-	tabentry->tuples_updated += lstats->counts.tuples_updated;
-	tabentry->tuples_deleted += lstats->counts.tuples_deleted;
 	tabentry->tuples_hot_updated += lstats->counts.tuples_hot_updated;
 	tabentry->tuples_newpage_updated += lstats->counts.tuples_newpage_updated;
+	tabentry->blocks_fetched += lstats->counts.blocks_fetched;
+	tabentry->blocks_hit += lstats->counts.blocks_hit;
 
 	/*
-	 * If table was truncated/dropped, first reset the live/dead counters.
+	 * Update counters that are only safe to flush outside of a transaction
+	 * that has modified this relation.
 	 */
-	if (lstats->counts.truncdropped)
+	if (lstats->trans == NULL)
 	{
-		tabentry->live_tuples = 0;
-		tabentry->dead_tuples = 0;
-		tabentry->ins_since_vacuum = 0;
-	}
+		tabentry->tuples_inserted += lstats->counts.tuples_inserted;
+		tabentry->tuples_updated += lstats->counts.tuples_updated;
+		tabentry->tuples_deleted += lstats->counts.tuples_deleted;
 
-	tabentry->live_tuples += lstats->counts.delta_live_tuples;
-	tabentry->dead_tuples += lstats->counts.delta_dead_tuples;
-	tabentry->mod_since_analyze += lstats->counts.changed_tuples;
+		/*
+		 * If table was truncated/dropped, first reset the live/dead counters.
+		 */
+		if (lstats->counts.truncdropped)
+		{
+			tabentry->live_tuples = 0;
+			tabentry->dead_tuples = 0;
+			tabentry->ins_since_vacuum = 0;
+		}
 
-	/*
-	 * Using tuples_inserted to update ins_since_vacuum does mean that we'll
-	 * track aborted inserts too.  This isn't ideal, but otherwise probably
-	 * not worth adding an extra field for.  It may just amount to autovacuums
-	 * triggering for inserts more often than they maybe should, which is
-	 * probably not going to be common enough to be too concerned about here.
-	 */
-	tabentry->ins_since_vacuum += lstats->counts.tuples_inserted;
+		tabentry->live_tuples += lstats->counts.delta_live_tuples;
+		tabentry->dead_tuples += lstats->counts.delta_dead_tuples;
+		tabentry->mod_since_analyze += lstats->counts.changed_tuples;
 
-	tabentry->blocks_fetched += lstats->counts.blocks_fetched;
-	tabentry->blocks_hit += lstats->counts.blocks_hit;
+		/*
+		 * Using tuples_inserted to update ins_since_vacuum does mean that
+		 * we'll track aborted inserts too.  This isn't ideal, but otherwise
+		 * probably not worth adding an extra field for.  It may just amount
+		 * to autovacuums triggering for inserts more often than they maybe
+		 * should, which is probably not going to be common enough to be too
+		 * concerned about here.
+		 */
+		tabentry->ins_since_vacuum += lstats->counts.tuples_inserted;
 
-	/* Clamp live_tuples in case of negative delta_live_tuples */
-	tabentry->live_tuples = Max(tabentry->live_tuples, 0);
-	/* Likewise for dead_tuples */
-	tabentry->dead_tuples = Max(tabentry->dead_tuples, 0);
+		/* Clamp live_tuples in case of negative delta_live_tuples */
+		tabentry->live_tuples = Max(tabentry->live_tuples, 0);
+		/* Likewise for dead_tuples */
+		tabentry->dead_tuples = Max(tabentry->dead_tuples, 0);
+	}
 
 	pgstat_unlock_entry(entry_ref);
 
@@ -893,13 +905,30 @@ pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 	dbentry = pgstat_prep_database_pending(dboid);
 	dbentry->tuples_returned += lstats->counts.tuples_returned;
 	dbentry->tuples_fetched += lstats->counts.tuples_fetched;
-	dbentry->tuples_inserted += lstats->counts.tuples_inserted;
-	dbentry->tuples_updated += lstats->counts.tuples_updated;
-	dbentry->tuples_deleted += lstats->counts.tuples_deleted;
 	dbentry->blocks_fetched += lstats->counts.blocks_fetched;
 	dbentry->blocks_hit += lstats->counts.blocks_hit;
 
-	return true;
+	if (lstats->trans == NULL)
+	{
+		dbentry->tuples_inserted += lstats->counts.tuples_inserted;
+		dbentry->tuples_updated += lstats->counts.tuples_updated;
+		dbentry->tuples_deleted += lstats->counts.tuples_deleted;
+		return true;
+	}
+
+	/*
+	 * This is a partial, in-transaction flush.  Zero out the counters we
+	 * already flushed so they aren't double-counted on the next flush.
+	 */
+	lstats->counts.numscans = 0;
+	lstats->counts.tuples_returned = 0;
+	lstats->counts.tuples_fetched = 0;
+	lstats->counts.tuples_hot_updated = 0;
+	lstats->counts.tuples_newpage_updated = 0;
+	lstats->counts.blocks_fetched = 0;
+	lstats->counts.blocks_hit = 0;
+
+	return false;
 }
 
 void
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 6f9c9c72de5..eb22490dc2c 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -28,6 +28,7 @@
 #include "replication/logicallauncher.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/procsignal.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/timestamp.h"
@@ -1929,6 +1930,45 @@ pg_stat_force_next_flush(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/*
+ * Signal a backend to report all its pending statistics to shared memory.
+ * If the target is the current backend, the report happens immediately.
+ */
+Datum
+pg_stat_report_anytime(PG_FUNCTION_ARGS)
+{
+	int			pid = PG_GETARG_INT32(0);
+	PGPROC	   *proc;
+	ProcNumber	procNumber = INVALID_PROC_NUMBER;
+
+	if (pid == MyProcPid)
+	{
+		pgstat_report_anytime_stat();
+		PG_RETURN_BOOL(true);
+	}
+
+	proc = BackendPidGetProc(pid);
+	if (proc == NULL)
+		proc = AuxiliaryPidGetProc(pid);
+
+	if (proc == NULL)
+	{
+		ereport(WARNING,
+				(errmsg("PID %d is not a PostgreSQL server process", pid)));
+		PG_RETURN_BOOL(false);
+	}
+
+	procNumber = GetNumberFromPGProc(proc);
+	if (SendProcSignal(pid, PROCSIG_REPORT_ANYTIME_STATS, procNumber) < 0)
+	{
+		ereport(WARNING,
+				(errmsg("could not send signal to process %d: %m", pid)));
+		PG_RETURN_BOOL(false);
+	}
+
+	PG_RETURN_BOOL(true);
+}
+
 
 /* Reset all counters for the current database */
 Datum
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index bbd28d14d99..1b5b3d59c3c 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -39,6 +39,7 @@ volatile sig_atomic_t TransactionTimeoutPending = false;
 volatile sig_atomic_t IdleSessionTimeoutPending = false;
 volatile sig_atomic_t ProcSignalBarrierPending = false;
 volatile sig_atomic_t LogMemoryContextPending = false;
+volatile sig_atomic_t ReportAnytimeStatsPending = false;
 volatile sig_atomic_t IdleStatsUpdateTimeoutPending = false;
 volatile uint32 InterruptHoldoffCount = 0;
 volatile uint32 QueryCancelHoldoffCount = 0;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index be157a5fbe9..406628025b1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6209,6 +6209,12 @@
   proname => 'pg_stat_force_next_flush', proisstrict => 'f', provolatile => 'v',
   proparallel => 'r', prorettype => 'void', proargtypes => '',
   prosrc => 'pg_stat_force_next_flush' },
+{ oid => '9953',
+  descr => 'statistics: force flush of pending stats to shared memory, including from within a transaction',
+  proname => 'pg_stat_report_anytime', provolatile => 'v',
+  prorettype => 'bool', proargtypes => 'int4',
+  prosrc => 'pg_stat_report_anytime',
+  proacl => '{POSTGRES=X}' },
 { oid => '2274',
   descr => 'statistics: reset collected statistics for current database',
   proname => 'pg_stat_reset', proisstrict => 'f', provolatile => 'v',
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 8ccdf61246b..7f8b38cb9d7 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -97,6 +97,7 @@ extern PGDLLIMPORT volatile sig_atomic_t TransactionTimeoutPending;
 extern PGDLLIMPORT volatile sig_atomic_t IdleSessionTimeoutPending;
 extern PGDLLIMPORT volatile sig_atomic_t ProcSignalBarrierPending;
 extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending;
+extern PGDLLIMPORT volatile sig_atomic_t ReportAnytimeStatsPending;
 extern PGDLLIMPORT volatile sig_atomic_t IdleStatsUpdateTimeoutPending;
 
 extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index dfa2e837638..87def3b08e2 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -552,6 +552,9 @@ extern void pgstat_initialize(void);
 /* Functions called from backends */
 extern long pgstat_report_stat(bool force);
 extern void pgstat_force_next_flush(void);
+extern void pgstat_report_anytime_stat(void);
+extern void HandleReportAnytimeStatsInterrupt(void);
+extern void ProcessReportAnytimeStatsInterrupt(void);
 
 extern void pgstat_reset_counters(void);
 extern void pgstat_reset(PgStat_Kind kind, Oid dboid, uint64 objid);
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index aaa158bfd66..a184d449eba 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -38,6 +38,8 @@ typedef enum
 	PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
 	PROCSIG_SLOTSYNC_MESSAGE,	/* ask slot synchronization to stop */
 	PROCSIG_REPACK_MESSAGE,		/* Message from repack worker */
+	PROCSIG_REPORT_ANYTIME_STATS,	/* ask backend to report anytime
+									 * statistics */
 	PROCSIG_RECOVERY_CONFLICT,	/* backend is blocking recovery, check
 								 * PGPROC->pendingRecoveryConflicts for the
 								 * reason */
diff --git a/src/test/regress/expected/stats.out b/src/test/regress/expected/stats.out
index c551abb1178..66b683965a6 100644
--- a/src/test/regress/expected/stats.out
+++ b/src/test/regress/expected/stats.out
@@ -2040,4 +2040,126 @@ SELECT fastpath_exceeded > :fastpath_exceeded_before FROM pg_stat_lock WHERE loc
 (1 row)
 
 DROP TABLE part_test;
+--
+-- Test pg_stat_report_anytime
+--
+CREATE TABLE partial_flush(id int);
+INSERT INTO partial_flush VALUES (1), (2), (3);
+SELECT pg_stat_force_next_flush();
+ pg_stat_force_next_flush 
+--------------------------
+ 
+(1 row)
+
+-- Record counters before the explicit transaction
+SELECT seq_scan AS seq_scan_before,
+       seq_tup_read AS seq_tup_read_before,
+       n_tup_ins AS n_tup_ins_before,
+       n_tup_upd AS n_tup_upd_before
+  FROM pg_stat_user_tables WHERE relname = 'partial_flush' \gset
+BEGIN;
+SET LOCAL stats_fetch_consistency = none;
+-- Generate both transaction-safe and transaction-unsafe counters.
+SELECT count(*) FROM partial_flush;
+ count 
+-------
+     3
+(1 row)
+
+INSERT INTO partial_flush VALUES (4), (5);
+UPDATE partial_flush SET id = id WHERE id = 1;
+-- Flush mid-transaction
+SELECT pg_stat_report_anytime(pg_backend_pid());
+ pg_stat_report_anytime 
+------------------------
+ t
+(1 row)
+
+-- Transaction-safe counters should be visible mid-transaction.
+-- Transaction-unsafe counters (ins, upd) should NOT be flushed yet,
+-- since their final values depend on whether the transaction commits.
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+       seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+       n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+       n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+  FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+ seq_scan_delta | seq_tup_read_delta | n_tup_ins_delta | n_tup_upd_delta 
+----------------+--------------------+-----------------+-----------------
+              2 |                  8 |               0 |               0
+(1 row)
+
+-- Generate more transaction-safe activity to verify no double counting.
+SELECT count(*) FROM partial_flush;
+ count 
+-------
+     5
+(1 row)
+
+-- Flush again mid-transaction
+SELECT pg_stat_report_anytime(pg_backend_pid());
+ pg_stat_report_anytime 
+------------------------
+ t
+(1 row)
+
+-- Should show cumulative totals, not double-counted.
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+       seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+       n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+       n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+  FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+ seq_scan_delta | seq_tup_read_delta | n_tup_ins_delta | n_tup_upd_delta 
+----------------+--------------------+-----------------+-----------------
+              3 |                 13 |               0 |               0
+(1 row)
+
+COMMIT;
+-- After commit, all counters should be flushed.
+SELECT pg_stat_force_next_flush();
+ pg_stat_force_next_flush 
+--------------------------
+ 
+(1 row)
+
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+       seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+       n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+       n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+  FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+ seq_scan_delta | seq_tup_read_delta | n_tup_ins_delta | n_tup_upd_delta 
+----------------+--------------------+-----------------+-----------------
+              3 |                 13 |               2 |               1
+(1 row)
+
+DROP TABLE partial_flush;
+-- Test that pg_stat_report_anytime also flushes non-relation stats.
+CREATE TABLE wal_flush_test(id int);
+SELECT pg_stat_force_next_flush();
+ pg_stat_force_next_flush 
+--------------------------
+ 
+(1 row)
+
+SELECT wal_records AS wal_records_before
+  FROM pg_stat_get_backend_wal(pg_backend_pid()) \gset
+BEGIN;
+SET LOCAL stats_fetch_consistency = none;
+-- Generate WAL inside the transaction.
+INSERT INTO wal_flush_test SELECT generate_series(1, 10);
+-- Flush mid-transaction; WAL stats should become visible immediately.
+SELECT pg_stat_report_anytime(pg_backend_pid());
+ pg_stat_report_anytime 
+------------------------
+ t
+(1 row)
+
+SELECT wal_records > :wal_records_before AS wal_flushed
+  FROM pg_stat_get_backend_wal(pg_backend_pid());
+ wal_flushed 
+-------------
+ t
+(1 row)
+
+COMMIT;
+DROP TABLE wal_flush_test;
 -- End of Stats Test
diff --git a/src/test/regress/sql/stats.sql b/src/test/regress/sql/stats.sql
index 610fd21fae4..c8bc0f22f27 100644
--- a/src/test/regress/sql/stats.sql
+++ b/src/test/regress/sql/stats.sql
@@ -1008,4 +1008,85 @@ SELECT fastpath_exceeded > :fastpath_exceeded_before FROM pg_stat_lock WHERE loc
 
 DROP TABLE part_test;
 
+--
+-- Test pg_stat_report_anytime
+--
+CREATE TABLE partial_flush(id int);
+INSERT INTO partial_flush VALUES (1), (2), (3);
+SELECT pg_stat_force_next_flush();
+
+-- Record counters before the explicit transaction
+SELECT seq_scan AS seq_scan_before,
+       seq_tup_read AS seq_tup_read_before,
+       n_tup_ins AS n_tup_ins_before,
+       n_tup_upd AS n_tup_upd_before
+  FROM pg_stat_user_tables WHERE relname = 'partial_flush' \gset
+
+BEGIN;
+SET LOCAL stats_fetch_consistency = none;
+
+-- Generate both transaction-safe and transaction-unsafe counters.
+SELECT count(*) FROM partial_flush;
+INSERT INTO partial_flush VALUES (4), (5);
+UPDATE partial_flush SET id = id WHERE id = 1;
+
+-- Flush mid-transaction
+SELECT pg_stat_report_anytime(pg_backend_pid());
+
+-- Transaction-safe counters should be visible mid-transaction.
+-- Transaction-unsafe counters (ins, upd) should NOT be flushed yet,
+-- since their final values depend on whether the transaction commits.
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+       seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+       n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+       n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+  FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+
+-- Generate more transaction-safe activity to verify no double counting.
+SELECT count(*) FROM partial_flush;
+
+-- Flush again mid-transaction
+SELECT pg_stat_report_anytime(pg_backend_pid());
+
+-- Should show cumulative totals, not double-counted.
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+       seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+       n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+       n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+  FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+
+COMMIT;
+
+-- After commit, all counters should be flushed.
+SELECT pg_stat_force_next_flush();
+
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+       seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+       n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+       n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+  FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+
+DROP TABLE partial_flush;
+
+-- Test that pg_stat_report_anytime also flushes non-relation stats.
+CREATE TABLE wal_flush_test(id int);
+SELECT pg_stat_force_next_flush();
+SELECT wal_records AS wal_records_before
+  FROM pg_stat_get_backend_wal(pg_backend_pid()) \gset
+
+BEGIN;
+SET LOCAL stats_fetch_consistency = none;
+
+-- Generate WAL inside the transaction.
+INSERT INTO wal_flush_test SELECT generate_series(1, 10);
+
+-- Flush mid-transaction; WAL stats should become visible immediately.
+SELECT pg_stat_report_anytime(pg_backend_pid());
+
+SELECT wal_records > :wal_records_before AS wal_flushed
+  FROM pg_stat_get_backend_wal(pg_backend_pid());
+
+COMMIT;
+DROP TABLE wal_flush_test;
+
 -- End of Stats Test
-- 
2.50.1 (Apple Git-155)



  [application/octet-stream] v2-0003-pg_stat_statements-add-DSA-based-query-text-stora.patch (18.4K, 4-v2-0003-pg_stat_statements-add-DSA-based-query-text-stora.patch)
  download | inline diff:
From 52e18de566596815daba73eea5b8224dd686769b Mon Sep 17 00:00:00 2001
From: Sami Imseih <samimseih@gmail.com>
Date: Thu, 14 May 2026 12:38:29 -0500
Subject: [PATCH v2 3/4] pg_stat_statements: add DSA-based query text storage

Add optional DSA (dynamic shared area) storage for query texts.
When enabled via pg_stat_statements.query_text_memory (default 64MB),
query texts are allocated from a shared DSA area, avoiding the overhead
of file I/O for text storage.  When the DSA limit is reached, new texts
fall back to the existing external file (PGSS_TEXT_FILE).

Setting query_text_memory to 0 disables DSA storage entirely,
reverting to the file-only behavior.
---
 .../pg_stat_statements/pg_stat_statements.c   | 285 +++++++++++++++---
 doc/src/sgml/pgstatstatements.sgml            |  33 +-
 2 files changed, 278 insertions(+), 40 deletions(-)

diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 0e6e65e3e51..af7637b25b9 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -18,8 +18,9 @@
  * strings in which constants are replaced with parameter symbols ($n), to
  * make it clearer what a normalized entry can represent.
  *
- * Each shared pgstat entry carries its own query text, stored in an
- * external file (PGSS_TEXT_FILE).
+ * Each shared pgstat entry carries its own query text.  When DSA storage is
+ * enabled, text is allocated from a shared DSA area.  If DSA is disabled or
+ * exhausted, texts fall back to an external file (PGSS_TEXT_FILE).
  *
  * Eviction of least-used entries is throttled to run at most once every
  * EVICTION_INTERVAL_MS milliseconds.  When eviction is needed, a single
@@ -60,6 +61,8 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
+#include "storage/dsm_registry.h"
+#include "utils/dsa.h"
 #include "tcop/utility.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -234,6 +237,7 @@ struct pgssSharedState
 	Size		extent;			/* current extent of query file */
 	int			n_writers;		/* number of active writers to query file */
 	int			gc_count;		/* query file garbage collection cycle count */
+	bool		dsa_created;	/* true once query text DSA has been created */
 	TimestampTz last_eviction_time; /* throttle: last time entry_dealloc ran */
 	pgssGlobalStats stats;		/* global statistics for pgss */
 };
@@ -252,7 +256,12 @@ struct PgStatShared_Pgss
 	int			query_len;		/* length of query text, or -1 if invalid */
 	int			encoding;		/* encoding of query text */
 
-	Size		query_offset;	/* offset in external query text file */
+	/*
+	 * Query text is stored in exactly one location: DSA memory if
+	 * query_text_dp is valid, otherwise the overflow file at query_offset.
+	 */
+	dsa_pointer query_text_dp;	/* DSA pointer to text, or InvalidDsaPointer */
+	Size		query_offset;	/* offset in overflow file (when dp invalid) */
 };
 
 /*
@@ -274,6 +283,8 @@ struct PendingDrop
 
 /* Links to shared memory state */
 static pgssSharedState *pgss;
+static dsa_area *pgss_qtext_dsa = NULL; /* backend-local handle to query text
+										 * DSA */
 
 /* Buffer used during serialization to avoid reloading text file per entry */
 static char *pgss_qtext_write_buffer = NULL;
@@ -354,6 +365,7 @@ static bool pgss_track_utility = true;	/* whether to track utility commands */
 static bool pgss_track_planning = false;	/* whether to track planning
 											 * duration */
 static bool pgss_save = true;	/* whether to save stats across shutdown */
+static int	pgss_query_text_memory = 64;	/* MB of DSA for query texts */
 #define pgss_enabled(level) \
 	(!IsParallelWorker() && \
 	(pgss_track == PGSS_TRACK_ALL || \
@@ -416,7 +428,8 @@ static void pg_stat_statements_internal(FunctionCallInfo fcinfo,
 										bool showtext);
 static void entry_dealloc(void);
 static bool qtext_store(const char *query, int query_len,
-						Size *query_offset, int *gc_count);
+						Size *query_offset, dsa_pointer *query_dp,
+						int *gc_count);
 static char *qtext_load_file(Size *buffer_size);
 static char *qtext_fetch(Size query_offset, int query_len,
 						 char *buffer, Size buffer_size);
@@ -426,6 +439,8 @@ static TimestampTz entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_
 static char *generate_normalized_query(const JumbleState *jstate,
 									   const char *query,
 									   int query_loc, int *query_len_p);
+static void pgss_qtext_dsa_attach(void);
+static void pgss_query_text_memory_assign(int newval, void *extra);
 static void pgss_entry_init(PgStatShared_Pgss *shared_entry,
 							const pgssHashKey *key, int encoding);
 static void pgss_store_query_text(PgStatShared_Pgss *shared_entry,
@@ -540,6 +555,19 @@ _PG_init(void)
 							 NULL,
 							 NULL);
 
+	DefineCustomIntVariable("pg_stat_statements.query_text_memory",
+							"Amount of shared memory (MB) for storing query texts.",
+							NULL,
+							&pgss_query_text_memory,
+							64,
+							0,
+							MaxAllocSize / (1024 * 1024),
+							PGC_SIGHUP,
+							GUC_UNIT_MB,
+							NULL,
+							pgss_query_text_memory_assign,
+							NULL);
+
 	MarkGUCPrefixReserved("pg_stat_statements");
 
 	/*
@@ -639,6 +667,44 @@ pgss_shmem_init(void *arg)
 	FreeFile(qfile);
 }
 
+/*
+ * Ensure the backend-local DSA pointer for query texts is set up.
+ * Creates the DSA on first use (under exclusive lock) or attaches to it.
+ */
+static void
+pgss_qtext_dsa_attach(void)
+{
+	bool		found;
+
+	if (pgss_qtext_dsa != NULL)
+		return;
+
+	/* DSA never created and memory disabled, nothing to do */
+	if (!pgss->dsa_created && pgss_query_text_memory == 0)
+		return;
+
+	pgss_qtext_dsa = GetNamedDSA("pg_stat_statements_qtext", &found);
+	if (!found)
+	{
+		dsa_set_size_limit(pgss_qtext_dsa,
+						   (size_t) pgss_query_text_memory * 1024 * 1024);
+		pgss->dsa_created = true;
+	}
+}
+
+/*
+ * GUC assign_hook for pg_stat_statements.query_text_memory.
+ * Update the DSA size limit when the setting changes at runtime.
+ */
+static void
+pgss_query_text_memory_assign(int newval, void *extra)
+{
+	if (pgss_qtext_dsa)
+		dsa_set_size_limit(pgss_qtext_dsa,
+						   (size_t) newval * 1024 * 1024);
+}
+
+
 /*
  * Post-parse-analysis hook: mark query with a queryId
  */
@@ -1086,11 +1152,13 @@ pgss_entry_init(PgStatShared_Pgss *shared_entry,
 	shared_entry->minmax_stats_since = shared_entry->stats_since;
 	shared_entry->query_len = -1;
 	shared_entry->encoding = encoding;
+	shared_entry->query_text_dp = InvalidDsaPointer;
 	shared_entry->query_offset = 0;
 }
 
 /*
- * Store query text into a shared entry via the external text file.
+ * Store query text into a shared entry, trying DSA first and falling back
+ * to the external text file.
  *
  * Caller must hold the entry lock.  Does nothing if text is already present.
  */
@@ -1098,20 +1166,43 @@ static void
 pgss_store_query_text(PgStatShared_Pgss *shared_entry,
 					  const char *query, int query_len, int encoding)
 {
-	Size		query_offset;
-	int			gc_count;
+	dsa_pointer dp;
 
 	if (shared_entry->query_len >= 0)
 		return;
 
-	LWLockAcquire(&pgss->lock.lock, LW_SHARED);
-	if (qtext_store(query, query_len, &query_offset, &gc_count))
+	pgss_qtext_dsa_attach();
+	dp = InvalidDsaPointer;
+	if (pgss_qtext_dsa && pgss_query_text_memory > 0)
+		dp = dsa_allocate_extended(pgss_qtext_dsa, query_len + 1,
+								   DSA_ALLOC_NO_OOM);
+
+	if (DsaPointerIsValid(dp))
 	{
-		shared_entry->query_offset = query_offset;
+		char	   *dst = dsa_get_address(pgss_qtext_dsa, dp);
+
+		memcpy(dst, query, query_len);
+		dst[query_len] = '\0';
+		shared_entry->query_text_dp = dp;
 		shared_entry->query_len = query_len;
 		shared_entry->encoding = encoding;
 	}
-	LWLockRelease(&pgss->lock.lock);
+	else
+	{
+		Size		query_offset;
+		dsa_pointer query_dp;
+		int			gc_count;
+
+		LWLockAcquire(&pgss->lock.lock, LW_SHARED);
+		if (qtext_store(query, query_len, &query_offset,
+						&query_dp, &gc_count))
+		{
+			shared_entry->query_offset = query_offset;
+			shared_entry->query_len = query_len;
+			shared_entry->encoding = encoding;
+		}
+		LWLockRelease(&pgss->lock.lock);
+	}
 }
 
 /*
@@ -1692,10 +1783,23 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 
 				if (shared_entry->query_len >= 0)
 				{
-					qstr = qtext_fetch(shared_entry->query_offset,
-									   shared_entry->query_len,
-									   qbuffer,
-									   qbuffer_size);
+					if (DsaPointerIsValid(shared_entry->query_text_dp))
+					{
+						pgss_qtext_dsa_attach();
+						/*
+						 * XXX: should we add a warning if we have a valid DSA
+						 * pointer but can't attach to the DSA? It should be
+						 * rare, if ever.
+						 */
+						if (pgss_qtext_dsa)
+							qstr = (char *) dsa_get_address(pgss_qtext_dsa,
+															shared_entry->query_text_dp);
+					}
+					else
+						qstr = qtext_fetch(shared_entry->query_offset,
+										   shared_entry->query_len,
+										   qbuffer,
+										   qbuffer_size);
 				}
 
 				if (qstr)
@@ -1924,14 +2028,43 @@ pg_stat_statements_info(PG_FUNCTION_ARGS)
  */
 static bool
 qtext_store(const char *query, int query_len,
-			Size *query_offset, int *gc_count)
+			Size *query_offset, dsa_pointer *query_dp,
+			int *gc_count)
 {
 	Size		off;
 	int			fd;
 
 	*query_offset = 0;
+	*query_dp = InvalidDsaPointer;
 
 	/*
+	 * Try to store the query text in DSA memory first, otherwise write to
+	 * disk.
+	 *
+	 * Caller must have called pgss_qtext_dsa_attach() before acquiring
+	 * pgss->lock to avoid deadlock on first-time DSA creation.
+	 */
+	if (pgss_qtext_dsa && pgss_query_text_memory > 0)
+	{
+		dsa_pointer dp;
+
+		dp = dsa_allocate_extended(pgss_qtext_dsa, query_len + 1,
+								   DSA_ALLOC_NO_OOM);
+		if (DsaPointerIsValid(dp))
+		{
+			char	   *dst = dsa_get_address(pgss_qtext_dsa, dp);
+
+			memcpy(dst, query, query_len);
+			dst[query_len] = '\0';
+			*query_dp = dp;
+			return true;
+		}
+	}
+
+	/*
+	 * DSA allocation failed (memory limit reached).  Fall back to the
+	 * external query text file.
+	 *
 	 * We use a spinlock to protect extent/n_writers/gc_count, so that
 	 * multiple processes may execute this function concurrently.
 	 */
@@ -2221,6 +2354,10 @@ gc_qtexts(void)
 
 		shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
 
+		/* Skip entries whose text lives in DSA, not in the file */
+		if (DsaPointerIsValid(shared_entry->query_text_dp))
+			continue;
+
 		query_len = shared_entry->query_len;
 		if (query_len < 0)
 			continue;
@@ -2292,7 +2429,8 @@ gc_fail:
 
 	/*
 	 * Since the contents of the external file are now uncertain, mark all
-	 * entries as having invalid texts.
+	 * file-based text entries as having invalid texts.  DSA entries are
+	 * unaffected.
 	 */
 	dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
 	while ((p = dshash_seq_next(&hstat)) != NULL)
@@ -2305,6 +2443,8 @@ gc_fail:
 			continue;
 
 		shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+		if (DsaPointerIsValid(shared_entry->query_text_dp))
+			continue;
 		shared_entry->query_offset = 0;
 		shared_entry->query_len = -1;
 	}
@@ -2338,6 +2478,13 @@ if (shared) { \
 	} \
 	else \
 	{ \
+		if (DsaPointerIsValid((shared)->query_text_dp)) \
+		{ \
+			pgss_qtext_dsa_attach(); \
+			if (pgss_qtext_dsa) \
+				dsa_free(pgss_qtext_dsa, (shared)->query_text_dp); \
+			(shared)->query_text_dp = InvalidDsaPointer; \
+		} \
 		(shared)->query_len = -1; \
 		pgstat_drop_entry(PGSTAT_KIND_PGSS, (key_ptr)->dbid, \
 						  pgss_objid(key_ptr)); \
@@ -2438,6 +2585,13 @@ entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only)
 				}
 				else
 				{
+					if (DsaPointerIsValid(shared_entry->query_text_dp))
+					{
+						pgss_qtext_dsa_attach();
+						if (pgss_qtext_dsa)
+							dsa_free(pgss_qtext_dsa, shared_entry->query_text_dp);
+						shared_entry->query_text_dp = InvalidDsaPointer;
+					}
 					shared_entry->query_len = -1;
 
 					/* Collect for deferred drop */
@@ -2668,13 +2822,23 @@ pgss_to_serialized_data(const PgStat_HashKey *key,
 	{
 		char	   *qstr = NULL;
 
-		if (!pgss_qtext_write_buffer && pgss)
-			pgss_qtext_write_buffer = qtext_load_file(&pgss_qtext_write_buffer_size);
+		if (DsaPointerIsValid(entry->query_text_dp))
+		{
+			pgss_qtext_dsa_attach();
+			if (pgss_qtext_dsa)
+				qstr = (char *) dsa_get_address(pgss_qtext_dsa,
+												entry->query_text_dp);
+		}
+		else
+		{
+			if (!pgss_qtext_write_buffer && pgss)
+				pgss_qtext_write_buffer = qtext_load_file(&pgss_qtext_write_buffer_size);
 
-		if (pgss_qtext_write_buffer)
-			qstr = qtext_fetch(entry->query_offset, query_len,
-							   pgss_qtext_write_buffer,
-							   pgss_qtext_write_buffer_size);
+			if (pgss_qtext_write_buffer)
+				qstr = qtext_fetch(entry->query_offset, query_len,
+								   pgss_qtext_write_buffer,
+								   pgss_qtext_write_buffer_size);
+		}
 
 		if (qstr)
 			pgstat_write_chunk(statfile, qstr, query_len + 1);
@@ -2723,14 +2887,16 @@ pgss_from_serialized_data(const PgStat_HashKey *key,
 	/* Initialize text fields */
 	entry->query_len = -1;
 	entry->encoding = encoding;
+	entry->query_text_dp = InvalidDsaPointer;
 	entry->query_offset = 0;
 
 	/*
-	 * Read the query text and store it in the external file.
+	 * Read the query text and store it directly in the entry.
 	 */
 	if (query_len >= 0)
 	{
 		char	   *buf = palloc(query_len + 1);
+		dsa_pointer dp = InvalidDsaPointer;
 
 		if (!pgstat_read_chunk(statfile, buf, query_len + 1))
 		{
@@ -2738,25 +2904,49 @@ pgss_from_serialized_data(const PgStat_HashKey *key,
 			return false;
 		}
 
-		if (!pgss_qtext_rebuild_file)
+		/* Try DSA allocation first */
+		pgss_qtext_dsa_attach();
+		if (pgss_qtext_dsa && pgss_query_text_memory > 0)
 		{
-			pgss_qtext_rebuild_file = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
-			if (!pgss_qtext_rebuild_file)
+			dp = dsa_allocate_extended(pgss_qtext_dsa, query_len + 1,
+									   DSA_ALLOC_NO_OOM);
+			if (DsaPointerIsValid(dp))
 			{
-				pfree(buf);
-				return false;
+				char	   *dst = dsa_get_address(pgss_qtext_dsa, dp);
+
+				memcpy(dst, buf, query_len + 1);
 			}
-			pgss_qtext_rebuild_extent = 0;
 		}
 
-		entry->query_offset = pgss_qtext_rebuild_extent;
-
-		if (fwrite(buf, 1, query_len + 1, pgss_qtext_rebuild_file) != (size_t) (query_len + 1))
+		if (DsaPointerIsValid(dp))
 		{
-			pfree(buf);
-			return false;
+			entry->query_text_dp = dp;
+			entry->query_offset = 0;
+		}
+		else
+		{
+			/* DSA unavailable or full; fall back to file */
+			if (!pgss_qtext_rebuild_file)
+			{
+				pgss_qtext_rebuild_file = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
+				if (!pgss_qtext_rebuild_file)
+				{
+					pfree(buf);
+					return false;
+				}
+				pgss_qtext_rebuild_extent = 0;
+			}
+
+			entry->query_text_dp = InvalidDsaPointer;
+			entry->query_offset = pgss_qtext_rebuild_extent;
+
+			if (fwrite(buf, 1, query_len + 1, pgss_qtext_rebuild_file) != (size_t) (query_len + 1))
+			{
+				pfree(buf);
+				return false;
+			}
+			pgss_qtext_rebuild_extent += query_len + 1;
 		}
-		pgss_qtext_rebuild_extent += query_len + 1;
 
 		entry->query_len = query_len;
 		entry->encoding = encoding;
@@ -2885,12 +3075,33 @@ entry_dealloc(void)
 	else
 		pgss->mean_query_len = ASSUMED_LENGTH_INIT;
 
-	/* Drop the bottom fraction */
+	/* Drop the bottom fraction, freeing DSA text if applicable */
 	nvictims = Max(10, nentries * USAGE_DEALLOC_PERCENT / 100);
 	nvictims = Min(nvictims, nentries);
 
+	pgss_qtext_dsa_attach();
 	for (i = 0; i < nvictims; i++)
 	{
+		PgStat_EntryRef *victim_ref;
+
+		/* Free DSA text from the entry before dropping it */
+		victim_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS,
+										  entries[i].key.dbid,
+										  pgss_objid(&entries[i].key),
+										  false, NULL);
+		if (victim_ref)
+		{
+			PgStatShared_Pgss *victim = (PgStatShared_Pgss *) victim_ref->shared_stats;
+
+			if (DsaPointerIsValid(victim->query_text_dp))
+			{
+				if (pgss_qtext_dsa)
+					dsa_free(pgss_qtext_dsa, victim->query_text_dp);
+				victim->query_text_dp = InvalidDsaPointer;
+			}
+			victim->query_len = -1;
+		}
+
 		pgstat_drop_entry(PGSTAT_KIND_PGSS,
 						  entries[i].key.dbid,
 						  pgss_objid(&entries[i].key));
diff --git a/doc/src/sgml/pgstatstatements.sgml b/doc/src/sgml/pgstatstatements.sgml
index 19b1dab74c7..0f4ab34965b 100644
--- a/doc/src/sgml/pgstatstatements.sgml
+++ b/doc/src/sgml/pgstatstatements.sgml
@@ -924,6 +924,30 @@ calls | 2
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term>
+     <varname>pg_stat_statements.query_text_memory</varname> (<type>integer</type>)
+     <indexterm>
+      <primary><varname>pg_stat_statements.query_text_memory</varname> configuration parameter</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+      <varname>pg_stat_statements.query_text_memory</varname> specifies the
+      amount of shared memory used for storing query texts in
+      DSA memory.  When this limit is reached, new query texts overflow to an
+      on-disk file.  Setting this to <literal>0</literal> disables DSA storage
+      entirely, causing all query texts to be written to disk.
+      If this value is specified without units, it is taken as megabytes.
+      The default value is <literal>64MB</literal>.
+      This parameter can be changed at any time by reloading the server
+      configuration.  Lowering the value does not immediately free existing
+      DSA allocations; they are released gradually as entries are evicted.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term>
      <varname>pg_stat_statements.track</varname> (<type>enum</type>)
@@ -1012,14 +1036,16 @@ calls | 2
 
   <para>
    The module requires additional shared memory proportional to
-   <varname>pg_stat_statements.max</varname>.  Note that this memory is
-   consumed whenever the module is loaded, even if
+   <varname>pg_stat_statements.max</varname> and
+   <varname>pg_stat_statements.query_text_memory</varname>.  Note that this
+   memory is consumed whenever the module is loaded, even if
    <varname>pg_stat_statements.track</varname> is set to <literal>none</literal>.
   </para>
 
   <para>
    These parameters are typically set in <filename>postgresql.conf</filename>.
-   Note that <varname>pg_stat_statements.max</varname> can be changed
+   Note that <varname>pg_stat_statements.max</varname> and
+   <varname>pg_stat_statements.query_text_memory</varname> can be changed
    without a server restart by reloading the configuration.
    Typical usage might be:
 
@@ -1030,6 +1056,7 @@ shared_preload_libraries = 'pg_stat_statements'
 compute_query_id = on
 pg_stat_statements.max = 10000
 pg_stat_statements.track = all
+pg_stat_statements.query_text_memory = 64
 </programlisting>
   </para>
  </sect2>
-- 
2.50.1 (Apple Git-155)



  [application/octet-stream] v2-0002-pg_stat_statements-modernize-entry-storage-with-p.patch (81.0K, 5-v2-0002-pg_stat_statements-modernize-entry-storage-with-p.patch)
  download | inline diff:
From 2c73cbe2091594561a52903ecbf7ff29805a0b2c Mon Sep 17 00:00:00 2001
From: Sami Imseih <samimseih@gmail.com>
Date: Thu, 14 May 2026 12:27:43 -0500
Subject: [PATCH v2 2/4] pg_stat_statements: modernize entry storage with
 pgstat kind

Replace the private shared-memory hash table with the pgstat subsystem's
dshash, move counter updates to backend-local pending entries that flush
periodically, and introduce admission control with timestamp-throttled
inline eviction: when entry count reaches pgss_max, a backend attempts
eviction using a conditional lock and a shared timestamp that ensures at
most one eviction cycle per 10 seconds.  Other backends skip entry
creation without blocking.

Variance/stddev computation uses Welford's online algorithm in per-backend
pending accumulation, merged into shared memory via Chan's parallel variance
algorithm during flush.
See <http://www.johndcook.com/blog/standard_deviation/>

pg_stat_statements.max is now PGC_SIGHUP (changeable without restart).
---
 contrib/pg_stat_statements/Makefile           |    1 +
 .../pg_stat_statements/expected/parallel.out  |    1 +
 contrib/pg_stat_statements/meson.build        |    1 +
 .../pg_stat_statements--1.13--1.14.sql        |    7 +
 .../pg_stat_statements/pg_stat_statements.c   | 1794 +++++++++--------
 .../pg_stat_statements.conf                   |    1 +
 .../pg_stat_statements.control                |    2 +-
 contrib/pg_stat_statements/sql/parallel.sql   |    1 +
 doc/src/sgml/pgstatstatements.sgml            |   18 +-
 src/tools/pgindent/typedefs.list              |    1 +
 10 files changed, 1011 insertions(+), 816 deletions(-)
 create mode 100644 contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql

diff --git a/contrib/pg_stat_statements/Makefile b/contrib/pg_stat_statements/Makefile
index c27e9529bb6..d7142f71cf7 100644
--- a/contrib/pg_stat_statements/Makefile
+++ b/contrib/pg_stat_statements/Makefile
@@ -7,6 +7,7 @@ OBJS = \
 
 EXTENSION = pg_stat_statements
 DATA = pg_stat_statements--1.4.sql \
+	pg_stat_statements--1.13--1.14.sql \
 	pg_stat_statements--1.12--1.13.sql \
 	pg_stat_statements--1.11--1.12.sql pg_stat_statements--1.10--1.11.sql \
 	pg_stat_statements--1.9--1.10.sql pg_stat_statements--1.8--1.9.sql \
diff --git a/contrib/pg_stat_statements/expected/parallel.out b/contrib/pg_stat_statements/expected/parallel.out
index 8af3bd2c915..bff0da7634b 100644
--- a/contrib/pg_stat_statements/expected/parallel.out
+++ b/contrib/pg_stat_statements/expected/parallel.out
@@ -20,6 +20,7 @@ SELECT count(*) FROM pgss_parallel_tab;
      0
 (1 row)
 
+RESET max_parallel_workers_per_gather;
 SELECT query,
   parallel_workers_to_launch > 0 AS has_workers_to_launch,
   parallel_workers_launched > 0 AS has_workers_launched
diff --git a/contrib/pg_stat_statements/meson.build b/contrib/pg_stat_statements/meson.build
index 9d78cb88b7d..77148949c0d 100644
--- a/contrib/pg_stat_statements/meson.build
+++ b/contrib/pg_stat_statements/meson.build
@@ -21,6 +21,7 @@ contrib_targets += pg_stat_statements
 install_data(
   'pg_stat_statements.control',
   'pg_stat_statements--1.4.sql',
+  'pg_stat_statements--1.13--1.14.sql',
   'pg_stat_statements--1.12--1.13.sql',
   'pg_stat_statements--1.11--1.12.sql',
   'pg_stat_statements--1.10--1.11.sql',
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql b/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql
new file mode 100644
index 00000000000..eb528a0d9ca
--- /dev/null
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql
@@ -0,0 +1,7 @@
+/* contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql */
+
+-- complain if script is sourced in psql, rather than via ALTER EXTENSION
+\echo Use "ALTER EXTENSION pg_stat_statements UPDATE TO '1.14'" to load this file. \quit
+
+ALTER FUNCTION pg_stat_statements(boolean) PARALLEL RESTRICTED;
+ALTER FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) PARALLEL RESTRICTED;
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 92315627916..0e6e65e3e51 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -5,8 +5,10 @@
  *		usage across a whole database cluster.
  *
  * Execution costs are totaled for each distinct source query, and kept in
- * a shared hashtable.  (We track only as many distinct queries as will fit
- * in the designated amount of shared memory.)
+ * a dshash table managed by the pgstat subsystem (custom stats kind
+ * PGSTAT_KIND_PGSS).  Counter updates accumulate in backend-local pending
+ * entries and are flushed to shared memory periodically or on demand via
+ * pgstat_report_anytime_stat().
  *
  * Starting in Postgres 9.2, this module normalized query entries.  As of
  * Postgres 14, the normalization is done by the core if compute_query_id is
@@ -14,24 +16,15 @@
  *
  * To facilitate presenting entries to users, we create "representative" query
  * strings in which constants are replaced with parameter symbols ($n), to
- * make it clearer what a normalized entry can represent.  To save on shared
- * memory, and to avoid having to truncate oversized query strings, we store
- * these strings in a temporary external query-texts file.  Offsets into this
- * file are kept in shared memory.
+ * make it clearer what a normalized entry can represent.
  *
- * Note about locking issues: to create or delete an entry in the shared
- * hashtable, one must hold pgss->lock exclusively.  Modifying any field
- * in an entry except the counters requires the same.  To look up an entry,
- * one must hold the lock shared.  To read or update the counters within
- * an entry, one must hold the lock shared or exclusive (so the entry doesn't
- * disappear!) and also take the entry's mutex spinlock.
- * The shared state variable pgss->extent (the next free spot in the external
- * query-text file) should be accessed only while holding either the
- * pgss->mutex spinlock, or exclusive lock on pgss->lock.  We use the mutex to
- * allow reserving file space while holding only shared lock on pgss->lock.
- * Rewriting the entire external query-text file, eg for garbage collection,
- * requires holding pgss->lock exclusively; this allows individual entries
- * in the file to be read or written while holding only shared lock.
+ * Each shared pgstat entry carries its own query text, stored in an
+ * external file (PGSS_TEXT_FILE).
+ *
+ * Eviction of least-used entries is throttled to run at most once every
+ * EVICTION_INTERVAL_MS milliseconds.  When eviction is needed, a single
+ * backend performs it inline using a conditional lock; other backends simply
+ * skip entry creation until space is freed.
  *
  *
  * Copyright (c) 2008-2026, PostgreSQL Global Development Group
@@ -49,6 +42,7 @@
 
 #include "access/htup_details.h"
 #include "access/parallel.h"
+#include "access/xact.h"
 #include "catalog/pg_authid.h"
 #include "executor/instrument.h"
 #include "funcapi.h"
@@ -58,7 +52,9 @@
 #include "nodes/queryjumble.h"
 #include "optimizer/planner.h"
 #include "parser/analyze.h"
+#include "common/hashfn.h"
 #include "pgstat.h"
+#include "utils/pgstat_internal.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
@@ -76,9 +72,6 @@ PG_MODULE_MAGIC_EXT(
 					.version = PG_VERSION
 );
 
-/* Location of permanent stats file (valid when database is shut down) */
-#define PGSS_DUMP_FILE	PGSTAT_STAT_PERMANENT_DIRECTORY "/pg_stat_statements.stat"
-
 /*
  * Location of external query text file.
  */
@@ -87,18 +80,16 @@ PG_MODULE_MAGIC_EXT(
 /* Magic number identifying the stats file format */
 static const uint32 PGSS_FILE_HEADER = 0x20250731;
 
-/* PostgreSQL major version number, changes in which invalidate all entries */
-static const uint32 PGSS_PG_MAJOR_VERSION = PG_VERSION_NUM / 100;
+/* Custom pgstat kind ID for pg_stat_statements entries */
+#define PGSTAT_KIND_PGSS	PGSTAT_KIND_EXPERIMENTAL
 
 /* XXX: Should USAGE_EXEC reflect execution time and/or buffer usage? */
 #define USAGE_EXEC(duration)	(1.0)
 #define USAGE_INIT				(1.0)	/* including initial planning */
-#define ASSUMED_MEDIAN_INIT		(10.0)	/* initial assumed median usage */
 #define ASSUMED_LENGTH_INIT		1024	/* initial assumed mean query length */
 #define USAGE_DECREASE_FACTOR	(0.99)	/* decreased every entry_dealloc */
-#define STICKY_DECREASE_FACTOR	(0.50)	/* factor for sticky entries */
 #define USAGE_DEALLOC_PERCENT	5	/* free this % of entries at once */
-#define IS_STICKY(c)	((c.calls[PGSS_PLAN] + c.calls[PGSS_EXEC]) == 0)
+#define EVICTION_INTERVAL_MS	10000	/* min ms between eviction cycles */
 
 /*
  * Extension version number, for supporting older extension versions' objects
@@ -140,18 +131,27 @@ typedef enum pgssStoreKind
  * zero the padding bytes.  Otherwise, things will break, because pgss_hash is
  * created using HASH_BLOBS, and thus tag_hash is used to hash this.
  */
-typedef struct pgssHashKey
+typedef struct pgssHashKey pgssHashKey;
+typedef struct Counters Counters;
+typedef struct pgssGlobalStats pgssGlobalStats;
+typedef struct pgssSharedState pgssSharedState;
+typedef struct PgStatShared_Pgss PgStatShared_Pgss;
+typedef struct PgStat_PendingPgss PgStat_PendingPgss;
+typedef struct UsageEntry UsageEntry;
+typedef struct PendingDrop PendingDrop;
+
+struct pgssHashKey
 {
 	Oid			userid;			/* user OID */
 	Oid			dbid;			/* database OID */
 	int64		queryid;		/* query identifier */
 	bool		toplevel;		/* query executed at top level */
-} pgssHashKey;
+};
 
 /*
  * The actual stats counters kept within pgssEntry.
  */
-typedef struct Counters
+struct Counters
 {
 	int64		calls[PGSS_NUMKIND];	/* # of times planned/executed */
 	double		total_time[PGSS_NUMKIND];	/* total planning/execution time,
@@ -212,54 +212,76 @@ typedef struct Counters
 											 * launched */
 	int64		generic_plan_calls; /* number of calls using a generic plan */
 	int64		custom_plan_calls;	/* number of calls using a custom plan */
-} Counters;
+};
 
 /*
  * Global statistics for pg_stat_statements
  */
-typedef struct pgssGlobalStats
+struct pgssGlobalStats
 {
 	int64		dealloc;		/* # of times entries were deallocated */
 	TimestampTz stats_reset;	/* timestamp with all stats reset */
-} pgssGlobalStats;
-
-/*
- * Statistics per statement
- *
- * Note: in event of a failure in garbage collection of the query text file,
- * we reset query_offset to zero and query_len to -1.  This will be seen as
- * an invalid state by qtext_fetch().
- */
-typedef struct pgssEntry
-{
-	pgssHashKey key;			/* hash key of entry - MUST BE FIRST */
-	Counters	counters;		/* the statistics for this query */
-	Size		query_offset;	/* query text offset in external file */
-	int			query_len;		/* # of valid bytes in query string, or -1 */
-	int			encoding;		/* query text encoding */
-	TimestampTz stats_since;	/* timestamp of entry allocation */
-	TimestampTz minmax_stats_since; /* timestamp of last min/max values reset */
-	slock_t		mutex;			/* protects the counters only */
-} pgssEntry;
+};
 
 /*
  * Global shared state
  */
-typedef struct pgssSharedState
+struct pgssSharedState
 {
-	LWLockPadded lock;			/* protects hashtable search/modification */
-	double		cur_median_usage;	/* current median usage in hashtable */
+	LWLockPadded lock;			/* protects query text file operations */
 	Size		mean_query_len; /* current mean entry text length */
 	slock_t		mutex;			/* protects following fields only: */
 	Size		extent;			/* current extent of query file */
 	int			n_writers;		/* number of active writers to query file */
 	int			gc_count;		/* query file garbage collection cycle count */
+	TimestampTz last_eviction_time; /* throttle: last time entry_dealloc ran */
 	pgssGlobalStats stats;		/* global statistics for pgss */
-} pgssSharedState;
+};
+
+/*
+ * Shared memory entry for pgstat custom kind.
+ * This is what lives in the pgstat shared hash table.
+ */
+struct PgStatShared_Pgss
+{
+	PgStatShared_Common header;
+	pgssHashKey key;			/* full original key for reconstruction */
+	Counters	counters;		/* the statistics */
+	TimestampTz stats_since;	/* timestamp of entry allocation */
+	TimestampTz minmax_stats_since; /* timestamp of last min/max reset */
+	int			query_len;		/* length of query text, or -1 if invalid */
+	int			encoding;		/* encoding of query text */
+
+	Size		query_offset;	/* offset in external query text file */
+};
+
+/*
+ * Pending (backend-local) stats entry, accumulated before flush.
+ */
+struct PgStat_PendingPgss
+{
+	Counters	counters;
+};
+
+/*
+ * Used during entry reset to collect keys for deferred drop.
+ */
+struct PendingDrop
+{
+	Oid			dbid;
+	uint64		objid;
+};
 
 /* Links to shared memory state */
 static pgssSharedState *pgss;
-static HTAB *pgss_hash;
+
+/* Buffer used during serialization to avoid reloading text file per entry */
+static char *pgss_qtext_write_buffer = NULL;
+static Size pgss_qtext_write_buffer_size = 0;
+
+/* File handle used during deserialization to rebuild query text file */
+static FILE *pgss_qtext_rebuild_file = NULL;
+static Size pgss_qtext_rebuild_extent = 0;
 
 static void pgss_shmem_request(void *arg);
 static void pgss_shmem_init(void *arg);
@@ -269,6 +291,32 @@ static const ShmemCallbacks pgss_shmem_callbacks = {
 	.init_fn = pgss_shmem_init,
 };
 
+/* pgstat custom kind callbacks */
+static bool pgss_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait);
+static void pgss_to_serialized_data(const PgStat_HashKey *key,
+									const PgStatShared_Common *header,
+									FILE *statfile);
+static bool pgss_from_serialized_data(const PgStat_HashKey *key,
+									  PgStatShared_Common *header,
+									  FILE *statfile);
+static void pgss_finish(PgStat_StatsFileOp status);
+
+static const PgStat_KindInfo pgss_kind_info = {
+	.name = "pg_stat_statements",
+	.fixed_amount = false,
+	.write_to_file = true,
+	.track_entry_count = true,
+	.accessed_across_databases = true,
+	.shared_size = sizeof(PgStatShared_Pgss),
+	.shared_data_off = offsetof(PgStatShared_Pgss, key),
+	.shared_data_len = sizeof(PgStatShared_Pgss) - offsetof(PgStatShared_Pgss, key),
+	.pending_size = sizeof(PgStat_PendingPgss),
+	.flush_pending_cb = pgss_flush_pending_cb,
+	.to_serialized_data = pgss_to_serialized_data,
+	.from_serialized_data = pgss_from_serialized_data,
+	.finish = pgss_finish,
+};
+
 /*---- Local variables ----*/
 
 /* Current nesting depth of planner/ExecutorRun/ProcessUtility calls */
@@ -306,7 +354,6 @@ static bool pgss_track_utility = true;	/* whether to track utility commands */
 static bool pgss_track_planning = false;	/* whether to track planning
 											 * duration */
 static bool pgss_save = true;	/* whether to save stats across shutdown */
-
 #define pgss_enabled(level) \
 	(!IsParallelWorker() && \
 	(pgss_track == PGSS_TRACK_ALL || \
@@ -335,7 +382,6 @@ PG_FUNCTION_INFO_V1(pg_stat_statements_1_13);
 PG_FUNCTION_INFO_V1(pg_stat_statements);
 PG_FUNCTION_INFO_V1(pg_stat_statements_info);
 
-static void pgss_shmem_shutdown(int code, Datum arg);
 static void pgss_post_parse_analyze(ParseState *pstate, Query *query,
 									const JumbleState *jstate);
 static PlannedStmt *pgss_planner(Query *parse,
@@ -368,8 +414,6 @@ static void pgss_store(const char *query, int64 queryId,
 static void pg_stat_statements_internal(FunctionCallInfo fcinfo,
 										pgssVersion api_version,
 										bool showtext);
-static pgssEntry *entry_alloc(pgssHashKey *key, Size query_offset, int query_len,
-							  int encoding, bool sticky);
 static void entry_dealloc(void);
 static bool qtext_store(const char *query, int query_len,
 						Size *query_offset, int *gc_count);
@@ -382,6 +426,35 @@ static TimestampTz entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_
 static char *generate_normalized_query(const JumbleState *jstate,
 									   const char *query,
 									   int query_loc, int *query_len_p);
+static void pgss_entry_init(PgStatShared_Pgss *shared_entry,
+							const pgssHashKey *key, int encoding);
+static void pgss_store_query_text(PgStatShared_Pgss *shared_entry,
+								  const char *query, int query_len,
+								  int encoding);
+
+struct UsageEntry
+{
+	pgssHashKey key;
+	double		usage;
+};
+
+static int	entry_cmp(const void *a, const void *b);
+static void pgss_maybe_evict(void);
+
+/*
+ * Compute a uint64 objid from a pgssHashKey for use in PgStat_HashKey.
+ * We hash (userid, queryid, toplevel) together since dbid goes into dboid.
+ */
+static inline uint64
+pgss_objid(const pgssHashKey *key)
+{
+	uint64		hashval;
+
+	hashval = murmurhash64((uint64) key->userid);
+	hashval = hash_combine64(hashval, murmurhash64((uint64) key->queryid));
+	hashval = hash_combine64(hashval, murmurhash64((uint64) key->toplevel));
+	return hashval;
+}
 
 /*
  * Module load callback
@@ -416,7 +489,7 @@ _PG_init(void)
 							5000,
 							100,
 							INT_MAX / 2,
-							PGC_POSTMASTER,
+							PGC_SIGHUP,
 							0,
 							NULL,
 							NULL,
@@ -474,6 +547,11 @@ _PG_init(void)
 	 */
 	RegisterShmemCallbacks(&pgss_shmem_callbacks);
 
+	/*
+	 * Register custom statistics kind for pg_stat_statements entries.
+	 */
+	pgstat_register_kind(PGSTAT_KIND_PGSS, &pgss_kind_info);
+
 	/*
 	 * Install hooks.
 	 */
@@ -505,13 +583,6 @@ _PG_init(void)
 static void
 pgss_shmem_request(void *arg)
 {
-	ShmemRequestHash(.name = "pg_stat_statements hash",
-					 .nelems = pgss_max,
-					 .hash_info.keysize = sizeof(pgssHashKey),
-					 .hash_info.entrysize = sizeof(pgssEntry),
-					 .hash_flags = HASH_ELEM | HASH_BLOBS,
-					 .ptr = &pgss_hash,
-		);
 	ShmemRequestStruct(.name = "pg_stat_statements",
 					   .size = sizeof(pgssSharedState),
 					   .ptr = (void **) &pgss,
@@ -530,14 +601,7 @@ static void
 pgss_shmem_init(void *arg)
 {
 	int			tranche_id;
-	FILE	   *file = NULL;
 	FILE	   *qfile = NULL;
-	uint32		header;
-	int32		num;
-	int32		pgver;
-	int32		i;
-	int			buffer_size;
-	char	   *buffer = NULL;
 
 	/*
 	 * We already checked that we're loaded from shared_preload_libraries in
@@ -546,285 +610,33 @@ pgss_shmem_init(void *arg)
 	Assert(!IsUnderPostmaster);
 
 	/*
-	 * Initialize the shmem area with no statistics.
+	 * Initialize the shmem area.
 	 */
 	tranche_id = LWLockNewTrancheId("pg_stat_statements");
 	LWLockInitialize(&pgss->lock.lock, tranche_id);
-	pgss->cur_median_usage = ASSUMED_MEDIAN_INIT;
 	pgss->mean_query_len = ASSUMED_LENGTH_INIT;
 	SpinLockInit(&pgss->mutex);
 	pgss->extent = 0;
 	pgss->n_writers = 0;
 	pgss->gc_count = 0;
+	pgss->last_eviction_time = 0;
 	pgss->stats.dealloc = 0;
 	pgss->stats.stats_reset = GetCurrentTimestamp();
 
-	/* The hash table must've also been initialized by now */
-	Assert(pgss_hash != NULL);
-
-	/*
-	 * Set up a shmem exit hook to dump the statistics to disk on postmaster
-	 * (or standalone backend) exit.
-	 */
-	on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
-
-	/*
-	 * Load any pre-existing statistics from file.
-	 *
-	 * Note: we don't bother with locks here, because there should be no other
-	 * processes running when this code is reached.
-	 */
-
 	/* Unlink query text file possibly left over from crash */
 	unlink(PGSS_TEXT_FILE);
 
 	/* Allocate new query text temp file */
 	qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
 	if (qfile == NULL)
-		goto write_error;
-
-	/*
-	 * If we were told not to load old statistics, we're done.  (Note we do
-	 * not try to unlink any old dump file in this case.  This seems a bit
-	 * questionable but it's the historical behavior.)
-	 */
-	if (!pgss_save)
-	{
-		FreeFile(qfile);
-		return;
-	}
-
-	/*
-	 * Attempt to load old statistics from the dump file.
-	 */
-	file = AllocateFile(PGSS_DUMP_FILE, PG_BINARY_R);
-	if (file == NULL)
 	{
-		if (errno != ENOENT)
-			goto read_error;
-		/* No existing persisted stats file, so we're done */
-		FreeFile(qfile);
+		ereport(LOG,
+				(errcode_for_file_access(),
+				 errmsg("could not write file \"%s\": %m",
+						PGSS_TEXT_FILE)));
 		return;
 	}
-
-	buffer_size = 2048;
-	buffer = (char *) palloc(buffer_size);
-
-	if (fread(&header, sizeof(uint32), 1, file) != 1 ||
-		fread(&pgver, sizeof(uint32), 1, file) != 1 ||
-		fread(&num, sizeof(int32), 1, file) != 1)
-		goto read_error;
-
-	if (header != PGSS_FILE_HEADER ||
-		pgver != PGSS_PG_MAJOR_VERSION)
-		goto data_error;
-
-	for (i = 0; i < num; i++)
-	{
-		pgssEntry	temp;
-		pgssEntry  *entry;
-		Size		query_offset;
-
-		if (fread(&temp, sizeof(pgssEntry), 1, file) != 1)
-			goto read_error;
-
-		/* Encoding is the only field we can easily sanity-check */
-		if (!PG_VALID_BE_ENCODING(temp.encoding))
-			goto data_error;
-
-		/* Resize buffer as needed */
-		if (temp.query_len >= buffer_size)
-		{
-			buffer_size = Max(buffer_size * 2, temp.query_len + 1);
-			buffer = repalloc(buffer, buffer_size);
-		}
-
-		if (fread(buffer, 1, temp.query_len + 1, file) != temp.query_len + 1)
-			goto read_error;
-
-		/* Should have a trailing null, but let's make sure */
-		buffer[temp.query_len] = '\0';
-
-		/* Skip loading "sticky" entries */
-		if (IS_STICKY(temp.counters))
-			continue;
-
-		/* Store the query text */
-		query_offset = pgss->extent;
-		if (fwrite(buffer, 1, temp.query_len + 1, qfile) != temp.query_len + 1)
-			goto write_error;
-		pgss->extent += temp.query_len + 1;
-
-		/* make the hashtable entry (discards old entries if too many) */
-		entry = entry_alloc(&temp.key, query_offset, temp.query_len,
-							temp.encoding,
-							false);
-
-		/* copy in the actual stats */
-		entry->counters = temp.counters;
-		entry->stats_since = temp.stats_since;
-		entry->minmax_stats_since = temp.minmax_stats_since;
-	}
-
-	/* Read global statistics for pg_stat_statements */
-	if (fread(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
-		goto read_error;
-
-	pfree(buffer);
-	FreeFile(file);
 	FreeFile(qfile);
-
-	/*
-	 * Remove the persisted stats file so it's not included in
-	 * backups/replication standbys, etc.  A new file will be written on next
-	 * shutdown.
-	 *
-	 * Note: it's okay if the PGSS_TEXT_FILE is included in a basebackup,
-	 * because we remove that file on startup; it acts inversely to
-	 * PGSS_DUMP_FILE, in that it is only supposed to be around when the
-	 * server is running, whereas PGSS_DUMP_FILE is only supposed to be around
-	 * when the server is not running.  Leaving the file creates no danger of
-	 * a newly restored database having a spurious record of execution costs,
-	 * which is what we're really concerned about here.
-	 */
-	unlink(PGSS_DUMP_FILE);
-
-	return;
-
-read_error:
-	ereport(LOG,
-			(errcode_for_file_access(),
-			 errmsg("could not read file \"%s\": %m",
-					PGSS_DUMP_FILE)));
-	goto fail;
-data_error:
-	ereport(LOG,
-			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-			 errmsg("ignoring invalid data in file \"%s\"",
-					PGSS_DUMP_FILE)));
-	goto fail;
-write_error:
-	ereport(LOG,
-			(errcode_for_file_access(),
-			 errmsg("could not write file \"%s\": %m",
-					PGSS_TEXT_FILE)));
-fail:
-	if (buffer)
-		pfree(buffer);
-	if (file)
-		FreeFile(file);
-	if (qfile)
-		FreeFile(qfile);
-	/* If possible, throw away the bogus file; ignore any error */
-	unlink(PGSS_DUMP_FILE);
-
-	/*
-	 * Don't unlink PGSS_TEXT_FILE here; it should always be around while the
-	 * server is running with pg_stat_statements enabled
-	 */
-}
-
-/*
- * shmem_shutdown hook: Dump statistics into file.
- *
- * Note: we don't bother with acquiring lock, because there should be no
- * other processes running when this is called.
- */
-static void
-pgss_shmem_shutdown(int code, Datum arg)
-{
-	FILE	   *file;
-	char	   *qbuffer = NULL;
-	Size		qbuffer_size = 0;
-	HASH_SEQ_STATUS hash_seq;
-	int32		num_entries;
-	pgssEntry  *entry;
-
-	/* Don't try to dump during a crash. */
-	if (code)
-		return;
-
-	/* Safety check ... shouldn't get here unless shmem is set up. */
-	if (!pgss || !pgss_hash)
-		return;
-
-	/* Don't dump if told not to. */
-	if (!pgss_save)
-		return;
-
-	file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W);
-	if (file == NULL)
-		goto error;
-
-	if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
-		goto error;
-	if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1)
-		goto error;
-	num_entries = hash_get_num_entries(pgss_hash);
-	if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
-		goto error;
-
-	qbuffer = qtext_load_file(&qbuffer_size);
-	if (qbuffer == NULL)
-		goto error;
-
-	/*
-	 * When serializing to disk, we store query texts immediately after their
-	 * entry data.  Any orphaned query texts are thereby excluded.
-	 */
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
-	{
-		int			len = entry->query_len;
-		char	   *qstr = qtext_fetch(entry->query_offset, len,
-									   qbuffer, qbuffer_size);
-
-		if (qstr == NULL)
-			continue;			/* Ignore any entries with bogus texts */
-
-		if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 ||
-			fwrite(qstr, 1, len + 1, file) != len + 1)
-		{
-			/* note: we assume hash_seq_term won't change errno */
-			hash_seq_term(&hash_seq);
-			goto error;
-		}
-	}
-
-	/* Dump global statistics for pg_stat_statements */
-	if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
-		goto error;
-
-	pfree(qbuffer);
-	qbuffer = NULL;
-
-	if (FreeFile(file))
-	{
-		file = NULL;
-		goto error;
-	}
-
-	/*
-	 * Rename file into place, so we atomically replace any old one.
-	 */
-	(void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG);
-
-	/* Unlink query-texts file; it's not needed while shutdown */
-	unlink(PGSS_TEXT_FILE);
-
-	return;
-
-error:
-	ereport(LOG,
-			(errcode_for_file_access(),
-			 errmsg("could not write file \"%s\": %m",
-					PGSS_DUMP_FILE ".tmp")));
-	if (qbuffer)
-		pfree(qbuffer);
-	if (file)
-		FreeFile(file);
-	unlink(PGSS_DUMP_FILE ".tmp");
-	unlink(PGSS_TEXT_FILE);
 }
 
 /*
@@ -837,7 +649,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query, const JumbleState *jst
 		prev_post_parse_analyze_hook(pstate, query, jstate);
 
 	/* Safety check... */
-	if (!pgss || !pgss_hash || !pgss_enabled(nesting_level))
+	if (!pgss || !pgss_enabled(nesting_level))
 		return;
 
 	/*
@@ -1254,9 +1066,62 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
 	}
 }
 
+/*
+ * Initialize a freshly-created shared entry.
+ *
+ * Caller must hold the entry lock.  The entry is considered "new" when
+ * key.queryid is still zero (as left by pgstat entry creation).
+ */
+static void
+pgss_entry_init(PgStatShared_Pgss *shared_entry,
+				const pgssHashKey *key, int encoding)
+{
+	if (shared_entry->key.queryid != INT64CONST(0))
+		return;
+
+	shared_entry->key = *key;
+	memset(&shared_entry->counters, 0, sizeof(Counters));
+	shared_entry->counters.usage = USAGE_INIT;
+	shared_entry->stats_since = GetCurrentTimestamp();
+	shared_entry->minmax_stats_since = shared_entry->stats_since;
+	shared_entry->query_len = -1;
+	shared_entry->encoding = encoding;
+	shared_entry->query_offset = 0;
+}
+
+/*
+ * Store query text into a shared entry via the external text file.
+ *
+ * Caller must hold the entry lock.  Does nothing if text is already present.
+ */
+static void
+pgss_store_query_text(PgStatShared_Pgss *shared_entry,
+					  const char *query, int query_len, int encoding)
+{
+	Size		query_offset;
+	int			gc_count;
+
+	if (shared_entry->query_len >= 0)
+		return;
+
+	LWLockAcquire(&pgss->lock.lock, LW_SHARED);
+	if (qtext_store(query, query_len, &query_offset, &gc_count))
+	{
+		shared_entry->query_offset = query_offset;
+		shared_entry->query_len = query_len;
+		shared_entry->encoding = encoding;
+	}
+	LWLockRelease(&pgss->lock.lock);
+}
+
 /*
  * Store some statistics for a statement.
  *
+ * Shared entry creation and query text storage are written directly to
+ * shared memory, making entries immediately visible to other backends.
+ * Counter accumulation is done in backend-local pending entries, flushed
+ * periodically by pgss_flush_pending_cb.
+ *
  * If jstate is not NULL then we're trying to create an entry for which
  * we have no statistics as yet; we just want to record the normalized
  * query string.  total_time, rows, bufusage and walusage are ignored in this
@@ -1279,14 +1144,16 @@ pgss_store(const char *query, int64 queryId,
 		   PlannedStmtOrigin planOrigin)
 {
 	pgssHashKey key;
-	pgssEntry  *entry;
+	PgStat_EntryRef *entry_ref;
+	PgStatShared_Pgss *shared_entry;
+	PgStat_PendingPgss *pending;
 	char	   *norm_query = NULL;
 	int			encoding = GetDatabaseEncoding();
 
 	Assert(query != NULL);
 
 	/* Safety check... */
-	if (!pgss || !pgss_hash)
+	if (!pgss)
 		return;
 
 	/*
@@ -1313,192 +1180,171 @@ pgss_store(const char *query, int64 queryId,
 	key.queryid = queryId;
 	key.toplevel = (nesting_level == 0);
 
-	/* Lookup the hash table entry with shared lock. */
-	LWLockAcquire(&pgss->lock.lock, LW_SHARED);
-
-	entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
-
-	/* Create new entry, if not present */
-	if (!entry)
+	/*
+	 * If jstate is set, create the shared entry and store normalized query
+	 * text.  Don't increment counters; entries with zero calls are not
+	 * returned by pg_stat_statements_internal().
+	 */
+	if (jstate)
 	{
-		Size		query_offset;
-		int			gc_count;
-		bool		stored;
-		bool		do_gc;
+		const char *store_query;
 
-		/*
-		 * Create a new, normalized query string if caller asked.  We don't
-		 * need to hold the lock while doing this work.  (Note: in any case,
-		 * it's possible that someone else creates a duplicate hashtable entry
-		 * in the interval where we don't hold the lock below.  That case is
-		 * handled by entry_alloc.)
-		 */
-		if (jstate)
+		if (pgstat_get_entry_count(PGSTAT_KIND_PGSS) >= pgss_max)
 		{
-			LWLockRelease(&pgss->lock.lock);
-			norm_query = generate_normalized_query(jstate, query,
-												   query_location,
-												   &query_len);
-			LWLockAcquire(&pgss->lock.lock, LW_SHARED);
+			pgss_maybe_evict();
+			return;
 		}
 
-		/* Append new query text to file with only shared lock held */
-		stored = qtext_store(norm_query ? norm_query : query, query_len,
-							 &query_offset, &gc_count);
-
-		/*
-		 * Determine whether we need to garbage collect external query texts
-		 * while the shared lock is still held.  This micro-optimization
-		 * avoids taking the time to decide this while holding exclusive lock.
-		 */
-		do_gc = need_gc_qtexts();
-
-		/* Need exclusive lock to make a new hashtable entry - promote */
-		LWLockRelease(&pgss->lock.lock);
-		LWLockAcquire(&pgss->lock.lock, LW_EXCLUSIVE);
+		norm_query = generate_normalized_query(jstate, query,
+											   query_location,
+											   &query_len);
+		store_query = norm_query ? norm_query : query;
 
-		/*
-		 * A garbage collection may have occurred while we weren't holding the
-		 * lock.  In the unlikely event that this happens, the query text we
-		 * stored above will have been garbage collected, so write it again.
-		 * This should be infrequent enough that doing it while holding
-		 * exclusive lock isn't a performance problem.
-		 */
-		if (!stored || pgss->gc_count != gc_count)
-			stored = qtext_store(norm_query ? norm_query : query, query_len,
-								 &query_offset, NULL);
+		entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_PGSS,
+												key.dbid,
+												pgss_objid(&key),
+												true);
+		if (!entry_ref)
+		{
+			if (norm_query)
+				pfree(norm_query);
+			return;
+		}
 
-		/* If we failed to write to the text file, give up */
-		if (!stored)
-			goto done;
+		shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats;
+		pgss_entry_init(shared_entry, &key, encoding);
+		pgss_store_query_text(shared_entry, store_query, query_len, encoding);
 
-		/* OK to create a new hashtable entry */
-		entry = entry_alloc(&key, query_offset, query_len, encoding,
-							jstate != NULL);
+		pgstat_unlock_entry(entry_ref);
 
-		/* If needed, perform garbage collection while exclusive lock held */
-		if (do_gc)
-			gc_qtexts();
+		if (norm_query)
+			pfree(norm_query);
+		return;
 	}
 
-	/* Increment the counts, except when jstate is not NULL */
-	if (!jstate)
+	/*
+	 * Normal case: accumulate stats in a pending entry.  The pending entry
+	 * will be flushed to shared memory by pgss_flush_pending_cb.
+	 *
+	 * But first, ensure the shared entry exists with query text.
+	 */
+	entry_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS, key.dbid,
+									 pgss_objid(&key), false, NULL);
+	if (!entry_ref)
 	{
-		Assert(kind == PGSS_PLAN || kind == PGSS_EXEC);
-
 		/*
-		 * Grab the spinlock while updating the counters (see comment about
-		 * locking rules at the head of the file)
+		 * Entry doesn't exist yet.  Don't create a new one if we've already
+		 * hit the configured maximum; eviction will free space eventually.
 		 */
-		SpinLockAcquire(&entry->mutex);
+		if (pgstat_get_entry_count(PGSTAT_KIND_PGSS) >= pgss_max)
+		{
+			pgss_maybe_evict();
+			return;
+		}
 
-		/* "Unstick" entry if it was previously sticky */
-		if (IS_STICKY(entry->counters))
-			entry->counters.usage = USAGE_INIT;
+		entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_PGSS,
+												key.dbid,
+												pgss_objid(&key),
+												true);
+		if (!entry_ref)
+			return;
 
-		entry->counters.calls[kind] += 1;
-		entry->counters.total_time[kind] += total_time;
+		shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats;
+		pgss_entry_init(shared_entry, &key, encoding);
+		pgss_store_query_text(shared_entry, query, query_len, encoding);
 
-		if (entry->counters.calls[kind] == 1)
-		{
-			entry->counters.min_time[kind] = total_time;
-			entry->counters.max_time[kind] = total_time;
-			entry->counters.mean_time[kind] = total_time;
-		}
-		else
-		{
-			/*
-			 * Welford's method for accurately computing variance. See
-			 * <http://www.johndcook.com/blog/standard_deviation/>
-			 */
-			double		old_mean = entry->counters.mean_time[kind];
+		pgstat_unlock_entry(entry_ref);
+	}
 
-			entry->counters.mean_time[kind] +=
-				(total_time - old_mean) / entry->counters.calls[kind];
-			entry->counters.sum_var_time[kind] +=
-				(total_time - old_mean) * (total_time - entry->counters.mean_time[kind]);
+	/*
+	 * Now accumulate stats in the pending entry.
+	 */
+	Assert(kind == PGSS_PLAN || kind == PGSS_EXEC);
 
-			/*
-			 * Calculate min and max time. min = 0 and max = 0 means that the
-			 * min/max statistics were reset
-			 */
-			if (entry->counters.min_time[kind] == 0
-				&& entry->counters.max_time[kind] == 0)
-			{
-				entry->counters.min_time[kind] = total_time;
-				entry->counters.max_time[kind] = total_time;
-			}
-			else
-			{
-				if (entry->counters.min_time[kind] > total_time)
-					entry->counters.min_time[kind] = total_time;
-				if (entry->counters.max_time[kind] < total_time)
-					entry->counters.max_time[kind] = total_time;
-			}
-		}
-		entry->counters.rows += rows;
-		entry->counters.shared_blks_hit += bufusage->shared_blks_hit;
-		entry->counters.shared_blks_read += bufusage->shared_blks_read;
-		entry->counters.shared_blks_dirtied += bufusage->shared_blks_dirtied;
-		entry->counters.shared_blks_written += bufusage->shared_blks_written;
-		entry->counters.local_blks_hit += bufusage->local_blks_hit;
-		entry->counters.local_blks_read += bufusage->local_blks_read;
-		entry->counters.local_blks_dirtied += bufusage->local_blks_dirtied;
-		entry->counters.local_blks_written += bufusage->local_blks_written;
-		entry->counters.temp_blks_read += bufusage->temp_blks_read;
-		entry->counters.temp_blks_written += bufusage->temp_blks_written;
-		entry->counters.shared_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_read_time);
-		entry->counters.shared_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_write_time);
-		entry->counters.local_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_read_time);
-		entry->counters.local_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_write_time);
-		entry->counters.temp_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_read_time);
-		entry->counters.temp_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_write_time);
-		entry->counters.usage += USAGE_EXEC(total_time);
-		entry->counters.wal_records += walusage->wal_records;
-		entry->counters.wal_fpi += walusage->wal_fpi;
-		entry->counters.wal_bytes += walusage->wal_bytes;
-		entry->counters.wal_buffers_full += walusage->wal_buffers_full;
-		if (jitusage)
-		{
-			entry->counters.jit_functions += jitusage->created_functions;
-			entry->counters.jit_generation_time += INSTR_TIME_GET_MILLISEC(jitusage->generation_counter);
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_PGSS, key.dbid,
+										  pgss_objid(&key), NULL);
+	pending = (PgStat_PendingPgss *) entry_ref->pending;
 
-			if (INSTR_TIME_GET_MILLISEC(jitusage->deform_counter))
-				entry->counters.jit_deform_count++;
-			entry->counters.jit_deform_time += INSTR_TIME_GET_MILLISEC(jitusage->deform_counter);
+	pending->counters.calls[kind] += 1;
+	pending->counters.total_time[kind] += total_time;
 
-			if (INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter))
-				entry->counters.jit_inlining_count++;
-			entry->counters.jit_inlining_time += INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter);
+	/*
+	 * Compute mean and sum of squared deviations using Welford's online
+	 * algorithm.  These per-backend values are later merged into shared
+	 * memory using Chan's parallel variance algorithm in the flush callback.
+	 * See <http://www.johndcook.com/blog/standard_deviation/>
+	 */
+	if (pending->counters.calls[kind] == 1)
+	{
+		pending->counters.min_time[kind] = total_time;
+		pending->counters.max_time[kind] = total_time;
+		pending->counters.mean_time[kind] = total_time;
+	}
+	else
+	{
+		double		old_mean = pending->counters.mean_time[kind];
+
+		if (pending->counters.min_time[kind] > total_time)
+			pending->counters.min_time[kind] = total_time;
+		if (pending->counters.max_time[kind] < total_time)
+			pending->counters.max_time[kind] = total_time;
+		pending->counters.mean_time[kind] +=
+			(total_time - old_mean) / pending->counters.calls[kind];
+		pending->counters.sum_var_time[kind] +=
+			(total_time - old_mean) * (total_time - pending->counters.mean_time[kind]);
+	}
 
-			if (INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter))
-				entry->counters.jit_optimization_count++;
-			entry->counters.jit_optimization_time += INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter);
+	pending->counters.rows += rows;
+	pending->counters.shared_blks_hit += bufusage->shared_blks_hit;
+	pending->counters.shared_blks_read += bufusage->shared_blks_read;
+	pending->counters.shared_blks_dirtied += bufusage->shared_blks_dirtied;
+	pending->counters.shared_blks_written += bufusage->shared_blks_written;
+	pending->counters.local_blks_hit += bufusage->local_blks_hit;
+	pending->counters.local_blks_read += bufusage->local_blks_read;
+	pending->counters.local_blks_dirtied += bufusage->local_blks_dirtied;
+	pending->counters.local_blks_written += bufusage->local_blks_written;
+	pending->counters.temp_blks_read += bufusage->temp_blks_read;
+	pending->counters.temp_blks_written += bufusage->temp_blks_written;
+	pending->counters.shared_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_read_time);
+	pending->counters.shared_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_write_time);
+	pending->counters.local_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_read_time);
+	pending->counters.local_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_write_time);
+	pending->counters.temp_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_read_time);
+	pending->counters.temp_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_write_time);
+	pending->counters.usage += USAGE_EXEC(total_time);
+	pending->counters.wal_records += walusage->wal_records;
+	pending->counters.wal_fpi += walusage->wal_fpi;
+	pending->counters.wal_bytes += walusage->wal_bytes;
+	pending->counters.wal_buffers_full += walusage->wal_buffers_full;
+	if (jitusage)
+	{
+		pending->counters.jit_functions += jitusage->created_functions;
+		pending->counters.jit_generation_time += INSTR_TIME_GET_MILLISEC(jitusage->generation_counter);
 
-			if (INSTR_TIME_GET_MILLISEC(jitusage->emission_counter))
-				entry->counters.jit_emission_count++;
-			entry->counters.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter);
-		}
+		if (INSTR_TIME_GET_MILLISEC(jitusage->deform_counter))
+			pending->counters.jit_deform_count++;
+		pending->counters.jit_deform_time += INSTR_TIME_GET_MILLISEC(jitusage->deform_counter);
 
-		/* parallel worker counters */
-		entry->counters.parallel_workers_to_launch += parallel_workers_to_launch;
-		entry->counters.parallel_workers_launched += parallel_workers_launched;
+		if (INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter))
+			pending->counters.jit_inlining_count++;
+		pending->counters.jit_inlining_time += INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter);
 
-		/* plan cache counters */
-		if (planOrigin == PLAN_STMT_CACHE_GENERIC)
-			entry->counters.generic_plan_calls++;
-		else if (planOrigin == PLAN_STMT_CACHE_CUSTOM)
-			entry->counters.custom_plan_calls++;
+		if (INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter))
+			pending->counters.jit_optimization_count++;
+		pending->counters.jit_optimization_time += INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter);
 
-		SpinLockRelease(&entry->mutex);
+		if (INSTR_TIME_GET_MILLISEC(jitusage->emission_counter))
+			pending->counters.jit_emission_count++;
+		pending->counters.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter);
 	}
 
-done:
-	LWLockRelease(&pgss->lock.lock);
+	pending->counters.parallel_workers_to_launch += parallel_workers_to_launch;
+	pending->counters.parallel_workers_launched += parallel_workers_launched;
 
-	/* We postpone this clean-up until we're out of the lock */
-	if (norm_query)
-		pfree(norm_query);
+	if (planOrigin == PLAN_STMT_CACHE_GENERIC)
+		pending->counters.generic_plan_calls++;
+	else if (planOrigin == PLAN_STMT_CACHE_CUSTOM)
+		pending->counters.custom_plan_calls++;
 }
 
 /*
@@ -1676,8 +1522,8 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 	Size		qbuffer_size = 0;
 	Size		extent = 0;
 	int			gc_count = 0;
-	HASH_SEQ_STATUS hash_seq;
-	pgssEntry  *entry;
+	dshash_seq_status hstat;
+	PgStatShared_HashEntry *p;
 
 	/*
 	 * Superusers or roles with the privileges of pg_read_all_stats members
@@ -1685,8 +1531,8 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 	 */
 	is_allowed_role = has_privs_of_role(userid, ROLE_PG_READ_ALL_STATS);
 
-	/* hash table must exist already */
-	if (!pgss || !pgss_hash)
+	/* shared state must exist already */
+	if (!pgss)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\"")));
@@ -1773,30 +1619,13 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 	}
 
 	/*
-	 * Get shared lock, load or reload the query text file if we must, and
-	 * iterate over the hashtable entries.
-	 *
-	 * With a large hash table, we might be holding the lock rather longer
-	 * than one could wish.  However, this only blocks creation of new hash
-	 * table entries, and the larger the hash table the less likely that is to
-	 * be needed.  So we can hope this is okay.  Perhaps someday we'll decide
-	 * we need to partition the hash table to limit the time spent holding any
-	 * one lock.
+	 * Get shared lock on the query text file, load or reload if needed, and
+	 * iterate over the pgstat shared hash entries.
 	 */
 	LWLockAcquire(&pgss->lock.lock, LW_SHARED);
 
 	if (showtext)
 	{
-		/*
-		 * Here it is safe to examine extent and gc_count without taking the
-		 * mutex.  Note that although other processes might change
-		 * pgss->extent just after we look at it, the strings they then write
-		 * into the file cannot yet be referenced in the hashtable, so we
-		 * don't care whether we see them or not.
-		 *
-		 * If qtext_load_file fails, we just press on; we'll return NULL for
-		 * every query text.
-		 */
 		if (qbuffer == NULL ||
 			pgss->extent != extent ||
 			pgss->gc_count != gc_count)
@@ -1807,45 +1636,75 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 		}
 	}
 
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
+	/* Flush any pending stats for this backend so they're visible */
+	pgstat_report_anytime_stat();
+
+	dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+	while ((p = dshash_seq_next(&hstat)) != NULL)
 	{
 		Datum		values[PG_STAT_STATEMENTS_COLS];
 		bool		nulls[PG_STAT_STATEMENTS_COLS];
 		int			i = 0;
 		Counters	tmp;
 		double		stddev;
-		int64		queryid = entry->key.queryid;
+		PgStatShared_Pgss *shared_entry;
+		int64		queryid;
 		TimestampTz stats_since;
 		TimestampTz minmax_stats_since;
 
-		memset(values, 0, sizeof(values));
-		memset(nulls, 0, sizeof(nulls));
+		/* Only process our kind */
+		if (p->key.kind != PGSTAT_KIND_PGSS)
+			continue;
+		if (p->dropped)
+			continue;
+
+		shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+		Assert(shared_entry);
+
+		/* Read entry data under the entry's LWLock */
+		LWLockAcquire(&shared_entry->header.lock, LW_SHARED);
+		tmp = shared_entry->counters;
+		queryid = shared_entry->key.queryid;
+		stats_since = shared_entry->stats_since;
+		minmax_stats_since = shared_entry->minmax_stats_since;
+		LWLockRelease(&shared_entry->header.lock);
+
+		/* Skip entries created at parse time but never executed */
+		if (tmp.calls[PGSS_PLAN] + tmp.calls[PGSS_EXEC] == 0)
+			continue;
 
-		values[i++] = ObjectIdGetDatum(entry->key.userid);
-		values[i++] = ObjectIdGetDatum(entry->key.dbid);
+		memset(values, 0, sizeof(values));
+		memset(nulls, 0, sizeof(nulls));
+
+		values[i++] = ObjectIdGetDatum(shared_entry->key.userid);
+		values[i++] = ObjectIdGetDatum(shared_entry->key.dbid);
 		if (api_version >= PGSS_V1_9)
-			values[i++] = BoolGetDatum(entry->key.toplevel);
+			values[i++] = BoolGetDatum(shared_entry->key.toplevel);
 
-		if (is_allowed_role || entry->key.userid == userid)
+		if (is_allowed_role || shared_entry->key.userid == userid)
 		{
 			if (api_version >= PGSS_V1_2)
 				values[i++] = Int64GetDatumFast(queryid);
 
 			if (showtext)
 			{
-				char	   *qstr = qtext_fetch(entry->query_offset,
-											   entry->query_len,
-											   qbuffer,
-											   qbuffer_size);
+				char	   *qstr = NULL;
+
+				if (shared_entry->query_len >= 0)
+				{
+					qstr = qtext_fetch(shared_entry->query_offset,
+									   shared_entry->query_len,
+									   qbuffer,
+									   qbuffer_size);
+				}
 
 				if (qstr)
 				{
 					char	   *enc;
 
 					enc = pg_any_to_server(qstr,
-										   entry->query_len,
-										   entry->encoding);
+										   shared_entry->query_len,
+										   shared_entry->encoding);
 
 					values[i++] = CStringGetTextDatum(enc);
 
@@ -1880,22 +1739,6 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 				nulls[i++] = true;
 		}
 
-		/* copy counters to a local variable to keep locking time short */
-		SpinLockAcquire(&entry->mutex);
-		tmp = entry->counters;
-		SpinLockRelease(&entry->mutex);
-
-		/*
-		 * The spinlock is not required when reading these two as they are
-		 * always updated when holding pgss->lock exclusively.
-		 */
-		stats_since = entry->stats_since;
-		minmax_stats_since = entry->minmax_stats_since;
-
-		/* Skip entry if unexecuted (ie, it's a pending "sticky" entry) */
-		if (IS_STICKY(tmp))
-			continue;
-
 		/* Note that we rely on PGSS_PLAN being 0 and PGSS_EXEC being 1. */
 		for (int kind = 0; kind < PGSS_NUMKIND; kind++)
 		{
@@ -2020,6 +1863,7 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
 	}
+	dshash_seq_term(&hstat);
 
 	LWLockRelease(&pgss->lock.lock);
 
@@ -2040,8 +1884,9 @@ pg_stat_statements_info(PG_FUNCTION_ARGS)
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_STATEMENTS_INFO_COLS] = {0};
 	bool		nulls[PG_STAT_STATEMENTS_INFO_COLS] = {0};
+	int			i = 0;
 
-	if (!pgss || !pgss_hash)
+	if (!pgss)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\"")));
@@ -2055,162 +1900,12 @@ pg_stat_statements_info(PG_FUNCTION_ARGS)
 	stats = pgss->stats;
 	SpinLockRelease(&pgss->mutex);
 
-	values[0] = Int64GetDatum(stats.dealloc);
-	values[1] = TimestampTzGetDatum(stats.stats_reset);
+	values[i++] = Int64GetDatum(stats.dealloc);
+	values[i++] = TimestampTzGetDatum(stats.stats_reset);
 
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
 
-/*
- * Allocate a new hashtable entry.
- * caller must hold an exclusive lock on pgss->lock
- *
- * "query" need not be null-terminated; we rely on query_len instead
- *
- * If "sticky" is true, make the new entry artificially sticky so that it will
- * probably still be there when the query finishes execution.  We do this by
- * giving it a median usage value rather than the normal value.  (Strictly
- * speaking, query strings are normalized on a best effort basis, though it
- * would be difficult to demonstrate this even under artificial conditions.)
- *
- * Note: despite needing exclusive lock, it's not an error for the target
- * entry to already exist.  This is because pgss_store releases and
- * reacquires lock after failing to find a match; so someone else could
- * have made the entry while we waited to get exclusive lock.
- */
-static pgssEntry *
-entry_alloc(pgssHashKey *key, Size query_offset, int query_len, int encoding,
-			bool sticky)
-{
-	pgssEntry  *entry;
-	bool		found;
-
-	/* Make space if needed */
-	while (hash_get_num_entries(pgss_hash) >= pgss_max)
-		entry_dealloc();
-
-	/* Find or create an entry with desired hash code */
-	entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER, &found);
-
-	if (!found)
-	{
-		/* New entry, initialize it */
-
-		/* reset the statistics */
-		memset(&entry->counters, 0, sizeof(Counters));
-		/* set the appropriate initial usage count */
-		entry->counters.usage = sticky ? pgss->cur_median_usage : USAGE_INIT;
-		/* re-initialize the mutex each time ... we assume no one using it */
-		SpinLockInit(&entry->mutex);
-		/* ... and don't forget the query text metadata */
-		Assert(query_len >= 0);
-		entry->query_offset = query_offset;
-		entry->query_len = query_len;
-		entry->encoding = encoding;
-		entry->stats_since = GetCurrentTimestamp();
-		entry->minmax_stats_since = entry->stats_since;
-	}
-
-	return entry;
-}
-
-/*
- * qsort comparator for sorting into increasing usage order
- */
-static int
-entry_cmp(const void *lhs, const void *rhs)
-{
-	double		l_usage = (*(pgssEntry *const *) lhs)->counters.usage;
-	double		r_usage = (*(pgssEntry *const *) rhs)->counters.usage;
-
-	if (l_usage < r_usage)
-		return -1;
-	else if (l_usage > r_usage)
-		return +1;
-	else
-		return 0;
-}
-
-/*
- * Deallocate least-used entries.
- *
- * Caller must hold an exclusive lock on pgss->lock.
- */
-static void
-entry_dealloc(void)
-{
-	HASH_SEQ_STATUS hash_seq;
-	pgssEntry **entries;
-	pgssEntry  *entry;
-	int			nvictims;
-	int			i;
-	Size		tottextlen;
-	int			nvalidtexts;
-
-	/*
-	 * Sort entries by usage and deallocate USAGE_DEALLOC_PERCENT of them.
-	 * While we're scanning the table, apply the decay factor to the usage
-	 * values, and update the mean query length.
-	 *
-	 * Note that the mean query length is almost immediately obsolete, since
-	 * we compute it before not after discarding the least-used entries.
-	 * Hopefully, that doesn't affect the mean too much; it doesn't seem worth
-	 * making two passes to get a more current result.  Likewise, the new
-	 * cur_median_usage includes the entries we're about to zap.
-	 */
-
-	entries = palloc(hash_get_num_entries(pgss_hash) * sizeof(pgssEntry *));
-
-	i = 0;
-	tottextlen = 0;
-	nvalidtexts = 0;
-
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
-	{
-		entries[i++] = entry;
-		/* "Sticky" entries get a different usage decay rate. */
-		if (IS_STICKY(entry->counters))
-			entry->counters.usage *= STICKY_DECREASE_FACTOR;
-		else
-			entry->counters.usage *= USAGE_DECREASE_FACTOR;
-		/* In the mean length computation, ignore dropped texts. */
-		if (entry->query_len >= 0)
-		{
-			tottextlen += entry->query_len + 1;
-			nvalidtexts++;
-		}
-	}
-
-	/* Sort into increasing order by usage */
-	qsort(entries, i, sizeof(pgssEntry *), entry_cmp);
-
-	/* Record the (approximate) median usage */
-	if (i > 0)
-		pgss->cur_median_usage = entries[i / 2]->counters.usage;
-	/* Record the mean query length */
-	if (nvalidtexts > 0)
-		pgss->mean_query_len = tottextlen / nvalidtexts;
-	else
-		pgss->mean_query_len = ASSUMED_LENGTH_INIT;
-
-	/* Now zap an appropriate fraction of lowest-usage entries */
-	nvictims = Max(10, i * USAGE_DEALLOC_PERCENT / 100);
-	nvictims = Min(nvictims, i);
-
-	for (i = 0; i < nvictims; i++)
-	{
-		hash_search(pgss_hash, &entries[i]->key, HASH_REMOVE, NULL);
-	}
-
-	pfree(entries);
-
-	/* Increment the number of times entries are deallocated */
-	SpinLockAcquire(&pgss->mutex);
-	pgss->stats.dealloc += 1;
-	SpinLockRelease(&pgss->mutex);
-}
-
 /*
  * Given a query string (not necessarily null-terminated), allocate a new
  * entry in the external query text file and store the string there.
@@ -2234,6 +1929,8 @@ qtext_store(const char *query, int query_len,
 	Size		off;
 	int			fd;
 
+	*query_offset = 0;
+
 	/*
 	 * We use a spinlock to protect extent/n_writers/gc_count, so that
 	 * multiple processes may execute this function concurrently.
@@ -2366,8 +2063,8 @@ qtext_load_file(Size *buffer_size)
 		/*
 		 * If we get a short read and errno doesn't get set, the reason is
 		 * probably that garbage collection truncated the file since we did
-		 * the fstat(), so we don't log a complaint --- but we don't return
-		 * the data, either, since it's most likely corrupt due to concurrent
+		 * the fstat(), so we don't log a complaint; but we don't return the
+		 * data, either, since it's most likely corrupt due to concurrent
 		 * writes from garbage collection.
 		 */
 		errno = 0;
@@ -2420,8 +2117,6 @@ qtext_fetch(Size query_offset, int query_len,
 
 /*
  * Do we need to garbage-collect the external query text file?
- *
- * Caller should hold at least a shared lock on pgss->lock.
  */
 static bool
 need_gc_qtexts(void)
@@ -2478,13 +2173,13 @@ gc_qtexts(void)
 	char	   *qbuffer;
 	Size		qbuffer_size;
 	FILE	   *qfile = NULL;
-	HASH_SEQ_STATUS hash_seq;
-	pgssEntry  *entry;
+	dshash_seq_status hstat;
+	PgStatShared_HashEntry *p;
 	Size		extent;
 	int			nentries;
 
 	/*
-	 * When called from pgss_store, some other session might have proceeded
+	 * When called from the bg worker, some other session might have proceeded
 	 * with garbage collection in the no-lock-held interim of lock strength
 	 * escalation.  Check once more that this is actually necessary.
 	 */
@@ -2493,21 +2188,12 @@ gc_qtexts(void)
 
 	/*
 	 * Load the old texts file.  If we fail (out of memory, for instance),
-	 * invalidate query texts.  Hopefully this is rare.  It might seem better
-	 * to leave things alone on an OOM failure, but the problem is that the
-	 * file is only going to get bigger; hoping for a future non-OOM result is
-	 * risky and can easily lead to complete denial of service.
+	 * invalidate query texts.  Hopefully this is rare.
 	 */
 	qbuffer = qtext_load_file(&qbuffer_size);
 	if (qbuffer == NULL)
 		goto gc_fail;
 
-	/*
-	 * We overwrite the query texts file in place, so as to reduce the risk of
-	 * an out-of-disk-space failure.  Since the file is guaranteed not to get
-	 * larger, this should always work on traditional filesystems; though we
-	 * could still lose on copy-on-write filesystems.
-	 */
 	qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
 	if (qfile == NULL)
 	{
@@ -2521,21 +2207,32 @@ gc_qtexts(void)
 	extent = 0;
 	nentries = 0;
 
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
+	dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+	while ((p = dshash_seq_next(&hstat)) != NULL)
 	{
-		int			query_len = entry->query_len;
-		char	   *qry = qtext_fetch(entry->query_offset,
-									  query_len,
-									  qbuffer,
-									  qbuffer_size);
+		PgStatShared_Pgss *shared_entry;
+		int			query_len;
+		char	   *qry;
+
+		if (p->key.kind != PGSTAT_KIND_PGSS)
+			continue;
+		if (p->dropped)
+			continue;
+
+		shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+
+		query_len = shared_entry->query_len;
+		if (query_len < 0)
+			continue;
+
+		qry = qtext_fetch(shared_entry->query_offset, query_len,
+						  qbuffer, qbuffer_size);
 
 		if (qry == NULL)
 		{
-			/* Trouble ... drop the text */
-			entry->query_offset = 0;
-			entry->query_len = -1;
-			/* entry will not be counted in mean query length computation */
+			/* Trouble ... mark text invalid */
+			shared_entry->query_offset = 0;
+			shared_entry->query_len = -1;
 			continue;
 		}
 
@@ -2545,19 +2242,16 @@ gc_qtexts(void)
 					(errcode_for_file_access(),
 					 errmsg("could not write file \"%s\": %m",
 							PGSS_TEXT_FILE)));
-			hash_seq_term(&hash_seq);
+			dshash_seq_term(&hstat);
 			goto gc_fail;
 		}
 
-		entry->query_offset = extent;
+		shared_entry->query_offset = extent;
 		extent += query_len + 1;
 		nentries++;
 	}
+	dshash_seq_term(&hstat);
 
-	/*
-	 * Truncate away any now-unused space.  If this fails for some odd reason,
-	 * we log it, but there's no need to fail.
-	 */
 	if (ftruncate(fileno(qfile), extent) != 0)
 		ereport(LOG,
 				(errcode_for_file_access(),
@@ -2577,13 +2271,8 @@ gc_qtexts(void)
 	elog(DEBUG1, "pgss gc of queries file shrunk size from %zu to %zu",
 		 pgss->extent, extent);
 
-	/* Reset the shared extent pointer */
 	pgss->extent = extent;
 
-	/*
-	 * Also update the mean query length, to be sure that need_gc_qtexts()
-	 * won't still think we have a problem.
-	 */
 	if (nentries > 0)
 		pgss->mean_query_len = extent / nentries;
 	else
@@ -2591,19 +2280,11 @@ gc_qtexts(void)
 
 	pfree(qbuffer);
 
-	/*
-	 * OK, count a garbage collection cycle.  (Note: even though we have
-	 * exclusive lock on pgss->lock, we must take pgss->mutex for this, since
-	 * other processes may examine gc_count while holding only the mutex.
-	 * Also, we have to advance the count *after* we've rewritten the file,
-	 * else other processes might not realize they read a stale file.)
-	 */
 	record_gc_qtexts();
 
 	return;
 
 gc_fail:
-	/* clean up resources */
 	if (qfile)
 		FreeFile(qfile);
 	if (qbuffer)
@@ -2611,18 +2292,24 @@ gc_fail:
 
 	/*
 	 * Since the contents of the external file are now uncertain, mark all
-	 * hashtable entries as having invalid texts.
+	 * entries as having invalid texts.
 	 */
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
+	dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+	while ((p = dshash_seq_next(&hstat)) != NULL)
 	{
-		entry->query_offset = 0;
-		entry->query_len = -1;
+		PgStatShared_Pgss *shared_entry;
+
+		if (p->key.kind != PGSTAT_KIND_PGSS)
+			continue;
+		if (p->dropped)
+			continue;
+
+		shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+		shared_entry->query_offset = 0;
+		shared_entry->query_len = -1;
 	}
+	dshash_seq_term(&hstat);
 
-	/*
-	 * Destroy the query text file and create a new, empty one
-	 */
 	(void) unlink(PGSS_TEXT_FILE);
 	qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
 	if (qfile == NULL)
@@ -2633,41 +2320,27 @@ gc_fail:
 	else
 		FreeFile(qfile);
 
-	/* Reset the shared extent pointer */
 	pgss->extent = 0;
-
-	/* Reset mean_query_len to match the new state */
 	pgss->mean_query_len = ASSUMED_LENGTH_INIT;
 
-	/*
-	 * Bump the GC count even though we failed.
-	 *
-	 * This is needed to make concurrent readers of file without any lock on
-	 * pgss->lock notice existence of new version of file.  Once readers
-	 * subsequently observe a change in GC count with pgss->lock held, that
-	 * forces a safe reopen of file.  Writers also require that we bump here,
-	 * of course.  (As required by locking protocol, readers and writers don't
-	 * trust earlier file contents until gc_count is found unchanged after
-	 * pgss->lock acquired in shared or exclusive mode respectively.)
-	 */
 	record_gc_qtexts();
 }
 
-#define SINGLE_ENTRY_RESET(e) \
-if (e) { \
+#define SINGLE_ENTRY_RESET(shared, key_ptr, minmax_only, stats_reset, num_remove) \
+if (shared) { \
 	if (minmax_only) { \
-		/* When requested reset only min/max statistics of an entry */ \
 		for (int kind = 0; kind < PGSS_NUMKIND; kind++) \
 		{ \
-			e->counters.max_time[kind] = 0; \
-			e->counters.min_time[kind] = 0; \
+			(shared)->counters.max_time[kind] = 0; \
+			(shared)->counters.min_time[kind] = 0; \
 		} \
-		e->minmax_stats_since = stats_reset; \
+		(shared)->minmax_stats_since = stats_reset; \
 	} \
 	else \
 	{ \
-		/* Remove the key otherwise  */ \
-		hash_search(pgss_hash, &e->key, HASH_REMOVE, NULL); \
+		(shared)->query_len = -1; \
+		pgstat_drop_entry(PGSTAT_KIND_PGSS, (key_ptr)->dbid, \
+						  pgss_objid(key_ptr)); \
 		num_remove++; \
 	} \
 }
@@ -2678,69 +2351,122 @@ if (e) { \
 static TimestampTz
 entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only)
 {
-	HASH_SEQ_STATUS hash_seq;
-	pgssEntry  *entry;
+	dshash_seq_status hstat;
+	PgStatShared_HashEntry *p;
 	FILE	   *qfile;
 	int64		num_entries;
 	int64		num_remove = 0;
-	pgssHashKey key;
 	TimestampTz stats_reset;
 
-	if (!pgss || !pgss_hash)
+	if (!pgss)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\"")));
 
 	LWLockAcquire(&pgss->lock.lock, LW_EXCLUSIVE);
-	num_entries = hash_get_num_entries(pgss_hash);
+	num_entries = pgstat_get_entry_count(PGSTAT_KIND_PGSS);
 
 	stats_reset = GetCurrentTimestamp();
 
 	if (userid != 0 && dbid != 0 && queryid != INT64CONST(0))
 	{
 		/* If all the parameters are available, use the fast path. */
+		pgssHashKey key;
+		PgStat_EntryRef *entry_ref;
+		PgStatShared_Pgss *shared_entry;
+
 		memset(&key, 0, sizeof(pgssHashKey));
 		key.userid = userid;
 		key.dbid = dbid;
 		key.queryid = queryid;
 
-		/*
-		 * Reset the entry if it exists, starting with the non-top-level
-		 * entry.
-		 */
+		/* Reset non-top-level entry */
 		key.toplevel = false;
-		entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
-
-		SINGLE_ENTRY_RESET(entry);
+		entry_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS, key.dbid,
+										 pgss_objid(&key), false, NULL);
+		if (entry_ref)
+		{
+			shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats;
+			SINGLE_ENTRY_RESET(shared_entry, &key, minmax_only, stats_reset, num_remove);
+		}
 
-		/* Also reset the top-level entry if it exists. */
+		/* Reset top-level entry */
 		key.toplevel = true;
-		entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
-
-		SINGLE_ENTRY_RESET(entry);
+		entry_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS, key.dbid,
+										 pgss_objid(&key), false, NULL);
+		if (entry_ref)
+		{
+			shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats;
+			SINGLE_ENTRY_RESET(shared_entry, &key, minmax_only, stats_reset, num_remove);
+		}
 	}
-	else if (userid != 0 || dbid != 0 || queryid != INT64CONST(0))
+	else
 	{
-		/* Reset entries corresponding to valid parameters. */
-		hash_seq_init(&hash_seq, pgss_hash);
-		while ((entry = hash_seq_search(&hash_seq)) != NULL)
+		/*
+		 * Iterate all entries and reset matching ones.  We cannot call
+		 * pgstat_drop_entry() while iterating the dshash (it internally
+		 * acquires partition locks), so collect keys to drop and do it after.
+		 */
+		PendingDrop *to_drop = NULL;
+		int			num_to_drop = 0;
+		int			max_to_drop = 0;
+
+		dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+		while ((p = dshash_seq_next(&hstat)) != NULL)
 		{
-			if ((!userid || entry->key.userid == userid) &&
-				(!dbid || entry->key.dbid == dbid) &&
-				(!queryid || entry->key.queryid == queryid))
+			PgStatShared_Pgss *shared_entry;
+
+			if (p->key.kind != PGSTAT_KIND_PGSS)
+				continue;
+			if (p->dropped)
+				continue;
+
+			shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+
+			if ((!userid || shared_entry->key.userid == userid) &&
+				(!dbid || shared_entry->key.dbid == dbid) &&
+				(!queryid || shared_entry->key.queryid == queryid))
 			{
-				SINGLE_ENTRY_RESET(entry);
+				if (minmax_only)
+				{
+					for (int kind = 0; kind < PGSS_NUMKIND; kind++)
+					{
+						shared_entry->counters.max_time[kind] = 0;
+						shared_entry->counters.min_time[kind] = 0;
+					}
+					shared_entry->minmax_stats_since = stats_reset;
+				}
+				else
+				{
+					shared_entry->query_len = -1;
+
+					/* Collect for deferred drop */
+					if (num_to_drop >= max_to_drop)
+					{
+						max_to_drop = Max(max_to_drop * 2, 128);
+						if (to_drop == NULL)
+							to_drop = palloc_array(PendingDrop, max_to_drop);
+						else
+							to_drop = repalloc_array(to_drop, PendingDrop, max_to_drop);
+					}
+					to_drop[num_to_drop].dbid = shared_entry->key.dbid;
+					to_drop[num_to_drop].objid = pgss_objid(&shared_entry->key);
+					num_to_drop++;
+				}
 			}
 		}
-	}
-	else
-	{
-		/* Reset all entries. */
-		hash_seq_init(&hash_seq, pgss_hash);
-		while ((entry = hash_seq_search(&hash_seq)) != NULL)
+		dshash_seq_term(&hstat);
+
+		/* Now drop entries outside the iteration */
+		for (int i = 0; i < num_to_drop; i++)
 		{
-			SINGLE_ENTRY_RESET(entry);
+			pgstat_drop_entry(PGSTAT_KIND_PGSS,
+							  to_drop[i].dbid, to_drop[i].objid);
 		}
+		num_remove = num_to_drop;
+
+		if (to_drop)
+			pfree(to_drop);
 	}
 
 	/* All entries are removed? */
@@ -2790,6 +2516,456 @@ release_lock:
 	return stats_reset;
 }
 
+/*
+ * pgstat flush callback: merge pending stats into shared memory.
+ *
+ * This is called by the pgstat infrastructure to flush accumulated
+ * backend-local statistics into the shared entry.
+ */
+static bool
+pgss_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait)
+{
+	PgStat_PendingPgss *pending;
+	PgStatShared_Pgss *shared_entry;
+	Counters   *shared;
+	Counters   *p;
+
+	pending = (PgStat_PendingPgss *) entry_ref->pending;
+	shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats;
+
+	if (!pgstat_lock_entry(entry_ref, nowait))
+		return false;
+
+	shared = &shared_entry->counters;
+	p = &pending->counters;
+
+	for (int kind = 0; kind < PGSS_NUMKIND; kind++)
+	{
+		if (p->calls[kind] == 0)
+			continue;
+
+		/*
+		 * Merge variance using Chan's parallel variance algorithm to combine
+		 * per-backend sum_var_time (computed via Welford's method) with the
+		 * shared aggregate.  This must be done before updating calls/totals.
+		 * See
+		 * <https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm>
+		 */
+		if (shared->calls[kind] > 0)
+		{
+			double		old_mean_a = shared->mean_time[kind];
+			double		old_mean_b = p->mean_time[kind];
+			double		delta = old_mean_a - old_mean_b;
+			double		n_a = shared->calls[kind];
+			double		n_b = p->calls[kind];
+
+			shared->sum_var_time[kind] += p->sum_var_time[kind] +
+				delta * delta * n_a * n_b / (n_a + n_b);
+		}
+		else
+		{
+			shared->sum_var_time[kind] = p->sum_var_time[kind];
+		}
+
+		shared->calls[kind] += p->calls[kind];
+		shared->total_time[kind] += p->total_time[kind];
+
+		/*
+		 * Update min/max.  If both are 0 in shared, it means a reset
+		 * happened, so treat the pending values as the new baseline.
+		 */
+		if (shared->min_time[kind] == 0 && shared->max_time[kind] == 0)
+		{
+			shared->min_time[kind] = p->min_time[kind];
+			shared->max_time[kind] = p->max_time[kind];
+		}
+		else
+		{
+			if (shared->min_time[kind] > p->min_time[kind])
+				shared->min_time[kind] = p->min_time[kind];
+			if (shared->max_time[kind] < p->max_time[kind])
+				shared->max_time[kind] = p->max_time[kind];
+		}
+
+		/* Recompute mean from totals */
+		shared->mean_time[kind] =
+			shared->total_time[kind] / shared->calls[kind];
+	}
+
+	shared->rows += p->rows;
+	shared->shared_blks_hit += p->shared_blks_hit;
+	shared->shared_blks_read += p->shared_blks_read;
+	shared->shared_blks_dirtied += p->shared_blks_dirtied;
+	shared->shared_blks_written += p->shared_blks_written;
+	shared->local_blks_hit += p->local_blks_hit;
+	shared->local_blks_read += p->local_blks_read;
+	shared->local_blks_dirtied += p->local_blks_dirtied;
+	shared->local_blks_written += p->local_blks_written;
+	shared->temp_blks_read += p->temp_blks_read;
+	shared->temp_blks_written += p->temp_blks_written;
+	shared->shared_blk_read_time += p->shared_blk_read_time;
+	shared->shared_blk_write_time += p->shared_blk_write_time;
+	shared->local_blk_read_time += p->local_blk_read_time;
+	shared->local_blk_write_time += p->local_blk_write_time;
+	shared->temp_blk_read_time += p->temp_blk_read_time;
+	shared->temp_blk_write_time += p->temp_blk_write_time;
+	shared->usage += p->usage;
+	shared->wal_records += p->wal_records;
+	shared->wal_fpi += p->wal_fpi;
+	shared->wal_bytes += p->wal_bytes;
+	shared->wal_buffers_full += p->wal_buffers_full;
+	shared->jit_functions += p->jit_functions;
+	shared->jit_generation_time += p->jit_generation_time;
+	shared->jit_deform_count += p->jit_deform_count;
+	shared->jit_deform_time += p->jit_deform_time;
+	shared->jit_inlining_count += p->jit_inlining_count;
+	shared->jit_inlining_time += p->jit_inlining_time;
+	shared->jit_optimization_count += p->jit_optimization_count;
+	shared->jit_optimization_time += p->jit_optimization_time;
+	shared->jit_emission_count += p->jit_emission_count;
+	shared->jit_emission_time += p->jit_emission_time;
+	shared->parallel_workers_to_launch += p->parallel_workers_to_launch;
+	shared->parallel_workers_launched += p->parallel_workers_launched;
+	shared->generic_plan_calls += p->generic_plan_calls;
+	shared->custom_plan_calls += p->custom_plan_calls;
+
+	pgstat_unlock_entry(entry_ref);
+
+	return true;
+}
+
+/*
+ * pgstat serialization callback: write query text info for an entry.
+ *
+ * We write the query text offset, length, encoding, and the full pgssHashKey
+ * to the stats file so we can reconstruct the entry on reload.
+ */
+static void
+pgss_to_serialized_data(const PgStat_HashKey *key,
+						const PgStatShared_Common *header,
+						FILE *statfile)
+{
+	const PgStatShared_Pgss *entry = (const PgStatShared_Pgss *) header;
+	uint32		magic = PGSS_FILE_HEADER;
+	pgssHashKey hkey = entry->key;
+	TimestampTz stats_since = entry->stats_since;
+	TimestampTz minmax_stats_since = entry->minmax_stats_since;
+	int			query_len = entry->query_len;
+	int			encoding = entry->encoding;
+
+	pgstat_write_chunk_s(statfile, &magic);
+	pgstat_write_chunk_s(statfile, &hkey);
+	pgstat_write_chunk_s(statfile, &query_len);
+	pgstat_write_chunk_s(statfile, &encoding);
+	pgstat_write_chunk_s(statfile, &stats_since);
+	pgstat_write_chunk_s(statfile, &minmax_stats_since);
+
+	/*
+	 * Write the query text itself into the stats file so it survives restarts
+	 * (PGSS_TEXT_FILE lives in a tmpfs that gets wiped).
+	 */
+	if (query_len >= 0)
+	{
+		char	   *qstr = NULL;
+
+		if (!pgss_qtext_write_buffer && pgss)
+			pgss_qtext_write_buffer = qtext_load_file(&pgss_qtext_write_buffer_size);
+
+		if (pgss_qtext_write_buffer)
+			qstr = qtext_fetch(entry->query_offset, query_len,
+							   pgss_qtext_write_buffer,
+							   pgss_qtext_write_buffer_size);
+
+		if (qstr)
+			pgstat_write_chunk(statfile, qstr, query_len + 1);
+		else
+		{
+			char		nul = '\0';
+
+			pgstat_write_chunk(statfile, &nul, 1);
+		}
+	}
+}
+
+/*
+ * pgstat deserialization callback: read query text info for an entry.
+ */
+static bool
+pgss_from_serialized_data(const PgStat_HashKey *key,
+						  PgStatShared_Common *header,
+						  FILE *statfile)
+{
+	PgStatShared_Pgss *entry = (PgStatShared_Pgss *) header;
+	int			query_len;
+	int			encoding;
+	uint32		magic;
+
+	if (!pgstat_read_chunk_s(statfile, &magic))
+		return false;
+	if (magic != PGSS_FILE_HEADER)
+	{
+		elog(WARNING, "pg_stat_statements: discarding stats with mismatched format (got 0x%08X, expected 0x%08X)",
+			 magic, PGSS_FILE_HEADER);
+		return false;
+	}
+
+	if (!pgstat_read_chunk_s(statfile, &entry->key))
+		return false;
+	if (!pgstat_read_chunk_s(statfile, &query_len))
+		return false;
+	if (!pgstat_read_chunk_s(statfile, &encoding))
+		return false;
+	if (!pgstat_read_chunk_s(statfile, &entry->stats_since))
+		return false;
+	if (!pgstat_read_chunk_s(statfile, &entry->minmax_stats_since))
+		return false;
+
+	/* Initialize text fields */
+	entry->query_len = -1;
+	entry->encoding = encoding;
+	entry->query_offset = 0;
+
+	/*
+	 * Read the query text and store it in the external file.
+	 */
+	if (query_len >= 0)
+	{
+		char	   *buf = palloc(query_len + 1);
+
+		if (!pgstat_read_chunk(statfile, buf, query_len + 1))
+		{
+			pfree(buf);
+			return false;
+		}
+
+		if (!pgss_qtext_rebuild_file)
+		{
+			pgss_qtext_rebuild_file = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
+			if (!pgss_qtext_rebuild_file)
+			{
+				pfree(buf);
+				return false;
+			}
+			pgss_qtext_rebuild_extent = 0;
+		}
+
+		entry->query_offset = pgss_qtext_rebuild_extent;
+
+		if (fwrite(buf, 1, query_len + 1, pgss_qtext_rebuild_file) != (size_t) (query_len + 1))
+		{
+			pfree(buf);
+			return false;
+		}
+		pgss_qtext_rebuild_extent += query_len + 1;
+
+		entry->query_len = query_len;
+		entry->encoding = encoding;
+		pfree(buf);
+	}
+
+	return true;
+}
+
+/*
+ * pgstat finish callback: handle end of stats file operations.
+ *
+ * For pg_stat_statements, we manage the query text file lifecycle here.
+ */
+static void
+pgss_finish(PgStat_StatsFileOp status)
+{
+	switch (status)
+	{
+		case STATS_WRITE:
+			/* Free the cached query text buffer used during serialization */
+			if (pgss_qtext_write_buffer)
+			{
+				pfree(pgss_qtext_write_buffer);
+				pgss_qtext_write_buffer = NULL;
+				pgss_qtext_write_buffer_size = 0;
+			}
+			break;
+
+		case STATS_READ:
+			/* Close the rebuild file and update shared extent */
+			if (pgss_qtext_rebuild_file)
+			{
+				FreeFile(pgss_qtext_rebuild_file);
+				pgss_qtext_rebuild_file = NULL;
+				if (pgss)
+				{
+					pgss->extent = pgss_qtext_rebuild_extent;
+				}
+				pgss_qtext_rebuild_extent = 0;
+			}
+
+			/*
+			 * If pg_stat_statements.save is disabled, discard all entries
+			 * that were just loaded from the stats file.
+			 */
+			if (!pgss_save)
+			{
+				entry_reset(0, 0, 0, false);
+			}
+			break;
+
+		case STATS_DISCARD:
+			unlink(PGSS_TEXT_FILE);
+			break;
+	}
+}
+
+/*
+ * Evict least-used entries when the entry count exceeds pgss_max.
+ *
+ * Sorts all entries by usage, applies a decay factor, then drops the
+ * bottom USAGE_DEALLOC_PERCENT of entries.
+ */
+static void
+entry_dealloc(void)
+{
+	dshash_seq_status hstat;
+	PgStatShared_HashEntry *p;
+	UsageEntry *entries;
+	int			nentries = 0;
+	int			allocated = 1024;
+	int			nvictims;
+	int			i;
+	Size		tottextlen = 0;
+	int			nvalidtexts = 0;
+
+	entries = palloc(allocated * sizeof(UsageEntry));
+
+	/* Scan all entries, collect usage info and apply decay */
+	dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+	while ((p = dshash_seq_next(&hstat)) != NULL)
+	{
+		PgStatShared_Pgss *shared_entry;
+
+		if (p->key.kind != PGSTAT_KIND_PGSS)
+			continue;
+		if (p->dropped)
+			continue;
+
+		shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body);
+
+		/* Skip entries not yet executed; protect parse-time entries. */
+		if (shared_entry->counters.calls[PGSS_PLAN] +
+			shared_entry->counters.calls[PGSS_EXEC] == 0)
+			continue;
+
+		if (nentries >= allocated)
+		{
+			allocated *= 2;
+			entries = repalloc(entries, allocated * sizeof(UsageEntry));
+		}
+
+		entries[nentries].key = shared_entry->key;
+		entries[nentries].usage = shared_entry->counters.usage;
+		nentries++;
+
+		/* Apply decay */
+		shared_entry->counters.usage *= USAGE_DECREASE_FACTOR;
+
+		if (shared_entry->query_len >= 0)
+		{
+			tottextlen += shared_entry->query_len + 1;
+			nvalidtexts++;
+		}
+	}
+	dshash_seq_term(&hstat);
+
+	/* Sort by usage ascending */
+	qsort(entries, nentries, sizeof(UsageEntry),
+		  entry_cmp);
+
+	/* Update mean query length */
+	if (nvalidtexts > 0)
+		pgss->mean_query_len = tottextlen / nvalidtexts;
+	else
+		pgss->mean_query_len = ASSUMED_LENGTH_INIT;
+
+	/* Drop the bottom fraction */
+	nvictims = Max(10, nentries * USAGE_DEALLOC_PERCENT / 100);
+	nvictims = Min(nvictims, nentries);
+
+	for (i = 0; i < nvictims; i++)
+	{
+		pgstat_drop_entry(PGSTAT_KIND_PGSS,
+						  entries[i].key.dbid,
+						  pgss_objid(&entries[i].key));
+	}
+
+	pfree(entries);
+
+	/*
+	 * Signal other backends to invalidate their cached references to the
+	 * dropped entries.  Without this, backends keep stale refs and never
+	 * re-create evicted entries.
+	 */
+	pgstat_request_entry_refs_gc();
+
+	/* Increment dealloc counter */
+	SpinLockAcquire(&pgss->mutex);
+	pgss->stats.dealloc += 1;
+	SpinLockRelease(&pgss->mutex);
+}
+
+/*
+ * qsort comparator for eviction: sort by usage ascending.
+ */
+static int
+entry_cmp(const void *a, const void *b)
+{
+	double		l = ((const UsageEntry *) a)->usage;
+	double		r = ((const UsageEntry *) b)->usage;
+
+	if (l < r)
+		return -1;
+	else if (l > r)
+		return +1;
+	else
+		return 0;
+}
+
+/*
+ * Attempt eviction if enough time has passed since the last cycle.
+ *
+ * Uses a conditional lock so that at most one backend performs eviction at a
+ * time; others simply return without blocking.  The time check ensures we
+ * don't evict more often than EVICTION_INTERVAL_MS milliseconds.
+ */
+static void
+pgss_maybe_evict(void)
+{
+	/*
+	 * Use the statement start timestamp since this is always called from
+	 * pgss_store() at the start of query execution.
+	 */
+	TimestampTz now = GetCurrentStatementStartTimestamp();
+
+	if (!TimestampDifferenceExceeds(pgss->last_eviction_time, now,
+									EVICTION_INTERVAL_MS))
+		return;
+
+	if (!LWLockConditionalAcquire(&pgss->lock.lock, LW_EXCLUSIVE))
+		return;
+
+	/* Re-check after acquiring lock */
+	if (TimestampDifferenceExceeds(pgss->last_eviction_time, now,
+								   EVICTION_INTERVAL_MS))
+	{
+		entry_dealloc();
+		pgss->last_eviction_time = now;
+
+		/* Also handle query text GC while we hold the lock */
+		if (need_gc_qtexts())
+			gc_qtexts();
+	}
+
+	LWLockRelease(&pgss->lock.lock);
+}
+
 /*
  * Generate a normalized version of the query string that will be used to
  * represent all similar queries.
diff --git a/contrib/pg_stat_statements/pg_stat_statements.conf b/contrib/pg_stat_statements/pg_stat_statements.conf
index 0e900d7119b..21a10c41d09 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.conf
+++ b/contrib/pg_stat_statements/pg_stat_statements.conf
@@ -1,2 +1,3 @@
 shared_preload_libraries = 'pg_stat_statements'
 max_prepared_transactions = 5
+max_parallel_workers_per_gather = 0
diff --git a/contrib/pg_stat_statements/pg_stat_statements.control b/contrib/pg_stat_statements/pg_stat_statements.control
index 2eee0ceffa8..61ae41efc14 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.control
+++ b/contrib/pg_stat_statements/pg_stat_statements.control
@@ -1,5 +1,5 @@
 # pg_stat_statements extension
 comment = 'track planning and execution statistics of all SQL statements executed'
-default_version = '1.13'
+default_version = '1.14'
 module_pathname = '$libdir/pg_stat_statements'
 relocatable = true
diff --git a/contrib/pg_stat_statements/sql/parallel.sql b/contrib/pg_stat_statements/sql/parallel.sql
index 4ce1573d132..05a3e2524d5 100644
--- a/contrib/pg_stat_statements/sql/parallel.sql
+++ b/contrib/pg_stat_statements/sql/parallel.sql
@@ -16,6 +16,7 @@ SELECT pg_stat_statements_reset() IS NOT NULL AS t;
 
 SELECT count(*) FROM pgss_parallel_tab;
 
+RESET max_parallel_workers_per_gather;
 SELECT query,
   parallel_workers_to_launch > 0 AS has_workers_to_launch,
   parallel_workers_launched > 0 AS has_workers_launched
diff --git a/doc/src/sgml/pgstatstatements.sgml b/doc/src/sgml/pgstatstatements.sgml
index d753de5836e..19b1dab74c7 100644
--- a/doc/src/sgml/pgstatstatements.sgml
+++ b/doc/src/sgml/pgstatstatements.sgml
@@ -809,7 +809,6 @@ calls | 2
        <structname>pg_stat_statements</structname> view were last reset.
       </para></entry>
      </row>
-
     </tbody>
    </tgroup>
   </table>
@@ -911,11 +910,16 @@ calls | 2
       statements tracked by the module (i.e., the maximum number of rows
       in the <structname>pg_stat_statements</structname> view).  If more distinct
       statements than that are observed, information about the least-executed
-      statements is discarded.  The number of times such information was
+      statements is discarded.  Eviction is throttled to occur at most once
+      every 10 seconds; until then, new entries are simply not created once
+      the limit is reached while existing entries continue to accumulate
+      statistics normally.
+      The number of times such information was
       discarded can be seen in the
       <structname>pg_stat_statements_info</structname> view.
       The default value is 5000.
-      This parameter can only be set at server start.
+      This parameter can be changed at any time by reloading the server
+      configuration.
      </para>
     </listitem>
    </varlistentry>
@@ -1008,13 +1012,15 @@ calls | 2
 
   <para>
    The module requires additional shared memory proportional to
-   <varname>pg_stat_statements.max</varname>.  Note that this
-   memory is consumed whenever the module is loaded, even if
+   <varname>pg_stat_statements.max</varname>.  Note that this memory is
+   consumed whenever the module is loaded, even if
    <varname>pg_stat_statements.track</varname> is set to <literal>none</literal>.
   </para>
 
   <para>
-   These parameters must be set in <filename>postgresql.conf</filename>.
+   These parameters are typically set in <filename>postgresql.conf</filename>.
+   Note that <varname>pg_stat_statements.max</varname> can be changed
+   without a server restart by reloading the configuration.
    Typical usage might be:
 
 <programlisting>
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 8cf40c87043..62351ab09cd 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2322,6 +2322,7 @@ PgStatShared_Function
 PgStatShared_HashEntry
 PgStatShared_IO
 PgStatShared_Lock
+PgStatShared_Pgss
 PgStatShared_Relation
 PgStatShared_ReplSlot
 PgStatShared_SLRU
-- 
2.50.1 (Apple Git-155)



reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: pgsql-hackers@postgresql.org
  Cc: samimseih@gmail.com
  Subject: Re: Improve pg_stat_statements scalability
  In-Reply-To: <CAA5RZ0vjpyU-RJzUYCkr0-9jqJtbS95cRFnWTiOEENE7iYNgcA@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox