gist-split-hotstandby-90.patch
text/x-diff
Filename: gist-split-hotstandby-90.patch
Type: text/x-diff
Part: 0
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 02c4ec3..60fc173 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -151,7 +151,6 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
*/
GistPageSetLeaf(page);
- GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
@@ -222,16 +221,28 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
Page page;
int i;
bool isrootsplit = false;
+ Buffer *buffers;
+ /*
+ * If this split inserted a downlink for a child at lower level, we can
+ * now set the NSN and clear the follow-right flag on that child. It's
+ * OK to do this before locking the parent page. If a concurrent scan
+ * reads this parent page after we've already cleared the follow-right
+ * flag on the child, it'll still follow the rightlink because of the
+ * NSN.
+ */
if (BlockNumberIsValid(xldata->leftchild))
gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
decodePageSplitRecord(&xlrec, record);
- /* loop around all pages */
+ /*
+ * Lock all the pages involved in the split first, so that any concurrent
+ * scans in hot standby mode will see the split as an atomic operation.
+ */
+ buffers = palloc(xlrec.data->npage * sizeof(Buffer));
for (i = 0; i < xlrec.data->npage; i++)
{
NewPage *newpage = xlrec.page + i;
- int flags;
if (newpage->header->blkno == GIST_ROOT_BLKNO)
{
@@ -239,8 +250,19 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
isrootsplit = true;
}
- buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
- Assert(BufferIsValid(buffer));
+ buffers[i] = XLogReadBuffer(xlrec.data->node,
+ newpage->header->blkno,
+ true);
+ Assert(BufferIsValid(buffers[i]));
+ }
+
+ /* Write out all the pages */
+ for (i = 0; i < xlrec.data->npage; i++)
+ {
+ NewPage *newpage = xlrec.page + i;
+ int flags;
+
+ buffer = buffers[i];
page = (Page) BufferGetPage(buffer);
/* ok, clear buffer */
@@ -277,6 +299,8 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
+
+ pfree(buffers);
}
static void