copy-heap-multi-insert-1.patch
text/x-diff
Filename: copy-heap-multi-insert-1.patch
Type: text/x-diff
Part: 0
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 9f1bcf1..0594332 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -24,6 +24,7 @@
* heap_getnext - retrieve next tuple in scan
* heap_fetch - retrieve tuple with given tid
* heap_insert - insert tuple into a relation
+ * heap_multi_insert - insert multiple tuples into a relation
* heap_delete - delete a tuple from a relation
* heap_update - replace a tuple in a relation with another tuple
* heap_markpos - mark scan position
@@ -1860,11 +1861,39 @@ Oid
heap_insert(Relation relation, HeapTuple tup, CommandId cid,
int options, BulkInsertState bistate)
{
+ HeapTuple heaptup;
+
+ heaptup = heap_prepare_insert(relation, tup, cid, options, bistate);
+
+ heap_multi_insert(relation, &heaptup, 1, options, bistate);
+
+ /*
+ * If heaptup is a private copy, release it. Don't forget to copy t_self
+ * back to the caller's image, too.
+ */
+ if (heaptup != tup)
+ {
+ tup->t_self = heaptup->t_self;
+ heap_freetuple(heaptup);
+ }
+
+ return HeapTupleGetOid(tup);
+}
+
+/*
+ * Prepare a tuple for insertion with heap_multi_insert. This sets the
+ * tuple header fields, assigns an OID, and toasts the tuple if necessary.
+ * Returns a toasted version of the tuple if it was toasted, or the original
+ * tuple if not.
+ *
+ * This needs to be called for each tuple before calling heap_multi_insert().
+ */
+HeapTuple
+heap_prepare_insert(Relation relation, HeapTuple tup, CommandId cid,
+ int options, BulkInsertState bistate)
+{
TransactionId xid = GetCurrentTransactionId();
- HeapTuple heaptup;
- Buffer buffer;
- Buffer vmbuffer = InvalidBuffer;
- bool all_visible_cleared = false;
+ HeapTuple heaptup;
if (relation->rd_rel->relhasoids)
{
@@ -1916,6 +1945,39 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
else
heaptup = tup;
+ return heaptup;
+}
+
+/*
+ * Inserts tuples to relation. This always inserts at least one tuple, and
+ * opportunistically more if the chosen target page happens to have room, up
+ * to ntuples. Returns the number of tuples inserted.
+ */
+int
+heap_multi_insert(Relation relation, HeapTuple *heaptuples, int ntuples,
+ int options, BulkInsertState bistate)
+{
+ HeapTuple heaptup = heaptuples[0];
+ Buffer buffer;
+ Buffer vmbuffer = InvalidBuffer;
+ bool all_visible_cleared = false;
+ int i;
+ int ndone;
+ char *scratch = NULL;
+ int scratchused = 0;
+ Page page;
+
+ /*
+ * Allocate some memory to use for constructing the WAL record. Using
+ * palloc() within a critical section is not safe, so we allocate this
+ * beforehand.
+ */
+ if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
+ scratch = palloc(BLCKSZ);
+
+ if (IsSystemRelation(relation))
+ Assert(ntuples == 1);
+
/*
* Find buffer to insert this tuple into. If the page is all visible,
* this will also pin the requisite visibility map page.
@@ -1923,6 +1985,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
InvalidBuffer, options, bistate,
&vmbuffer, NULL);
+ page = BufferGetPage(buffer);
/*
* We're about to do the actual insert -- check for conflict at the
@@ -1931,20 +1994,25 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
*/
CheckForSerializableConflictIn(relation, NULL, buffer);
- /* NO EREPORT(ERROR) from here till changes are logged */
- START_CRIT_SECTION();
-
- RelationPutHeapTuple(relation, buffer, heaptup);
-
- if (PageIsAllVisible(BufferGetPage(buffer)))
+ if (PageIsAllVisible(page))
{
all_visible_cleared = true;
- PageClearAllVisible(BufferGetPage(buffer));
+ PageClearAllVisible(page);
visibilitymap_clear(relation,
ItemPointerGetBlockNumber(&(heaptup->t_self)),
vmbuffer);
}
+ /* NO EREPORT(ERROR) from here till changes are logged */
+ START_CRIT_SECTION();
+
+ ndone = 0;
+ do
+ {
+ heaptup = heaptuples[ndone++];
+ RelationPutHeapTuple(relation, buffer, heaptup);
+ } while (ndone < ntuples && PageGetHeapFreeSpace(page) > MAXALIGN(heaptup->t_len));
+
/*
* XXX Should we set PageSetPrunable on this page ?
*
@@ -1961,11 +2029,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
/* XLOG stuff */
if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
{
+ XLogRecPtr recptr;
+
+ if (ntuples == 1)
+ {
xl_heap_insert xlrec;
xl_heap_header xlhdr;
- XLogRecPtr recptr;
XLogRecData rdata[3];
- Page page = BufferGetPage(buffer);
uint8 info = XLOG_HEAP_INSERT;
xlrec.all_visible_cleared = all_visible_cleared;
@@ -2007,10 +2077,79 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
- rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
+ rdata[1].buffer = InvalidBuffer;
+ }
+
+ recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
}
+ else
+ {
+ xl_heap_multi_insert *xlrec;
+ XLogRecData rdata[2];
+ uint8 info = XLOG_HEAP2_MULTI_INSERT;
+ char *tupledata;
+ int totaldatalen;
+
+ xlrec = (xl_heap_multi_insert *) scratch;
+ scratchused += SizeOfHeapMultiInsert(ndone);
+
+ xlrec->all_visible_cleared = all_visible_cleared;
+ xlrec->node = relation->rd_node;
+ xlrec->blkno = BufferGetBlockNumber(buffer);
+ xlrec->ntuples = ndone;
+
+ tupledata = &scratch[scratchused];
+ totaldatalen = 0;
+
+ for (i = 0; i < ndone; i++)
+ {
+ int tuplen;
+
+ heaptup = heaptuples[i];
+ xlrec->tuphdrs[i].offset = ItemPointerGetOffsetNumber(&heaptup->t_self);
+ xlrec->tuphdrs[i].xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
+ xlrec->tuphdrs[i].xlhdr.t_infomask = heaptup->t_data->t_infomask;
+ xlrec->tuphdrs[i].xlhdr.t_hoff = heaptup->t_data->t_hoff;
+
+ /* write bitmap [+ padding] [+ oid] + data */
+ tuplen = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ memcpy(&tupledata[totaldatalen],
+ (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ tuplen);
+ totaldatalen += tuplen;
+ }
+ scratchused += totaldatalen;
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfHeapMultiInsert(ndone);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &(rdata[1]);
- recptr = XLogInsert(RM_HEAP_ID, info, rdata);
+ /*
+ * note we mark rdata[1] as belonging to buffer; if XLogInsert decides
+ * to write the whole page to the xlog, we don't need to store
+ * xl_heap_header in the xlog. XXX: we do anyway
+ */
+ rdata[1].data = tupledata;
+ rdata[1].len = totaldatalen;
+ rdata[1].buffer = buffer;
+ rdata[1].buffer_std = true;
+ rdata[1].next = NULL;
+
+ /*
+ * If this is the single and first tuple on page, we can reinit the
+ * page instead of restoring the whole thing. Set flag, and hide
+ * buffer references from XLogInsert.
+ */
+ if (ItemPointerGetOffsetNumber(&(heaptuples[0]->t_self)) == FirstOffsetNumber &&
+ PageGetMaxOffsetNumber(page) == FirstOffsetNumber + ndone - 1)
+ {
+ info |= XLOG_HEAP_INIT_PAGE;
+ rdata[1].buffer = InvalidBuffer;
+ }
+
+ recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
+ }
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
@@ -2030,19 +2169,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
*/
CacheInvalidateHeapTuple(relation, heaptup);
- pgstat_count_heap_insert(relation);
+ for (i = 0; i < ndone; i++)
+ pgstat_count_heap_insert(relation);
- /*
- * If heaptup is a private copy, release it. Don't forget to copy t_self
- * back to the caller's image, too.
- */
- if (heaptup != tup)
- {
- tup->t_self = heaptup->t_self;
- heap_freetuple(heaptup);
- }
+ if (scratch)
+ pfree(scratch);
- return HeapTupleGetOid(tup);
+ return ndone;
}
/*
@@ -4729,6 +4862,12 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
XLogRecordPageWithFreeSpace(xlrec->target.node, blkno, freespace);
}
+static void
+heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
+{
+ /* TODO */
+}
+
/*
* Handles UPDATE and HOT_UPDATE
*/
@@ -5118,6 +5257,9 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
case XLOG_HEAP2_VISIBLE:
heap_xlog_visible(lsn, record);
break;
+ case XLOG_HEAP2_MULTI_INSERT:
+ heap_xlog_multi_insert(lsn, record);
+ break;
default:
elog(PANIC, "heap2_redo: unknown op code %u", info);
}
@@ -5255,6 +5397,10 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode, xlrec->block);
}
+ else if (info == XLOG_HEAP2_MULTI_INSERT)
+ {
+ /* TODO */
+ }
else
appendStringInfo(buf, "UNKNOWN");
}
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 528a3a1..bc0a2fe 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1842,11 +1842,16 @@ CopyFrom(CopyState cstate)
ExprContext *econtext;
TupleTableSlot *myslot;
MemoryContext oldcontext = CurrentMemoryContext;
+
ErrorContextCallback errcontext;
CommandId mycid = GetCurrentCommandId(true);
int hi_options = 0; /* start with default heap_insert options */
BulkInsertState bistate;
uint64 processed = 0;
+ bool useHeapMultiInsert = false;
+ int nBufferedTuples = 0;
+#define TUPLE_BUFFER_SIZE 100
+ HeapTuple bufferedTuples[TUPLE_BUFFER_SIZE];
Assert(cstate->rel);
@@ -1941,6 +1946,22 @@ CopyFrom(CopyState cstate)
/* Triggers might need a slot as well */
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
+ /*
+ * If there isn't any triggers on the table, we can buffer the constructed
+ * tuples and insert them in bigger chunks using heap_multi_insert(). It's
+ * not clear if this would be safe with triggers. A trigger could look at
+ * the rows already inserted and act differently based on them, for
+ * example, and if we insert them in chunks, so an AFTER ROW trigger would
+ * see the whole chunk as inserted (or as not inserted, for a BEFORE ROW
+ * trigger), even though the triggers for the other tuples had not been
+ * run yet.
+ */
+ if (resultRelInfo->ri_TrigDesc == NULL)
+ {
+ useHeapMultiInsert = true;
+ nBufferedTuples = 0;
+ }
+
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
@@ -1972,8 +1993,11 @@ CopyFrom(CopyState cstate)
CHECK_FOR_INTERRUPTS();
- /* Reset the per-tuple exprcontext */
- ResetPerTupleExprContext(estate);
+ if (nBufferedTuples == 0)
+ {
+ /* Reset the per-tuple exprcontext */
+ ResetPerTupleExprContext(estate);
+ }
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@@ -2016,18 +2040,73 @@ CopyFrom(CopyState cstate)
if (cstate->rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
- /* OK, store the tuple and create index entries for it */
- heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+ if (useHeapMultiInsert)
+ {
+ /* Insert this tuple to the tuple buffer */
+ bufferedTuples[nBufferedTuples++] =
+ heap_prepare_insert(cstate->rel, tuple, mycid,
+ hi_options, bistate);
+
+ /* If the buffer filled up, flush it */
+ if (nBufferedTuples == TUPLE_BUFFER_SIZE)
+ {
+ int ninserted;
+ int nremaining;
+ HeapTuple *remainingTuples;
- if (resultRelInfo->ri_NumIndices > 0)
- recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
- estate);
+ /*
+ * Call heap_multi_insert() until all the tuples have been
+ * inserted. We must flush it fully, so that we can reset
+ * the per-tuple memory context (which is now a bit of a
+ * misnomer).
+ */
+ remainingTuples = bufferedTuples;
+ nremaining = nBufferedTuples;
+ while (nremaining > 0)
+ {
+ ninserted = heap_multi_insert(cstate->rel,
+ remainingTuples,
+ nremaining,
+ hi_options,
+ bistate);
+ nremaining -= ninserted;
+ remainingTuples = remainingTuples + ninserted;
+ }
+
+ /*
+ * If there are any indexes, update the indexes for all the
+ * inserted tuples.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ int i;
+ for (i = 0; i < nBufferedTuples; i++)
+ {
+ ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
+ recheckIndexes = ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
+ estate);
+ list_free(recheckIndexes);
+ }
+ }
- /* AFTER ROW INSERT Triggers */
- ExecARInsertTriggers(estate, resultRelInfo, tuple,
- recheckIndexes);
+ nBufferedTuples = 0;
+ }
+ }
+ else
+ {
+ /* OK, store the tuple and create index entries for it */
+ heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+ estate);
+
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(estate, resultRelInfo, tuple,
+ recheckIndexes);
- list_free(recheckIndexes);
+ list_free(recheckIndexes);
+ }
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger;
@@ -2038,6 +2117,48 @@ CopyFrom(CopyState cstate)
}
}
+ /* Flush any remaining buffered tuples */
+ if (nBufferedTuples > 0)
+ {
+ int ninserted;
+ int nremaining;
+ HeapTuple *remainingTuples;
+
+ /*
+ * Call heap_multi_insert() until all the tuples have been
+ * inserted.
+ */
+ remainingTuples = bufferedTuples;
+ nremaining = nBufferedTuples;
+ while(nremaining > 0)
+ {
+ ninserted = heap_multi_insert(cstate->rel,
+ remainingTuples,
+ nremaining,
+ hi_options,
+ bistate);
+ nremaining -= ninserted;
+ remainingTuples = remainingTuples + ninserted;
+ }
+
+ /*
+ * If there are any indexes, update the indexes for all the
+ * inserted tuples.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ int i;
+ for (i = 0; i < nBufferedTuples; i++)
+ {
+ List *recheckIndexes;
+ ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
+ recheckIndexes = ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
+ estate);
+ list_free(recheckIndexes);
+ }
+ }
+ }
+
/* Done, clean up */
error_context_stack = errcontext.previous;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 56036a8..e2a36f7 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -97,8 +97,12 @@ extern void setLastTid(const ItemPointer tid);
extern BulkInsertState GetBulkInsertState(void);
extern void FreeBulkInsertState(BulkInsertState);
+extern HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, CommandId cid,
+ int options, BulkInsertState bistate);
extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
int options, BulkInsertState bistate);
+extern int heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+ int options, BulkInsertState bistate);
extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
ItemPointer ctid, TransactionId *update_xmax,
CommandId cid, Snapshot crosscheck, bool wait);
diff --git a/src/include/access/htup.h b/src/include/access/htup.h
index ba5d9b2..f978dd0 100644
--- a/src/include/access/htup.h
+++ b/src/include/access/htup.h
@@ -607,6 +607,7 @@ typedef HeapTupleData *HeapTuple;
/* 0x20 is free, was XLOG_HEAP2_CLEAN_MOVE */
#define XLOG_HEAP2_CLEANUP_INFO 0x30
#define XLOG_HEAP2_VISIBLE 0x40
+#define XLOG_HEAP2_MULTI_INSERT 0x50
/*
* All what we need to find changed tuple
@@ -660,6 +661,25 @@ typedef struct xl_heap_insert
#define SizeOfHeapInsert (offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
+typedef struct xl_multi_header
+{
+ OffsetNumber offset;
+ xl_heap_header xlhdr;
+} xl_multi_header;
+
+/* This is what we need to know about insert */
+typedef struct xl_heap_multi_insert
+{
+ RelFileNode node;
+ BlockNumber blkno;
+ bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */
+ uint16 ntuples;
+ xl_multi_header tuphdrs[1]; /* var length */
+ /* TUPLE DATA FOLLOW AT END OF STRUCT */
+} xl_heap_multi_insert;
+
+#define SizeOfHeapMultiInsert(n) (offsetof(xl_heap_multi_insert, tuphdrs) + sizeof(xl_multi_header) * (n))
+
/* This is what we need to know about update|hot_update */
typedef struct xl_heap_update
{