0001-refactor-shell_archive.c-use-OpenPipeStream-improve-.patch

text/x-patch

Filename: 0001-refactor-shell_archive.c-use-OpenPipeStream-improve-.patch
Type: text/x-patch
Part: 0
Message: Re: Non-blocking archiver process

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch 0001
Subject: refactor: shell_archive.c - use OpenPipeStream, improve naming, remove dead code
File+
src/backend/archive/shell_archive.c 146 5
From 8d2ed12aa4056ea20b14cf8678a6e24588bfc843 Mon Sep 17 00:00:00 2001
From: Lakshmi <bharatdbpg@gmail.com>
Date: Mon, 10 Nov 2025 15:29:31 +0530
Subject: [PATCH] refactor: shell_archive.c - use OpenPipeStream, improve
 naming, remove dead code

---
 src/backend/archive/shell_archive.c | 151 +++++++++++++++++++++++++++-
 1 file changed, 146 insertions(+), 5 deletions(-)

diff --git a/src/backend/archive/shell_archive.c b/src/backend/archive/shell_archive.c
index 828723afe47..f64d5f9591b 100644
--- a/src/backend/archive/shell_archive.c
+++ b/src/backend/archive/shell_archive.c
@@ -16,13 +16,21 @@
 #include "postgres.h"
 
 #include <sys/wait.h>
-
+#include "latch.h"  /* For WaitLatchOrSocket */
+#include "miscadmin.h"  /* For MyLatch */
+#ifdef WIN32
+#include <windows.h>  /* For WaitForSingleObject, DWORD, etc. */
+#endif
 #include "access/xlog.h"
 #include "archive/archive_module.h"
 #include "archive/shell_archive.h"
 #include "common/percentrepl.h"
 #include "pgstat.h"
-
+#include "utils/elog.h"  /* For elog logging */
+#include "postgres.h"      /* already there */
+#include "utils/palloc.h"  /* add this line */
+#include "libpq/pqformat.h"    /* for OpenPipeStream */
+#include "storage/latch.h"     /* for WaitLatchOrSocket */
 static bool shell_archive_configured(ArchiveModuleState *state);
 static bool shell_archive_file(ArchiveModuleState *state,
 							   const char *file,
@@ -53,12 +61,34 @@ shell_archive_configured(ArchiveModuleState *state)
 	return false;
 }
 
