gist-split-hotstandby-90.patch
text/x-diff
Filename: gist-split-hotstandby-90.patch
Type: text/x-diff
Part: 0
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 82ba726..71c145d 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -377,9 +377,18 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
state->ituplen++;
}
- /* saves old rightlink */
+ /* save old rightlink and NSN */
if (state->stack->blkno != GIST_ROOT_BLKNO)
+ {
rrlink = GistPageGetOpaque(dist->page)->rightlink;
+ oldnsn = GistPageGetOpaque(dist->page)->nsn;
+ }
+ else
+ {
+ /* if root split we should put initial value */
+ rrlink = InvalidBlockNumber;
+ oldnsn = PageGetLSN(dist->page);
+ }
START_CRIT_SECTION();
@@ -407,7 +416,8 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
XLogRecData *rdata;
rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
- is_leaf, &(state->key), dist);
+ is_leaf, &(state->key), dist,
+ rrlink, &oldnsn);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
@@ -425,12 +435,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
}
}
- /* set up NSN */
- oldnsn = GistPageGetOpaque(dist->page)->nsn;
- if (state->stack->blkno == GIST_ROOT_BLKNO)
- /* if root split we should put initial value */
- oldnsn = PageGetLSN(dist->page);
-
for (ptr = dist; ptr; ptr = ptr->next)
{
/* only for last set oldnsn */
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 7f5dd99..cdd8aaf 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -39,6 +39,8 @@ typedef struct
{
gistxlogPageSplit *data;
NewPage *page;
+ BlockNumber origrlink;
+ XLogRecPtr orignsn;
} PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
@@ -250,7 +252,6 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
*/
GistPageSetLeaf(page);
- GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
@@ -310,6 +311,26 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
j++;
}
}
+
+ /*
+ * Starting with 9.0.5, the original NSN and rightlink on the split page
+ * are stored here. It would've been more logical to add them to the
+ * gistxlogPageSplit struct, but that would've broken compatibility with
+ * the pre-9.0.5 WAL format.
+ */
+ if (ptr - begin < record->xl_len)
+ {
+ memcpy(&decoded->origrlink, ptr, sizeof(BlockNumber));
+ ptr += sizeof(BlockNumber);
+ memcpy(&decoded->orignsn, ptr, sizeof(XLogRecPtr));
+ }
+ else
+ {
+ /* pre-9.0.5 format, no rightlink/NSN information */
+ decoded->origrlink = InvalidBlockNumber;
+ decoded->orignsn.xlogid = 0;
+ decoded->orignsn.xrecoff = 0;
+ }
}
static void
@@ -320,17 +341,32 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
Page page;
int i;
int flags;
+ Buffer *buffers;
decodePageSplitRecord(&xlrec, record);
flags = xlrec.data->origleaf ? F_LEAF : 0;
- /* loop around all pages */
+ /*
+ * Lock all the pages involved in the split first, so that any concurrent
+ * scans in hot standby mode will see the split as an atomic operation.
+ */
+ buffers = palloc(xlrec.data->npage * sizeof(Buffer));
for (i = 0; i < xlrec.data->npage; i++)
{
NewPage *newpage = xlrec.page + i;
- buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
- Assert(BufferIsValid(buffer));
+ buffers[i] = XLogReadBuffer(xlrec.data->node,
+ newpage->header->blkno,
+ true);
+ page = (Page) BufferGetPage(buffers[i]);
+ }
+
+ /* Write out all the pages */
+ for (i = 0; i < xlrec.data->npage; i++)
+ {
+ NewPage *newpage = xlrec.page + i;
+
+ buffer = buffers[i];
page = (Page) BufferGetPage(buffer);
/* ok, clear buffer */
@@ -339,6 +375,18 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
/* and fill it */
gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
+ /* Set NSN and rightlink, needed for concurrent scans in hot standby */
+ if (i == xlrec.data->npage - 1)
+ {
+ GistPageGetOpaque(page)->nsn = xlrec.orignsn;
+ GistPageGetOpaque(page)->rightlink = xlrec.origrlink;
+ }
+ else
+ {
+ GistPageGetOpaque(page)->nsn = lsn;
+ GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
@@ -350,6 +398,8 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
NULL, 0,
&xlrec);
+
+ pfree(buffers);
}
static void
@@ -655,6 +705,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
XLogRecPtr recptr;
Buffer tempbuffer = InvalidBuffer;
int ntodelete = 0;
+ BlockNumber rrlink;
+ XLogRecPtr oldnsn;
numbuffer = 1;
buffers[0] = ReadBuffer(index, insert->path[i]);
@@ -691,6 +743,10 @@ gistContinueInsert(gistIncompleteInsert *insert)
if (ntodelete == 0)
elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");
+ /* Remember old rightlink and NSN */
+ rrlink = GistPageGetOpaque(pages[0])->rightlink;
+ oldnsn = GistPageGetOpaque(pages[0])->nsn;
+
/*
* we check space with subtraction only first tuple to delete,
* hope, that wiil be enough space....
@@ -742,7 +798,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
xlinfo = XLOG_GIST_PAGE_SPLIT;
rdata = formSplitRdata(index->rd_node, insert->path[i],
false, &(insert->key),
- gistMakePageLayout(buffers, numbuffer));
+ gistMakePageLayout(buffers, numbuffer),
+ rrlink, &oldnsn);
}
else
@@ -849,7 +906,8 @@ gist_safe_restartpoint(void)
XLogRecData *
formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
- ItemPointer key, SplitedPageLayout *dist)
+ ItemPointer key, SplitedPageLayout *dist,
+ BlockNumber origrlink, XLogRecPtr *orignsn)
{
XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
@@ -864,7 +922,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
ptr = ptr->next;
}
- rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
+ rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 4));
xlrec->node = node;
xlrec->origblkno = blkno;
@@ -893,11 +951,24 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
rdata[cur].data = (char *) (ptr->list);
rdata[cur].len = ptr->lenlist;
rdata[cur - 1].next = &(rdata[cur]);
- rdata[cur].next = NULL;
cur++;
ptr = ptr->next;
}
+ /* Append origin rightlink and NSN */
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char *) &origrlink;
+ rdata[cur].len = sizeof(BlockNumber);
+ rdata[cur - 1].next = &(rdata[cur]);
+ cur++;
+
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char *) orignsn;
+ rdata[cur].len = sizeof(XLogRecPtr);
+ rdata[cur - 1].next = &(rdata[cur]);
+
+ rdata[cur].next = NULL;
+
return rdata;
}
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 4df5fed..d4c8f04 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -260,7 +260,8 @@ extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
extern XLogRecData *formSplitRdata(RelFileNode node,
BlockNumber blkno, bool page_is_leaf,
- ItemPointer key, SplitedPageLayout *dist);
+ ItemPointer key, SplitedPageLayout *dist,
+ BlockNumber origrlink, XLogRecPtr *orignsn);
extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);