public inbox for pgsql-hackers@postgresql.org  
help / color / mirror / Atom feed
Re: Adding pg_dump flag for parallel export to pipes
3+ messages / 2 participants
[nested] [flat]

* Re: Adding pg_dump flag for parallel export to pipes
@ 2026-02-20 09:08 Nitin Motiani <nitinmotiani@google.com>
  2026-05-21 09:56 ` Re: Adding pg_dump flag for parallel export to pipes Nitin Motiani <nitinmotiani@google.com>
  0 siblings, 1 reply; 3+ messages in thread

From: Nitin Motiani @ 2026-02-20 09:08 UTC (permalink / raw)
  To: Dilip Kumar <dilipbalaut@gmail.com>; +Cc: Thomas Munro <thomas.munro@gmail.com>; Hannu Krosing <hannuk@google.com>; pgsql-hackers

Hi,

I fixed the failing tap test. Attaching rebased patches along with the
new test. The last patch is still WIP because I need to add more test
cases.

Thanks,
Nitin Motiani
Google


Attachments:

  [application/x-patch] v8-0003-Add-basic-tests-for-pipe-command.patch (2.6K, 2-v8-0003-Add-basic-tests-for-pipe-command.patch)
  download | inline diff:
From 6786ec50ae1d6494253b27ad624c562393821b93 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Sat, 15 Feb 2025 04:29:17 +0000
Subject: [PATCH v8 3/5] Add basic tests for pipe-command

* This currently only adds a few basic tests for pg_dump with --pipe-command.
* These tests include the invalid usages of --pipe-command with other flags.
* We are still working on adding other tests in pg_dump.pl. But
  we ran into some issues which might be related to setup.
---
 src/bin/pg_dump/t/001_basic.pl | 36 ++++++++++++++++++++++++++++++++++
 1 file changed, 36 insertions(+)

diff --git a/src/bin/pg_dump/t/001_basic.pl b/src/bin/pg_dump/t/001_basic.pl
index ab9310eb42b..9ef634df989 100644
--- a/src/bin/pg_dump/t/001_basic.pl
+++ b/src/bin/pg_dump/t/001_basic.pl
@@ -74,6 +74,42 @@ command_fails_like(
 	'pg_dump: options --statistics-only and --no-statistics cannot be used together'
 );
 
+command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe-command="cat"', '-f', 'testdir', 'test'],
+	qr/\Qpg_dump: hint: Only one of [--file, --pipe-command] allowed\E/,
+	'pg_dump: hint: Only one of [--file, --pipe-command] allowed'
+);
+
+command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe-command="cat"', '-Z', 'gzip', 'test'],
+	qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+	'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe-command="cat"', '--compress=lz4', 'test'],
+	qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+	'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe-command="cat"', '-Z', '1', 'test'],
+	qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+	'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+	[ 'pg_dump', '-Fc', '--pipe-command="cat"', 'test'],
+	qr/\Qpg_dump: hint: Option --pipe-command is only supported with directory format.\E/,
+	'pg_dump: hint: Option --pipe-command is only supported with directory format.'
+);
+
+command_fails_like(
+	[ 'pg_dump', '--format=tar', '--pipe-command="cat"', 'test'],
+	qr/\Qpg_dump: hint: Option --pipe-command is only supported with directory format.\E/,
+	'pg_dump: hint: Option --pipe-command is only supported with directory format.'
+);
+
 command_fails_like(
 	[ 'pg_dump', '-j2', '--include-foreign-data=xxx' ],
 	qr/\Qpg_dump: error: option --include-foreign-data is not supported with parallel backup\E/,
-- 
2.53.0.345.g96ddfc5eaa-goog



  [application/x-patch] v8-0005-POC-Add-tests-to-pg_dump.pl.patch (1.2K, 3-v8-0005-POC-Add-tests-to-pg_dump.pl.patch)
  download | inline diff:
From c9e38bc87fad2399d3a99e661eeb8eabf0a81b8e Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Thu, 22 May 2025 10:20:15 +0000
Subject: [PATCH v8 5/5] [POC] Add tests to pg_dump.pl

* POC test in pg_dump.pl. More tests will be added and this patch
  merged with the other test patch.
---
 src/bin/pg_dump/t/002_pg_dump.pl | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index a8dcc2b5c75..2af28285760 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -223,6 +223,25 @@ my %pgdump_runs = (
 		],
 	},
 
+	# This test kept failing.
+	defaults_dir_format_pipe => {
+		test_key => 'defaults',
+		dump_cmd => [
+			'pg_dump',
+			'--format' => 'directory',
+			'--pipe-command' => "cat > $tempdir/defaults_dir_format/%f",
+			'--statistics',
+			'postgres',
+		],
+	restore_cmd => [
+			'pg_restore',
+			'--format' => 'directory',
+			'--file' => "$tempdir/defaults_dir_format_pipe.sql",
+			'--statistics',
+			"$tempdir/defaults_dir_format",
+		],
+	},
+
 	# Do not use --no-sync to give test coverage for data sync.
 	defaults_parallel => {
 		test_key => 'defaults',
-- 
2.53.0.345.g96ddfc5eaa-goog



  [application/x-patch] v8-0004-Add-documentation-for-pipe-command-in-pg_dump-and.patch (7.5K, 4-v8-0004-Add-documentation-for-pipe-command-in-pg_dump-and.patch)
  download | inline diff:
From 9052f48a52f4ba63f2d59709c170993f0248c83d Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Fri, 4 Apr 2025 14:34:48 +0000
Subject: [PATCH v8 4/5] Add documentation for pipe-command in pg_dump and
 pg_restore

* Add the descriptions of the new flags and constraints
  regarding which mode and other flags they can't be used with.
* Explain the purpose of the flags.
* Add a few examples of the usage of the flags.
---
 doc/src/sgml/ref/pg_dump.sgml    | 56 ++++++++++++++++++++++++++
 doc/src/sgml/ref/pg_restore.sgml | 68 +++++++++++++++++++++++++++++++-
 2 files changed, 123 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 7f538e90194..51ce73f390f 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -297,6 +297,7 @@ PostgreSQL documentation
         specifies the target directory instead of a file. In this case the
         directory is created by <command>pg_dump</command> unless the directory
         exists and is empty.
+        This option and <option>--pipe-command</option> can't be used together.
        </para>
       </listitem>
      </varlistentry>
@@ -1224,6 +1225,32 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--pipe-command</option></term>
+      <listitem>
+       <para>
+        This option is only supported with the directory output
+        format. It can be used to write to multiple streams which
+        otherwise would not be possible with the directory mode.
+        For each stream, it starts a process which runs the
+        specified command and pipes the pg_dump output to this
+        process.
+        This option is not valid if <option>--file</option>
+        is also specified.
+       </para>
+       <para>
+        The pipe-command can be used to perform operations like compress
+        using a custom algorithm, filter, or write the output to a cloud
+        storage etc. The user would need a way to pipe the final output of
+        each stream to a file. To handle that, the pipe command supports a format
+        specifier %f. And all the instances of %f in the command string
+        will be replaced with the corresponding file name which
+        would have been used in the directory mode with <option>--file</option>.
+        See <xref linkend="pg-dump-examples"/> below.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--quote-all-identifiers</option></term>
       <listitem>
@@ -1791,6 +1818,35 @@ CREATE DATABASE foo WITH TEMPLATE template0;
 </screen>
   </para>
 
+  <para>
+   To use pipe-command to dump a database into a directory-format archive
+   (the directory <literal>dumpdir</literal> needs to exist before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb --pipe-command="cat > dumpdir/%f"</userinput>
+</screen>
+  </para>
+
+  <para>
+   To use pipe-command to dump a database into a directory-format archive
+   in parallel with 5 worker jobs (the directory <literal>dumpdir</literal> needs to exist
+   before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb -j 5 --pipe-command="cat > dumpdir/%f"</userinput>
+</screen>
+  </para>
+
+  <para>
+   To use pipe-command to compress and dump a database into a
+   directory-format archive (the directory <literal>dumpdir</literal> needs to
+   exist before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb --pipe-command="gzip > dumpdir/%f.gz"</userinput>
+</screen>
+  </para>
+
   <para>
    To reload an archive file into a (freshly created) database named
    <literal>newdb</literal>:
diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml
index 420a308a7c7..be1ed51fe1b 100644
--- a/doc/src/sgml/ref/pg_restore.sgml
+++ b/doc/src/sgml/ref/pg_restore.sgml
@@ -96,7 +96,10 @@ PostgreSQL documentation
        <para>
        Specifies the location of the archive file (or directory, for a
        directory-format archive) to be restored.
-       If not specified, the standard input is used.
+       This option and <option>--pipe-command</option> can't be set
+       at the same time.
+       If neither this option nor <option>--pipe-command</option> is specified,
+       the standard input is used.
        </para>
       </listitem>
      </varlistentry>
@@ -820,6 +823,32 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--pipe-command</option></term>
+      <listitem>
+       <para>
+        This option is only supported with the directory output
+        format. It can be used to read from multiple streams which
+        otherwise would not be possible with the directory mode.
+        For each stream, it starts a process which runs the
+        specified command and pipes its output to the pg_restore process.
+        This option is not valid if <option>filename</option> is also specified.
+       </para>
+       <para>
+        The pipe-command can be used to perform operations like
+        decompress using a custom algorithm, filter, or read from
+        a cloud storage. When reading from the pg_dump output,
+        the user would need a way to read the correct file in each
+        stream. To handle that, the pipe command supports a format
+        specifier %f. And all the instances of %f in the command string
+        will be replaced with the corresponding file name which
+        would have been used in the directory mode with <option>filename</option>.
+        This is same as the <option>--pipe-command</option> of pg-dump.
+        See <xref linkend="app-pgrestore-examples"/> below.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
        <term><option>--section=<replaceable class="parameter">sectionname</replaceable></option></term>
        <listitem>
@@ -1234,6 +1263,43 @@ CREATE DATABASE foo WITH TEMPLATE template0;
 <prompt>$</prompt> <userinput>pg_restore -L db.list db.dump</userinput>
 </screen></para>
 
+  <para>
+   To use pg_restore with pipe-command to recreate from a dump in
+   directory-archive format. The database should not exist beforehand.
+   Assume in this example that the dump in directory-archive format is
+   stored in <literal>dumpdir</literal>.
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f"</userinput>
+</screen>
+  </para>
+
+  <para>
+   To use pg_restore with pipe-command to first decompress and then
+   recreate from a dump in directory-archive format. The database
+   should not exist beforehand.
+   Assume in this example that the dump in directory-archive format is
+   stored in <literal>dumpdir</literal>. And all files are
+   <literal>gzip</literal> compressed.
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f.gz | gunzip"</userinput>
+</screen>
+  </para>
+
+  <para>
+   To use pipe-command along with <option>-L</option> to recreate only
+   selectd items from a dump in the directory-archive format.
+   The database should not exist beforehand.
+   Assume in this example that the dump in directory-archive format is
+   stored in dumpdir.
+   The <literal>db.list</literal> file is the same as one used in the previous example with <option>-L</option>
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f" -L db.list</userinput>
+</screen>
+  </para>
+
  </refsect1>
 
  <refsect1>
-- 
2.53.0.345.g96ddfc5eaa-goog



  [application/x-patch] v8-0002-Add-pipe-command-support-in-pg_restore.patch (6.0K, 5-v8-0002-Add-pipe-command-support-in-pg_restore.patch)
  download | inline diff:
From 11e6e9c64793413464cee936643854d65eec3d47 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Sat, 15 Feb 2025 08:05:25 +0000
Subject: [PATCH v8 2/5] Add pipe-command support in pg_restore

* This is same as the pg_dump change. We add support
  for --pipe-command in directory archive format. This can be used
  to read from multiple streams and do pre-processing (decompression
  with a custom algorithm, filtering etc) before restore.
  Currently that is not possible because the pg_dump output of
  directory format can't just be piped.
* Like pg_dump, here also either filename or --pipe-command can be
  set. If neither are set, the standard input is used as before.
* This is only supported with compression none and archive format
  directory.
* We reuse the inputFileSpec field for the pipe-command. And add
  a bool to specify if it is a pipe.
* The changes made for pg_dump to handle the pipe case with popen
  and pclose also work here.
* The logic of %f format specifier to read from the pg_dump output
  is the same too. Most of the code from the pg_dump commit works.
  We add similar logic to the function to read large objects.
* The --pipe command works -l and -L option.
---
 src/bin/pg_dump/compress_io.c         | 30 +++++++++++++++----------
 src/bin/pg_dump/pg_backup_directory.c | 16 +++++++++++++-
 src/bin/pg_dump/pg_restore.c          | 32 ++++++++++++++++++++++++---
 3 files changed, 62 insertions(+), 16 deletions(-)

diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index bc521dd274b..88488186b34 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -260,22 +260,28 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode,
 
 	fname = pg_strdup(path);
 
-	if (hasSuffix(fname, ".gz"))
-		compression_spec.algorithm = PG_COMPRESSION_GZIP;
-	else if (hasSuffix(fname, ".lz4"))
-		compression_spec.algorithm = PG_COMPRESSION_LZ4;
-	else if (hasSuffix(fname, ".zst"))
-		compression_spec.algorithm = PG_COMPRESSION_ZSTD;
-	else
+	/*
+	 * If the path is a pipe command, the compression algorithm is none.
+	 */
+	if (!path_is_pipe_command)
 	{
-		if (stat(path, &st) == 0)
-			compression_spec.algorithm = PG_COMPRESSION_NONE;
-		else if (check_compressed_file(path, &fname, "gz"))
+		if (hasSuffix(fname, ".gz"))
 			compression_spec.algorithm = PG_COMPRESSION_GZIP;
-		else if (check_compressed_file(path, &fname, "lz4"))
+		else if (hasSuffix(fname, ".lz4"))
 			compression_spec.algorithm = PG_COMPRESSION_LZ4;
-		else if (check_compressed_file(path, &fname, "zst"))
+		else if (hasSuffix(fname, ".zst"))
 			compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+		else
+		{
+			if (stat(path, &st) == 0)
+				compression_spec.algorithm = PG_COMPRESSION_NONE;
+			else if (check_compressed_file(path, &fname, "gz"))
+				compression_spec.algorithm = PG_COMPRESSION_GZIP;
+			else if (check_compressed_file(path, &fname, "lz4"))
+				compression_spec.algorithm = PG_COMPRESSION_LZ4;
+			else if (check_compressed_file(path, &fname, "zst"))
+				compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+		}
 	}
 
 	CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 74fc651f6f4..2b18c3c8270 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -439,7 +439,21 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 					 tocfname, line);
 
 		StartRestoreLO(AH, oid, AH->public.ropt->dropSchema);
-		snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+
+		/*
+		 * XXX : Create a helper function for blob files naming common to
+		 * _LoadLOs an _StartLO.
+		 */
+		if (AH->fSpecIsPipe)
+		{
+			pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", lofname);
+			strcpy(path, pipe);
+			pfree(pipe);
+		}
+		else
+		{
+			snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+		}
 		_PrintFileData(AH, path);
 		EndRestoreLO(AH, oid);
 	}
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 9495a37ffc1..46758f72d98 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -66,6 +66,7 @@ main(int argc, char **argv)
 	char	   *inputFileSpec;
 	bool		data_only = false;
 	bool		schema_only = false;
+	bool		filespec_is_pipe = false;
 	static int	disable_triggers = 0;
 	static int	enable_row_security = 0;
 	static int	if_exists = 0;
@@ -142,6 +143,7 @@ main(int argc, char **argv)
 		{"statistics-only", no_argument, &statistics_only, 1},
 		{"filter", required_argument, NULL, 4},
 		{"restrict-key", required_argument, NULL, 6},
+		{"pipe-command", required_argument, NULL, 7},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -321,6 +323,11 @@ main(int argc, char **argv)
 				opts->restrict_key = pg_strdup(optarg);
 				break;
 
+			case 7:				/* pipe-command */
+				inputFileSpec = pg_strdup(optarg);
+				filespec_is_pipe = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -328,11 +335,29 @@ main(int argc, char **argv)
 		}
 	}
 
-	/* Get file name from command line */
+	/*
+	 * Get file name from command line. Note that filename argument and
+	 * pipe-command can't both be set.
+	 */
 	if (optind < argc)
+	{
+		if (filespec_is_pipe)
+		{
+			pg_log_error_hint("Only one of [filespec, --pipe-command] allowed");
+			exit_nicely(1);
+		}
 		inputFileSpec = argv[optind++];
-	else
+	}
+
+	/*
+	 * Even if the file argument is not provided, if the pipe-command is
+	 * specified, we need to use that as the file arg and not fallback to
+	 * stdio.
+	 */
+	else if (!filespec_is_pipe)
+	{
 		inputFileSpec = NULL;
+	}
 
 	/* Complain if any arguments remain */
 	if (optind < argc)
@@ -485,7 +510,8 @@ main(int argc, char **argv)
 					 opts->formatName);
 	}
 
-	AH = OpenArchive(inputFileSpec, opts->format, false);
+
+	AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe);
 
 	SetArchiveOptions(AH, NULL, opts);
 
-- 
2.53.0.345.g96ddfc5eaa-goog



  [application/x-patch] v8-0001-Add-pipe-command-support-for-directory-mode-of-pg.patch (30.4K, 6-v8-0001-Add-pipe-command-support-for-directory-mode-of-pg.patch)
  download | inline diff:
From c5980161b782daaedee4cdde9d451ffba77fb4fb Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Tue, 11 Feb 2025 08:31:02 +0000
Subject: [PATCH v8 1/5] Add pipe-command support for directory mode of pg_dump

* We add a new flag --pipe-command which can be used in directory
  mode. This allows us to support multiple streams and we can
  do post processing like compression, filtering etc. This is
  currently not possible with directory-archive format.
* Currently this flag is only supported with compression none
  and archive format directory.
* This flag can't be used with the flag --file. Only one of the
  two flags can be used at a time.
* We reuse the filename field for the --pipe-command also. And add a
  bool to specify that the field will be used as a pipe command.
* Most of the code remains as it is. The core change is that
  in case of --pipe-command, instead of fopen we do popen.
* The user would need a way to store the post-processing output
  in files. For that we support the same format as the directory
  mode currently does with the flag --file. We allow the user
  to add a format specifier %f to the --pipe-command. And for each
  stream, the format specifier is replaced with the corresponding
  file name. This file name is the same as it would have been if
  the flag --file had been used.
* To enable the above, there are a few places in the code where
  we change the file name creation logic. Currently the file name
  is appended to the directory name which is provided with --file flag.
  In case of --pipe-command, we instead replace %f with the file name.
  This change is made for the common use case and separately for
  blob files.
* There is an open question on what mode to use in case of large objects
  TOC file. Currently the code uses "ab" but that won't work for popen.
  We have proposed a few options in the comments regarding this. For the
  time being we are using mode PG_BINARY_W for the pipe use case.
---
 src/bin/pg_dump/compress_gzip.c       |   9 ++-
 src/bin/pg_dump/compress_gzip.h       |   3 +-
 src/bin/pg_dump/compress_io.c         |  26 +++++--
 src/bin/pg_dump/compress_io.h         |  11 ++-
 src/bin/pg_dump/compress_lz4.c        |  11 ++-
 src/bin/pg_dump/compress_lz4.h        |   3 +-
 src/bin/pg_dump/compress_none.c       |  26 ++++++-
 src/bin/pg_dump/compress_none.h       |   3 +-
 src/bin/pg_dump/compress_zstd.c       |  10 ++-
 src/bin/pg_dump/compress_zstd.h       |   3 +-
 src/bin/pg_dump/pg_backup.h           |   5 +-
 src/bin/pg_dump/pg_backup_archiver.c  |  22 +++---
 src/bin/pg_dump/pg_backup_archiver.h  |   2 +
 src/bin/pg_dump/pg_backup_directory.c | 103 +++++++++++++++++++++-----
 src/bin/pg_dump/pg_dump.c             |  37 ++++++++-
 src/bin/pg_dump/pg_restore.c          |   2 +-
 16 files changed, 220 insertions(+), 56 deletions(-)

diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c
index c9ce8a53aaa..46ac5bddcd5 100644
--- a/src/bin/pg_dump/compress_gzip.c
+++ b/src/bin/pg_dump/compress_gzip.c
@@ -417,8 +417,12 @@ Gzip_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
 
 void
 InitCompressFileHandleGzip(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
+	if (path_is_pipe_command)
+		pg_fatal("cPipe command not supported for Gzip");
+
 	CFH->open_func = Gzip_open;
 	CFH->open_write_func = Gzip_open_write;
 	CFH->read_func = Gzip_read;
@@ -443,7 +447,8 @@ InitCompressorGzip(CompressorState *cs,
 
 void
 InitCompressFileHandleGzip(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
 	pg_fatal("this build does not support compression with %s", "gzip");
 }
diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h
index af1a2a3445e..f77c5c86c56 100644
--- a/src/bin/pg_dump/compress_gzip.h
+++ b/src/bin/pg_dump/compress_gzip.h
@@ -19,6 +19,7 @@
 extern void InitCompressorGzip(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleGzip(CompressFileHandle *CFH,
-									   const pg_compress_specification compression_spec);
+									   const pg_compress_specification compression_spec,
+									   bool path_is_pipe_command);
 
 #endif							/* _COMPRESS_GZIP_H_ */
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 52652b0d979..bc521dd274b 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -191,20 +191,29 @@ free_keep_errno(void *p)
  * Initialize a compress file handle for the specified compression algorithm.
  */
 CompressFileHandle *
-InitCompressFileHandle(const pg_compress_specification compression_spec)
+InitCompressFileHandle(const pg_compress_specification compression_spec,
+					   bool path_is_pipe_command)
 {
 	CompressFileHandle *CFH;
 
 	CFH = pg_malloc0_object(CompressFileHandle);
 
-	if (compression_spec.algorithm == PG_COMPRESSION_NONE)
-		InitCompressFileHandleNone(CFH, compression_spec);
+	/*
+	 * Always set to non-compressed when path_is_pipe_command assuming that
+	 * external compressor as part of pipe is more efficient. Can review in
+	 * the future.
+	 */
+	if (path_is_pipe_command)
+		InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
+
+	else if (compression_spec.algorithm == PG_COMPRESSION_NONE)
+		InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
 	else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
-		InitCompressFileHandleGzip(CFH, compression_spec);
+		InitCompressFileHandleGzip(CFH, compression_spec, path_is_pipe_command);
 	else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
-		InitCompressFileHandleLZ4(CFH, compression_spec);
+		InitCompressFileHandleLZ4(CFH, compression_spec, path_is_pipe_command);
 	else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
-		InitCompressFileHandleZstd(CFH, compression_spec);
+		InitCompressFileHandleZstd(CFH, compression_spec, path_is_pipe_command);
 
 	return CFH;
 }
@@ -237,7 +246,8 @@ check_compressed_file(const char *path, char **fname, char *ext)
  * On failure, return NULL with an error code in errno.
  */
 CompressFileHandle *
-InitDiscoverCompressFileHandle(const char *path, const char *mode)
+InitDiscoverCompressFileHandle(const char *path, const char *mode,
+							   bool path_is_pipe_command)
 {
 	CompressFileHandle *CFH = NULL;
 	struct stat st;
@@ -268,7 +278,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
 			compression_spec.algorithm = PG_COMPRESSION_ZSTD;
 	}
 
-	CFH = InitCompressFileHandle(compression_spec);
+	CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);
 	errno = 0;
 	if (!CFH->open_func(fname, -1, mode, CFH))
 	{
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index ed7b14f0963..bd0fc2634dc 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -186,6 +186,11 @@ struct CompressFileHandle
 	 */
 	pg_compress_specification compression_spec;
 
+	/*
+	 * Compression specification for this file handle.
+	 */
+	bool		path_is_pipe_command;
+
 	/*
 	 * Private data to be used by the compressor.
 	 */
@@ -195,7 +200,8 @@ struct CompressFileHandle
 /*
  * Initialize a compress file handle with the requested compression.
  */
-extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec);
+extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec,
+												  bool path_is_pipe_command);
 
 /*
  * Initialize a compress file stream. Infer the compression algorithm
@@ -203,6 +209,7 @@ extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specificatio
  * suffixes in 'path'.
  */
 extern CompressFileHandle *InitDiscoverCompressFileHandle(const char *path,
-														  const char *mode);
+														  const char *mode,
+														  bool path_is_pipe_command);
 extern bool EndCompressFileHandle(CompressFileHandle *CFH);
 #endif
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index b72bad130ad..f016627ceab 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -739,10 +739,14 @@ LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH
  */
 void
 InitCompressFileHandleLZ4(CompressFileHandle *CFH,
-						  const pg_compress_specification compression_spec)
+						  const pg_compress_specification compression_spec,
+						  bool path_is_pipe_command)
 {
 	LZ4State   *state;
 
+	if (path_is_pipe_command)
+		pg_fatal("Pipe command not supported for LZ4");
+
 	CFH->open_func = LZ4Stream_open;
 	CFH->open_write_func = LZ4Stream_open_write;
 	CFH->read_func = LZ4Stream_read;
@@ -758,6 +762,8 @@ InitCompressFileHandleLZ4(CompressFileHandle *CFH,
 	if (CFH->compression_spec.level >= 0)
 		state->prefs.compressionLevel = CFH->compression_spec.level;
 
+	CFH->path_is_pipe_command = path_is_pipe_command;
+
 	CFH->private_data = state;
 }
 #else							/* USE_LZ4 */
@@ -770,7 +776,8 @@ InitCompressorLZ4(CompressorState *cs,
 
 void
 InitCompressFileHandleLZ4(CompressFileHandle *CFH,
-						  const pg_compress_specification compression_spec)
+						  const pg_compress_specification compression_spec,
+						  bool path_is_pipe_command)
 {
 	pg_fatal("this build does not support compression with %s", "LZ4");
 }
diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h
index 7360a469fc0..490141ee8a1 100644
--- a/src/bin/pg_dump/compress_lz4.h
+++ b/src/bin/pg_dump/compress_lz4.h
@@ -19,6 +19,7 @@
 extern void InitCompressorLZ4(CompressorState *cs,
 							  const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH,
-									  const pg_compress_specification compression_spec);
+									  const pg_compress_specification compression_spec,
+									  bool path_is_pipe_command);
 
 #endif							/* _COMPRESS_LZ4_H_ */
diff --git a/src/bin/pg_dump/compress_none.c b/src/bin/pg_dump/compress_none.c
index d862d8ca6e9..bc63ccabdb6 100644
--- a/src/bin/pg_dump/compress_none.c
+++ b/src/bin/pg_dump/compress_none.c
@@ -211,7 +211,10 @@ close_none(CompressFileHandle *CFH)
 	if (fp)
 	{
 		errno = 0;
-		ret = fclose(fp);
+		if (CFH->path_is_pipe_command)
+			ret = pclose(fp);
+		else
+			ret = fclose(fp);
 		if (ret != 0)
 			pg_log_error("could not close file: %m");
 	}
@@ -233,7 +236,12 @@ open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
 	if (fd >= 0)
 		CFH->private_data = fdopen(dup(fd), mode);
 	else
-		CFH->private_data = fopen(path, mode);
+	{
+		if (CFH->path_is_pipe_command)
+			CFH->private_data = popen(path, mode);
+		else
+			CFH->private_data = fopen(path, mode);
+	}
 
 	if (CFH->private_data == NULL)
 		return false;
@@ -246,7 +254,14 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
 {
 	Assert(CFH->private_data == NULL);
 
-	CFH->private_data = fopen(path, mode);
+	pg_log_debug("Opening %s, pipe is %s",
+				 path, CFH->path_is_pipe_command ? "true" : "false");
+
+	if (CFH->path_is_pipe_command)
+		CFH->private_data = popen(path, mode);
+	else
+		CFH->private_data = fopen(path, mode);
+
 	if (CFH->private_data == NULL)
 		return false;
 
@@ -259,7 +274,8 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
 
 void
 InitCompressFileHandleNone(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
 	CFH->open_func = open_none;
 	CFH->open_write_func = open_write_none;
@@ -271,5 +287,7 @@ InitCompressFileHandleNone(CompressFileHandle *CFH,
 	CFH->eof_func = eof_none;
 	CFH->get_error_func = get_error_none;
 
+	CFH->path_is_pipe_command = path_is_pipe_command;
+
 	CFH->private_data = NULL;
 }
diff --git a/src/bin/pg_dump/compress_none.h b/src/bin/pg_dump/compress_none.h
index 5134f012ee9..d898a2d411c 100644
--- a/src/bin/pg_dump/compress_none.h
+++ b/src/bin/pg_dump/compress_none.h
@@ -19,6 +19,7 @@
 extern void InitCompressorNone(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleNone(CompressFileHandle *CFH,
-									   const pg_compress_specification compression_spec);
+									   const pg_compress_specification compression_spec,
+									   bool path_is_pipe_command);
 
 #endif							/* _COMPRESS_NONE_H_ */
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index cf2db2649ac..a2c50822566 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -27,7 +27,8 @@ InitCompressorZstd(CompressorState *cs, const pg_compress_specification compress
 }
 
 void
-InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
 	pg_fatal("this build does not support compression with %s", "ZSTD");
 }
@@ -558,8 +559,12 @@ Zstd_get_error(CompressFileHandle *CFH)
 
 void
 InitCompressFileHandleZstd(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
+	if (path_is_pipe_command)
+		pg_fatal("Pipe command not supported for Zstd");
+
 	CFH->open_func = Zstd_open;
 	CFH->open_write_func = Zstd_open_write;
 	CFH->read_func = Zstd_read;
@@ -571,6 +576,7 @@ InitCompressFileHandleZstd(CompressFileHandle *CFH,
 	CFH->get_error_func = Zstd_get_error;
 
 	CFH->compression_spec = compression_spec;
+	CFH->path_is_pipe_command = path_is_pipe_command;
 
 	CFH->private_data = NULL;
 }
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
index 1222d7107d9..1f23e7266bf 100644
--- a/src/bin/pg_dump/compress_zstd.h
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -20,6 +20,7 @@
 extern void InitCompressorZstd(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
-									   const pg_compress_specification compression_spec);
+									   const pg_compress_specification compression_spec,
+									   bool path_is_pipe_command);
 
 #endif							/* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 2f8d9799c30..4fc0eb64fcb 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -316,14 +316,15 @@ extern void ProcessArchiveRestoreOptions(Archive *AHX);
 extern void RestoreArchive(Archive *AHX);
 
 /* Open an existing archive */
-extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
+extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe);
 
 /* Create a new archive */
 extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 							  const pg_compress_specification compression_spec,
 							  bool dosync, ArchiveMode mode,
 							  SetupWorkerPtrType setupDumpWorker,
-							  DataDirSyncMethod sync_method);
+							  DataDirSyncMethod sync_method,
+							  bool FileSpecIsPipe);
 
 /* The --list option */
 extern void PrintTOCSummary(Archive *AHX);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 7afcc0859c8..09e14011d48 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -56,7 +56,7 @@ static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 							   const pg_compress_specification compression_spec,
 							   bool dosync, ArchiveMode mode,
 							   SetupWorkerPtrType setupWorkerPtr,
-							   DataDirSyncMethod sync_method);
+							   DataDirSyncMethod sync_method, bool FileSpecIsPipe);
 static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
 static void _doSetFixedOutputState(ArchiveHandle *AH);
@@ -232,11 +232,12 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 			  const pg_compress_specification compression_spec,
 			  bool dosync, ArchiveMode mode,
 			  SetupWorkerPtrType setupDumpWorker,
-			  DataDirSyncMethod sync_method)
+			  DataDirSyncMethod sync_method,
+			  bool FileSpecIsPipe)
 
 {
 	ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
-								 dosync, mode, setupDumpWorker, sync_method);
+								 dosync, mode, setupDumpWorker, sync_method, FileSpecIsPipe);
 
 	return (Archive *) AH;
 }
@@ -244,7 +245,7 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 /* Open an existing archive */
 /* Public */
 Archive *
-OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
+OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe)
 {
 	ArchiveHandle *AH;
 	pg_compress_specification compression_spec = {0};
@@ -252,7 +253,7 @@ OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
 	compression_spec.algorithm = PG_COMPRESSION_NONE;
 	AH = _allocAH(FileSpec, fmt, compression_spec, true,
 				  archModeRead, setupRestoreWorker,
-				  DATA_DIR_SYNC_METHOD_FSYNC);
+				  DATA_DIR_SYNC_METHOD_FSYNC, FileSpecIsPipe);
 
 	return (Archive *) AH;
 }
@@ -1720,7 +1721,7 @@ SetOutput(ArchiveHandle *AH, const char *filename,
 	else
 		mode = PG_BINARY_W;
 
-	CFH = InitCompressFileHandle(compression_spec);
+	CFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
 
 	if (!CFH->open_func(filename, fn, mode, CFH))
 	{
@@ -2376,7 +2377,8 @@ static ArchiveHandle *
 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 		 const pg_compress_specification compression_spec,
 		 bool dosync, ArchiveMode mode,
-		 SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
+		 SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method,
+		 bool FileSpecIsPipe)
 {
 	ArchiveHandle *AH;
 	CompressFileHandle *CFH;
@@ -2417,6 +2419,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 	else
 		AH->fSpec = NULL;
 
+	AH->fSpecIsPipe = FileSpecIsPipe;
+
 	AH->currUser = NULL;		/* unknown */
 	AH->currSchema = NULL;		/* ditto */
 	AH->currTablespace = NULL;	/* ditto */
@@ -2429,14 +2433,14 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 
 	AH->mode = mode;
 	AH->compression_spec = compression_spec;
-	AH->dosync = dosync;
+	AH->dosync = FileSpecIsPipe ? false : dosync;
 	AH->sync_method = sync_method;
 
 	memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
 
 	/* Open stdout with no compression for AH output handle */
 	out_compress_spec.algorithm = PG_COMPRESSION_NONE;
-	CFH = InitCompressFileHandle(out_compress_spec);
+	CFH = InitCompressFileHandle(out_compress_spec, AH->fSpecIsPipe);
 	if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
 		pg_fatal("could not open stdout for appending: %m");
 	AH->OF = CFH;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 325b53fc9bd..a8b1ab79e82 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -301,6 +301,8 @@ struct _archiveHandle
 	int			loCount;		/* # of LOs restored */
 
 	char	   *fSpec;			/* Archive File Spec */
+	bool		fSpecIsPipe;	/* fSpec is a pipe command template requiring
+								 * replacing %f with file name */
 	FILE	   *FH;				/* General purpose file handle */
 	void	   *OF;				/* Output file */
 
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index d6a1428c67a..74fc651f6f4 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -39,7 +39,8 @@
 #include <dirent.h>
 #include <sys/stat.h>
 
-#include "common/file_utils.h"
+/* #include "common/file_utils.h" */
+#include "common/percentrepl.h"
 #include "compress_io.h"
 #include "dumputils.h"
 #include "parallel.h"
@@ -157,8 +158,11 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 	if (AH->mode == archModeWrite)
 	{
-		/* we accept an empty existing directory */
-		create_or_open_dir(ctx->directory);
+		if (!AH->fSpecIsPipe)	/* no checks for pipe */
+		{
+			/* we accept an empty existing directory */
+			create_or_open_dir(ctx->directory);
+		}
 	}
 	else
 	{							/* Read Mode */
@@ -167,7 +171,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 		setFilePath(AH, fname, "toc.dat");
 
-		tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R);
+		tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R, AH->fSpecIsPipe);
 		if (tocFH == NULL)
 			pg_fatal("could not open input file \"%s\": %m", fname);
 
@@ -295,7 +299,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
 
 	setFilePath(AH, fname, tctx->filename);
 
-	ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+	ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
 
 	if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
@@ -353,7 +357,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
 	if (!filename)
 		return;
 
-	CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R);
+	CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R, AH->fSpecIsPipe);
 	if (!CFH)
 		pg_fatal("could not open input file \"%s\": %m", filename);
 
@@ -416,7 +420,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 	else
 		setFilePath(AH, tocfname, tctx->filename);
 
-	CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R);
+	CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R, AH->fSpecIsPipe);
 
 	if (ctx->LOsTocFH == NULL)
 		pg_fatal("could not open large object TOC file \"%s\" for input: %m",
@@ -427,6 +431,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 	{
 		char		lofname[MAXPGPATH + 1];
 		char		path[MAXPGPATH];
+		char	   *pipe;
 
 		/* Can't overflow because line and lofname are the same length */
 		if (sscanf(line, "%u %" CppAsString2(MAXPGPATH) "s\n", &oid, lofname) != 2)
@@ -545,7 +550,7 @@ _CloseArchive(ArchiveHandle *AH)
 
 		/* The TOC is always created uncompressed */
 		compression_spec.algorithm = PG_COMPRESSION_NONE;
-		tocFH = InitCompressFileHandle(compression_spec);
+		tocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
 		if (!tocFH->open_write_func(fname, PG_BINARY_W, tocFH))
 			pg_fatal("could not open output file \"%s\": %m", fname);
 		ctx->dataFH = tocFH;
@@ -606,13 +611,46 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te)
 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
 	pg_compress_specification compression_spec = {0};
 	char		fname[MAXPGPATH];
+	const char *mode;
 
 	setFilePath(AH, fname, tctx->filename);
 
 	/* The LO TOC file is never compressed */
 	compression_spec.algorithm = PG_COMPRESSION_NONE;
-	ctx->LOsTocFH = InitCompressFileHandle(compression_spec);
-	if (!ctx->LOsTocFH->open_write_func(fname, "ab", ctx->LOsTocFH))
+	ctx->LOsTocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
+
+	/*
+	 * XXX: We can probably simplify this code by using the mode 'w' for all
+	 * cases. The current implementation is due to historical reason that the
+	 * mode for the LOs TOC file has been "ab" from the start. That is
+	 * something we can't do for pipe-command as popen only supports read and
+	 * write. So here a different mode is used for pipes.
+	 *
+	 * But in future we can evaluate using 'w' for everything.there is one
+	 * ToCEntry There is only one ToCEntry per blob group. And it is written
+	 * by @WriteDataChunksForToCEntry. This function calls _StartLOs once
+	 * before the dumper function and and _EndLOs once after the dumper. And
+	 * the dumper dumps all the LOs in the group. So a blob_NNN.toc is only
+	 * opened once and closed after all the entries are written. Therefore the
+	 * mode can be made 'w' for all the cases. We tested changing the mode to
+	 * PG_BINARY_W and the tests passed. But in case there are some missing
+	 * scenarios, we have not made that change here. Instead for now only
+	 * doing it for the pipe command.
+	 *
+	 * Another alternative is to keep the 'ab' mode for regular files and use
+	 * 'w' mode for pipe files but now also cache the pipe handle to keep it
+	 * open till all the LOs in the dump group are done. This is not needed
+	 * because of the same reason listed above that a file handle is only
+	 * opened once. In short there are 3 solutions : 1. Change the mode for
+	 * everything (preferred) 2. Change it only for pipe-command (current) 3.
+	 * Change it for pipe-command and then cache those handles and close them
+	 * in the end (not needed).
+	 */
+	if (AH->fSpecIsPipe)
+		mode = PG_BINARY_W;
+	else
+		mode = "ab";
+	if (!ctx->LOsTocFH->open_write_func(fname, mode, ctx->LOsTocFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
 }
 
@@ -626,10 +664,22 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
 	char		fname[MAXPGPATH];
+	char	   *pipe;
+	char		blob_name[MAXPGPATH];
 
-	snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+	if (AH->fSpecIsPipe)
+	{
+		snprintf(blob_name, MAXPGPATH, "blob_%u.dat", oid);
+		pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", blob_name);
+		strcpy(fname, pipe);
+		pfree(pipe);
+	}
+	else
+	{
+		snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+	}
 
-	ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+	ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
 	if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
 }
@@ -683,15 +733,27 @@ setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
 	char	   *dname;
+	char	   *pipe;
 
 	dname = ctx->directory;
 
-	if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
-		pg_fatal("file name too long: \"%s\"", dname);
 
-	strcpy(buf, dname);
-	strcat(buf, "/");
-	strcat(buf, relativeFilename);
+	if (AH->fSpecIsPipe)
+	{
+		pipe = replace_percent_placeholders(dname, "pipe-command", "f", relativeFilename);
+		strcpy(buf, pipe);
+		pfree(pipe);
+	}
+	else						/* replace all ocurrences of %f in dname with
+								 * relativeFilename */
+	{
+		if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
+			pg_fatal("file name too long: \"%s\"", dname);
+
+		strcpy(buf, dname);
+		strcat(buf, "/");
+		strcat(buf, relativeFilename);
+	}
 }
 
 /*
@@ -733,17 +795,24 @@ _PrepParallelRestore(ArchiveHandle *AH)
 		 * only need an approximate indicator of that.
 		 */
 		setFilePath(AH, fname, tctx->filename);
+		pg_log_error("filename: %s", fname);
 
 		if (stat(fname, &st) == 0)
 			te->dataLength = st.st_size;
 		else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE)
 		{
+			if (AH->fSpecIsPipe)
+				pg_log_error("pipe and compressed");
 			if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
 				strlcat(fname, ".gz", sizeof(fname));
 			else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
 				strlcat(fname, ".lz4", sizeof(fname));
 			else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+			{
+				pg_log_error("filename: %s", fname);
 				strlcat(fname, ".zst", sizeof(fname));
+				pg_log_error("filename: %s", fname);
+			}
 
 			if (stat(fname, &st) == 0)
 				te->dataLength = st.st_size;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 450cec285b3..6bfac4174fe 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -419,6 +419,7 @@ main(int argc, char **argv)
 {
 	int			c;
 	const char *filename = NULL;
+	bool		filename_is_pipe = false;
 	const char *format = "p";
 	TableInfo  *tblinfo;
 	int			numTables;
@@ -535,6 +536,7 @@ main(int argc, char **argv)
 		{"exclude-extension", required_argument, NULL, 17},
 		{"sequence-data", no_argument, &dopt.sequence_data, 1},
 		{"restrict-key", required_argument, NULL, 25},
+		{"pipe-command", required_argument, NULL, 26},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -606,7 +608,14 @@ main(int argc, char **argv)
 				break;
 
 			case 'f':
+				if (filename != NULL)
+				{
+					pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+					exit_nicely(1);
+				}
 				filename = pg_strdup(optarg);
+				filename_is_pipe = false;	/* it already is, setting again
+											 * here just for clarity */
 				break;
 
 			case 'F':
@@ -799,6 +808,16 @@ main(int argc, char **argv)
 				dopt.restrict_key = pg_strdup(optarg);
 				break;
 
+			case 26:			/* pipe command */
+				if (filename != NULL)
+				{
+					pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+					exit_nicely(1);
+				}
+				filename = pg_strdup(optarg);
+				filename_is_pipe = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -920,14 +939,26 @@ main(int argc, char **argv)
 	else if (dopt.restrict_key)
 		pg_fatal("option %s can only be used with %s",
 				 "--restrict-key", "--format=plain");
+	if (filename_is_pipe && archiveFormat != archDirectory)
+	{
+		pg_log_error_hint("Option --pipe-command is only supported with directory format.");
+		exit_nicely(1);
+	}
+
+	if (filename_is_pipe && strcmp(compression_algorithm_str, "none") != 0)
+	{
+		pg_log_error_hint("Option --pipe-command is not supported with any compression type.");
+		exit_nicely(1);
+	}
 
 	/*
 	 * Custom and directory formats are compressed by default with gzip when
 	 * available, not the others.  If gzip is not available, no compression is
-	 * done by default.
+	 * done by default. If directory format is being used with pipe-command,
+	 * no compression is done.
 	 */
 	if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
-		!user_compression_defined)
+		!filename_is_pipe && !user_compression_defined)
 	{
 #ifdef HAVE_LIBZ
 		compression_algorithm_str = "gzip";
@@ -977,7 +1008,7 @@ main(int argc, char **argv)
 
 	/* Open the output file */
 	fout = CreateArchive(filename, archiveFormat, compression_spec,
-						 dosync, archiveMode, setupDumpWorker, sync_method);
+						 dosync, archiveMode, setupDumpWorker, sync_method, filename_is_pipe);
 
 	/* Make dump options accessible right away */
 	SetArchiveOptions(fout, &dopt, NULL);
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 84b8d410c9e..9495a37ffc1 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -485,7 +485,7 @@ main(int argc, char **argv)
 					 opts->formatName);
 	}
 
-	AH = OpenArchive(inputFileSpec, opts->format);
+	AH = OpenArchive(inputFileSpec, opts->format, false);
 
 	SetArchiveOptions(AH, NULL, opts);
 
-- 
2.53.0.345.g96ddfc5eaa-goog



^ permalink  raw  reply  [nested|flat] 3+ messages in thread

* Re: Adding pg_dump flag for parallel export to pipes
  2026-02-20 09:08 Re: Adding pg_dump flag for parallel export to pipes Nitin Motiani <nitinmotiani@google.com>
@ 2026-05-21 09:56 ` Nitin Motiani <nitinmotiani@google.com>
  2026-05-22 10:34   ` Re: Adding pg_dump flag for parallel export to pipes solai v <solai.cdac@gmail.com>
  0 siblings, 1 reply; 3+ messages in thread

From: Nitin Motiani @ 2026-05-21 09:56 UTC (permalink / raw)
  To: Hannu Krosing <hannuk@google.com>; +Cc: Mahendra Singh Thalor <mahi6run@gmail.com>; Dilip Kumar <dilipbalaut@gmail.com>; Thomas Munro <thomas.munro@gmail.com>; pgsql-hackers

Changed how pipe commands are quoted in the Windows test. The latest
versions are attached.

Thanks

Nitin Motiani
Google


Attachments:

  [application/x-patch] v16-0002-Add-pipe-command-support-in-pg_restore.patch (9.5K, 2-v16-0002-Add-pipe-command-support-in-pg_restore.patch)
  download | inline diff:
From 9c85862782a6fe018c22dc469bd5abfa596cf1f9 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Sat, 15 Feb 2025 08:05:25 +0000
Subject: [PATCH v16 2/5] Add pipe-command support in pg_restore

* This is same as the pg_dump change. We add support
  for --pipe-command in directory archive format. This can be used
  to read from multiple streams and do pre-processing (decompression
  with a custom algorithm, filtering etc) before restore.
  Currently that is not possible because the pg_dump output of
  directory format can't just be piped.
* Like pg_dump, here also either filename or --pipe-command can be
  set. If neither are set, the standard input is used as before.
* This is only supported with compression none and archive format
  directory.
* We reuse the inputFileSpec field for the pipe-command. And add
  a bool to specify if it is a pipe.
* The changes made for pg_dump to handle the pipe case with popen
  and pclose also work here.
* The logic of %f format specifier to read from the pg_dump output
  is the same too. Most of the code from the pg_dump commit works.
  We add similar logic to the function to read large objects.
* The --pipe command works -l and -L option.
---
 src/bin/pg_dump/compress_io.c         | 30 +++++++++------
 src/bin/pg_dump/pg_backup_directory.c | 16 +++++++-
 src/bin/pg_dump/pg_restore.c          | 53 ++++++++++++++++++++-------
 3 files changed, 72 insertions(+), 27 deletions(-)

diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index bc521dd274b..88488186b34 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -260,22 +260,28 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode,
 
 	fname = pg_strdup(path);
 
-	if (hasSuffix(fname, ".gz"))
-		compression_spec.algorithm = PG_COMPRESSION_GZIP;
-	else if (hasSuffix(fname, ".lz4"))
-		compression_spec.algorithm = PG_COMPRESSION_LZ4;
-	else if (hasSuffix(fname, ".zst"))
-		compression_spec.algorithm = PG_COMPRESSION_ZSTD;
-	else
+	/*
+	 * If the path is a pipe command, the compression algorithm is none.
+	 */
+	if (!path_is_pipe_command)
 	{
-		if (stat(path, &st) == 0)
-			compression_spec.algorithm = PG_COMPRESSION_NONE;
-		else if (check_compressed_file(path, &fname, "gz"))
+		if (hasSuffix(fname, ".gz"))
 			compression_spec.algorithm = PG_COMPRESSION_GZIP;
-		else if (check_compressed_file(path, &fname, "lz4"))
+		else if (hasSuffix(fname, ".lz4"))
 			compression_spec.algorithm = PG_COMPRESSION_LZ4;
-		else if (check_compressed_file(path, &fname, "zst"))
+		else if (hasSuffix(fname, ".zst"))
 			compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+		else
+		{
+			if (stat(path, &st) == 0)
+				compression_spec.algorithm = PG_COMPRESSION_NONE;
+			else if (check_compressed_file(path, &fname, "gz"))
+				compression_spec.algorithm = PG_COMPRESSION_GZIP;
+			else if (check_compressed_file(path, &fname, "lz4"))
+				compression_spec.algorithm = PG_COMPRESSION_LZ4;
+			else if (check_compressed_file(path, &fname, "zst"))
+				compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+		}
 	}
 
 	CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 49a7ab91050..15ce45fb9e9 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -439,7 +439,21 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 					 tocfname, line);
 
 		StartRestoreLO(AH, oid, AH->public.ropt->dropSchema);
-		snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+
+		/*
+		 * XXX : Create a helper function for blob files naming common to
+		 * _LoadLOs an _StartLO.
+		 */
+		if (AH->fSpecIsPipe)
+		{
+			pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", lofname);
+			strcpy(path, pipe);
+			pfree(pipe);
+		}
+		else
+		{
+			snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+		}
 		_PrintFileData(AH, path);
 		EndRestoreLO(AH, oid);
 	}
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index c31d262e71a..c657149d658 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -60,11 +60,11 @@ static void usage(const char *progname);
 static void read_restore_filters(const char *filename, RestoreOptions *opts);
 static bool file_exists_in_directory(const char *dir, const char *filename);
 static int	restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
-								 int numWorkers, bool append_data);
-static int	restore_global_objects(const char *inputFileSpec, RestoreOptions *opts);
+								 int numWorkers, bool append_data, bool filespec_is_pipe);
+static int	restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool filespec_is_pipe);
 
 static int	restore_all_databases(const char *inputFileSpec,
-								  SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers);
+								  SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers, bool filespec_is_pipe);
 static int	get_dbnames_list_to_restore(PGconn *conn,
 										SimplePtrList *dbname_oid_list,
 										SimpleStringList db_exclude_patterns);
@@ -93,6 +93,7 @@ main(int argc, char **argv)
 	int			n_errors = 0;
 	bool		globals_only = false;
 	SimpleStringList db_exclude_patterns = {NULL, NULL};
+	bool		filespec_is_pipe = false;
 	static int	disable_triggers = 0;
 	static int	enable_row_security = 0;
 	static int	if_exists = 0;
@@ -173,6 +174,7 @@ main(int argc, char **argv)
 		{"filter", required_argument, NULL, 4},
 		{"restrict-key", required_argument, NULL, 6},
 		{"exclude-database", required_argument, NULL, 7},
+		{"pipe-command", required_argument, NULL, 8},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -356,6 +358,11 @@ main(int argc, char **argv)
 				simple_string_list_append(&db_exclude_patterns, optarg);
 				break;
 
+			case 8:				/* pipe-command */
+				inputFileSpec = pg_strdup(optarg);
+				filespec_is_pipe = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -363,11 +370,29 @@ main(int argc, char **argv)
 		}
 	}
 
-	/* Get file name from command line */
+	/*
+	 * Get file name from command line. Note that filename argument and
+	 * pipe-command can't both be set.
+	 */
 	if (optind < argc)
+	{
+		if (filespec_is_pipe)
+		{
+			pg_log_error_hint("Only one of [filespec, --pipe-command] allowed");
+			exit_nicely(1);
+		}
 		inputFileSpec = argv[optind++];
-	else
+	}
+
+	/*
+	 * Even if the file argument is not provided, if the pipe-command is
+	 * specified, we need to use that as the file arg and not fallback to
+	 * stdio.
+	 */
+	else if (!filespec_is_pipe)
+	{
 		inputFileSpec = NULL;
+	}
 
 	/* Complain if any arguments remain */
 	if (optind < argc)
@@ -594,7 +619,7 @@ main(int argc, char **argv)
 		snprintf(global_path, MAXPGPATH, "%s/toc.glo", inputFileSpec);
 
 		if (!no_globals)
-			n_errors = restore_global_objects(global_path, tmpopts);
+			n_errors = restore_global_objects(global_path, tmpopts, filespec_is_pipe);
 		else
 			pg_log_info("skipping restore of global objects because %s was specified",
 						"--no-globals");
@@ -606,7 +631,7 @@ main(int argc, char **argv)
 		{
 			/* Now restore all the databases from map.dat */
 			n_errors = n_errors + restore_all_databases(inputFileSpec, db_exclude_patterns,
-														opts, numWorkers);
+														opts, numWorkers, filespec_is_pipe);
 		}
 
 		/* Free db pattern list. */
@@ -626,7 +651,7 @@ main(int argc, char **argv)
 					 "-g/--globals-only");
 
 		/* Process if toc.glo file does not exist. */
-		n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false);
+		n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, filespec_is_pipe);
 	}
 
 	/* Done, print a summary of ignored errors during restore. */
@@ -645,7 +670,7 @@ main(int argc, char **argv)
  * This restore all global objects.
  */
 static int
-restore_global_objects(const char *inputFileSpec, RestoreOptions *opts)
+restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool filespec_is_pipe)
 {
 	Archive    *AH;
 	int			nerror = 0;
@@ -654,7 +679,7 @@ restore_global_objects(const char *inputFileSpec, RestoreOptions *opts)
 	opts->format = archCustom;
 	opts->txn_size = 0;
 
-	AH = OpenArchive(inputFileSpec, opts->format, false);
+	AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe);
 
 	SetArchiveOptions(AH, NULL, opts);
 
@@ -691,12 +716,12 @@ restore_global_objects(const char *inputFileSpec, RestoreOptions *opts)
  */
 static int
 restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
-					 int numWorkers, bool append_data)
+					 int numWorkers, bool append_data, bool filespec_is_pipe)
 {
 	Archive    *AH;
 	int			n_errors;
 
-	AH = OpenArchive(inputFileSpec, opts->format, false);
+	AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe);
 
 	SetArchiveOptions(AH, NULL, opts);
 
@@ -1145,7 +1170,7 @@ get_dbname_oid_list_from_mfile(const char *dumpdirpath,
 static int
 restore_all_databases(const char *inputFileSpec,
 					  SimpleStringList db_exclude_patterns, RestoreOptions *opts,
-					  int numWorkers)
+					  int numWorkers, bool filespec_is_pipe)
 {
 	SimplePtrList dbname_oid_list = {NULL, NULL};
 	int			num_db_restore = 0;
@@ -1309,7 +1334,7 @@ restore_all_databases(const char *inputFileSpec,
 		}
 
 		/* Restore the single database. */
-		n_errors = restore_one_database(subdirpath, tmpopts, numWorkers, true);
+		n_errors = restore_one_database(subdirpath, tmpopts, numWorkers, true, filespec_is_pipe);
 
 		n_errors_total += n_errors;
 
-- 
2.54.0.669.g59709faab0-goog



  [application/x-patch] v16-0004-Add-tests-for-pipe.patch (20.1K, 3-v16-0004-Add-tests-for-pipe.patch)
  download | inline diff:
From 76ee3b3c375c32055edff86f6348af007012a922 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Sat, 15 Feb 2025 04:29:17 +0000
Subject: [PATCH v16 4/5] Add tests for pipe

* These tests include the invalid usages of --pipe-command with other flags.

* Also test pg_dump and pg_restore with pipe command along with various other flags.
---
 src/bin/pg_dump/t/001_basic.pl              |  72 ++++-
 src/bin/pg_dump/t/002_pg_dump.pl            | 292 +++++++++++++++++++-
 src/bin/pg_dump/t/004_pg_dump_parallel.pl   |  43 +++
 src/bin/pg_dump/t/005_pg_dump_filterfile.pl |  18 ++
 4 files changed, 415 insertions(+), 10 deletions(-)

diff --git a/src/bin/pg_dump/t/001_basic.pl b/src/bin/pg_dump/t/001_basic.pl
index 687e842cde9..92d47e4fd93 100644
--- a/src/bin/pg_dump/t/001_basic.pl
+++ b/src/bin/pg_dump/t/001_basic.pl
@@ -74,6 +74,48 @@ command_fails_like(
 	'pg_dump: options --statistics-only and --no-statistics cannot be used together'
 );
 
+command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe="cat"', '-f', 'testdir', 'test'],
+	qr/\Qpg_dump: error: options -f\/--file and --pipe cannot be used together\E/,
+	'pg_dump: options -f/--file and --pipe cannot be used together'
+);
+
+command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe="cat"', '-Z', 'gzip', 'test'],
+	qr/\Qpg_dump: error: option --pipe is not supported with any compression type\E/,
+	'pg_dump: option --pipe is not supported with any compression type'
+);
+
+command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe="cat"', '--compress=lz4', 'test'],
+	qr/\Qpg_dump: error: option --pipe is not supported with any compression type\E/,
+	'pg_dump: option --pipe is not supported with any compression type'
+);
+
+command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe="cat"', '--compress=gzip', 'test'],
+	qr/\Qpg_dump: error: option --pipe is not supported with any compression type\E/,
+	'pg_dump: option --pipe is not supported with any compression type'
+);
+
+command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe="cat"', '-Z', '1', 'test'],
+	qr/\Qpg_dump: error: option --pipe is not supported with any compression type\E/,
+	'pg_dump: option --pipe is not supported with any compression type'
+);
+
+command_fails_like(
+	[ 'pg_dump', '-Fc', '--pipe="cat"', 'test'],
+	qr/\Qpg_dump: error: option --pipe is only supported with directory format\E/,
+	'pg_dump: option --pipe is only supported with directory format'
+);
+
+command_fails_like(
+	[ 'pg_dump', '--format=tar', '--pipe="cat"', 'test'],
+	qr/\Qpg_dump: error: option --pipe is only supported with directory format\E/,
+	'pg_dump: option --pipe is only supported with directory format'
+);
+
 command_fails_like(
 	[ 'pg_dump', '-j2', '--include-foreign-data=xxx' ],
 	qr/\Qpg_dump: error: option --include-foreign-data is not supported with parallel backup\E/,
@@ -94,12 +136,38 @@ command_fails_like(
 command_fails_like(
 	[ 'pg_restore', '-d', 'xxx', '-f', 'xxx' ],
 	qr/\Qpg_restore: error: options -d\/--dbname and -f\/--file cannot be used together\E/,
-	'pg_restore: options -d/--dbname and -f/--file cannot be used together');
+	'pg_restore: options -d/--dbname and -f/--file cannot be used together'
+);
+
+command_fails_like(
+	[ 'pg_restore', '-f', '-', '--pipe="cat"', 'dumpdir' ],
+	qr/\Qpg_restore: error: cannot specify both an input file and --pipe\E/,
+	'pg_restore: cannot specify both an input file and --pipe'
+);
+
+command_fails_like(
+	[ 'pg_restore', '-Fd', '-f', '-', '--pipe="cat"', 'dumpdir' ],
+	qr/\Qpg_restore: error: cannot specify both an input file and --pipe\E/,
+	'pg_restore: cannot specify both an input file and --pipe'
+);
+
+command_fails_like(
+	[ 'pg_restore', '-Fc', '-f', '-', '--pipe="cat"' ],
+	qr/\Qpg_restore: error: option --pipe is only supported with directory format\E/,
+	'pg_restore: option --pipe is only supported with directory format'
+);
+
+command_fails_like(
+	[ 'pg_restore', '--format=tar', '-f', '-', '--pipe="cat"' ],
+	qr/\Qpg_restore: error: option --pipe is only supported with directory format\E/,
+	'pg_restore: option --pipe is only supported with directory format'
+);
 
 command_fails_like(
 	[ 'pg_dump', '-c', '-a' ],
 	qr/\Qpg_dump: error: options -c\/--clean and -a\/--data-only cannot be used together\E/,
-	'pg_dump: options -c/--clean and -a/--data-only cannot be used together');
+	'pg_dump: options -c/--clean and -a/--data-only cannot be used together'
+);
 
 command_fails_like(
 	[ 'pg_dumpall', '-c', '-a' ],
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 3ee9fda50e4..51582b3caf7 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -7,8 +7,10 @@ use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
+use File::Spec;
 
 my $tempdir = PostgreSQL::Test::Utils::tempdir;
+$tempdir =~ s!\\!/!g if $PostgreSQL::Test::Utils::windows_os;
 
 ###############################################################
 # Definition of the pg_dump runs to make.
@@ -46,6 +48,69 @@ my $tempdir = PostgreSQL::Test::Utils::tempdir;
 my $supports_icu = ($ENV{with_icu} eq 'yes');
 my $supports_gzip = check_pg_config("#define HAVE_LIBZ 1");
 
+# Use perl one-liner as a portable 'cat' replacement for Windows compatibility.
+# On Windows, perl opens file handles in text mode by default, which corrupts
+# binary archive data by translating newlines and interpreting EOF characters.
+# We use -Mopen=IO,:raw to force raw binary mode. We use -pe 1 instead of
+# -pe '' to avoid shell quoting issues with empty strings on Windows cmd.exe.
+my $perlbin = $^X;
+$perlbin =~ s!\\!/!g if $PostgreSQL::Test::Utils::windows_os;
+my $perl_cat = "\"$perlbin\" -Mopen=IO,:raw -pe 1";
+
+# Check for external gzip program for pipe tests.
+my $gzip_path = $ENV{GZIP_PROGRAM} || 'gzip';
+my $gzip_bin = "\"$gzip_path\"";
+my $has_gzip_bin =
+  (system("$gzip_bin --version >" . File::Spec->devnull() . " 2>&1") == 0);
+
+# Pre-calculate complex pipe commands to keep the test definitions readable
+# and ensure unified --pipe=... syntax for Windows stability.
+# On Windows, we use double-layer quoting: internal quotes for paths with
+# spaces, and an outer set of escaped quotes to protect shell operators
+# like | and >. On other platforms, we avoid the outer wrap to satisfy /bin/sh.
+my $is_win = $PostgreSQL::Test::Utils::windows_os;
+
+my $raw_pipe_defaults_dir = "$perl_cat > \"$tempdir/defaults_dir_format/%f\"";
+my $raw_pipe_defaults_res = "$perl_cat \"$tempdir/defaults_dir_format/%f\"";
+my $raw_pipe_cross_dump = "$perl_cat > \"$tempdir/pipe_cross_dump/%f\"";
+my $raw_pipe_cross_restore = ($supports_gzip && !$is_win)
+  ? "if [ -f \"$tempdir/pipe_cross_restore/%f.gz\" ]; then $gzip_bin -d -c \"$tempdir/pipe_cross_restore/%f.gz\"; else $perl_cat \"$tempdir/pipe_cross_restore/%f\"; fi"
+  : "$perl_cat \"$tempdir/pipe_cross_restore/%f\"";
+my $raw_pipe_parallel_out = "$perl_cat > \"$tempdir/pipe_out_dir_parallel/%f\"";
+my $raw_pipe_parallel_in = "$perl_cat \"$tempdir/pipe_out_dir_parallel/%f\"";
+my $raw_pipe_parallel_8_out = "$perl_cat > \"$tempdir/pipe_out_dir_parallel_8/%f\"";
+my $raw_pipe_parallel_8_in = "$perl_cat \"$tempdir/pipe_out_dir_parallel_8/%f\"";
+my $raw_pipe_complex_out = "$gzip_bin | $perl_cat > \"$tempdir/pipe_out_dir_complex/%f.gz\"";
+my $raw_pipe_complex_in = "$perl_cat \"$tempdir/pipe_out_dir_complex/%f.gz\" | $gzip_bin -d";
+my $raw_pipe_lo_out = "$perl_cat > \"$tempdir/pipe_out_dir_lo/%f\"";
+my $raw_pipe_lo_in = "$perl_cat \"$tempdir/pipe_out_dir_lo/%f\"";
+my $raw_pipe_schema_out = "$perl_cat > \"$tempdir/schema_only_pipe_dir/%f\"";
+my $raw_pipe_schema_in = "$perl_cat \"$tempdir/schema_only_pipe_dir/%f\"";
+
+my $pipe_defaults_dir = $is_win ? "\"$raw_pipe_defaults_dir\"" : $raw_pipe_defaults_dir;
+my $pipe_defaults_res = $is_win ? "\"$raw_pipe_defaults_res\"" : $raw_pipe_defaults_res;
+my $pipe_cross_dump = $is_win ? "\"$raw_pipe_cross_dump\"" : $raw_pipe_cross_dump;
+my $pipe_cross_restore = $is_win ? "\"$raw_pipe_cross_restore\"" : $raw_pipe_cross_restore;
+my $pipe_parallel_out = $is_win ? "\"$raw_pipe_parallel_out\"" : $raw_pipe_parallel_out;
+my $pipe_parallel_in = $is_win ? "\"$raw_pipe_parallel_in\"" : $raw_pipe_parallel_in;
+my $pipe_parallel_8_out = $is_win ? "\"$raw_pipe_parallel_8_out\"" : $raw_pipe_parallel_8_out;
+my $pipe_parallel_8_in = $is_win ? "\"$raw_pipe_parallel_8_in\"" : $raw_pipe_parallel_8_in;
+my $pipe_complex_out = $is_win ? "\"$raw_pipe_complex_out\"" : $raw_pipe_complex_out;
+my $pipe_complex_in = $is_win ? "\"$raw_pipe_complex_in\"" : $raw_pipe_complex_in;
+my $pipe_lo_out = $is_win ? "\"$raw_pipe_lo_out\"" : $raw_pipe_lo_out;
+my $pipe_lo_in = $is_win ? "\"$raw_pipe_lo_in\"" : $raw_pipe_lo_in;
+my $pipe_schema_out = $is_win ? "\"$raw_pipe_schema_out\"" : $raw_pipe_schema_out;
+my $pipe_schema_in = $is_win ? "\"$raw_pipe_schema_in\"" : $raw_pipe_schema_in;
+
+# Create output directories for pipe tests
+mkdir "$tempdir/pipe_out_dir_parallel";
+mkdir "$tempdir/pipe_out_dir_parallel_8";
+mkdir "$tempdir/pipe_out_dir_complex";
+mkdir "$tempdir/pipe_out_dir_lo";
+mkdir "$tempdir/pipe_cross_dump";
+mkdir "$tempdir/pipe_cross_restore";
+mkdir "$tempdir/schema_only_pipe_dir";
+
 my %pgdump_runs = (
 	binary_upgrade => {
 		dump_cmd => [
@@ -223,6 +288,139 @@ my %pgdump_runs = (
 		],
 	},
 
+	defaults_dir_format_pipe => {
+		test_key => 'defaults',
+		dump_cmd => [
+			'pg_dump',
+			'--format' => 'directory',
+			"--pipe=$pipe_defaults_dir",
+			'--statistics',
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			'--format' => 'directory',
+			'--file' => "$tempdir/defaults_dir_format_pipe.sql",
+			"--pipe=$pipe_defaults_res",
+			'--statistics',
+		],
+	},
+
+	defaults_dir_format_pipe_dump_only => {
+		test_key => 'defaults',
+		dump_cmd => [
+			'pg_dump',
+			'--format' => 'directory',
+			"--pipe=$pipe_cross_dump",
+			'--statistics',
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			'--format' => 'directory',
+			'--file' => "$tempdir/defaults_dir_format_pipe_dump_only.sql",
+			'--statistics',
+			"$tempdir/pipe_cross_dump",
+		],
+	},
+
+	defaults_dir_format_pipe_restore_only => {
+		test_key => 'defaults',
+		dump_cmd => [
+			'pg_dump',
+			'--format' => 'directory',
+			'--file' => "$tempdir/pipe_cross_restore",
+			'--statistics',
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			'--format' => 'directory',
+			'--file' => "$tempdir/defaults_dir_format_pipe_restore_only.sql",
+			"--pipe=$pipe_cross_restore",
+			'--statistics',
+		],
+	},
+
+	defaults_parallel_pipe => {
+		test_key => 'defaults',
+		dump_cmd => [
+			'pg_dump',
+			'--format' => 'directory',
+			'--jobs' => 2,
+			"--pipe=$pipe_parallel_out",
+			'--statistics',
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			'--format' => 'directory',
+			'--file' => "$tempdir/defaults_parallel_pipe.sql",
+			"--pipe=$pipe_parallel_in",
+			'--statistics',
+		],
+	},
+
+	defaults_parallel_8_pipe => {
+		test_key => 'defaults',
+		dump_cmd => [
+			'pg_dump',
+			'--format' => 'directory',
+			'--jobs' => 8,
+			"--pipe=$pipe_parallel_8_out",
+			'--statistics',
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			'--format' => 'directory',
+			'--file' => "$tempdir/defaults_parallel_8_pipe.sql",
+			"--pipe=$pipe_parallel_8_in",
+			'--statistics',
+		],
+	},
+
+	defaults_complex_pipe => {
+		test_key => 'defaults',
+		skip_unless => \$has_gzip_bin,
+		dump_cmd => [
+			'pg_dump',
+			'--format' => 'directory',
+			"--pipe=$pipe_complex_out",
+			'--statistics',
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			'--format' => 'directory',
+			'--file' => "$tempdir/defaults_complex_pipe.sql",
+			"--pipe=$pipe_complex_in",
+			'--statistics',
+		],
+	},
+
+	defaults_lo_pipe => {
+		test_key => 'defaults',
+		dump_cmd => [
+			'pg_dump',
+			'--format' => 'directory',
+			'--statistics',
+			"--pipe=$pipe_lo_out",
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			'--format' => 'directory',
+			'--file' => "$tempdir/defaults_lo_pipe.sql",
+			'--statistics',
+			"--pipe=$pipe_lo_in",
+		],
+		glob_patterns => [
+			"$tempdir/pipe_out_dir_lo/toc.dat",
+			"$tempdir/pipe_out_dir_lo/blobs_*.toc",
+		],
+	},
+
 	# Do not use --no-sync to give test coverage for data sync.
 	defaults_parallel => {
 		test_key => 'defaults',
@@ -527,6 +725,22 @@ my %pgdump_runs = (
 			'postgres',
 		],
 	},
+	schema_only_pipe => {
+		test_key => 'schema_only',
+		dump_cmd => [
+			'pg_dump', '--no-sync',
+			'--format' => 'directory',
+			'--schema-only',
+			"--pipe=$pipe_schema_out",
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			'--format' => 'directory',
+			'--file' => "$tempdir/schema_only_pipe.sql",
+			"--pipe=$pipe_schema_in",
+		],
+	},
 	section_pre_data => {
 		dump_cmd => [
 			'pg_dump', '--no-sync',
@@ -5212,25 +5426,24 @@ command_fails_like(
 #########################################
 # Run all runs
 
+
 foreach my $run (sort keys %pgdump_runs)
 {
 	my $test_key = $run;
-	my $run_db = 'postgres';
+	my $run_db   = 'postgres';
 
 	$node->command_ok(\@{ $pgdump_runs{$run}->{dump_cmd} },
 		"$run: pg_dump runs");
 
 	if ($pgdump_runs{$run}->{glob_patterns})
 	{
-		my $glob_patterns = $pgdump_runs{$run}->{glob_patterns};
-		foreach my $glob_pattern (@{$glob_patterns})
+		foreach my $glob_pattern (@{ $pgdump_runs{$run}->{glob_patterns} })
 		{
-			my @glob_output = glob($glob_pattern);
 			my $ok = 0;
-			# certainly found some files if glob() returned multiple matches
-			$ok = 1 if (scalar(@glob_output) > 1);
-			# if just one match, we need to check if it's real
-			$ok = 1 if (scalar(@glob_output) == 1 && -f $glob_output[0]);
+			foreach my $file (glob("$glob_pattern"))
+			{
+				$ok = 1 if -e $file;
+			}
 			is($ok, 1, "$run: glob check for $glob_pattern");
 		}
 	}
@@ -5334,6 +5547,69 @@ foreach my $run (sort keys %pgdump_runs)
 	}
 }
 
+#########################################
+# Test error reporting for a failing pipe command.
+# We use a perl one-liner that exits with 1 after processing input.
+# This ensures we test the error handling in pclose() at the end of the dump,
+# verifying that the child's exit status is correctly captured and reported.
+my $failing_perl_cat = "\"$perlbin\" -Mopen=IO,:raw -pe \"END { exit 1 }\"";
+
+$node->command_fails_like(
+	[ 'pg_dump', '-Fd', $is_win ? "--pipe=\"$failing_perl_cat > \\\"%f\\\"\"" : "--pipe=$failing_perl_cat > \"%f\"", 'postgres' ],
+	qr/pipe command failed/,
+	'pg_dump pipe command error reporting'
+);
+
+$node->command_fails_like(
+	[ 'pg_restore', '-Fd', '-l', $is_win ? "--pipe=\"$failing_perl_cat \\\"$tempdir/pipe_cross_dump/%f\\\"\"" : "--pipe=$failing_perl_cat \"$tempdir/pipe_cross_dump/%f\"", ],
+	qr/pipe command failed/,
+	'pg_restore pipe command error reporting'
+);
+
+# Targeted Edge Case Tests
+$node->command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe=/nonexistent/binary', 'postgres' ],
+	qr/could not write to file: (?:Broken pipe|The pipe has been ended)|Permission denied/,
+	'pg_dump early pipe command execution failure'
+);
+
+$node->command_fails_like(
+	[ 'pg_dump', '-Fd', '--pipe=no_such_command_at_all', 'postgres' ],
+	qr/could not write to file: (?:Broken pipe|The pipe has been ended)|not found|not recognized/,
+	'pg_dump command not found error reporting'
+);
+
+$node->command_fails_like(
+	[ 'pg_dump', '-Fd', '-f', '-', $is_win ? "--pipe=\"$perl_cat > \\\"%f\\\"\"" : "--pipe=$perl_cat > \"%f\"", 'postgres' ],
+	qr/options -f\/--file and --pipe cannot be used together/,
+	'pg_dump options -f/--file and --pipe conflict check'
+);
+
+# Test that pg_restore rejects a positional argument when --pipe is used.
+# We create a dummy cluster archive (containing toc.glo) to verify that
+# even in cluster mode, the mutual exclusivity holds.
+mkdir "$tempdir/dummy_cluster_archive";
+open my $fh, '>', "$tempdir/dummy_cluster_archive/toc.glo";
+close $fh;
+
+$node->command_fails_like(
+	[ 'pg_restore', '-Fd', '-l', $is_win ? "--pipe=\"$perl_cat \\\"%f\\\"\"" : "--pipe=$perl_cat \"%f\"", "$tempdir/dummy_cluster_archive" ],
+	qr/cannot specify both an input file and --pipe/,
+	'pg_restore --pipe rejects positional argument even for cluster archive'
+);
+
+# Test that pg_dump --pipe bypasses local directory existence check.
+# We use a pipe command that writes to a subdirectory that hasn't been created.
+# The dump itself will fail when the pipe command tries to write to the
+# non-existent directory, but the error should come from the pipe command/write
+# failure, not from pg_dump's directory initialization.
+my $remote_dir = "$tempdir/non_existent_remote_dir";
+$node->command_fails_like(
+	[ 'pg_dump', '-Fd', $is_win ? "--pipe=\"$perl_cat > \\\"$remote_dir/%f\\\"\"" : "--pipe=$perl_cat > \"$remote_dir/%f\"", 'postgres' ],
+	qr/could not write to file: (?:Broken pipe|The pipe has been ended)|pipe command failed/,
+	'pg_dump --pipe bypasses local directory existence check'
+);
+
 #########################################
 # Stop the database instance, which will be removed at the end of the tests.
 
diff --git a/src/bin/pg_dump/t/004_pg_dump_parallel.pl b/src/bin/pg_dump/t/004_pg_dump_parallel.pl
index 738f34b1c1b..63cd3ba016d 100644
--- a/src/bin/pg_dump/t/004_pg_dump_parallel.pl
+++ b/src/bin/pg_dump/t/004_pg_dump_parallel.pl
@@ -8,19 +8,31 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+# Use perl one-liner as a portable 'cat' replacement for Windows compatibility.
+# On Windows, perl opens file handles in text mode by default, which corrupts
+# binary archive data by translating newlines and interpreting EOF characters.
+# We use -Mopen=IO,:raw to force raw binary mode. We use -pe 1 instead of
+# -pe '' to avoid shell quoting issues with empty strings on Windows cmd.exe.
+my $perlbin = $^X;
+$perlbin =~ s!\\!/!g if $PostgreSQL::Test::Utils::windows_os;
+my $perl_cat = "\"$perlbin\" -Mopen=IO,:raw -pe 1";
+
 my $dbname1 = 'regression_src';
 my $dbname2 = 'regression_dest1';
 my $dbname3 = 'regression_dest2';
+my $dbname4 = 'regression_dest3';
 
 my $node = PostgreSQL::Test::Cluster->new('main');
 $node->init;
 $node->start;
 
 my $backupdir = $node->backup_dir;
+$backupdir =~ s!\\!/!g if $PostgreSQL::Test::Utils::windows_os;
 
 $node->run_log([ 'createdb', $dbname1 ]);
 $node->run_log([ 'createdb', $dbname2 ]);
 $node->run_log([ 'createdb', $dbname3 ]);
+$node->run_log([ 'createdb', $dbname4 ]);
 
 $node->safe_psql(
 	$dbname1,
@@ -87,4 +99,35 @@ $node->command_ok(
 	],
 	'parallel restore as inserts');
 
+mkdir "$backupdir/dump_pipe";
+
+# Pre-calculate pipe commands for readability and unified syntax.
+# Use double-layer quoting only on Windows to protect shell operators.
+my $is_win = $PostgreSQL::Test::Utils::windows_os;
+my $raw_pipe_dump = "$perl_cat > \"$backupdir/dump_pipe/%f\"";
+my $raw_pipe_restore = "$perl_cat \"$backupdir/dump_pipe/%f\"";
+
+my $pipe_dump = $is_win ? "\"$raw_pipe_dump\"" : $raw_pipe_dump;
+my $pipe_restore = $is_win ? "\"$raw_pipe_restore\"" : $raw_pipe_restore;
+
+$node->command_ok(
+	[
+		'pg_dump',
+		'--format' => 'directory',
+		'--no-sync',
+		'--jobs' => 2,
+		"--pipe=$pipe_dump",
+		$node->connstr($dbname1),
+	],
+	'parallel dump with pipe');
+
+$node->command_ok(
+	[
+		'pg_restore', '--verbose',
+		'--dbname' => $node->connstr($dbname4),
+		'--format' => 'directory',
+		'--jobs' => 3,
+		"--pipe=$pipe_restore",
+	],
+	'parallel restore with pipe');
 done_testing();
diff --git a/src/bin/pg_dump/t/005_pg_dump_filterfile.pl b/src/bin/pg_dump/t/005_pg_dump_filterfile.pl
index cecf0442088..f4b1afd8ef6 100644
--- a/src/bin/pg_dump/t/005_pg_dump_filterfile.pl
+++ b/src/bin/pg_dump/t/005_pg_dump_filterfile.pl
@@ -8,6 +8,11 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+# Use perl one-liner as a portable 'cat' replacement for Windows compatibility.
+my $perlbin = $^X;
+$perlbin =~ s!\\!/!g if $PostgreSQL::Test::Utils::windows_os;
+my $perl_cat = "$perlbin -pe ''";
+
 my $tempdir = PostgreSQL::Test::Utils::tempdir;
 my $inputfile;
 
@@ -98,6 +103,19 @@ command_ok(
 	],
 	"filter file without patterns");
 
+mkdir "$backupdir/dump_pipe_filter";
+
+command_ok(
+	[
+		'pg_dump',
+		'--port' => $port,
+		'--format' => 'directory',
+		'--pipe' => "$perl_cat > $backupdir/dump_pipe_filter/%f",
+		'--filter' => "$tempdir/inputfile.txt",
+		'postgres'
+	],
+	"filter file without patterns with pipe");
+
 my $dump = slurp_file($plainfile);
 
 like($dump, qr/^CREATE TABLE public\.table_one/m, "table one dumped");
-- 
2.54.0.669.g59709faab0-goog



  [application/x-patch] v16-0001-Add-pipe-command-support-for-directory-mode-of-p.patch (31.6K, 4-v16-0001-Add-pipe-command-support-for-directory-mode-of-p.patch)
  download | inline diff:
From d17b8e90c1b17e5d9b6497c8034996370e02c3c0 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Tue, 11 Feb 2025 08:31:02 +0000
Subject: [PATCH v16 1/5] Add pipe-command support for directory mode of
 pg_dump

* We add a new flag --pipe-command which can be used in directory
  mode. This allows us to support multiple streams and we can
  do post processing like compression, filtering etc. This is
  currently not possible with directory-archive format.
* Currently this flag is only supported with compression none
  and archive format directory.
* This flag can't be used with the flag --file. Only one of the
  two flags can be used at a time.
* We reuse the filename field for the --pipe-command also. And add a
  bool to specify that the field will be used as a pipe command.
* Most of the code remains as it is. The core change is that
  in case of --pipe-command, instead of fopen we do popen.
* The user would need a way to store the post-processing output
  in files. For that we support the same format as the directory
  mode currently does with the flag --file. We allow the user
  to add a format specifier %f to the --pipe-command. And for each
  stream, the format specifier is replaced with the corresponding
  file name. This file name is the same as it would have been if
  the flag --file had been used.
* To enable the above, there are a few places in the code where
  we change the file name creation logic. Currently the file name
  is appended to the directory name which is provided with --file flag.
  In case of --pipe-command, we instead replace %f with the file name.
  This change is made for the common use case and separately for
  blob files.
* There is an open question on what mode to use in case of large objects
  TOC file. Currently the code uses "ab" but that won't work for popen.
  We have proposed a few options in the comments regarding this. For the
  time being we are using mode PG_BINARY_W for the pipe use case.
---
 src/bin/pg_dump/compress_gzip.c       |   9 ++-
 src/bin/pg_dump/compress_gzip.h       |   3 +-
 src/bin/pg_dump/compress_io.c         |  26 +++++--
 src/bin/pg_dump/compress_io.h         |  11 ++-
 src/bin/pg_dump/compress_lz4.c        |  11 ++-
 src/bin/pg_dump/compress_lz4.h        |   3 +-
 src/bin/pg_dump/compress_none.c       |  25 ++++++-
 src/bin/pg_dump/compress_none.h       |   3 +-
 src/bin/pg_dump/compress_zstd.c       |  10 ++-
 src/bin/pg_dump/compress_zstd.h       |   3 +-
 src/bin/pg_dump/pg_backup.h           |   5 +-
 src/bin/pg_dump/pg_backup_archiver.c  |  22 +++---
 src/bin/pg_dump/pg_backup_archiver.h  |   2 +
 src/bin/pg_dump/pg_backup_directory.c | 103 +++++++++++++++++++++-----
 src/bin/pg_dump/pg_dump.c             |  37 ++++++++-
 src/bin/pg_dump/pg_dumpall.c          |   2 +-
 src/bin/pg_dump/pg_restore.c          |   6 +-
 17 files changed, 222 insertions(+), 59 deletions(-)

diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c
index 60c553ba25a..0ce15847d9a 100644
--- a/src/bin/pg_dump/compress_gzip.c
+++ b/src/bin/pg_dump/compress_gzip.c
@@ -429,8 +429,12 @@ Gzip_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
 
 void
 InitCompressFileHandleGzip(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
+	if (path_is_pipe_command)
+		pg_fatal("cPipe command not supported for Gzip");
+
 	CFH->open_func = Gzip_open;
 	CFH->open_write_func = Gzip_open_write;
 	CFH->read_func = Gzip_read;
@@ -455,7 +459,8 @@ InitCompressorGzip(CompressorState *cs,
 
 void
 InitCompressFileHandleGzip(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
 	pg_fatal("this build does not support compression with %s", "gzip");
 }
diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h
index af1a2a3445e..f77c5c86c56 100644
--- a/src/bin/pg_dump/compress_gzip.h
+++ b/src/bin/pg_dump/compress_gzip.h
@@ -19,6 +19,7 @@
 extern void InitCompressorGzip(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleGzip(CompressFileHandle *CFH,
-									   const pg_compress_specification compression_spec);
+									   const pg_compress_specification compression_spec,
+									   bool path_is_pipe_command);
 
 #endif							/* _COMPRESS_GZIP_H_ */
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 52652b0d979..bc521dd274b 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -191,20 +191,29 @@ free_keep_errno(void *p)
  * Initialize a compress file handle for the specified compression algorithm.
  */
 CompressFileHandle *
-InitCompressFileHandle(const pg_compress_specification compression_spec)
+InitCompressFileHandle(const pg_compress_specification compression_spec,
+					   bool path_is_pipe_command)
 {
 	CompressFileHandle *CFH;
 
 	CFH = pg_malloc0_object(CompressFileHandle);
 
-	if (compression_spec.algorithm == PG_COMPRESSION_NONE)
-		InitCompressFileHandleNone(CFH, compression_spec);
+	/*
+	 * Always set to non-compressed when path_is_pipe_command assuming that
+	 * external compressor as part of pipe is more efficient. Can review in
+	 * the future.
+	 */
+	if (path_is_pipe_command)
+		InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
+
+	else if (compression_spec.algorithm == PG_COMPRESSION_NONE)
+		InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
 	else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
-		InitCompressFileHandleGzip(CFH, compression_spec);
+		InitCompressFileHandleGzip(CFH, compression_spec, path_is_pipe_command);
 	else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
-		InitCompressFileHandleLZ4(CFH, compression_spec);
+		InitCompressFileHandleLZ4(CFH, compression_spec, path_is_pipe_command);
 	else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
-		InitCompressFileHandleZstd(CFH, compression_spec);
+		InitCompressFileHandleZstd(CFH, compression_spec, path_is_pipe_command);
 
 	return CFH;
 }
@@ -237,7 +246,8 @@ check_compressed_file(const char *path, char **fname, char *ext)
  * On failure, return NULL with an error code in errno.
  */
 CompressFileHandle *
-InitDiscoverCompressFileHandle(const char *path, const char *mode)
+InitDiscoverCompressFileHandle(const char *path, const char *mode,
+							   bool path_is_pipe_command)
 {
 	CompressFileHandle *CFH = NULL;
 	struct stat st;
@@ -268,7 +278,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
 			compression_spec.algorithm = PG_COMPRESSION_ZSTD;
 	}
 
-	CFH = InitCompressFileHandle(compression_spec);
+	CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);
 	errno = 0;
 	if (!CFH->open_func(fname, -1, mode, CFH))
 	{
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index ed7b14f0963..bd0fc2634dc 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -186,6 +186,11 @@ struct CompressFileHandle
 	 */
 	pg_compress_specification compression_spec;
 
+	/*
+	 * Compression specification for this file handle.
+	 */
+	bool		path_is_pipe_command;
+
 	/*
 	 * Private data to be used by the compressor.
 	 */
@@ -195,7 +200,8 @@ struct CompressFileHandle
 /*
  * Initialize a compress file handle with the requested compression.
  */
-extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec);
+extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec,
+												  bool path_is_pipe_command);
 
 /*
  * Initialize a compress file stream. Infer the compression algorithm
@@ -203,6 +209,7 @@ extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specificatio
  * suffixes in 'path'.
  */
 extern CompressFileHandle *InitDiscoverCompressFileHandle(const char *path,
-														  const char *mode);
+														  const char *mode,
+														  bool path_is_pipe_command);
 extern bool EndCompressFileHandle(CompressFileHandle *CFH);
 #endif
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index 0a7872116e7..2bc4c37c5db 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -766,10 +766,14 @@ LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH
  */
 void
 InitCompressFileHandleLZ4(CompressFileHandle *CFH,
-						  const pg_compress_specification compression_spec)
+						  const pg_compress_specification compression_spec,
+						  bool path_is_pipe_command)
 {
 	LZ4State   *state;
 
+	if (path_is_pipe_command)
+		pg_fatal("Pipe command not supported for LZ4");
+
 	CFH->open_func = LZ4Stream_open;
 	CFH->open_write_func = LZ4Stream_open_write;
 	CFH->read_func = LZ4Stream_read;
@@ -785,6 +789,8 @@ InitCompressFileHandleLZ4(CompressFileHandle *CFH,
 	if (CFH->compression_spec.level >= 0)
 		state->prefs.compressionLevel = CFH->compression_spec.level;
 
+	CFH->path_is_pipe_command = path_is_pipe_command;
+
 	CFH->private_data = state;
 }
 #else							/* USE_LZ4 */
@@ -797,7 +803,8 @@ InitCompressorLZ4(CompressorState *cs,
 
 void
 InitCompressFileHandleLZ4(CompressFileHandle *CFH,
-						  const pg_compress_specification compression_spec)
+						  const pg_compress_specification compression_spec,
+						  bool path_is_pipe_command)
 {
 	pg_fatal("this build does not support compression with %s", "LZ4");
 }
diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h
index 7360a469fc0..490141ee8a1 100644
--- a/src/bin/pg_dump/compress_lz4.h
+++ b/src/bin/pg_dump/compress_lz4.h
@@ -19,6 +19,7 @@
 extern void InitCompressorLZ4(CompressorState *cs,
 							  const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH,
-									  const pg_compress_specification compression_spec);
+									  const pg_compress_specification compression_spec,
+									  bool path_is_pipe_command);
 
 #endif							/* _COMPRESS_LZ4_H_ */
diff --git a/src/bin/pg_dump/compress_none.c b/src/bin/pg_dump/compress_none.c
index 743e2ce94b5..4cf02843185 100644
--- a/src/bin/pg_dump/compress_none.c
+++ b/src/bin/pg_dump/compress_none.c
@@ -211,7 +211,10 @@ close_none(CompressFileHandle *CFH)
 	if (fp)
 	{
 		errno = 0;
-		ret = fclose(fp);
+		if (CFH->path_is_pipe_command)
+			ret = pclose(fp);
+		else
+			ret = fclose(fp);
 		if (ret != 0)
 			pg_log_error("could not close file: %m");
 	}
@@ -245,7 +248,11 @@ open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
 	}
 	else
 	{
-		CFH->private_data = fopen(path, mode);
+		if (CFH->path_is_pipe_command)
+			CFH->private_data = popen(path, mode);
+		else
+			CFH->private_data = fopen(path, mode);
+
 		if (CFH->private_data == NULL)
 			return false;
 	}
@@ -258,7 +265,14 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
 {
 	Assert(CFH->private_data == NULL);
 
-	CFH->private_data = fopen(path, mode);
+	pg_log_debug("Opening %s, pipe is %s",
+				 path, CFH->path_is_pipe_command ? "true" : "false");
+
+	if (CFH->path_is_pipe_command)
+		CFH->private_data = popen(path, mode);
+	else
+		CFH->private_data = fopen(path, mode);
+
 	if (CFH->private_data == NULL)
 		return false;
 
@@ -271,7 +285,8 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
 
 void
 InitCompressFileHandleNone(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
 	CFH->open_func = open_none;
 	CFH->open_write_func = open_write_none;
@@ -283,5 +298,7 @@ InitCompressFileHandleNone(CompressFileHandle *CFH,
 	CFH->eof_func = eof_none;
 	CFH->get_error_func = get_error_none;
 
+	CFH->path_is_pipe_command = path_is_pipe_command;
+
 	CFH->private_data = NULL;
 }
diff --git a/src/bin/pg_dump/compress_none.h b/src/bin/pg_dump/compress_none.h
index 5134f012ee9..d898a2d411c 100644
--- a/src/bin/pg_dump/compress_none.h
+++ b/src/bin/pg_dump/compress_none.h
@@ -19,6 +19,7 @@
 extern void InitCompressorNone(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleNone(CompressFileHandle *CFH,
-									   const pg_compress_specification compression_spec);
+									   const pg_compress_specification compression_spec,
+									   bool path_is_pipe_command);
 
 #endif							/* _COMPRESS_NONE_H_ */
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index 68f1d815917..e4830d35ec0 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -27,7 +27,8 @@ InitCompressorZstd(CompressorState *cs, const pg_compress_specification compress
 }
 
 void
-InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
 	pg_fatal("this build does not support compression with %s", "ZSTD");
 }
@@ -574,8 +575,12 @@ Zstd_get_error(CompressFileHandle *CFH)
 
 void
 InitCompressFileHandleZstd(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
+	if (path_is_pipe_command)
+		pg_fatal("Pipe command not supported for Zstd");
+
 	CFH->open_func = Zstd_open;
 	CFH->open_write_func = Zstd_open_write;
 	CFH->read_func = Zstd_read;
@@ -587,6 +592,7 @@ InitCompressFileHandleZstd(CompressFileHandle *CFH,
 	CFH->get_error_func = Zstd_get_error;
 
 	CFH->compression_spec = compression_spec;
+	CFH->path_is_pipe_command = path_is_pipe_command;
 
 	CFH->private_data = NULL;
 }
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
index 1222d7107d9..1f23e7266bf 100644
--- a/src/bin/pg_dump/compress_zstd.h
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -20,6 +20,7 @@
 extern void InitCompressorZstd(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
-									   const pg_compress_specification compression_spec);
+									   const pg_compress_specification compression_spec,
+									   bool path_is_pipe_command);
 
 #endif							/* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 28e7ff6fa16..549703af622 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -316,14 +316,15 @@ extern void ProcessArchiveRestoreOptions(Archive *AHX);
 extern void RestoreArchive(Archive *AHX, bool append_data);
 
 /* Open an existing archive */
-extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
+extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe);
 
 /* Create a new archive */
 extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 							  const pg_compress_specification compression_spec,
 							  bool dosync, ArchiveMode mode,
 							  SetupWorkerPtrType setupDumpWorker,
-							  DataDirSyncMethod sync_method);
+							  DataDirSyncMethod sync_method,
+							  bool FileSpecIsPipe);
 
 /* The --list option */
 extern void PrintTOCSummary(Archive *AHX);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 2fd773ad84f..4b6bb7b8a14 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -57,7 +57,7 @@ static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 							   const pg_compress_specification compression_spec,
 							   bool dosync, ArchiveMode mode,
 							   SetupWorkerPtrType setupWorkerPtr,
-							   DataDirSyncMethod sync_method);
+							   DataDirSyncMethod sync_method, bool FileSpecIsPipe);
 static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
 static void _doSetFixedOutputState(ArchiveHandle *AH);
@@ -233,11 +233,12 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 			  const pg_compress_specification compression_spec,
 			  bool dosync, ArchiveMode mode,
 			  SetupWorkerPtrType setupDumpWorker,
-			  DataDirSyncMethod sync_method)
+			  DataDirSyncMethod sync_method,
+			  bool FileSpecIsPipe)
 
 {
 	ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
-								 dosync, mode, setupDumpWorker, sync_method);
+								 dosync, mode, setupDumpWorker, sync_method, FileSpecIsPipe);
 
 	return (Archive *) AH;
 }
@@ -245,7 +246,7 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 /* Open an existing archive */
 /* Public */
 Archive *
-OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
+OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe)
 {
 	ArchiveHandle *AH;
 	pg_compress_specification compression_spec = {0};
@@ -253,7 +254,7 @@ OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
 	compression_spec.algorithm = PG_COMPRESSION_NONE;
 	AH = _allocAH(FileSpec, fmt, compression_spec, true,
 				  archModeRead, setupRestoreWorker,
-				  DATA_DIR_SYNC_METHOD_FSYNC);
+				  DATA_DIR_SYNC_METHOD_FSYNC, FileSpecIsPipe);
 
 	return (Archive *) AH;
 }
@@ -1743,7 +1744,7 @@ SetOutput(ArchiveHandle *AH, const char *filename,
 	else
 		mode = PG_BINARY_W;
 
-	CFH = InitCompressFileHandle(compression_spec);
+	CFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
 
 	if (!CFH->open_func(filename, fn, mode, CFH))
 	{
@@ -2399,7 +2400,8 @@ static ArchiveHandle *
 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 		 const pg_compress_specification compression_spec,
 		 bool dosync, ArchiveMode mode,
-		 SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
+		 SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method,
+		 bool FileSpecIsPipe)
 {
 	ArchiveHandle *AH;
 	CompressFileHandle *CFH;
@@ -2440,6 +2442,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 	else
 		AH->fSpec = NULL;
 
+	AH->fSpecIsPipe = FileSpecIsPipe;
+
 	AH->currUser = NULL;		/* unknown */
 	AH->currSchema = NULL;		/* ditto */
 	AH->currTablespace = NULL;	/* ditto */
@@ -2452,14 +2456,14 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 
 	AH->mode = mode;
 	AH->compression_spec = compression_spec;
-	AH->dosync = dosync;
+	AH->dosync = FileSpecIsPipe ? false : dosync;
 	AH->sync_method = sync_method;
 
 	memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
 
 	/* Open stdout with no compression for AH output handle */
 	out_compress_spec.algorithm = PG_COMPRESSION_NONE;
-	CFH = InitCompressFileHandle(out_compress_spec);
+	CFH = InitCompressFileHandle(out_compress_spec, AH->fSpecIsPipe);
 	if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
 		pg_fatal("could not open stdout for appending: %m");
 	AH->OF = CFH;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 1218bf6a6a1..cdc12a54f5e 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -301,6 +301,8 @@ struct _archiveHandle
 	int			loCount;		/* # of LOs restored */
 
 	char	   *fSpec;			/* Archive File Spec */
+	bool		fSpecIsPipe;	/* fSpec is a pipe command template requiring
+								 * replacing %f with file name */
 	FILE	   *FH;				/* General purpose file handle */
 	void	   *OF;				/* Output file */
 
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 562868cd2ad..49a7ab91050 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -39,7 +39,8 @@
 #include <dirent.h>
 #include <sys/stat.h>
 
-#include "common/file_utils.h"
+/* #include "common/file_utils.h" */
+#include "common/percentrepl.h"
 #include "compress_io.h"
 #include "dumputils.h"
 #include "parallel.h"
@@ -157,8 +158,11 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 	if (AH->mode == archModeWrite)
 	{
-		/* we accept an empty existing directory */
-		create_or_open_dir(ctx->directory);
+		if (!AH->fSpecIsPipe)	/* no checks for pipe */
+		{
+			/* we accept an empty existing directory */
+			create_or_open_dir(ctx->directory);
+		}
 	}
 	else
 	{							/* Read Mode */
@@ -167,7 +171,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 		setFilePath(AH, fname, "toc.dat");
 
-		tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R);
+		tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R, AH->fSpecIsPipe);
 		if (tocFH == NULL)
 			pg_fatal("could not open input file \"%s\": %m", fname);
 
@@ -295,7 +299,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
 
 	setFilePath(AH, fname, tctx->filename);
 
-	ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+	ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
 
 	if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
@@ -353,7 +357,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
 	if (!filename)
 		return;
 
-	CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R);
+	CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R, AH->fSpecIsPipe);
 	if (!CFH)
 		pg_fatal("could not open input file \"%s\": %m", filename);
 
@@ -416,7 +420,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 	else
 		setFilePath(AH, tocfname, tctx->filename);
 
-	CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R);
+	CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R, AH->fSpecIsPipe);
 
 	if (ctx->LOsTocFH == NULL)
 		pg_fatal("could not open large object TOC file \"%s\" for input: %m",
@@ -427,6 +431,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 	{
 		char		lofname[MAXPGPATH + 1];
 		char		path[MAXPGPATH];
+		char	   *pipe;
 
 		/* Can't overflow because line and lofname are the same length */
 		if (sscanf(line, "%u %" CppAsString2(MAXPGPATH) "s\n", &oid, lofname) != 2)
@@ -545,7 +550,7 @@ _CloseArchive(ArchiveHandle *AH)
 
 		/* The TOC is always created uncompressed */
 		compression_spec.algorithm = PG_COMPRESSION_NONE;
-		tocFH = InitCompressFileHandle(compression_spec);
+		tocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
 		if (!tocFH->open_write_func(fname, PG_BINARY_W, tocFH))
 			pg_fatal("could not open output file \"%s\": %m", fname);
 		ctx->dataFH = tocFH;
@@ -606,13 +611,46 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te)
 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
 	pg_compress_specification compression_spec = {0};
 	char		fname[MAXPGPATH];
+	const char *mode;
 
 	setFilePath(AH, fname, tctx->filename);
 
 	/* The LO TOC file is never compressed */
 	compression_spec.algorithm = PG_COMPRESSION_NONE;
-	ctx->LOsTocFH = InitCompressFileHandle(compression_spec);
-	if (!ctx->LOsTocFH->open_write_func(fname, "ab", ctx->LOsTocFH))
+	ctx->LOsTocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
+
+	/*
+	 * XXX: We can probably simplify this code by using the mode 'w' for all
+	 * cases. The current implementation is due to historical reason that the
+	 * mode for the LOs TOC file has been "ab" from the start. That is
+	 * something we can't do for pipe-command as popen only supports read and
+	 * write. So here a different mode is used for pipes.
+	 *
+	 * But in future we can evaluate using 'w' for everything.there is one
+	 * ToCEntry There is only one ToCEntry per blob group. And it is written
+	 * by @WriteDataChunksForToCEntry. This function calls _StartLOs once
+	 * before the dumper function and and _EndLOs once after the dumper. And
+	 * the dumper dumps all the LOs in the group. So a blob_NNN.toc is only
+	 * opened once and closed after all the entries are written. Therefore the
+	 * mode can be made 'w' for all the cases. We tested changing the mode to
+	 * PG_BINARY_W and the tests passed. But in case there are some missing
+	 * scenarios, we have not made that change here. Instead for now only
+	 * doing it for the pipe command.
+	 *
+	 * Another alternative is to keep the 'ab' mode for regular files and use
+	 * 'w' mode for pipe files but now also cache the pipe handle to keep it
+	 * open till all the LOs in the dump group are done. This is not needed
+	 * because of the same reason listed above that a file handle is only
+	 * opened once. In short there are 3 solutions : 1. Change the mode for
+	 * everything (preferred) 2. Change it only for pipe-command (current) 3.
+	 * Change it for pipe-command and then cache those handles and close them
+	 * in the end (not needed).
+	 */
+	if (AH->fSpecIsPipe)
+		mode = PG_BINARY_W;
+	else
+		mode = "ab";
+	if (!ctx->LOsTocFH->open_write_func(fname, mode, ctx->LOsTocFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
 }
 
@@ -626,10 +664,22 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
 	char		fname[MAXPGPATH];
+	char	   *pipe;
+	char		blob_name[MAXPGPATH];
 
-	snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+	if (AH->fSpecIsPipe)
+	{
+		snprintf(blob_name, MAXPGPATH, "blob_%u.dat", oid);
+		pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", blob_name);
+		strcpy(fname, pipe);
+		pfree(pipe);
+	}
+	else
+	{
+		snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+	}
 
-	ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+	ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
 	if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
 }
@@ -683,15 +733,27 @@ setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
 	char	   *dname;
+	char	   *pipe;
 
 	dname = ctx->directory;
 
-	if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
-		pg_fatal("file name too long: \"%s\"", dname);
 
-	strcpy(buf, dname);
-	strcat(buf, "/");
-	strcat(buf, relativeFilename);
+	if (AH->fSpecIsPipe)
+	{
+		pipe = replace_percent_placeholders(dname, "pipe-command", "f", relativeFilename);
+		strcpy(buf, pipe);
+		pfree(pipe);
+	}
+	else						/* replace all ocurrences of %f in dname with
+								 * relativeFilename */
+	{
+		if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
+			pg_fatal("file name too long: \"%s\"", dname);
+
+		strcpy(buf, dname);
+		strcat(buf, "/");
+		strcat(buf, relativeFilename);
+	}
 }
 
 /*
@@ -733,17 +795,24 @@ _PrepParallelRestore(ArchiveHandle *AH)
 		 * only need an approximate indicator of that.
 		 */
 		setFilePath(AH, fname, tctx->filename);
+		pg_log_error("filename: %s", fname);
 
 		if (stat(fname, &st) == 0)
 			te->dataLength = st.st_size;
 		else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE)
 		{
+			if (AH->fSpecIsPipe)
+				pg_log_error("pipe and compressed");
 			if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
 				strlcat(fname, ".gz", sizeof(fname));
 			else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
 				strlcat(fname, ".lz4", sizeof(fname));
 			else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+			{
+				pg_log_error("filename: %s", fname);
 				strlcat(fname, ".zst", sizeof(fname));
+				pg_log_error("filename: %s", fname);
+			}
 
 			if (stat(fname, &st) == 0)
 				te->dataLength = st.st_size;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index d56dcc701ce..7345e6c7a4b 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -419,6 +419,7 @@ main(int argc, char **argv)
 {
 	int			c;
 	const char *filename = NULL;
+	bool		filename_is_pipe = false;
 	const char *format = "p";
 	TableInfo  *tblinfo;
 	int			numTables;
@@ -535,6 +536,7 @@ main(int argc, char **argv)
 		{"exclude-extension", required_argument, NULL, 17},
 		{"sequence-data", no_argument, &dopt.sequence_data, 1},
 		{"restrict-key", required_argument, NULL, 25},
+		{"pipe-command", required_argument, NULL, 26},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -606,7 +608,14 @@ main(int argc, char **argv)
 				break;
 
 			case 'f':
+				if (filename != NULL)
+				{
+					pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+					exit_nicely(1);
+				}
 				filename = pg_strdup(optarg);
+				filename_is_pipe = false;	/* it already is, setting again
+											 * here just for clarity */
 				break;
 
 			case 'F':
@@ -799,6 +808,16 @@ main(int argc, char **argv)
 				dopt.restrict_key = pg_strdup(optarg);
 				break;
 
+			case 26:			/* pipe command */
+				if (filename != NULL)
+				{
+					pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+					exit_nicely(1);
+				}
+				filename = pg_strdup(optarg);
+				filename_is_pipe = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -907,14 +926,26 @@ main(int argc, char **argv)
 	else if (dopt.restrict_key)
 		pg_fatal("option %s can only be used with %s",
 				 "--restrict-key", "--format=plain");
+	if (filename_is_pipe && archiveFormat != archDirectory)
+	{
+		pg_log_error_hint("Option --pipe-command is only supported with directory format.");
+		exit_nicely(1);
+	}
+
+	if (filename_is_pipe && strcmp(compression_algorithm_str, "none") != 0)
+	{
+		pg_log_error_hint("Option --pipe-command is not supported with any compression type.");
+		exit_nicely(1);
+	}
 
 	/*
 	 * Custom and directory formats are compressed by default with gzip when
 	 * available, not the others.  If gzip is not available, no compression is
-	 * done by default.
+	 * done by default. If directory format is being used with pipe-command,
+	 * no compression is done.
 	 */
 	if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
-		!user_compression_defined)
+		!filename_is_pipe && !user_compression_defined)
 	{
 #ifdef HAVE_LIBZ
 		compression_algorithm_str = "gzip";
@@ -964,7 +995,7 @@ main(int argc, char **argv)
 
 	/* Open the output file */
 	fout = CreateArchive(filename, archiveFormat, compression_spec,
-						 dosync, archiveMode, setupDumpWorker, sync_method);
+						 dosync, archiveMode, setupDumpWorker, sync_method, filename_is_pipe);
 
 	/* Make dump options accessible right away */
 	SetArchiveOptions(fout, &dopt, NULL);
diff --git a/src/bin/pg_dump/pg_dumpall.c b/src/bin/pg_dump/pg_dumpall.c
index c1f43113c53..2d551365180 100644
--- a/src/bin/pg_dump/pg_dumpall.c
+++ b/src/bin/pg_dump/pg_dumpall.c
@@ -678,7 +678,7 @@ main(int argc, char *argv[])
 
 		/* Open the output file */
 		fout = CreateArchive(global_path, archCustom, compression_spec,
-							 dosync, archModeWrite, NULL, DATA_DIR_SYNC_METHOD_FSYNC);
+							 dosync, archModeWrite, NULL, DATA_DIR_SYNC_METHOD_FSYNC, false);
 
 		/* Make dump options accessible right away */
 		SetArchiveOptions(fout, &dopt, NULL);
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 48fdcb0fae1..c31d262e71a 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -1,5 +1,5 @@
 /*-------------------------------------------------------------------------
- *
+*
  * pg_restore.c
  *	pg_restore is an utility extracting postgres database definitions
  *	from a backup archive created by pg_dump/pg_dumpall using the archiver
@@ -654,7 +654,7 @@ restore_global_objects(const char *inputFileSpec, RestoreOptions *opts)
 	opts->format = archCustom;
 	opts->txn_size = 0;
 
-	AH = OpenArchive(inputFileSpec, opts->format);
+	AH = OpenArchive(inputFileSpec, opts->format, false);
 
 	SetArchiveOptions(AH, NULL, opts);
 
@@ -696,7 +696,7 @@ restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
 	Archive    *AH;
 	int			n_errors;
 
-	AH = OpenArchive(inputFileSpec, opts->format);
+	AH = OpenArchive(inputFileSpec, opts->format, false);
 
 	SetArchiveOptions(AH, NULL, opts);
 
-- 
2.54.0.669.g59709faab0-goog



  [application/x-patch] v16-0005-Add-documentation-for-pipe-in-pg_dump-and-pg_res.patch (7.4K, 5-v16-0005-Add-documentation-for-pipe-in-pg_dump-and-pg_res.patch)
  download | inline diff:
From 680027b837f2d8b6f5ae195b5fe23abb1d4f1f71 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Thu, 21 May 2026 09:52:25 +0000
Subject: [PATCH v16 5/5] Add documentation for pipe in pg_dump and pg_restore

   * Add the descriptions of the new flags and constraints
     regarding which mode and other flags they can't be used with.
   * Explain the purpose of the flags.
   * Add a few examples of the usage of the flags.
---
 doc/src/sgml/ref/pg_dump.sgml    | 56 ++++++++++++++++++++++++++
 doc/src/sgml/ref/pg_restore.sgml | 68 +++++++++++++++++++++++++++++++-
 2 files changed, 123 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index ae1bc14d2f2..6458b032d25 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -297,6 +297,7 @@ PostgreSQL documentation
         specifies the target directory instead of a file. In this case the
         directory is created by <command>pg_dump</command> unless the directory
         exists and is empty.
+        This option and <option>--pipe</option> can't be used together.
        </para>
       </listitem>
      </varlistentry>
@@ -1224,6 +1225,32 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--pipe</option></term>
+      <listitem>
+       <para>
+        This option is only supported with the directory output
+        format. It can be used to write to multiple streams which
+        otherwise would not be possible with the directory mode.
+        For each stream, it starts a process which runs the
+        specified command and pipes the pg_dump output to this
+        process.
+        This option is not valid if <option>--file</option>
+        is also specified.
+       </para>
+       <para>
+        The pipe can be used to perform operations like compress
+        using a custom algorithm, filter, or write the output to a cloud
+        storage etc. The user would need a way to pipe the final output of
+        each stream to a file. To handle that, the pipe command supports a format
+        specifier %f. And all the instances of %f in the command string
+        will be replaced with the corresponding file name which
+        would have been used in the directory mode with <option>--file</option>.
+        See <xref linkend="pg-dump-examples"/> below.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--quote-all-identifiers</option></term>
       <listitem>
@@ -1803,6 +1830,35 @@ CREATE DATABASE foo WITH TEMPLATE template0;
 </screen>
   </para>
 
+  <para>
+   To use pipe to dump a database into a directory-format archive
+   (the directory <literal>dumpdir</literal> needs to exist before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb --pipe="cat > dumpdir/%f"</userinput>
+</screen>
+  </para>
+
+  <para>
+   To use pipe to dump a database into a directory-format archive
+   in parallel with 5 worker jobs (the directory <literal>dumpdir</literal> needs to exist
+   before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb -j 5 --pipe="cat > dumpdir/%f"</userinput>
+</screen>
+  </para>
+
+  <para>
+   To use pipe to compress and dump a database into a
+   directory-format archive (the directory <literal>dumpdir</literal> needs to
+   exist before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb --pipe="gzip > dumpdir/%f.gz"</userinput>
+</screen>
+  </para>
+
   <para>
    To reload an archive file into a (freshly created) database named
    <literal>newdb</literal>:
diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml
index 5e77ddd556f..6db9cbc12af 100644
--- a/doc/src/sgml/ref/pg_restore.sgml
+++ b/doc/src/sgml/ref/pg_restore.sgml
@@ -118,7 +118,10 @@ PostgreSQL documentation
        <para>
        Specifies the location of the archive file (or directory, for a
        directory-format archive) to be restored.
-       If not specified, the standard input is used.
+       This option and <option>--pipe</option> can't be set
+       at the same time.
+       If neither this option nor <option>--pipe</option> is specified,
+       the standard input is used.
        </para>
       </listitem>
      </varlistentry>
@@ -919,6 +922,32 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--pipe</option></term>
+      <listitem>
+       <para>
+        This option is only supported with the directory output
+        format. It can be used to read from multiple streams which
+        otherwise would not be possible with the directory mode.
+        For each stream, it starts a process which runs the
+        specified command and pipes its output to the pg_restore process.
+        This option is not valid if <option>filename</option> is also specified.
+       </para>
+       <para>
+        The pipe can be used to perform operations like
+        decompress using a custom algorithm, filter, or read from
+        a cloud storage. When reading from the pg_dump output,
+        the user would need a way to read the correct file in each
+        stream. To handle that, the pipe command supports a format
+        specifier %f. And all the instances of %f in the command string
+        will be replaced with the corresponding file name which
+        would have been used in the directory mode with <option>filename</option>.
+        This is same as the <option>--pipe</option> of pg-dump.
+        See <xref linkend="app-pgrestore-examples"/> below.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
        <term><option>--section=<replaceable class="parameter">sectionname</replaceable></option></term>
        <listitem>
@@ -1364,6 +1393,43 @@ CREATE DATABASE foo WITH TEMPLATE template0;
 <prompt>$</prompt> <userinput>pg_restore -L db.list db.dump</userinput>
 </screen></para>
 
+  <para>
+   To use pg_restore with pipe to recreate from a dump in
+   directory-archive format. The database should not exist beforehand.
+   Assume in this example that the dump in directory-archive format is
+   stored in <literal>dumpdir</literal>.
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe="cat dumpdir/%f"</userinput>
+</screen>
+  </para>
+
+  <para>
+   To use pg_restore with pipe to first decompress and then
+   recreate from a dump in directory-archive format. The database
+   should not exist beforehand.
+   Assume in this example that the dump in directory-archive format is
+   stored in <literal>dumpdir</literal>. And all files are
+   <literal>gzip</literal> compressed.
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe="cat dumpdir/%f.gz | gunzip"</userinput>
+</screen>
+  </para>
+
+  <para>
+   To use pipe along with <option>-L</option> to recreate only
+   selectd items from a dump in the directory-archive format.
+   The database should not exist beforehand.
+   Assume in this example that the dump in directory-archive format is
+   stored in dumpdir.
+   The <literal>db.list</literal> file is the same as one used in the previous example with <option>-L</option>
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe="cat dumpdir/%f" -L db.list</userinput>
+</screen>
+  </para>
+
  </refsect1>
 
  <refsect1>
-- 
2.54.0.669.g59709faab0-goog



  [application/x-patch] v16-0003-Fixes-and-refactors-in-pipe-command.patch (39.5K, 6-v16-0003-Fixes-and-refactors-in-pipe-command.patch)
  download | inline diff:
From 34ef34f70469972c7784395b037f55533a9396ef Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Sun, 3 May 2026 12:37:46 +0000
Subject: [PATCH v16 3/5] Fixes and refactors in pipe command

Fix pclose bug with fdopen case for stdout by ensuring fclose is called.

Add better error handling to pclose and show a clearer error message using wait_result_to_str()

Changed pipe-command flag to pipe as recommended in review.

Change the mode from 'ab' to 'w' for large object TOC.

Refactor and document the code.
---
 src/bin/pg_dump/compress_gzip.c       |   6 +-
 src/bin/pg_dump/compress_gzip.h       |   2 +-
 src/bin/pg_dump/compress_io.c         |  25 +++---
 src/bin/pg_dump/compress_io.h         |   6 +-
 src/bin/pg_dump/compress_lz4.c        |   8 +-
 src/bin/pg_dump/compress_lz4.h        |   2 +-
 src/bin/pg_dump/compress_none.c       |  60 ++++++++++----
 src/bin/pg_dump/compress_none.h       |   2 +-
 src/bin/pg_dump/compress_zstd.c       |   8 +-
 src/bin/pg_dump/compress_zstd.h       |   2 +-
 src/bin/pg_dump/pg_backup.h           |   4 +-
 src/bin/pg_dump/pg_backup_archiver.c  |  30 ++++++-
 src/bin/pg_dump/pg_backup_archiver.h  |   2 +-
 src/bin/pg_dump/pg_backup_directory.c | 111 ++++++++++----------------
 src/bin/pg_dump/pg_dump.c             |  53 +++++-------
 src/bin/pg_dump/pg_dumpall.c          |   9 +++
 src/bin/pg_dump/pg_restore.c          |  69 +++++++++-------
 17 files changed, 217 insertions(+), 182 deletions(-)

diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c
index 0ce15847d9a..6a02f9b3907 100644
--- a/src/bin/pg_dump/compress_gzip.c
+++ b/src/bin/pg_dump/compress_gzip.c
@@ -430,9 +430,9 @@ Gzip_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
 void
 InitCompressFileHandleGzip(CompressFileHandle *CFH,
 						   const pg_compress_specification compression_spec,
-						   bool path_is_pipe_command)
+						   bool is_pipe)
 {
-	if (path_is_pipe_command)
+	if (is_pipe)
 		pg_fatal("cPipe command not supported for Gzip");
 
 	CFH->open_func = Gzip_open;
@@ -460,7 +460,7 @@ InitCompressorGzip(CompressorState *cs,
 void
 InitCompressFileHandleGzip(CompressFileHandle *CFH,
 						   const pg_compress_specification compression_spec,
-						   bool path_is_pipe_command)
+						   bool is_pipe)
 {
 	pg_fatal("this build does not support compression with %s", "gzip");
 }
diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h
index f77c5c86c56..952c9223836 100644
--- a/src/bin/pg_dump/compress_gzip.h
+++ b/src/bin/pg_dump/compress_gzip.h
@@ -20,6 +20,6 @@ extern void InitCompressorGzip(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleGzip(CompressFileHandle *CFH,
 									   const pg_compress_specification compression_spec,
-									   bool path_is_pipe_command);
+									   bool is_pipe);
 
 #endif							/* _COMPRESS_GZIP_H_ */
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 88488186b34..b4d84ef17d1 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -192,28 +192,27 @@ free_keep_errno(void *p)
  */
 CompressFileHandle *
 InitCompressFileHandle(const pg_compress_specification compression_spec,
-					   bool path_is_pipe_command)
+					   bool is_pipe)
 {
 	CompressFileHandle *CFH;
 
 	CFH = pg_malloc0_object(CompressFileHandle);
 
 	/*
-	 * Always set to non-compressed when path_is_pipe_command assuming that
-	 * external compressor as part of pipe is more efficient. Can review in
-	 * the future.
+	 * Always set to non-compressed when is_pipe assuming that external
+	 * compressor as part of pipe is more efficient. Can review in the future.
 	 */
-	if (path_is_pipe_command)
-		InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
+	if (is_pipe)
+		InitCompressFileHandleNone(CFH, compression_spec, is_pipe);
 
 	else if (compression_spec.algorithm == PG_COMPRESSION_NONE)
-		InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
+		InitCompressFileHandleNone(CFH, compression_spec, is_pipe);
 	else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
-		InitCompressFileHandleGzip(CFH, compression_spec, path_is_pipe_command);
+		InitCompressFileHandleGzip(CFH, compression_spec, is_pipe);
 	else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
-		InitCompressFileHandleLZ4(CFH, compression_spec, path_is_pipe_command);
+		InitCompressFileHandleLZ4(CFH, compression_spec, is_pipe);
 	else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
-		InitCompressFileHandleZstd(CFH, compression_spec, path_is_pipe_command);
+		InitCompressFileHandleZstd(CFH, compression_spec, is_pipe);
 
 	return CFH;
 }
@@ -247,7 +246,7 @@ check_compressed_file(const char *path, char **fname, char *ext)
  */
 CompressFileHandle *
 InitDiscoverCompressFileHandle(const char *path, const char *mode,
-							   bool path_is_pipe_command)
+							   bool is_pipe)
 {
 	CompressFileHandle *CFH = NULL;
 	struct stat st;
@@ -263,7 +262,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode,
 	/*
 	 * If the path is a pipe command, the compression algorithm is none.
 	 */
-	if (!path_is_pipe_command)
+	if (!is_pipe)
 	{
 		if (hasSuffix(fname, ".gz"))
 			compression_spec.algorithm = PG_COMPRESSION_GZIP;
@@ -284,7 +283,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode,
 		}
 	}
 
-	CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);
+	CFH = InitCompressFileHandle(compression_spec, is_pipe);
 	errno = 0;
 	if (!CFH->open_func(fname, -1, mode, CFH))
 	{
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index bd0fc2634dc..3857eff2179 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -189,7 +189,7 @@ struct CompressFileHandle
 	/*
 	 * Compression specification for this file handle.
 	 */
-	bool		path_is_pipe_command;
+	bool		is_pipe;
 
 	/*
 	 * Private data to be used by the compressor.
@@ -201,7 +201,7 @@ struct CompressFileHandle
  * Initialize a compress file handle with the requested compression.
  */
 extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec,
-												  bool path_is_pipe_command);
+												  bool is_pipe);
 
 /*
  * Initialize a compress file stream. Infer the compression algorithm
@@ -210,6 +210,6 @@ extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specificatio
  */
 extern CompressFileHandle *InitDiscoverCompressFileHandle(const char *path,
 														  const char *mode,
-														  bool path_is_pipe_command);
+														  bool is_pipe);
 extern bool EndCompressFileHandle(CompressFileHandle *CFH);
 #endif
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index 2bc4c37c5db..79595556715 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -767,11 +767,11 @@ LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH
 void
 InitCompressFileHandleLZ4(CompressFileHandle *CFH,
 						  const pg_compress_specification compression_spec,
-						  bool path_is_pipe_command)
+						  bool is_pipe)
 {
 	LZ4State   *state;
 
-	if (path_is_pipe_command)
+	if (is_pipe)
 		pg_fatal("Pipe command not supported for LZ4");
 
 	CFH->open_func = LZ4Stream_open;
@@ -789,7 +789,7 @@ InitCompressFileHandleLZ4(CompressFileHandle *CFH,
 	if (CFH->compression_spec.level >= 0)
 		state->prefs.compressionLevel = CFH->compression_spec.level;
 
-	CFH->path_is_pipe_command = path_is_pipe_command;
+	CFH->is_pipe = is_pipe;
 
 	CFH->private_data = state;
 }
@@ -804,7 +804,7 @@ InitCompressorLZ4(CompressorState *cs,
 void
 InitCompressFileHandleLZ4(CompressFileHandle *CFH,
 						  const pg_compress_specification compression_spec,
-						  bool path_is_pipe_command)
+						  bool is_pipe)
 {
 	pg_fatal("this build does not support compression with %s", "LZ4");
 }
diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h
index 490141ee8a1..2c235cf3a50 100644
--- a/src/bin/pg_dump/compress_lz4.h
+++ b/src/bin/pg_dump/compress_lz4.h
@@ -20,6 +20,6 @@ extern void InitCompressorLZ4(CompressorState *cs,
 							  const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH,
 									  const pg_compress_specification compression_spec,
-									  bool path_is_pipe_command);
+									  bool is_pipe);
 
 #endif							/* _COMPRESS_LZ4_H_ */
diff --git a/src/bin/pg_dump/compress_none.c b/src/bin/pg_dump/compress_none.c
index 4cf02843185..2dae62aadd4 100644
--- a/src/bin/pg_dump/compress_none.c
+++ b/src/bin/pg_dump/compress_none.c
@@ -14,6 +14,7 @@
 #include "postgres_fe.h"
 #include <unistd.h>
 
+#include "port.h"
 #include "compress_none.h"
 #include "pg_backup_utils.h"
 
@@ -210,13 +211,31 @@ close_none(CompressFileHandle *CFH)
 
 	if (fp)
 	{
-		errno = 0;
-		if (CFH->path_is_pipe_command)
+		if (CFH->is_pipe)
+		{
 			ret = pclose(fp);
+			if (ret != 0)
+			{
+				/*
+				 * For pipe commands, pclose() returns the exit status of the
+				 * child process. If the shell command itself fails (e.g.
+				 * "command not found"), pclose() will return a non-zero exit
+				 * status, but errno will likely remain 0 (Success). We use
+				 * wait_result_to_str to decode the status and pg_fatal to
+				 * prevent the caller from logging a generic and misleading
+				 * "could not close file: Success" message.
+				 */
+				char	   *reason = wait_result_to_str(ret);
+
+				pg_fatal("pipe command failed: %s", reason);
+			}
+		}
 		else
+		{
 			ret = fclose(fp);
-		if (ret != 0)
-			pg_log_error("could not close file: %m");
+			if (ret != 0)
+				pg_fatal("could not close file: %m");
+		}
 	}
 
 	return ret == 0;
@@ -228,6 +247,23 @@ eof_none(CompressFileHandle *CFH)
 	return feof((FILE *) CFH->private_data) != 0;
 }
 
+static FILE *
+open_handle_none(const char *path, const char *mode, bool is_pipe)
+{
+	if (is_pipe)
+	{
+		/*
+		 * If the path is a pipe, we use popen(). Note that we do not track
+		 * the child PID for cleanup during fatal errors. We intentionally
+		 * rely on standard POSIX semantics: if pg_dump crashes, the OS will
+		 * close our end of the pipe, sending EOF to the child process, which
+		 * will then cleanly exit on its own.
+		 */
+		return popen(path, mode);
+	}
+	return fopen(path, mode);
+}
+
 static bool
 open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
 {
@@ -248,10 +284,7 @@ open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
 	}
 	else
 	{
-		if (CFH->path_is_pipe_command)
-			CFH->private_data = popen(path, mode);
-		else
-			CFH->private_data = fopen(path, mode);
+		CFH->private_data = open_handle_none(path, mode, CFH->is_pipe);
 
 		if (CFH->private_data == NULL)
 			return false;
@@ -266,12 +299,9 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
 	Assert(CFH->private_data == NULL);
 
 	pg_log_debug("Opening %s, pipe is %s",
-				 path, CFH->path_is_pipe_command ? "true" : "false");
+				 path, CFH->is_pipe ? "true" : "false");
 
-	if (CFH->path_is_pipe_command)
-		CFH->private_data = popen(path, mode);
-	else
-		CFH->private_data = fopen(path, mode);
+	CFH->private_data = open_handle_none(path, mode, CFH->is_pipe);
 
 	if (CFH->private_data == NULL)
 		return false;
@@ -286,7 +316,7 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
 void
 InitCompressFileHandleNone(CompressFileHandle *CFH,
 						   const pg_compress_specification compression_spec,
-						   bool path_is_pipe_command)
+						   bool is_pipe)
 {
 	CFH->open_func = open_none;
 	CFH->open_write_func = open_write_none;
@@ -298,7 +328,7 @@ InitCompressFileHandleNone(CompressFileHandle *CFH,
 	CFH->eof_func = eof_none;
 	CFH->get_error_func = get_error_none;
 
-	CFH->path_is_pipe_command = path_is_pipe_command;
+	CFH->is_pipe = is_pipe;
 
 	CFH->private_data = NULL;
 }
diff --git a/src/bin/pg_dump/compress_none.h b/src/bin/pg_dump/compress_none.h
index d898a2d411c..57943ceff7f 100644
--- a/src/bin/pg_dump/compress_none.h
+++ b/src/bin/pg_dump/compress_none.h
@@ -20,6 +20,6 @@ extern void InitCompressorNone(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleNone(CompressFileHandle *CFH,
 									   const pg_compress_specification compression_spec,
-									   bool path_is_pipe_command);
+									   bool is_pipe);
 
 #endif							/* _COMPRESS_NONE_H_ */
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index e4830d35ec0..57c4ad16500 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -28,7 +28,7 @@ InitCompressorZstd(CompressorState *cs, const pg_compress_specification compress
 
 void
 InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec,
-						   bool path_is_pipe_command)
+						   bool is_pipe)
 {
 	pg_fatal("this build does not support compression with %s", "ZSTD");
 }
@@ -576,9 +576,9 @@ Zstd_get_error(CompressFileHandle *CFH)
 void
 InitCompressFileHandleZstd(CompressFileHandle *CFH,
 						   const pg_compress_specification compression_spec,
-						   bool path_is_pipe_command)
+						   bool is_pipe)
 {
-	if (path_is_pipe_command)
+	if (is_pipe)
 		pg_fatal("Pipe command not supported for Zstd");
 
 	CFH->open_func = Zstd_open;
@@ -592,7 +592,7 @@ InitCompressFileHandleZstd(CompressFileHandle *CFH,
 	CFH->get_error_func = Zstd_get_error;
 
 	CFH->compression_spec = compression_spec;
-	CFH->path_is_pipe_command = path_is_pipe_command;
+	CFH->is_pipe = is_pipe;
 
 	CFH->private_data = NULL;
 }
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
index 1f23e7266bf..8b06657bc80 100644
--- a/src/bin/pg_dump/compress_zstd.h
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -21,6 +21,6 @@ extern void InitCompressorZstd(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
 									   const pg_compress_specification compression_spec,
-									   bool path_is_pipe_command);
+									   bool is_pipe);
 
 #endif							/* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 549703af622..c1148a66635 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -316,7 +316,7 @@ extern void ProcessArchiveRestoreOptions(Archive *AHX);
 extern void RestoreArchive(Archive *AHX, bool append_data);
 
 /* Open an existing archive */
-extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe);
+extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool is_pipe);
 
 /* Create a new archive */
 extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
@@ -324,7 +324,7 @@ extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 							  bool dosync, ArchiveMode mode,
 							  SetupWorkerPtrType setupDumpWorker,
 							  DataDirSyncMethod sync_method,
-							  bool FileSpecIsPipe);
+							  bool is_pipe);
 
 /* The --list option */
 extern void PrintTOCSummary(Archive *AHX);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 4b6bb7b8a14..bb14a83b80b 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -1744,7 +1744,19 @@ SetOutput(ArchiveHandle *AH, const char *filename,
 	else
 		mode = PG_BINARY_W;
 
-	CFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
+	/*
+	 * The output handle (usually stdout) should never be a pipe command
+	 * managed by our popen logic, even if the archive itself is a pipe.  Our
+	 * pipe command implementation for directory mode is a template for the
+	 * data files, not for this primary output stream.
+	 *
+	 * Furthermore, marking this as a pipe command would cause it to be closed
+	 * with pclose() instead of fclose().  Since this handle is opened via
+	 * fdopen() (for stdout) or fopen() (for a regular file), using pclose()
+	 * on it is a bug that causes failures on BSD-based systems (like FreeBSD
+	 * or macOS).
+	 */
+	CFH = InitCompressFileHandle(compression_spec, false);
 
 	if (!CFH->open_func(filename, fn, mode, CFH))
 	{
@@ -2442,7 +2454,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 	else
 		AH->fSpec = NULL;
 
-	AH->fSpecIsPipe = FileSpecIsPipe;
+	AH->is_pipe = FileSpecIsPipe;
 
 	AH->currUser = NULL;		/* unknown */
 	AH->currSchema = NULL;		/* ditto */
@@ -2463,7 +2475,19 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 
 	/* Open stdout with no compression for AH output handle */
 	out_compress_spec.algorithm = PG_COMPRESSION_NONE;
-	CFH = InitCompressFileHandle(out_compress_spec, AH->fSpecIsPipe);
+
+	/*
+	 * The output handle (usually stdout) should never be a pipe command
+	 * managed by our popen logic, even if the archive itself is a pipe.  Our
+	 * pipe command implementation for directory mode is a template for the
+	 * data files, not for this primary output stream.
+	 *
+	 * Furthermore, marking this as a pipe command would cause it to be closed
+	 * with pclose() instead of fclose().  Since this handle is opened via
+	 * fdopen() (for stdout), using pclose() on it is a bug that causes
+	 * failures on BSD-based systems (like FreeBSD or macOS).
+	 */
+	CFH = InitCompressFileHandle(out_compress_spec, false);
 	if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
 		pg_fatal("could not open stdout for appending: %m");
 	AH->OF = CFH;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index cdc12a54f5e..9555d44ae29 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -301,7 +301,7 @@ struct _archiveHandle
 	int			loCount;		/* # of LOs restored */
 
 	char	   *fSpec;			/* Archive File Spec */
-	bool		fSpecIsPipe;	/* fSpec is a pipe command template requiring
+	bool		is_pipe;		/* fSpec is a pipe command template requiring
 								 * replacing %f with file name */
 	FILE	   *FH;				/* General purpose file handle */
 	void	   *OF;				/* Output file */
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 15ce45fb9e9..3a6f47d5483 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -158,7 +158,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 	if (AH->mode == archModeWrite)
 	{
-		if (!AH->fSpecIsPipe)	/* no checks for pipe */
+		if (!AH->is_pipe)		/* no checks for pipe */
 		{
 			/* we accept an empty existing directory */
 			create_or_open_dir(ctx->directory);
@@ -171,7 +171,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 		setFilePath(AH, fname, "toc.dat");
 
-		tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R, AH->fSpecIsPipe);
+		tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R, AH->is_pipe);
 		if (tocFH == NULL)
 			pg_fatal("could not open input file \"%s\": %m", fname);
 
@@ -299,7 +299,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
 
 	setFilePath(AH, fname, tctx->filename);
 
-	ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
+	ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->is_pipe);
 
 	if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
@@ -357,7 +357,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
 	if (!filename)
 		return;
 
-	CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R, AH->fSpecIsPipe);
+	CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R, AH->is_pipe);
 	if (!CFH)
 		pg_fatal("could not open input file \"%s\": %m", filename);
 
@@ -420,7 +420,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 	else
 		setFilePath(AH, tocfname, tctx->filename);
 
-	CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R, AH->fSpecIsPipe);
+	CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R, AH->is_pipe);
 
 	if (ctx->LOsTocFH == NULL)
 		pg_fatal("could not open large object TOC file \"%s\" for input: %m",
@@ -431,7 +431,6 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 	{
 		char		lofname[MAXPGPATH + 1];
 		char		path[MAXPGPATH];
-		char	   *pipe;
 
 		/* Can't overflow because line and lofname are the same length */
 		if (sscanf(line, "%u %" CppAsString2(MAXPGPATH) "s\n", &oid, lofname) != 2)
@@ -440,20 +439,8 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 
 		StartRestoreLO(AH, oid, AH->public.ropt->dropSchema);
 
-		/*
-		 * XXX : Create a helper function for blob files naming common to
-		 * _LoadLOs an _StartLO.
-		 */
-		if (AH->fSpecIsPipe)
-		{
-			pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", lofname);
-			strcpy(path, pipe);
-			pfree(pipe);
-		}
-		else
-		{
-			snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
-		}
+		setFilePath(AH, path, lofname);
+
 		_PrintFileData(AH, path);
 		EndRestoreLO(AH, oid);
 	}
@@ -564,7 +551,7 @@ _CloseArchive(ArchiveHandle *AH)
 
 		/* The TOC is always created uncompressed */
 		compression_spec.algorithm = PG_COMPRESSION_NONE;
-		tocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
+		tocFH = InitCompressFileHandle(compression_spec, AH->is_pipe);
 		if (!tocFH->open_write_func(fname, PG_BINARY_W, tocFH))
 			pg_fatal("could not open output file \"%s\": %m", fname);
 		ctx->dataFH = tocFH;
@@ -631,39 +618,27 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te)
 
 	/* The LO TOC file is never compressed */
 	compression_spec.algorithm = PG_COMPRESSION_NONE;
-	ctx->LOsTocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
+	ctx->LOsTocFH = InitCompressFileHandle(compression_spec, AH->is_pipe);
 
 	/*
-	 * XXX: We can probably simplify this code by using the mode 'w' for all
-	 * cases. The current implementation is due to historical reason that the
-	 * mode for the LOs TOC file has been "ab" from the start. That is
-	 * something we can't do for pipe-command as popen only supports read and
-	 * write. So here a different mode is used for pipes.
+	 * We use 'w' (PG_BINARY_W) mode for the LOs TOC file in all cases.
+	 * Historically, the mode for this file was "ab". However, append mode is
+	 * entirely redundant due to how large objects are partitioned.
 	 *
-	 * But in future we can evaluate using 'w' for everything.there is one
-	 * ToCEntry There is only one ToCEntry per blob group. And it is written
-	 * by @WriteDataChunksForToCEntry. This function calls _StartLOs once
-	 * before the dumper function and and _EndLOs once after the dumper. And
-	 * the dumper dumps all the LOs in the group. So a blob_NNN.toc is only
-	 * opened once and closed after all the entries are written. Therefore the
-	 * mode can be made 'w' for all the cases. We tested changing the mode to
-	 * PG_BINARY_W and the tests passed. But in case there are some missing
-	 * scenarios, we have not made that change here. Instead for now only
-	 * doing it for the pipe command.
+	 * pg_dump splits large objects into chunks of up to 1000 blobs per
+	 * archive entry. Each chunk receives a completely unique dumpId, and the
+	 * TOC file is named using that ID (e.g., blobs_123.toc). Furthermore,
+	 * WriteDataChunksForTocEntry ensures a strict sequential lifecycle for
+	 * each entry: it calls _StartLOs (opens the file), then the dumper
+	 * function (writes the chunk), and finally _EndLOs (closes the file).
 	 *
-	 * Another alternative is to keep the 'ab' mode for regular files and use
-	 * 'w' mode for pipe files but now also cache the pipe handle to keep it
-	 * open till all the LOs in the dump group are done. This is not needed
-	 * because of the same reason listed above that a file handle is only
-	 * opened once. In short there are 3 solutions : 1. Change the mode for
-	 * everything (preferred) 2. Change it only for pipe-command (current) 3.
-	 * Change it for pipe-command and then cache those handles and close them
-	 * in the end (not needed).
+	 * Because a blobs_NNN.toc file is guaranteed to be unique and is only
+	 * opened exactly once, written to sequentially, and then closed forever,
+	 * there is no scenario where "ab" is required. This change to "w" is
+	 * necessary because popen() for pipe-commands only supports "r" and "w".
 	 */
-	if (AH->fSpecIsPipe)
-		mode = PG_BINARY_W;
-	else
-		mode = "ab";
+	mode = PG_BINARY_W;
+
 	if (!ctx->LOsTocFH->open_write_func(fname, mode, ctx->LOsTocFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
 }
@@ -678,22 +653,12 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
 	char		fname[MAXPGPATH];
-	char	   *pipe;
 	char		blob_name[MAXPGPATH];
 
-	if (AH->fSpecIsPipe)
-	{
-		snprintf(blob_name, MAXPGPATH, "blob_%u.dat", oid);
-		pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", blob_name);
-		strcpy(fname, pipe);
-		pfree(pipe);
-	}
-	else
-	{
-		snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
-	}
+	snprintf(blob_name, MAXPGPATH, "blob_%u.dat", oid);
+	setFilePath(AH, fname, blob_name);
 
-	ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
+	ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->is_pipe);
 	if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
 }
@@ -752,10 +717,23 @@ setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
 	dname = ctx->directory;
 
 
-	if (AH->fSpecIsPipe)
+	if (AH->is_pipe)
 	{
-		pipe = replace_percent_placeholders(dname, "pipe-command", "f", relativeFilename);
+		/*
+		 * Unlike commands synthesized by the backend, this is a user-provided
+		 * template running client-side. We perform literal substitution
+		 * rather than using appendShellString() to avoid interfering with the
+		 * user's intentional shell quoting (e.g., for Windows vs Unix
+		 * differences). Since this is a client-side execution, there are no
+		 * privilege escalation concerns.
+		 */
+		pipe = replace_percent_placeholders(dname, "pipe", "f", relativeFilename);
+
+		if (strlen(pipe) >= MAXPGPATH)
+			pg_fatal("pipe command too long: \"%s\"", pipe);
+
 		strcpy(buf, pipe);
+
 		pfree(pipe);
 	}
 	else						/* replace all ocurrences of %f in dname with
@@ -809,23 +787,18 @@ _PrepParallelRestore(ArchiveHandle *AH)
 		 * only need an approximate indicator of that.
 		 */
 		setFilePath(AH, fname, tctx->filename);
-		pg_log_error("filename: %s", fname);
 
 		if (stat(fname, &st) == 0)
 			te->dataLength = st.st_size;
 		else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE)
 		{
-			if (AH->fSpecIsPipe)
-				pg_log_error("pipe and compressed");
 			if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
 				strlcat(fname, ".gz", sizeof(fname));
 			else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
 				strlcat(fname, ".lz4", sizeof(fname));
 			else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
 			{
-				pg_log_error("filename: %s", fname);
 				strlcat(fname, ".zst", sizeof(fname));
-				pg_log_error("filename: %s", fname);
 			}
 
 			if (stat(fname, &st) == 0)
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7345e6c7a4b..21157c568b8 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -419,7 +419,8 @@ main(int argc, char **argv)
 {
 	int			c;
 	const char *filename = NULL;
-	bool		filename_is_pipe = false;
+	char	   *pipe_command = NULL;
+	bool		is_pipe = false;
 	const char *format = "p";
 	TableInfo  *tblinfo;
 	int			numTables;
@@ -536,7 +537,7 @@ main(int argc, char **argv)
 		{"exclude-extension", required_argument, NULL, 17},
 		{"sequence-data", no_argument, &dopt.sequence_data, 1},
 		{"restrict-key", required_argument, NULL, 25},
-		{"pipe-command", required_argument, NULL, 26},
+		{"pipe", required_argument, NULL, 26},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -608,14 +609,9 @@ main(int argc, char **argv)
 				break;
 
 			case 'f':
-				if (filename != NULL)
-				{
-					pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
-					exit_nicely(1);
-				}
 				filename = pg_strdup(optarg);
-				filename_is_pipe = false;	/* it already is, setting again
-											 * here just for clarity */
+				is_pipe = false;	/* it already is, setting again here just
+									 * for clarity */
 				break;
 
 			case 'F':
@@ -809,13 +805,8 @@ main(int argc, char **argv)
 				break;
 
 			case 26:			/* pipe command */
-				if (filename != NULL)
-				{
-					pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
-					exit_nicely(1);
-				}
-				filename = pg_strdup(optarg);
-				filename_is_pipe = true;
+				pipe_command = pg_strdup(optarg);
+				is_pipe = true;
 				break;
 
 			default:
@@ -825,6 +816,10 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (filename && pipe_command)
+		pg_fatal("options %s and %s cannot be used together",
+				 "-f/--file", "--pipe");
+
 	/*
 	 * Non-option argument specifies database name as long as it wasn't
 	 * already specified with -d / --dbname
@@ -926,26 +921,20 @@ main(int argc, char **argv)
 	else if (dopt.restrict_key)
 		pg_fatal("option %s can only be used with %s",
 				 "--restrict-key", "--format=plain");
-	if (filename_is_pipe && archiveFormat != archDirectory)
-	{
-		pg_log_error_hint("Option --pipe-command is only supported with directory format.");
-		exit_nicely(1);
-	}
+	if (is_pipe && archiveFormat != archDirectory)
+		pg_fatal("option --pipe is only supported with directory format");
 
-	if (filename_is_pipe && strcmp(compression_algorithm_str, "none") != 0)
-	{
-		pg_log_error_hint("Option --pipe-command is not supported with any compression type.");
-		exit_nicely(1);
-	}
+	if (is_pipe && strcmp(compression_algorithm_str, "none") != 0)
+		pg_fatal("option --pipe is not supported with any compression type");
 
 	/*
 	 * Custom and directory formats are compressed by default with gzip when
 	 * available, not the others.  If gzip is not available, no compression is
-	 * done by default. If directory format is being used with pipe-command,
-	 * no compression is done.
+	 * done by default. If directory format is being used with pipe, no
+	 * compression is done.
 	 */
 	if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
-		!filename_is_pipe && !user_compression_defined)
+		!is_pipe && !user_compression_defined)
 	{
 #ifdef HAVE_LIBZ
 		compression_algorithm_str = "gzip";
@@ -994,8 +983,8 @@ main(int argc, char **argv)
 		pg_fatal("parallel backup only supported by the directory format");
 
 	/* Open the output file */
-	fout = CreateArchive(filename, archiveFormat, compression_spec,
-						 dosync, archiveMode, setupDumpWorker, sync_method, filename_is_pipe);
+	fout = CreateArchive(is_pipe ? pipe_command : filename, archiveFormat, compression_spec,
+						 dosync, archiveMode, setupDumpWorker, sync_method, is_pipe);
 
 	/* Make dump options accessible right away */
 	SetArchiveOptions(fout, &dopt, NULL);
@@ -1327,6 +1316,8 @@ help(const char *progname)
 
 	printf(_("\nGeneral options:\n"));
 	printf(_("  -f, --file=FILENAME          output file or directory name\n"));
+	printf(_("  --pipe=COMMAND               execute command for each output file and\n"
+			 "                               write data to it via pipe\n"));
 	printf(_("  -F, --format=c|d|t|p         output file format (custom, directory, tar,\n"
 			 "                               plain text (default))\n"));
 	printf(_("  -j, --jobs=NUM               use this many parallel jobs to dump\n"));
diff --git a/src/bin/pg_dump/pg_dumpall.c b/src/bin/pg_dump/pg_dumpall.c
index 2d551365180..bf69a44fa23 100644
--- a/src/bin/pg_dump/pg_dumpall.c
+++ b/src/bin/pg_dump/pg_dumpall.c
@@ -298,6 +298,15 @@ main(int argc, char *argv[])
 			case 'F':
 				format_name = pg_strdup(optarg);
 				break;
+
+				/*
+				 * Note: support for --pipe is currently skipped for
+				 * pg_dumpall due to the complexity of avoiding path
+				 * collisions between multiple databases and coordinating
+				 * nested directory structures. This could be considered as a
+				 * future enhancement.
+				 */
+
 			case 'g':
 				globals_only = true;
 				break;
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index c657149d658..35dc5b492bb 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -60,11 +60,11 @@ static void usage(const char *progname);
 static void read_restore_filters(const char *filename, RestoreOptions *opts);
 static bool file_exists_in_directory(const char *dir, const char *filename);
 static int	restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
-								 int numWorkers, bool append_data, bool filespec_is_pipe);
-static int	restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool filespec_is_pipe);
+								 int numWorkers, bool append_data, bool is_pipe);
+static int	restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool is_pipe);
 
 static int	restore_all_databases(const char *inputFileSpec,
-								  SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers, bool filespec_is_pipe);
+								  SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers, bool is_pipe);
 static int	get_dbnames_list_to_restore(PGconn *conn,
 										SimplePtrList *dbname_oid_list,
 										SimpleStringList db_exclude_patterns);
@@ -87,13 +87,14 @@ main(int argc, char **argv)
 	RestoreOptions *opts;
 	int			c;
 	int			numWorkers = 1;
-	char	   *inputFileSpec;
+	char	   *inputFileSpec = NULL;
+	char	   *pipe_command = NULL;
 	bool		data_only = false;
 	bool		schema_only = false;
 	int			n_errors = 0;
 	bool		globals_only = false;
 	SimpleStringList db_exclude_patterns = {NULL, NULL};
-	bool		filespec_is_pipe = false;
+	bool		is_pipe = false;
 	static int	disable_triggers = 0;
 	static int	enable_row_security = 0;
 	static int	if_exists = 0;
@@ -174,7 +175,7 @@ main(int argc, char **argv)
 		{"filter", required_argument, NULL, 4},
 		{"restrict-key", required_argument, NULL, 6},
 		{"exclude-database", required_argument, NULL, 7},
-		{"pipe-command", required_argument, NULL, 8},
+		{"pipe", required_argument, NULL, 8},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -358,9 +359,9 @@ main(int argc, char **argv)
 				simple_string_list_append(&db_exclude_patterns, optarg);
 				break;
 
-			case 8:				/* pipe-command */
-				inputFileSpec = pg_strdup(optarg);
-				filespec_is_pipe = true;
+			case 8:				/* pipe */
+				pipe_command = pg_strdup(optarg);
+				is_pipe = true;
 				break;
 
 			default:
@@ -371,25 +372,21 @@ main(int argc, char **argv)
 	}
 
 	/*
-	 * Get file name from command line. Note that filename argument and
-	 * pipe-command can't both be set.
+	 * Get file name from command line. Note that filename argument and pipe
+	 * can't both be set.
 	 */
 	if (optind < argc)
 	{
-		if (filespec_is_pipe)
-		{
-			pg_log_error_hint("Only one of [filespec, --pipe-command] allowed");
-			exit_nicely(1);
-		}
+		if (is_pipe)
+			pg_fatal("cannot specify both an input file and --pipe");
 		inputFileSpec = argv[optind++];
 	}
 
 	/*
-	 * Even if the file argument is not provided, if the pipe-command is
-	 * specified, we need to use that as the file arg and not fallback to
-	 * stdio.
+	 * Even if the file argument is not provided, if the pipe is specified, we
+	 * need to use that as the file arg and not fallback to stdio.
 	 */
-	else if (!filespec_is_pipe)
+	else if (!is_pipe)
 	{
 		inputFileSpec = NULL;
 	}
@@ -539,10 +536,20 @@ main(int argc, char **argv)
 			pg_fatal("unrecognized archive format \"%s\"; please specify \"c\", \"d\", or \"t\"",
 					 opts->formatName);
 	}
+	else
+		opts->format = archUnknown;
+
+	if (is_pipe && opts->format != archDirectory)
+		pg_fatal("option --pipe is only supported with directory format");
 
 	/*
 	 * If toc.glo file is present, then restore all the databases from
 	 * map.dat, but skip restoring those matching --exclude-database patterns.
+	 *
+	 * Note: support for --pipe is currently skipped for cluster archives
+	 * (archives containing toc.glo) due to the added complexity of handling
+	 * nested directory paths and multiple databases. This could be considered
+	 * as a future enhancement.
 	 */
 	if (inputFileSpec != NULL &&
 		(file_exists_in_directory(inputFileSpec, "toc.glo")))
@@ -619,7 +626,7 @@ main(int argc, char **argv)
 		snprintf(global_path, MAXPGPATH, "%s/toc.glo", inputFileSpec);
 
 		if (!no_globals)
-			n_errors = restore_global_objects(global_path, tmpopts, filespec_is_pipe);
+			n_errors = restore_global_objects(global_path, tmpopts, is_pipe);
 		else
 			pg_log_info("skipping restore of global objects because %s was specified",
 						"--no-globals");
@@ -630,8 +637,8 @@ main(int argc, char **argv)
 		else
 		{
 			/* Now restore all the databases from map.dat */
-			n_errors = n_errors + restore_all_databases(inputFileSpec, db_exclude_patterns,
-														opts, numWorkers, filespec_is_pipe);
+			n_errors = n_errors + restore_all_databases(is_pipe ? pipe_command : inputFileSpec, db_exclude_patterns,
+														opts, numWorkers, is_pipe);
 		}
 
 		/* Free db pattern list. */
@@ -651,7 +658,7 @@ main(int argc, char **argv)
 					 "-g/--globals-only");
 
 		/* Process if toc.glo file does not exist. */
-		n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, filespec_is_pipe);
+		n_errors = restore_one_database(is_pipe ? pipe_command : inputFileSpec, opts, numWorkers, false, is_pipe);
 	}
 
 	/* Done, print a summary of ignored errors during restore. */
@@ -670,7 +677,7 @@ main(int argc, char **argv)
  * This restore all global objects.
  */
 static int
-restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool filespec_is_pipe)
+restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool is_pipe)
 {
 	Archive    *AH;
 	int			nerror = 0;
@@ -679,7 +686,7 @@ restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool fil
 	opts->format = archCustom;
 	opts->txn_size = 0;
 
-	AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe);
+	AH = OpenArchive(inputFileSpec, opts->format, is_pipe);
 
 	SetArchiveOptions(AH, NULL, opts);
 
@@ -716,12 +723,12 @@ restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool fil
  */
 static int
 restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
-					 int numWorkers, bool append_data, bool filespec_is_pipe)
+					 int numWorkers, bool append_data, bool is_pipe)
 {
 	Archive    *AH;
 	int			n_errors;
 
-	AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe);
+	AH = OpenArchive(inputFileSpec, opts->format, is_pipe);
 
 	SetArchiveOptions(AH, NULL, opts);
 
@@ -777,6 +784,8 @@ usage(const char *progname)
 	printf(_("\nGeneral options:\n"));
 	printf(_("  -d, --dbname=NAME        connect to database name\n"));
 	printf(_("  -f, --file=FILENAME      output file name (- for stdout)\n"));
+	printf(_("  --pipe=COMMAND           execute command for each input file and\n"
+			 "                           read data from it via pipe\n"));
 	printf(_("  -F, --format=c|d|t       backup file format (should be automatic)\n"));
 	printf(_("  -l, --list               print summarized TOC of the archive\n"));
 	printf(_("  -v, --verbose            verbose mode\n"));
@@ -1170,7 +1179,7 @@ get_dbname_oid_list_from_mfile(const char *dumpdirpath,
 static int
 restore_all_databases(const char *inputFileSpec,
 					  SimpleStringList db_exclude_patterns, RestoreOptions *opts,
-					  int numWorkers, bool filespec_is_pipe)
+					  int numWorkers, bool is_pipe)
 {
 	SimplePtrList dbname_oid_list = {NULL, NULL};
 	int			num_db_restore = 0;
@@ -1334,7 +1343,7 @@ restore_all_databases(const char *inputFileSpec,
 		}
 
 		/* Restore the single database. */
-		n_errors = restore_one_database(subdirpath, tmpopts, numWorkers, true, filespec_is_pipe);
+		n_errors = restore_one_database(subdirpath, tmpopts, numWorkers, true, is_pipe);
 
 		n_errors_total += n_errors;
 
-- 
2.54.0.669.g59709faab0-goog



^ permalink  raw  reply  [nested|flat] 3+ messages in thread

* Re: Adding pg_dump flag for parallel export to pipes
  2026-02-20 09:08 Re: Adding pg_dump flag for parallel export to pipes Nitin Motiani <nitinmotiani@google.com>
  2026-05-21 09:56 ` Re: Adding pg_dump flag for parallel export to pipes Nitin Motiani <nitinmotiani@google.com>