+#define WAIT_INTERVAL_MS 1000  /* 1s for efficient latch waiting */
+
 static bool
 shell_archive_file(ArchiveModuleState *state, const char *file,
 				   const char *path)
 {
 	char	   *xlogarchcmd;
 	char	   *nativePath = NULL;
+#ifndef WIN32
+	FILE	   *archiveFd = NULL;
+	int			archiveFileno;
+	char		buf[1024];
+	ssize_t nread;
+
+#else
+	size_t		cmdPrefixLen;
+	size_t		cmdlen;
+	char *win32cmd = palloc(strlen(xlogarchcmd) + 30);  /* cmd.exe /c "..." + null */
+        if (win32cmd == NULL)
+{
+    ereport(FATAL,
+        (errmsg_internal("Failed to palloc win32cmd: %m")));
+    return false;
+}
+	STARTUPINFO si;
+	PROCESS_INFORMATION pi;
+	int exit_code = 0;
+#endif
 	int			rc;
 
 	if (path)
@@ -77,14 +107,125 @@ shell_archive_file(ArchiveModuleState *state, const char *file,
 
 	fflush(NULL);
 	pgstat_report_wait_start(WAIT_EVENT_ARCHIVE_COMMAND);
-	rc = system(xlogarchcmd);
+
+	/*
+	 * Start the command and read until it completes, while keep checking for
+	 * interrupts to process pending events.
+	 */
+#ifndef WIN32
+	archiveFile = OpenPipeStream(xlogarchcmd, PG_BINARY_R);
+if (archiveFile == NULL)
+{
+    ereport(FATAL,
+            (errcode_for_file_access(),
+             errmsg("could not open archive command pipe: %m")));
+}
+		while (true)
+		{
+			CHECK_FOR_INTERRUPTS();
+			nread = read(archiveFd, &buf, sizeof(buf));
+			if ((nread > 0) || (nread == -1 && errno == EAGAIN))
+				if (nread > 0)
+{
+    buf[nread] = '\0';  /* Null-terminate for string *
+    elog(LOG, "Archive command stdout: %s", buf);
+}
+			else
+				break;
+		}
+		rc = pclose(archiveFd);
+	}
+	else
+		rc = -1;
+#else
+	/*
+	 * * Create a palloc'd copy of the command string, we need to prefix it with
+	 * cmd /c as the commandLine argument to CreateProcess still expects .exe
+	 * files.
+	 */
+	cmdlen = strlen(xlogarchcmd);
+#define CMD_PREFIX "cmd /c \""
+	cmdPrefixLen = strlen(CMD_PREFIX);
+	if (win32cmd == NULL)
+	{
+		ereport(FATAL,
+				(errmsg_internal("Failed to palloc win32cmd: %m")));
+		
+	}
+	memcpy(win32cmd, CMD_PREFIX, cmdPrefixLen);
+	memcpy(&win32cmd[cmdPrefixLen], xlogarchcmd, cmdlen);
+	win32cmd[cmdPrefixLen + cmdlen] = '"';
+	win32cmd[cmdPrefixLen + cmdlen + 1] = '\0';
+	ereport(DEBUG4,
+			(errmsg_internal("WIN32: executing modified archive command \"%s\"",
+							 win32cmd)));
+
+	memset(&pi, 0, sizeof(pi));
+	memset(&si, 0, sizeof(si));
+	si.cb = sizeof(si);
+
+	archiveFile = OpenPipeStream(xlogarchcmd, PG_BINARY_R);
+if (archiveFile == NULL)
+{
+    ereport(FATAL,
+            (errcode_for_file_access(),
+             errmsg("could not open archive command pipe: %m")));
+}
+	
+
+	DWORD result;
+ResetLatch(MyLatch);
+     while (true)
+    { 
+     CHECK_FOR_INTERRUPTS();
+    int latch_rc = WaitLatchOrSocket(MyLatch,
+                                WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+                                PGINVALID_SOCKET,
+                                WAIT_INTERVAL_MS,
+                                WAIT_EVENT_ARCHIVER_WAIT_CHILD);  /* Or WAIT_EVENT_ARCHIVER_MAIN if undefined */
+if (latch_rc & WL_LATCH_SET)
+{
+    ResetLatch(MyLatch);
+    CHECK_FOR_INTERRUPTS();
+}
+DWORD result = WaitForSingleObject(pi.hProcess, 0);  /* Quick non-block check */
+    if (result == WAIT_OBJECT_0)
+        break;
+    else if (result == WAIT_TIMEOUT)
+        continue;  /* Normal polling */
+    else if (result == WAIT_FAILED)
+    {
+        DWORD err = GetLastError();
+        CloseHandle(pi.hProcess);
+        CloseHandle(pi.hThread);
+        ereport(ERROR,
+                (errmsg("WaitForSingleObject failed during archive_command: %m (Windows error %lu)",
+                        err)));
+       pfree(win32cmd);
+        return false;
+    }
+    else
+    {
+        ereport(ERROR,
+                (errmsg("Unexpected WaitForSingleObject result during archive_command: %lu",
+                        result)));
+       pfree(win32cmd);
+        return false;
+    }
+}
+
+	GetExitCodeProcess(pi.hProcess, &exit_code);
+	CloseHandle(pi.hProcess);
+	CloseHandle(pi.hThread);
+	rc = exit_code;
+#endif
 	pgstat_report_wait_end();
 
 	if (rc != 0)
 	{
 		/*
 		 * If either the shell itself, or a called command, died on a signal,
-		 * abort the archiver.  We do this because system() ignores SIGINT and
+		 * abort the archiver.  We do this because pclose() ignores SIGINT and
 		 * SIGQUIT while waiting; so a signal is very likely something that
 		 * should have interrupted us too.  Also die if the shell got a hard
 		 * "command not found" type of error.  If we overreact it's no big
@@ -126,7 +267,7 @@ shell_archive_file(ArchiveModuleState *state, const char *file,
 							   xlogarchcmd)));
 		}
 		pfree(xlogarchcmd);
-
+               pfree(win32cmd);
 		return false;
 	}
 	pfree(xlogarchcmd);
-- 
2.39.5