cascade_replication_v1.patch

text/x-patch

Filename: cascade_replication_v1.patch
Type: text/x-patch
Part: 0
Message: Cascade replication

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: context
Series: patch v1
File+
doc/src/sgml/config.sgml 3 0
doc/src/sgml/high-availability.sgml 38 0
doc/src/sgml/protocol.sgml 12 0
src/backend/access/transam/xlog.c 171 0
src/backend/postmaster/postmaster.c 10 0
src/backend/replication/basebackup.c 5 0
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c 20 0
src/backend/replication/syncrep.c 7 0
src/backend/replication/walreceiver.c 17 0
src/backend/replication/walreceiverfuncs.c 17 0
src/backend/replication/walsender.c 37 0
src/include/access/xlog.h 6 0
src/include/postmaster/postmaster.h 2 0
src/include/replication/walreceiver.h 5 0
src/include/replication/walsender.h 1 0
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1998,2004 **** SET ENABLE_SEQSCAN TO OFF;
          doesn't keep any extra segments for standby purposes, and the number
          of old WAL segments available to standby servers is a function of
          the location of the previous checkpoint and status of WAL
!         archiving.  This parameter has no effect on restartpoints.
          This parameter can only be set in the
          <filename>postgresql.conf</> file or on the server command line.
         </para>
--- 1998,2004 ----
          doesn't keep any extra segments for standby purposes, and the number
          of old WAL segments available to standby servers is a function of
          the location of the previous checkpoint and status of WAL
!         archiving.
          This parameter can only be set in the
          <filename>postgresql.conf</> file or on the server command line.
         </para>
***************
*** 2105,2111 **** SET ENABLE_SEQSCAN TO OFF;
          synchronous replication is enabled, individual transactions can be
          configured not to wait for replication by setting the
          <xref linkend="guc-synchronous-commit"> parameter to
!         <literal>local</> or <literal>off</>.
         </para>
         <para>
          This parameter can only be set in the <filename>postgresql.conf</>
--- 2105,2112 ----
          synchronous replication is enabled, individual transactions can be
          configured not to wait for replication by setting the
          <xref linkend="guc-synchronous-commit"> parameter to
!         <literal>local</> or <literal>off</>. This parameter has no effect on
!         cascade replication.
         </para>
         <para>
          This parameter can only be set in the <filename>postgresql.conf</>
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 877,884 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
--- 877,922 ----
       network delay, or that the standby is under heavy load.
      </para>
     </sect3>
+   </sect2>
+ 
+   <sect2 id="cascade-replication">
+    <title>Cascade Replication</title>
  
+    <indexterm zone="high-availability">
+     <primary>Cascade Replication</primary>
+    </indexterm>
+    <para>
+     Cascade replication feature allows the standby to accept the replication
+     connections and stream WAL records to another standbys. This is useful
+     for reducing the number of standbys connecting to the master and reducing
+     the overhead of the master, when you have many standbys.
+    </para>
+    <para>
+     The cascading standby sends not only WAL records received from the
+     master but also those restored from the archive. So even if the replication
+     connection in higher level is terminated, you can continue cascade replication.
+    </para>
+    <para>
+     Cascade replication is asynchronous. Note that synchronous replication
+     (see <xref linkend="synchronous-replication">) has no effect on cascade
+     replication.
+    </para>
+    <para>
+     Promoting the cascading standby terminates all the cascade replication
+     connections which it uses. This is because the timeline becomes different
+     between standbys, and they cannot continue replication any more.
+    </para>
+    <para>
+     To use cascade replication, set up the cascading standby so that it can
+     accept the replication connections, i.e., set <varname>max_wal_senders</>,
+     <varname>hot_standby</> and authentication option (see
+     <xref linkend="streaming-replication"> and <xref linkend="hot-standby">).
+     Also set <varname>primary_conninfo</> in the cascaded standby to point
+     to the cascading standby. Note that you cannot set up cascade replication
+     within the same standby.
+    </para>
    </sect2>
+ 
    <sect2 id="synchronous-replication">
     <title>Synchronous Replication</title>
  
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 1359,1364 **** The commands accepted in walsender mode are:
--- 1359,1376 ----
        </listitem>
        </varlistentry>
  
+       <varlistentry>
+       <term>
+        identificationkey
+       </term>
+       <listitem>
+       <para>
+        Identification key. Also useful to check that the standby is
+        not connecting to that standby itself.
+       </para>
+       </listitem>
+       </varlistentry>
+ 
        </variablelist>
       </para>
      </listitem>
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 446,451 **** typedef struct XLogCtlData
--- 446,453 ----
  	XLogRecPtr	recoveryLastRecPtr;
  	/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
  	TimestampTz recoveryLastXTime;
+ 	/* end of the last record restored from the archive */
+ 	XLogRecPtr	restoreLastRecPtr;
  	/* Are we requested to pause recovery? */
  	bool		recoveryPause;
  
***************
*** 618,625 **** static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  static bool AdvanceXLInsertBuffer(bool new_segment);
  static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
  static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
! static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
! 					   bool find_free, int *max_advance,
  					   bool use_lock);
  static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
  			 int source, bool notexistOk);
--- 620,629 ----
  static bool AdvanceXLInsertBuffer(bool new_segment);
  static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
  static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
! static void XLogFileCopy(TimeLineID tli, uint32 log, uint32 seg, char *srcpath,
! 						 uint32 offset);
! static bool InstallXLogFileSegment(TimeLineID tli, uint32 *log, uint32 *seg,
! 					   char *tmppath, bool find_free, int *max_advance,
  					   bool use_lock);
  static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
  			 int source, bool notexistOk);
***************
*** 1742,1748 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  
  			/* create/use new log file */
  			use_existent = true;
! 			openLogFile = XLogFileInit(openLogId, openLogSeg,
  									   &use_existent, true);
  			openLogOff = 0;
  		}