@ 2026-05-22 10:34   ` solai v <solai.cdac@gmail.com>
  0 siblings, 0 replies; 3+ messages in thread

From: solai v @ 2026-05-22 10:34 UTC (permalink / raw)
  To: Nitin Motiani <nitinmotiani@google.com>; +Cc: Hannu Krosing <hannuk@google.com>; Mahendra Singh Thalor <mahi6run@gmail.com>; Dilip Kumar <dilipbalaut@gmail.com>; Thomas Munro <thomas.munro@gmail.com>; pgsql-hackers

Hi all,

Thank you for the updated patch.

On Fri, May 22, 2026 at 1:03 PM Nitin Motiani <nitinmotiani@google.com> wrote:
>
> Changed how pipe commands are quoted in the Windows test. The latest
> versions are attached.

I worked on reproducing the current limitation around parallel dumps
and then tested the latest v16 patch adding --pipe support for
pg_dump. To begin with, I verified the existing behavior.
For example:
pg_dump postgres | gzip > dump.sql.gz works, but does not support parallelism,
whereas:
pg_dump -Fd -j 4 -f dumpdir postgres
du -sh dumpdir
21M dumpdir
requires intermediate disk storage. This demonstrates the current
limitation where users must choose between parallelism and streaming
pipelines.
I then tested the patch introducing --pipe support. The feature is
quite useful for modern workflows where users want to stream dump
output directly to compression or upload pipelines without relying on
intermediate storage. Basic functionality worked as expected.
For example:
pg_dump -p 55432 -Fd -j 4 --pipe="cat > dump.out" postgres, produced a
~38MB output file,
and:
pg_dump -p 55432 -Fd -j 4 --pipe="gzip > dump.gz" postgres produced, a
compressed file (~11MB).
The initial contents appeared valid:
gunzip -c dump.gz | head
1
2
3
...
Also, no intermediate directory was created, confirming that the patch
enables streaming without filesystem-backed staging. Error handling
also behaved correctly.
For example:
--pipe="invalid_cmd"
resulted in:
pg_dump: error: pipe command failed: command not found
and:
--pipe="gzip | false"
resulted in:
pg_dump: error: pipe command failed: child process exited with exit code 1
However, I observed an important issue when using the feature with
multiple parallel workers. Since the pipe command is executed per
output file, using: --pipe="gzip > dump.gz", it results in multiple
workers invoking independent gzip processes that all write to the same
output file. This leads to corrupted or truncated output.
In my testing:
gunzip -c dump.gz > dump.sql
failed with:
gzip: dump.gz: unexpected end of file
This suggests that concurrent writes to a shared output target are not
coordinated and can result in invalid dumps. It would be helpful to
clarify expected usage patterns here. For example: whether users are
expected to generate distinct outputs per worker, or whether
safeguards should be implemented to prevent multiple workers from
writing to the same destination. Additionally, during failure
scenarios I observed backend logs such as:
FATAL: connection to client lost
Broken pipe
While this is expected when the pipe terminates prematurely, it may be
worth considering whether error messaging or cleanup behavior can be
made clearer from the user perspective.
Overall, the feature is valuable and aligns well with modern backup
workflows. However, behavior in multi-worker scenarios with shared
pipe targets may need further clarification or safeguards to avoid
data corruption. Looking forward to more feedback.


Regards.
Solai






^ permalink  raw  reply  [nested|flat] 3+ messages in thread


end of thread, other threads:[~2026-05-22 10:34 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2026-02-20 09:08 Re: Adding pg_dump flag for parallel export to pipes Nitin Motiani <nitinmotiani@google.com>
2026-05-21 09:56 ` Nitin Motiani <nitinmotiani@google.com>
2026-05-22 10:34   ` solai v <solai.cdac@gmail.com>

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox