0001-Check-for-interrupts-during-archive_command.patch
text/x-patch
Filename: 0001-Check-for-interrupts-during-archive_command.patch
Type: text/x-patch
Part: 0
Message:
Re: Non-blocking archiver process
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0001
Subject: Check for interrupts during archive_command
| File | + | − |
|---|---|---|
| src/backend/archive/shell_archive.c | 94 | 2 |
From a2963908d4a6fc6cea9d0d9ae2413f6779f205bc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Patrick=20St=C3=A4hlin?= <me@packi.ch>
Date: Sat, 26 Jul 2025 10:11:58 +0200
Subject: [PATCH] Check for interrupts during archive_command
Else this will stall operations waiting for ProcSignalBarriers and may
cause pile-ups due to that. Since WAL archiving can take a while, this
helps in cases like DROP DATABASE where we require all backends to
release their file handles.
Note that the windows version should still do about the same as before,
but my windows days are long over, so take that code with a grain of
salt. It did pass CI though.
Suggested-by: Ronan Dunklau <ronan.dunklau@aiven.io>
---
src/backend/archive/shell_archive.c | 96 ++++++++++++++++++++++++++++-
1 file changed, 94 insertions(+), 2 deletions(-)
diff --git a/src/backend/archive/shell_archive.c b/src/backend/archive/shell_archive.c
index 828723afe47..14ef365b147 100644
--- a/src/backend/archive/shell_archive.c
+++ b/src/backend/archive/shell_archive.c
@@ -53,12 +53,27 @@ shell_archive_configured(ArchiveModuleState *state)
return false;
}
+#define POLL_TIMEOUT_MSEC 10
+
static bool
shell_archive_file(ArchiveModuleState *state, const char *file,
const char *path)
{
char *xlogarchcmd;
char *nativePath = NULL;
+#ifndef WIN32
+ FILE *archiveFd = NULL;
+ int archiveFileno;
+ char buf[1024];
+ ssize_t bytesRead;
+#else
+ size_t cmdPrefixLen;
+ size_t cmdlen;
+ char *win32cmd = NULL;
+ STARTUPINFO si;
+ PROCESS_INFORMATION pi;
+ DWORD dwRc;
+#endif
int rc;
if (path)
@@ -77,14 +92,91 @@ shell_archive_file(ArchiveModuleState *state, const char *file,
fflush(NULL);
pgstat_report_wait_start(WAIT_EVENT_ARCHIVE_COMMAND);
- rc = system(xlogarchcmd);
+
+ /*
+ * Start the command and read until it completes, while keep checking for
+ * interrupts to process pending events.
+ */
+#ifndef WIN32
+ archiveFd = popen(xlogarchcmd, "r");
+ if (archiveFd != NULL)
+ {
+ archiveFileno = fileno(archiveFd);
+ if (fcntl(archiveFileno, F_SETFL, O_NONBLOCK) == -1)
+ ereport(FATAL,
+ (errmsg("could not set handle to nonblocking mode: %m")));
+
+ while (true)
+ {
+ CHECK_FOR_INTERRUPTS();
+ bytesRead = read(archiveFileno, &buf, sizeof(buf));
+ if ((bytesRead > 0) || (bytesRead == -1 && errno == EAGAIN))
+ pg_usleep(POLL_TIMEOUT_MSEC * 1000);
+ else
+ break;
+ }
+ rc = pclose(archiveFd);
+ }
+ else
+ rc = -1;
+#else
+ /*
+ * Create a malloc'd copy of the command string, we need to prefix it with
+ * cmd /c as the commandLine argument to CreateProcess still expects .exe
+ * files.
+ */
+ cmdlen = strlen(xlogarchcmd);
+#define CMD_PREFIX "cmd /c \""
+ cmdPrefixLen = strlen(CMD_PREFIX);
+ win32cmd = malloc(cmdPrefixLen + cmdlen + 1 + 1);
+ if (win32cmd == NULL)
+ {
+ ereport(FATAL,
+ (errmsg_internal("Failed to malloc win32cmd %m")));
+ return false;
+ }
+ memcpy(win32cmd, CMD_PREFIX, cmdPrefixLen);
+ memcpy(&win32cmd[cmdPrefixLen], xlogarchcmd, cmdlen);
+ win32cmd[cmdPrefixLen + cmdlen] = '"';
+ win32cmd[cmdPrefixLen + cmdlen + 1] = '\0';
+ ereport(DEBUG4,
+ (errmsg_internal("WIN32: executing modified archive command \"%s\"",
+ win32cmd)));
+
+ memset(&pi, 0, sizeof(pi));
+ memset(&si, 0, sizeof(si));
+ si.cb = sizeof(si);
+
+ if (!CreateProcess(NULL, win32cmd, NULL, NULL, FALSE, 0,
+ NULL, NULL, &si, &pi))
+ {
+ ereport(FATAL,
+ (errmsg("CreateProcess() call failed: %m (error code %lu)",
+ GetLastError())));
+ free(win32cmd);
+ return false;
+ }
+ free(win32cmd);
+
+ while (true)
+ {
+ CHECK_FOR_INTERRUPTS();
+ if (WaitForSingleObject(pi.hProcess, POLL_TIMEOUT_MSEC) == WAIT_OBJECT_0)
+ break;
+ }
+
+ GetExitCodeProcess(pi.hProcess, &dwRc);
+ CloseHandle(pi.hProcess);
+ CloseHandle(pi.hThread);
+ rc = dwRc;
+#endif
pgstat_report_wait_end();
if (rc != 0)
{
/*
* If either the shell itself, or a called command, died on a signal,
- * abort the archiver. We do this because system() ignores SIGINT and
+ * abort the archiver. We do this because pclose() ignores SIGINT and
* SIGQUIT while waiting; so a signal is very likely something that
* should have interrupted us too. Also die if the shell got a hard
* "command not found" type of error. If we overreact it's no big
--
2.43.0