0001-refactor-shell_archive.c-use-OpenPipeStream-improve-.patch
text/x-patch
Filename: 0001-refactor-shell_archive.c-use-OpenPipeStream-improve-.patch
Type: text/x-patch
Part: 0
Message:
Re: Non-blocking archiver process
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0001
Subject: refactor: shell_archive.c - use OpenPipeStream, improve naming, remove dead code
| File | + | − |
|---|---|---|
| src/backend/archive/shell_archive.c | 146 | 5 |
From 8d2ed12aa4056ea20b14cf8678a6e24588bfc843 Mon Sep 17 00:00:00 2001
From: Lakshmi <bharatdbpg@gmail.com>
Date: Mon, 10 Nov 2025 15:29:31 +0530
Subject: [PATCH] refactor: shell_archive.c - use OpenPipeStream, improve
naming, remove dead code
---
src/backend/archive/shell_archive.c | 151 +++++++++++++++++++++++++++-
1 file changed, 146 insertions(+), 5 deletions(-)
diff --git a/src/backend/archive/shell_archive.c b/src/backend/archive/shell_archive.c
index 828723afe47..f64d5f9591b 100644
--- a/src/backend/archive/shell_archive.c
+++ b/src/backend/archive/shell_archive.c
@@ -16,13 +16,21 @@
#include "postgres.h"
#include <sys/wait.h>
-
+#include "latch.h" /* For WaitLatchOrSocket */
+#include "miscadmin.h" /* For MyLatch */
+#ifdef WIN32
+#include <windows.h> /* For WaitForSingleObject, DWORD, etc. */
+#endif
#include "access/xlog.h"
#include "archive/archive_module.h"
#include "archive/shell_archive.h"
#include "common/percentrepl.h"
#include "pgstat.h"
-
+#include "utils/elog.h" /* For elog logging */
+#include "postgres.h" /* already there */
+#include "utils/palloc.h" /* add this line */
+#include "libpq/pqformat.h" /* for OpenPipeStream */
+#include "storage/latch.h" /* for WaitLatchOrSocket */
static bool shell_archive_configured(ArchiveModuleState *state);
static bool shell_archive_file(ArchiveModuleState *state,
const char *file,
@@ -53,12 +61,34 @@ shell_archive_configured(ArchiveModuleState *state)
return false;
}
+#define WAIT_INTERVAL_MS 1000 /* 1s for efficient latch waiting */
+
static bool
shell_archive_file(ArchiveModuleState *state, const char *file,
const char *path)
{
char *xlogarchcmd;
char *nativePath = NULL;
+#ifndef WIN32
+ FILE *archiveFd = NULL;
+ int archiveFileno;
+ char buf[1024];
+ ssize_t nread;
+
+#else
+ size_t cmdPrefixLen;
+ size_t cmdlen;
+ char *win32cmd = palloc(strlen(xlogarchcmd) + 30); /* cmd.exe /c "..." + null */
+ if (win32cmd == NULL)
+{
+ ereport(FATAL,
+ (errmsg_internal("Failed to palloc win32cmd: %m")));
+ return false;
+}
+ STARTUPINFO si;
+ PROCESS_INFORMATION pi;
+ int exit_code = 0;
+#endif
int rc;
if (path)
@@ -77,14 +107,125 @@ shell_archive_file(ArchiveModuleState *state, const char *file,
fflush(NULL);
pgstat_report_wait_start(WAIT_EVENT_ARCHIVE_COMMAND);
- rc = system(xlogarchcmd);
+
+ /*
+ * Start the command and read until it completes, while keep checking for
+ * interrupts to process pending events.
+ */
+#ifndef WIN32
+ archiveFile = OpenPipeStream(xlogarchcmd, PG_BINARY_R);
+if (archiveFile == NULL)
+{
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg("could not open archive command pipe: %m")));
+}
+ while (true)
+ {
+ CHECK_FOR_INTERRUPTS();
+ nread = read(archiveFd, &buf, sizeof(buf));
+ if ((nread > 0) || (nread == -1 && errno == EAGAIN))
+ if (nread > 0)
+{
+ buf[nread] = '\0'; /* Null-terminate for string *
+ elog(LOG, "Archive command stdout: %s", buf);
+}
+ else
+ break;
+ }
+ rc = pclose(archiveFd);
+ }
+ else
+ rc = -1;
+#else
+ /*
+ * * Create a palloc'd copy of the command string, we need to prefix it with
+ * cmd /c as the commandLine argument to CreateProcess still expects .exe
+ * files.
+ */
+ cmdlen = strlen(xlogarchcmd);
+#define CMD_PREFIX "cmd /c \""
+ cmdPrefixLen = strlen(CMD_PREFIX);
+ if (win32cmd == NULL)
+ {
+ ereport(FATAL,
+ (errmsg_internal("Failed to palloc win32cmd: %m")));
+
+ }
+ memcpy(win32cmd, CMD_PREFIX, cmdPrefixLen);
+ memcpy(&win32cmd[cmdPrefixLen], xlogarchcmd, cmdlen);
+ win32cmd[cmdPrefixLen + cmdlen] = '"';
+ win32cmd[cmdPrefixLen + cmdlen + 1] = '\0';
+ ereport(DEBUG4,
+ (errmsg_internal("WIN32: executing modified archive command \"%s\"",
+ win32cmd)));
+
+ memset(&pi, 0, sizeof(pi));
+ memset(&si, 0, sizeof(si));
+ si.cb = sizeof(si);
+
+ archiveFile = OpenPipeStream(xlogarchcmd, PG_BINARY_R);
+if (archiveFile == NULL)
+{
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg("could not open archive command pipe: %m")));
+}
+
+
+ DWORD result;
+ResetLatch(MyLatch);
+ while (true)
+ {
+ CHECK_FOR_INTERRUPTS();
+ int latch_rc = WaitLatchOrSocket(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ PGINVALID_SOCKET,
+ WAIT_INTERVAL_MS,
+ WAIT_EVENT_ARCHIVER_WAIT_CHILD); /* Or WAIT_EVENT_ARCHIVER_MAIN if undefined */
+if (latch_rc & WL_LATCH_SET)
+{
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+}
+DWORD result = WaitForSingleObject(pi.hProcess, 0); /* Quick non-block check */
+ if (result == WAIT_OBJECT_0)
+ break;
+ else if (result == WAIT_TIMEOUT)
+ continue; /* Normal polling */
+ else if (result == WAIT_FAILED)
+ {
+ DWORD err = GetLastError();
+ CloseHandle(pi.hProcess);
+ CloseHandle(pi.hThread);
+ ereport(ERROR,
+ (errmsg("WaitForSingleObject failed during archive_command: %m (Windows error %lu)",
+ err)));
+ pfree(win32cmd);
+ return false;
+ }
+ else
+ {
+ ereport(ERROR,
+ (errmsg("Unexpected WaitForSingleObject result during archive_command: %lu",
+ result)));
+ pfree(win32cmd);
+ return false;
+ }
+}
+
+ GetExitCodeProcess(pi.hProcess, &exit_code);
+ CloseHandle(pi.hProcess);
+ CloseHandle(pi.hThread);
+ rc = exit_code;
+#endif
pgstat_report_wait_end();
if (rc != 0)
{
/*
* If either the shell itself, or a called command, died on a signal,
- * abort the archiver. We do this because system() ignores SIGINT and
+ * abort the archiver. We do this because pclose() ignores SIGINT and
* SIGQUIT while waiting; so a signal is very likely something that
* should have interrupted us too. Also die if the shell got a hard
* "command not found" type of error. If we overreact it's no big
@@ -126,7 +267,7 @@ shell_archive_file(ArchiveModuleState *state, const char *file,
xlogarchcmd)));
}
pfree(xlogarchcmd);
-
+ pfree(win32cmd);
return false;
}
pfree(xlogarchcmd);
--
2.39.5