--- 1746,1752 ----
  
  			/* create/use new log file */
  			use_existent = true;
! 			openLogFile = XLogFileInit(ThisTimeLineID, openLogId, openLogSeg,
  									   &use_existent, true);
  			openLogOff = 0;
  		}
***************
*** 2304,2310 **** XLogNeedsFlush(XLogRecPtr record)
  /*
   * Create a new XLOG file segment, or open a pre-existing one.
   *
!  * log, seg: identify segment to be created/opened.
   *
   * *use_existent: if TRUE, OK to use a pre-existing file (else, any
   * pre-existing file will be deleted).	On return, TRUE if a pre-existing
--- 2308,2314 ----
  /*
   * Create a new XLOG file segment, or open a pre-existing one.
   *
!  * tli, log, seg: identify segment to be created/opened.
   *
   * *use_existent: if TRUE, OK to use a pre-existing file (else, any
   * pre-existing file will be deleted).	On return, TRUE if a pre-existing
***************
*** 2322,2328 **** XLogNeedsFlush(XLogRecPtr record)
   * in a critical section.
   */
  int
! XLogFileInit(uint32 log, uint32 seg,
  			 bool *use_existent, bool use_lock)
  {
  	char		path[MAXPGPATH];
--- 2326,2332 ----
   * in a critical section.
   */
  int
! XLogFileInit(TimeLineID tli, uint32 log, uint32 seg,
  			 bool *use_existent, bool use_lock)
  {
  	char		path[MAXPGPATH];
***************
*** 2334,2340 **** XLogFileInit(uint32 log, uint32 seg,
  	int			fd;
  	int			nbytes;
  
! 	XLogFilePath(path, ThisTimeLineID, log, seg);
  
  	/*
  	 * Try to use existent file (checkpoint maker may have created it already)
--- 2338,2344 ----
  	int			fd;
  	int			nbytes;
  
! 	XLogFilePath(path, tli, log, seg);
  
  	/*
  	 * Try to use existent file (checkpoint maker may have created it already)
***************
*** 2431,2437 **** XLogFileInit(uint32 log, uint32 seg,
  	installed_log = log;
  	installed_seg = seg;
  	max_advance = XLOGfileslop;
! 	if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
  								*use_existent, &max_advance,
  								use_lock))
  	{
--- 2435,2441 ----
  	installed_log = log;
  	installed_seg = seg;
  	max_advance = XLOGfileslop;
! 	if (!InstallXLogFileSegment(tli, &installed_log, &installed_seg, tmppath,
  								*use_existent, &max_advance,
  								use_lock))
  	{
***************
*** 2463,2564 **** XLogFileInit(uint32 log, uint32 seg,
  /*
   * Create a new XLOG file segment by copying a pre-existing one.
   *
!  * log, seg: identify segment to be created.
   *
!  * srcTLI, srclog, srcseg: identify segment to be copied (could be from
!  *		a different timeline)
   *
!  * Currently this is only used during recovery, and so there are no locking
!  * considerations.	But we should be just as tense as XLogFileInit to avoid
!  * emplacing a bogus file.
   */
  static void
! XLogFileCopy(uint32 log, uint32 seg,
! 			 TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
  {
- 	char		path[MAXPGPATH];
- 	char		tmppath[MAXPGPATH];
  	char		buffer[XLOG_BLCKSZ];
! 	int			srcfd;
! 	int			fd;
! 	int			nbytes;
  
  	/*
  	 * Open the source file
  	 */
! 	XLogFilePath(path, srcTLI, srclog, srcseg);
! 	srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
  	if (srcfd < 0)
  		ereport(ERROR,
  				(errcode_for_file_access(),
! 				 errmsg("could not open file \"%s\": %m", path)));
  
! 	/*
! 	 * Copy into a temp file name.
! 	 */
! 	snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
  
! 	unlink(tmppath);
  
! 	/* do not use get_sync_bit() here --- want to fsync only at end of fill */
! 	fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
! 					   S_IRUSR | S_IWUSR);
! 	if (fd < 0)
! 		ereport(ERROR,
! 				(errcode_for_file_access(),
! 				 errmsg("could not create file \"%s\": %m", tmppath)));
  
  	/*
  	 * Do the data copying.
  	 */
! 	for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
  	{
! 		errno = 0;
! 		if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
! 		{
! 			if (errno != 0)
! 				ereport(ERROR,
! 						(errcode_for_file_access(),
! 						 errmsg("could not read file \"%s\": %m", path)));
! 			else
! 				ereport(ERROR,
! 						(errmsg("not enough data in file \"%s\"", path)));
! 		}
! 		errno = 0;
! 		if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
! 		{
! 			int			save_errno = errno;
  
! 			/*
! 			 * If we fail to make the file, delete it to release disk space
! 			 */
! 			unlink(tmppath);
! 			/* if write didn't set errno, assume problem is no disk space */
! 			errno = save_errno ? save_errno : ENOSPC;
  
  			ereport(ERROR,
  					(errcode_for_file_access(),
! 					 errmsg("could not write to file \"%s\": %m", tmppath)));
  		}
  	}
  
! 	if (pg_fsync(fd) != 0)
! 		ereport(ERROR,
! 				(errcode_for_file_access(),
! 				 errmsg("could not fsync file \"%s\": %m", tmppath)));
  
  	if (close(fd))
  		ereport(ERROR,
  				(errcode_for_file_access(),
! 				 errmsg("could not close file \"%s\": %m", tmppath)));
  
  	close(srcfd);
- 
- 	/*
- 	 * Now move the segment into place with its final name.
- 	 */
- 	if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
- 		elog(ERROR, "InstallXLogFileSegment should not have failed");
  }
  
  /*
--- 2467,2557 ----
  /*
   * Create a new XLOG file segment by copying a pre-existing one.
   *
!  * tli, log, seg: identify segment to be created.
   *
!  * srcpath: identify segment to be copied.
   *
!  * offset: identify offset to start copying from.
   */
  static void
! XLogFileCopy(TimeLineID tli, uint32 log, uint32 seg, char *srcpath, uint32 offset)
  {
  	char		buffer[XLOG_BLCKSZ];
! 	int		srcfd;
! 	int		fd;
! 	bool		use_existent;
  
  	/*
  	 * Open the source file
  	 */
! 	srcfd = BasicOpenFile(srcpath, O_RDONLY | PG_BINARY, 0);
  	if (srcfd < 0)
  		ereport(ERROR,
  				(errcode_for_file_access(),
! 				 errmsg("could not open file \"%s\": %m", srcpath)));
  
! 	/* Create/use new log file */
! 	use_existent = true;
! 	fd = XLogFileInit(tli, log, seg, &use_existent, true);
  
! 	/* Need to seek in the file? */
! 	if (offset != 0)
! 	{
! 		if (lseek(srcfd, (off_t) offset, SEEK_SET) < 0)
! 			ereport(ERROR,
! 					(errcode_for_file_access(),
! 					 errmsg("could not seek in log file \"%s\": %m",
! 							srcpath)));
  
! 		if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
! 			ereport(ERROR,
! 					(errcode_for_file_access(),
! 					 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
! 							log, seg, offset)));
! 	}
  
  	/*
  	 * Do the data copying.
  	 */
! 	while (offset < XLogSegSize)
  	{
! 		int		nbytes;
  
! 		nbytes = read(srcfd, buffer, sizeof(buffer));
! 		if (nbytes <= 0)
! 			ereport(ERROR,
! 					(errcode_for_file_access(),
! 					 errmsg("could not read from log file \"%s\": %m",
! 							srcpath)));
  
+ 		/* OK to write the logs */
+ 		errno = 0;
+ 		if (write(fd, buffer, nbytes) != nbytes)
+ 		{
+ 			/* if write didn't set errno, assume no disk space */
+ 			if (errno == 0)
+ 				errno = ENOSPC;
  			ereport(ERROR,
  					(errcode_for_file_access(),
! 					 errmsg("could not write to log file %u, segment %u "
! 							"at offset %u, length %lu: %m",
! 							log, seg, offset, (unsigned long) nbytes)));
  		}
+ 
+ 		/* Update state for copy */
+ 		offset += nbytes;
  	}
  
! 	/* Issue appropriate kind of fsync */
! 	issue_xlog_fsync(fd, log, seg);
  
  	if (close(fd))
  		ereport(ERROR,
  				(errcode_for_file_access(),
! 				 errmsg("could not close log file %u, segment %u: %m",
! 						log, seg)));
  
  	close(srcfd);
  }
  
  /*
***************
*** 2567,2573 **** XLogFileCopy(uint32 log, uint32 seg,
   * This is used both to install a newly-created segment (which has a temp
   * filename while it's being created) and to recycle an old segment.
   *
!  * *log, *seg: identify segment to install as (or first possible target).
   * When find_free is TRUE, these are modified on return to indicate the
   * actual installation location or last segment searched.
   *
--- 2560,2566 ----
   * This is used both to install a newly-created segment (which has a temp
   * filename while it's being created) and to recycle an old segment.
   *
!  * tli, *log, *seg: identify segment to install as (or first possible target).
   * When find_free is TRUE, these are modified on return to indicate the
   * actual installation location or last segment searched.
   *
***************
*** 2591,2604 **** XLogFileCopy(uint32 log, uint32 seg,
   * file into place.
   */
  static bool
! InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
  					   bool find_free, int *max_advance,
  					   bool use_lock)
  {
  	char		path[MAXPGPATH];
  	struct stat stat_buf;
  
! 	XLogFilePath(path, ThisTimeLineID, *log, *seg);
  
  	/*
  	 * We want to be sure that only one process does this at a time.
--- 2584,2597 ----
   * file into place.
   */
  static bool
! InstallXLogFileSegment(TimeLineID tli, uint32 *log, uint32 *seg, char *tmppath,
  					   bool find_free, int *max_advance,
  					   bool use_lock)
  {
  	char		path[MAXPGPATH];
  	struct stat stat_buf;
  
! 	XLogFilePath(path, tli, *log, *seg);
  
  	/*
  	 * We want to be sure that only one process does this at a time.
***************
*** 2625,2631 **** InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
  			}
  			NextLogSeg(*log, *seg);
  			(*max_advance)--;
! 			XLogFilePath(path, ThisTimeLineID, *log, *seg);
  		}
  	}
  
--- 2618,2624 ----
  			}
  			NextLogSeg(*log, *seg);
  			(*max_advance)--;
! 			XLogFilePath(path, tli, *log, *seg);
  		}
  	}
  
***************
*** 2729,2734 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
--- 2722,2757 ----
  			elog(ERROR, "invalid XLogFileRead source %d", source);
  	}
  
+ 	/*
+ 	 * If cascade replication is allowed, and we've just restored an archived
+ 	 * WAL file to temporary file, we copy it to the WAL file with correct
+ 	 * name, so that cascading walsenders can treat it.
+ 	 */
+ 	if (source == XLOG_FROM_ARCHIVE && AllowCascadeReplication())
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile XLogCtlData *xlogctl = XLogCtl;
+ 		XLogRecPtr		endptr;
+ 
+ 		XLogFileCopy(curFileTLI, log, seg, path, readOff);
+ 
+ 		/*
+ 		 * Calculate the end location of the restored WAL file and save it in
+ 		 * shmem. It's used as current standby flush position, and cascading
+ 		 * walsenders try to send WAL records up to this location.
+ 		 */
+ 		endptr.xlogid = log;
+ 		endptr.xrecoff = seg * XLogSegSize;
+ 		XLByteAdvance(endptr, XLogSegSize);
+ 
+ 		SpinLockAcquire(&xlogctl->info_lck);
+ 		xlogctl->restoreLastRecPtr = endptr;
+ 		SpinLockRelease(&xlogctl->info_lck);
+ 
+ 		/* Signal walsender that new WAL has arrived */
+ 		WalSndWakeup();
+ 	}
+ 
  	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
  	if (fd >= 0)
  	{
***************
*** 3255,3261 **** PreallocXlogFiles(XLogRecPtr endptr)
  	{
  		NextLogSeg(_logId, _logSeg);
  		use_existent = true;
! 		lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
  		close(lf);
  		if (!use_existent)
  			CheckpointStats.ckpt_segs_added++;
--- 3278,3284 ----
  	{
  		NextLogSeg(_logId, _logSeg);
  		use_existent = true;
! 		lf = XLogFileInit(ThisTimeLineID, _logId, _logSeg, &use_existent, true);
  		close(lf);
  		if (!use_existent)
  			CheckpointStats.ckpt_segs_added++;
***************
*** 3386,3392 **** RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
  				 * separate archive directory.
  				 */
  				if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
! 					InstallXLogFileSegment(&endlogId, &endlogSeg, path,
  										   true, &max_advance, true))
  				{
  					ereport(DEBUG2,
--- 3409,3415 ----
  				 * separate archive directory.
  				 */
  				if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
! 					InstallXLogFileSegment(ThisTimeLineID, &endlogId, &endlogSeg, path,
  										   true, &max_advance, true))
  				{
  					ereport(DEBUG2,
***************
*** 5153,5159 **** BootStrapXLOG(void)
  
  	/* Create first XLOG segment file */
  	use_existent = false;
! 	openLogFile = XLogFileInit(0, 1, &use_existent, false);
  
  	/* Write the first page with the initial record */
  	errno = 0;
--- 5176,5182 ----
  
  	/* Create first XLOG segment file */
  	use_existent = false;
! 	openLogFile = XLogFileInit(ThisTimeLineID, 0, 1, &use_existent, false);
  
  	/* Write the first page with the initial record */
  	errno = 0;
***************
*** 5533,5540 **** exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
  		 */
  		if (endTLI != ThisTimeLineID)
  		{
! 			XLogFileCopy(endLogId, endLogSeg,
! 						 endTLI, endLogId, endLogSeg);
  
  			if (XLogArchivingActive())
  			{
--- 5556,5564 ----
  		 */
  		if (endTLI != ThisTimeLineID)
  		{
! 			XLogFilePath(xlogpath, endTLI, endLogId, endLogSeg);
! 			XLogFileCopy(ThisTimeLineID, endLogId, endLogSeg,
! 						 xlogpath, 0);
  
  			if (XLogArchivingActive())
  			{
***************
*** 8158,8163 **** CreateRestartPoint(int flags)
--- 8182,8227 ----
  		/* Get the current (or recent) end of xlog */
  		endptr = GetWalRcvWriteRecPtr(NULL);
  
+ 		/*
+ 		 * Calculate the last segment that we need to retain because of
+ 		 * wal_keep_segments, by subtracting wal_keep_segments from
+ 		 * current end of xlog.
+ 		 */
+ 		if (wal_keep_segments > 0)
+ 		{
+ 			uint32		log;
+ 			uint32		seg;
+ 			int			d_log;
+ 			int			d_seg;
+ 
+ 			XLByteToSeg(endptr, log, seg);
+ 
+ 			d_seg = wal_keep_segments % XLogSegsPerFile;
+ 			d_log = wal_keep_segments / XLogSegsPerFile;
+ 			if (seg < d_seg)
+ 			{
+ 				d_log += 1;
+ 				seg = seg - d_seg + XLogSegsPerFile;
+ 			}
+ 			else
+ 				seg = seg - d_seg;
+ 			/* avoid underflow, don't go below (0,1) */
+ 			if (log < d_log || (log == d_log && seg == 0))
+ 			{
+ 				log = 0;
+ 				seg = 1;
+ 			}
+ 			else
+ 				log = log - d_log;
+ 
+ 			/* don't delete WAL segments newer than the calculated segment */
+ 			if (log < _logId || (log == _logId && seg < _logSeg))
+ 			{
+ 				_logId = log;
+ 				_logSeg = seg;
+ 			}
+ 		}
+ 
  		PrevLogSeg(_logId, _logSeg);
  		RemoveOldXlogFiles(_logId, _logSeg, endptr);
  
***************
*** 9545,9554 **** pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
  /*
   * Get latest redo apply position.
   *
   * Exported to allow WALReceiver to read the pointer directly.
   */
  XLogRecPtr
! GetXLogReplayRecPtr(void)
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
--- 9609,9622 ----
  /*
   * Get latest redo apply position.
   *
+  * Optionally, returns the end byte position of the last restored
+  * WAL segment. Callers not interested in that value may pass
+  * NULL for restoreLastRecPtr.
+  *
   * Exported to allow WALReceiver to read the pointer directly.
   */
  XLogRecPtr
! GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
***************
*** 9556,9567 **** GetXLogReplayRecPtr(void)
--- 9624,9657 ----
  
  	SpinLockAcquire(&xlogctl->info_lck);
  	recptr = xlogctl->recoveryLastRecPtr;
+ 	if (restoreLastRecPtr)
+ 		*restoreLastRecPtr = xlogctl->restoreLastRecPtr;
  	SpinLockRelease(&xlogctl->info_lck);
  
  	return recptr;
  }
  
  /*
+  * Get current standby flush position, ie, the last WAL position
+  * known to be fsync'd to disk in standby.
+  */
+ XLogRecPtr
+ GetStandbyFlushRecPtr(void)
+ {
+ 	XLogRecPtr	receivePtr;
+ 	XLogRecPtr	replayPtr;
+ 	XLogRecPtr	restorePtr;
+ 
+ 	receivePtr = GetWalRcvWriteRecPtr(NULL);
+ 	replayPtr = GetXLogReplayRecPtr(&restorePtr);
+ 
+ 	if (XLByteLT(receivePtr, replayPtr))
+ 		return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr;
+ 	else
+ 		return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr;
+ }
+ 
+ /*
   * Report the last WAL replay location (same format as pg_start_backup etc)
   *
   * This is useful for determining how much of WAL is visible to read-only
***************
*** 9573,9579 **** pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
  	XLogRecPtr	recptr;
  	char		location[MAXFNAMELEN];
  
! 	recptr = GetXLogReplayRecPtr();
  
  	if (recptr.xlogid == 0 && recptr.xrecoff == 0)
  		PG_RETURN_NULL();
--- 9663,9669 ----
  	XLogRecPtr	recptr;
  	char		location[MAXFNAMELEN];
  
! 	recptr = GetXLogReplayRecPtr(NULL);
  
  	if (recptr.xlogid == 0 && recptr.xrecoff == 0)
  		PG_RETURN_NULL();
***************
*** 10062,10067 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
--- 10152,10158 ----
  	}
  
  	XLByteToSeg(*RecPtr, readId, readSeg);
+ 	readOff = RecPtr->xrecoff % XLogSegSize;
  
  retry:
  	/* See if we need to retrieve more data */
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 351,357 **** static void processCancelRequest(Port *port, void *pkt);
  static int	initMasks(fd_set *rmask);
  static void report_fork_failure_to_client(Port *port, int errnum);
  static CAC_state canAcceptConnections(void);
- static long PostmasterRandom(void);
  static void RandomSalt(char *md5Salt);
  static void signal_child(pid_t pid, int signal);
  static bool SignalSomeChildren(int signal, int targets);
--- 351,356 ----
***************
*** 2410,2415 **** reaper(SIGNAL_ARGS)
--- 2409,2423 ----
  			pmState = PM_RUN;
  
  			/*
+ 			 * Kill the cascading walsender to urge the cascaded standby to
+ 			 * reread the timeline history file, adjust its timeline and
+ 			 * establish replication connection again. This is required
+ 			 * because the timeline of cascading standby is not consistent
+ 			 * with that of cascaded one just after failover.
+ 			 */
+ 			SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND);
+ 
+ 			/*
  			 * Crank up the background writer, if we didn't do that already
  			 * when we entered consistent recovery state.  It doesn't matter
  			 * if this fails, we'll just try again later.
***************
*** 4369,4375 **** RandomSalt(char *md5Salt)
  /*
   * PostmasterRandom
   */
! static long
  PostmasterRandom(void)
  {
  	/*
--- 4377,4383 ----
  /*
   * PostmasterRandom
   */
! long
  PostmasterRandom(void)
  {
  	/*
*** a/src/backend/replication/basebackup.c
--- b/src/backend/replication/basebackup.c
***************
*** 339,344 **** SendBaseBackup(BaseBackupCmd *cmd)
--- 339,349 ----
  	MemoryContext old_context;
  	basebackup_options opt;
  
+ 	if (cascading_walsender)
+ 		ereport(FATAL,
+ 				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
+ 				 errmsg("recovery is still in progress, can't accept WAL streaming connections for backup")));
+ 
  	parse_basebackup_options(cmd->options, &opt);
  
  	backup_context = AllocSetContextCreate(CurrentMemoryContext,
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 83,88 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
--- 83,90 ----
  	char		standby_sysid[32];
  	TimeLineID	primary_tli;
  	TimeLineID	standby_tli;
+ 	long		primary_key;
+ 	long		standby_key;
  	PGresult   *res;
  	char		cmd[64];
  
***************
*** 110,120 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	{
  		PQclear(res);
  		ereport(ERROR,
! 				(errmsg("could not receive database system identifier and timeline ID from "
! 						"the primary server: %s",
  						PQerrorMessage(streamConn))));
  	}
! 	if (PQnfields(res) != 3 || PQntuples(res) != 1)
  	{
  		int			ntuples = PQntuples(res);
  		int			nfields = PQnfields(res);
--- 112,122 ----
  	{
  		PQclear(res);
  		ereport(ERROR,
! 				(errmsg("could not receive database system identifier, timeline ID and "
! 						"identification key from the primary server: %s",
  						PQerrorMessage(streamConn))));
  	}
! 	if (PQnfields(res) != 4 || PQntuples(res) != 1)
  	{
  		int			ntuples = PQntuples(res);
  		int			nfields = PQnfields(res);
***************
*** 122,132 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  		PQclear(res);
  		ereport(ERROR,
  				(errmsg("invalid response from primary server"),
! 				 errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
  						   ntuples, nfields)));
  	}
  	primary_sysid = PQgetvalue(res, 0, 0);
  	primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
  
  	/*
  	 * Confirm that the system identifier of the primary is the same as ours.
--- 124,135 ----
  		PQclear(res);
  		ereport(ERROR,
  				(errmsg("invalid response from primary server"),
! 				 errdetail("Expected 1 tuple with 4 fields, got %d tuples with %d fields.",
  						   ntuples, nfields)));
  	}
  	primary_sysid = PQgetvalue(res, 0, 0);
  	primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+ 	primary_key = (long) pg_atoi(PQgetvalue(res, 0, 3), 4, 0);
  
  	/*
  	 * Confirm that the system identifier of the primary is the same as ours.
***************
*** 147,159 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	 * recovery target timeline.
  	 */
  	standby_tli = GetRecoveryTargetTLI();
- 	PQclear(res);
  	if (primary_tli != standby_tli)
  		ereport(ERROR,
  				(errmsg("timeline %u of the primary does not match recovery target timeline %u",
  						primary_tli, standby_tli)));
  	ThisTimeLineID = primary_tli;
  
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
--- 150,174 ----
  	 * recovery target timeline.
  	 */
  	standby_tli = GetRecoveryTargetTLI();
  	if (primary_tli != standby_tli)
+ 	{
+ 		PQclear(res);
  		ereport(ERROR,
  				(errmsg("timeline %u of the primary does not match recovery target timeline %u",
  						primary_tli, standby_tli)));
+ 	}
  	ThisTimeLineID = primary_tli;
  
+ 	/*
+ 	 * Confirm that the walreceiver is not connecting to its own standby.
+ 	 */
+ 	standby_key = GetWalRcvKey();
+ 	PQclear(res);
+ 	if (primary_key == standby_key)
+ 		ereport(ERROR,
+ 				(errmsg("the standby is just connecting to that standby itself"),
+ 				 errhint("The standby must connect to the primary or another standby.")));
+ 
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 469,474 **** SyncRepGetStandbyPriority(void)
--- 469,481 ----
  	int			priority = 0;
  	bool		found = false;
  
+ 	/*
+ 	 * Since synchronous cascade replication is not allowed, we always
+ 	 * set the priority of cascading walsender to zero.
+ 	 */
+ 	if (cascading_walsender)
+ 		return 0;
+ 
  	/* Need a modifiable copy of string */
  	rawstring = pstrdup(SyncRepStandbyNames);
  
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 42,49 ****
--- 42,51 ----
  #include "access/xlog_internal.h"
  #include "libpq/pqsignal.h"
  #include "miscadmin.h"
+ #include "postmaster/postmaster.h"
  #include "replication/walprotocol.h"
  #include "replication/walreceiver.h"
+ #include "replication/walsender.h"
  #include "storage/ipc.h"
  #include "storage/pmsignal.h"
  #include "storage/procarray.h"
***************
*** 171,176 **** WalReceiverMain(void)
--- 173,179 ----
  {
  	char		conninfo[MAXCONNINFO];
  	XLogRecPtr	startpoint;
+ 	long		walRcvKey;
  
  	/* use volatile pointer to prevent code rearrangement */
  	volatile WalRcvData *walrcv = WalRcv;
***************
*** 184,189 **** WalReceiverMain(void)
--- 187,199 ----
  	Assert(walrcv != NULL);
  
  	/*
+ 	 * Compute the identification key that will be assigned to this walreceiver.
+ 	 * This key is used to check that this walreceiver is not connecting to its
+ 	 * standby itself.
+ 	 */
+ 	walRcvKey = PostmasterRandom();
+ 
+ 	/*
  	 * Mark walreceiver as running in shared memory.
  	 *
  	 * Do this as early as possible, so that if we fail later on, we'll set
***************
*** 215,220 **** WalReceiverMain(void)
--- 225,231 ----
  	/* Advertise our PID so that the startup process can kill us */
  	walrcv->pid = MyProcPid;
  	walrcv->walRcvState = WALRCV_RUNNING;
+ 	walrcv->walRcvKey = walRcvKey;
  
  	/* Fetch information required to start streaming */
  	strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
***************
*** 354,359 **** WalRcvDie(int code, Datum arg)
--- 365,371 ----
  	Assert(walrcv->walRcvState == WALRCV_RUNNING ||
  		   walrcv->walRcvState == WALRCV_STOPPING);
  	walrcv->walRcvState = WALRCV_STOPPED;
+ 	walrcv->walRcvKey = 0;
  	walrcv->pid = 0;
  	SpinLockRelease(&walrcv->mutex);
  
***************
*** 485,491 **** XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
  			/* Create/use new log file */
  			XLByteToSeg(recptr, recvId, recvSeg);
  			use_existent = true;
! 			recvFile = XLogFileInit(recvId, recvSeg, &use_existent, true);
  			recvOff = 0;
  		}
  
--- 497,503 ----
  			/* Create/use new log file */
  			XLByteToSeg(recptr, recvId, recvSeg);
  			use_existent = true;
! 			recvFile = XLogFileInit(ThisTimeLineID, recvId, recvSeg, &use_existent, true);
  			recvOff = 0;
  		}
  
***************
*** 564,571 **** XLogWalRcvFlush(bool dying)
  		}
  		SpinLockRelease(&walrcv->mutex);
  
! 		/* Signal the startup process that new WAL has arrived */
  		WakeupRecovery();
  
  		/* Report XLOG streaming progress in PS display */
  		if (update_process_title)
--- 576,585 ----
  		}
  		SpinLockRelease(&walrcv->mutex);
  
! 		/* Signal the startup process and walsender that new WAL has arrived */
  		WakeupRecovery();
+ 		if (AllowCascadeReplication())
+ 			WalSndWakeup();
  
  		/* Report XLOG streaming progress in PS display */
  		if (update_process_title)
***************
*** 625,631 **** XLogWalRcvSendReply(void)
  	/* Construct a new message */
  	reply_message.write = LogstreamResult.Write;
  	reply_message.flush = LogstreamResult.Flush;
! 	reply_message.apply = GetXLogReplayRecPtr();
  	reply_message.sendTime = now;
  
  	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
--- 639,645 ----
  	/* Construct a new message */
  	reply_message.write = LogstreamResult.Write;
  	reply_message.flush = LogstreamResult.Flush;
! 	reply_message.apply = GetXLogReplayRecPtr(NULL);
  	reply_message.sendTime = now;
  
  	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
*** a/src/backend/replication/walreceiverfuncs.c
--- b/src/backend/replication/walreceiverfuncs.c
***************
*** 238,240 **** GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
--- 238,257 ----
  
  	return recptr;
  }
+ 
+ /*
+  * Returns the identification key for the walreceiver.
+  */
+ long
+ GetWalRcvKey(void)
+ {
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 	long	walRcvKey;
+ 
+ 	SpinLockAcquire(&walrcv->mutex);
+ 	walRcvKey = walrcv->walRcvKey;
+ 	SpinLockRelease(&walrcv->mutex);
+ 
+ 	return walRcvKey;
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 48,53 ****
--- 48,54 ----
  #include "replication/basebackup.h"
  #include "replication/replnodes.h"
  #include "replication/walprotocol.h"
+ #include "replication/walreceiver.h"
  #include "replication/walsender.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
***************
*** 70,75 **** WalSnd	   *MyWalSnd = NULL;
--- 71,77 ----
  
  /* Global state */
  bool		am_walsender = false;		/* Am I a walsender process ? */
+ bool		cascading_walsender = false;	/* Am I cascading WAL to another standby ? */
  
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
***************
*** 135,144 **** WalSenderMain(void)
  {
  	MemoryContext walsnd_context;
  
! 	if (RecoveryInProgress())
! 		ereport(FATAL,
! 				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
! 				 errmsg("recovery is still in progress, can't accept WAL streaming connections")));
  
  	/* Create a per-walsender data structure in shared memory */
  	InitWalSnd();
--- 137,143 ----
  {
  	MemoryContext walsnd_context;
  
! 	cascading_walsender = RecoveryInProgress();
  
  	/* Create a per-walsender data structure in shared memory */
  	InitWalSnd();
***************
*** 165,170 **** WalSenderMain(void)
--- 164,175 ----
  	/* Unblock signals (they were blocked when the postmaster forked us) */
  	PG_SETMASK(&UnBlockSig);
  
+ 	/*
+ 	 * Use the recovery target timeline ID during recovery
+ 	 */
+ 	if (cascading_walsender)
+ 		ThisTimeLineID = GetRecoveryTargetTLI();
+ 
  	/* Tell the standby that walsender is ready for receiving commands */
  	ReadyForQuery(DestRemote);
  
***************
*** 279,303 **** IdentifySystem(void)
  	char		sysid[32];
  	char		tli[11];
  	char		xpos[MAXFNAMELEN];
  	XLogRecPtr	logptr;
  
  	/*
! 	 * Reply with a result set with one row, three columns. First col is
! 	 * system ID, second is timeline ID, and third is current xlog location.
  	 */
  
  	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
  			 GetSystemIdentifier());
  	snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
  
! 	logptr = GetInsertRecPtr();
  
  	snprintf(xpos, sizeof(xpos), "%X/%X",
  			 logptr.xlogid, logptr.xrecoff);
  
  	/* Send a RowDescription message */
  	pq_beginmessage(&buf, 'T');
! 	pq_sendint(&buf, 3, 2);		/* 3 fields */
  
  	/* first field */
  	pq_sendstring(&buf, "systemid");	/* col name */
--- 284,312 ----
  	char		sysid[32];
  	char		tli[11];
  	char		xpos[MAXFNAMELEN];
+ 	char		key[32];
  	XLogRecPtr	logptr;
  
  	/*
! 	 * Reply with a result set with one row, four columns. First col is
! 	 * system ID, second is timeline ID, third is current xlog location,
! 	 * and fourth is identification key.
  	 */
  
  	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
  			 GetSystemIdentifier());
  	snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
  
! 	logptr = cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
  
  	snprintf(xpos, sizeof(xpos), "%X/%X",
  			 logptr.xlogid, logptr.xrecoff);
  
+ 	snprintf(key, sizeof(key), "%d", (int32) GetWalRcvKey());
+ 
  	/* Send a RowDescription message */
  	pq_beginmessage(&buf, 'T');
! 	pq_sendint(&buf, 4, 2);		/* 4 fields */
  
  	/* first field */
  	pq_sendstring(&buf, "systemid");	/* col name */
***************
*** 325,341 **** IdentifySystem(void)
  	pq_sendint(&buf, -1, 2);
  	pq_sendint(&buf, 0, 4);
  	pq_sendint(&buf, 0, 2);
  	pq_endmessage(&buf);
  
  	/* Send a DataRow message */
  	pq_beginmessage(&buf, 'D');
! 	pq_sendint(&buf, 3, 2);		/* # of columns */
  	pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
  	pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
  	pq_sendint(&buf, strlen(tli), 4);	/* col2 len */
  	pq_sendbytes(&buf, (char *) tli, strlen(tli));
  	pq_sendint(&buf, strlen(xpos), 4);	/* col3 len */
  	pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
  
  	pq_endmessage(&buf);
  
--- 334,361 ----
  	pq_sendint(&buf, -1, 2);
  	pq_sendint(&buf, 0, 4);
  	pq_sendint(&buf, 0, 2);
+ 
+ 	/* fourth field */
+ 	pq_sendstring(&buf, "identificationkey");
+ 	pq_sendint(&buf, 0, 4);
+ 	pq_sendint(&buf, 0, 2);
+ 	pq_sendint(&buf, INT4OID, 4);
+ 	pq_sendint(&buf, 4, 2);
+ 	pq_sendint(&buf, 0, 4);
+ 	pq_sendint(&buf, 0, 2);
  	pq_endmessage(&buf);
  
  	/* Send a DataRow message */
  	pq_beginmessage(&buf, 'D');
! 	pq_sendint(&buf, 4, 2);		/* # of columns */
  	pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
  	pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
  	pq_sendint(&buf, strlen(tli), 4);	/* col2 len */
  	pq_sendbytes(&buf, (char *) tli, strlen(tli));
  	pq_sendint(&buf, strlen(xpos), 4);	/* col3 len */
  	pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+ 	pq_sendint(&buf, strlen(key), 4);	/* col4 len */
+ 	pq_sendbytes(&buf, (char *) key, strlen(key));
  
  	pq_endmessage(&buf);
  
***************
*** 372,379 **** StartReplication(StartReplicationCmd *cmd)
  	 * directory that was created with 'minimal'. So this is not bulletproof,
  	 * the purpose is just to give a user-friendly error message that hints
  	 * how to configure the system correctly.
  	 */
! 	if (wal_level == WAL_LEVEL_MINIMAL)
  		ereport(FATAL,
  				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  		errmsg("standby connections not allowed because wal_level=minimal")));
--- 392,403 ----
  	 * directory that was created with 'minimal'. So this is not bulletproof,
  	 * the purpose is just to give a user-friendly error message that hints
  	 * how to configure the system correctly.
+ 	 *
+ 	 * NOTE: The existence of cascading walsender means that wal_level is set
+ 	 * to hot_standby in the master. So we don't need to check the value of
+ 	 * wal_level during recovery.
  	 */
! 	if (!cascading_walsender && wal_level == WAL_LEVEL_MINIMAL)
  		ereport(FATAL,
  				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  		errmsg("standby connections not allowed because wal_level=minimal")));
***************
*** 601,607 **** ProcessStandbyReplyMessage(void)
  		SpinLockRelease(&walsnd->mutex);
  	}
  
! 	SyncRepReleaseWaiters();
  }
  
  /*
--- 625,632 ----
  		SpinLockRelease(&walsnd->mutex);
  	}
  
! 	if (!cascading_walsender)
! 		SyncRepReleaseWaiters();
  }
  
  /*
***************
*** 1079,1085 **** XLogSend(char *msgbuf, bool *caughtup)
  	 * subsequently crashes and restarts, slaves must not have applied any WAL
  	 * that gets lost on the master.
  	 */
! 	SendRqstPtr = GetFlushRecPtr();
  
  	/* Quick exit if nothing to do */
  	if (XLByteLE(SendRqstPtr, sentPtr))
--- 1104,1110 ----
  	 * subsequently crashes and restarts, slaves must not have applied any WAL
  	 * that gets lost on the master.
  	 */
! 	SendRqstPtr = cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
  
  	/* Quick exit if nothing to do */
  	if (XLByteLE(SendRqstPtr, sentPtr))
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 221,226 **** extern int	wal_level;
--- 221,229 ----
  /* Do we need to WAL-log information required only for Hot Standby? */
  #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY)
  
+ /* Can we allow the standby to accept replication connection from another standby? */
+ #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
+ 
  #ifdef WAL_DEBUG
  extern bool XLOG_DEBUG;
  #endif
***************
*** 273,279 **** extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
  extern void XLogFlush(XLogRecPtr RecPtr);
  extern void XLogBackgroundFlush(void);
  extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
! extern int XLogFileInit(uint32 log, uint32 seg,
  			 bool *use_existent, bool use_lock);
  extern int	XLogFileOpen(uint32 log, uint32 seg);
  
--- 276,282 ----
  extern void XLogFlush(XLogRecPtr RecPtr);
  extern void XLogBackgroundFlush(void);
  extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
! extern int XLogFileInit(TimeLineID tli, uint32 log, uint32 seg,
  			 bool *use_existent, bool use_lock);
  extern int	XLogFileOpen(uint32 log, uint32 seg);
  
***************
*** 292,298 **** extern bool RecoveryInProgress(void);
  extern bool HotStandbyActive(void);
  extern bool XLogInsertAllowed(void);
  extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
! extern XLogRecPtr GetXLogReplayRecPtr(void);
  
  extern void UpdateControlFile(void);
  extern uint64 GetSystemIdentifier(void);
--- 295,302 ----
  extern bool HotStandbyActive(void);
  extern bool XLogInsertAllowed(void);
  extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
! extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
! extern XLogRecPtr GetStandbyFlushRecPtr(void);
  
  extern void UpdateControlFile(void);
  extern uint64 GetSystemIdentifier(void);
*** a/src/include/postmaster/postmaster.h
--- b/src/include/postmaster/postmaster.h
***************
*** 42,47 **** extern void ClosePostmasterPorts(bool am_syslogger);
--- 42,49 ----
  
  extern int	MaxLivePostmasterChildren(void);
  
+ extern long PostmasterRandom(void);
+ 
  #ifdef EXEC_BACKEND
  extern pid_t postmaster_forkexec(int argc, char *argv[]);
  extern int	SubPostmasterMain(int argc, char *argv[]);
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 43,55 **** typedef enum
  typedef struct
  {
  	/*
! 	 * PID of currently active walreceiver process, its current state and
  	 * start time (actually, the time at which it was requested to be
! 	 * started).
  	 */
  	pid_t		pid;
  	WalRcvState walRcvState;
  	pg_time_t	startTime;
  
  	/*
  	 * receiveStart is the first byte position that will be received. When
--- 43,56 ----
  typedef struct
  {
  	/*
! 	 * PID of currently active walreceiver process, its current state,
  	 * start time (actually, the time at which it was requested to be
! 	 * started) and identification key for this walreceiver.
  	 */
  	pid_t		pid;
  	WalRcvState walRcvState;
  	pg_time_t	startTime;
+ 	long		walRcvKey;
  
  	/*
  	 * receiveStart is the first byte position that will be received. When
***************
*** 107,112 **** extern void WalRcvShmemInit(void);
--- 108,115 ----
  extern void ShutdownWalRcv(void);
  extern bool WalRcvInProgress(void);
  extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
+ extern void SetWalRcvWriteRecPtr(XLogRecPtr recptr);
  extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+ extern long GetWalRcvKey(void);
  
  #endif   /* _WALRECEIVER_H */
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 92,97 **** extern WalSndCtlData *WalSndCtl;
--- 92,98 ----
  
  /* global state */
  extern bool am_walsender;
+ extern bool cascading_walsender;
  extern volatile sig_atomic_t walsender_shutdown_requested;
  extern volatile sig_atomic_t walsender_ready_to_stop;