commitlog_lessbytes_v2.patch
application/octet-stream
Filename: commitlog_lessbytes_v2.patch
Type: application/octet-stream
Part: 0
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
new file mode 100644
index 2ca1c14..727b86b
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
*************** RecordTransactionCommit(void)
*** 962,987 ****
/*
* Begin commit critical section and insert the commit XLOG record.
*/
- XLogRecData rdata[4];
- int lastrdata = 0;
- xl_xact_commit xlrec;
-
/* Tell bufmgr and smgr to prepare for commit */
BufmgrCommit();
/*
- * Set flags required for recovery processing of commits.
- */
- xlrec.xinfo = 0;
- if (RelcacheInitFileInval)
- xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
- if (forceSyncCommit)
- xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
- xlrec.dbId = MyDatabaseId;
- xlrec.tsId = MyDatabaseTableSpace;
-
- /*
* Mark ourselves as within our "commit critical section". This
* forces any concurrent checkpoint to wait until we've updated
* pg_clog. Without this, it is possible for the checkpoint to set
--- 962,971 ----
*************** RecordTransactionCommit(void)
*** 1002,1044 ****
MyProc->inCommit = true;
SetCurrentTransactionStopTimestamp();
! xlrec.xact_time = xactStopTimestamp;
! xlrec.nrels = nrels;
! xlrec.nsubxacts = nchildren;
! xlrec.nmsgs = nmsgs;
! rdata[0].data = (char *) (&xlrec);
! rdata[0].len = MinSizeOfXactCommit;
! rdata[0].buffer = InvalidBuffer;
! /* dump rels to delete */
! if (nrels > 0)
! {
! rdata[0].next = &(rdata[1]);
! rdata[1].data = (char *) rels;
! rdata[1].len = nrels * sizeof(RelFileNode);
! rdata[1].buffer = InvalidBuffer;
! lastrdata = 1;
! }
! /* dump committed child Xids */
! if (nchildren > 0)
{
! rdata[lastrdata].next = &(rdata[2]);
! rdata[2].data = (char *) children;
! rdata[2].len = nchildren * sizeof(TransactionId);
! rdata[2].buffer = InvalidBuffer;
! lastrdata = 2;
}
! /* dump shared cache invalidation messages */
! if (nmsgs > 0)
{
! rdata[lastrdata].next = &(rdata[3]);
! rdata[3].data = (char *) invalMessages;
! rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
! rdata[3].buffer = InvalidBuffer;
! lastrdata = 3;
! }
! rdata[lastrdata].next = NULL;
! (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
}
/*
--- 986,1071 ----
MyProc->inCommit = true;
SetCurrentTransactionStopTimestamp();
!
! if (nrels == 0 && nmsgs == 0)
{
! XLogRecData rdata[2];
! int lastrdata = 0;
! xl_xact_commit xlrec;
! xlrec.xact_time = xactStopTimestamp;
! xlrec.nsubxacts = nchildren;
! rdata[0].data = (char *) (&xlrec);
! rdata[0].len = MinSizeOfXactCommit;
! rdata[0].buffer = InvalidBuffer;
! /* dump committed child Xids */
! if (nchildren > 0)
! {
! rdata[0].next = &(rdata[1]);
! rdata[1].data = (char *) children;
! rdata[1].len = nchildren * sizeof(TransactionId);
! rdata[1].buffer = InvalidBuffer;
! lastrdata = 1;
! }
! rdata[lastrdata].next = NULL;
!
! (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
!
}
! else
{
! XLogRecData rdata[4];
! int lastrdata = 0;
! xl_xact_commit_with_info xlrec;
! /*
! * Set flags required for recovery processing of commits.
! */
! xlrec.xinfo = 0;
! if (RelcacheInitFileInval)
! xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
! if (forceSyncCommit)
! xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
! xlrec.dbId = MyDatabaseId;
! xlrec.tsId = MyDatabaseTableSpace;
!
! xlrec.xact_time = xactStopTimestamp;
! xlrec.nrels = nrels;
! xlrec.nsubxacts = nchildren;
! xlrec.nmsgs = nmsgs;
! rdata[0].data = (char *) (&xlrec);
! rdata[0].len = MinSizeOfXactCommit;
! rdata[0].buffer = InvalidBuffer;
! /* dump rels to delete */
! if (nrels > 0)
! {
! rdata[0].next = &(rdata[1]);
! rdata[1].data = (char *) rels;
! rdata[1].len = nrels * sizeof(RelFileNode);
! rdata[1].buffer = InvalidBuffer;
! lastrdata = 1;
! }
! /* dump committed child Xids */
! if (nchildren > 0)
! {
! rdata[lastrdata].next = &(rdata[2]);
! rdata[2].data = (char *) children;
! rdata[2].len = nchildren * sizeof(TransactionId);
! rdata[2].buffer = InvalidBuffer;
! lastrdata = 2;
! }
! /* dump shared cache invalidation messages */
! if (nmsgs > 0)
! {
! rdata[lastrdata].next = &(rdata[3]);
! rdata[3].data = (char *) invalMessages;
! rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
! rdata[3].buffer = InvalidBuffer;
! lastrdata = 3;
! }
! rdata[lastrdata].next = NULL;
!
! (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_WITH_INFO, rdata);
! }
}
/*
*************** xactGetCommittedChildren(TransactionId *
*** 4441,4459 ****
* actions for which the order of execution is critical.
*/
static void
! xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
{
- TransactionId *sub_xids;
- SharedInvalidationMessage *inval_msgs;
TransactionId max_xid;
int i;
! /* subxid array follows relfilenodes */
! sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
! /* invalidation messages array follows subxids */
! inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]);
!
! max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
/*
* Make sure nextXid is beyond any XID mentioned in the record.
--- 4468,4483 ----
* actions for which the order of execution is critical.
*/
static void
! xact_redo_commit(RelFileNode *xnodes, int nrels,
! TransactionId *sub_xids, int nsubxacts,
! SharedInvalidationMessage *inval_msgs, int nmsgs,
! Oid dbId, Oid tsId, uint32 xinfo,
! TransactionId xid, XLogRecPtr lsn)
{
TransactionId max_xid;
int i;
! max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
/*
* Make sure nextXid is beyond any XID mentioned in the record.
*************** xact_redo_commit(xl_xact_commit *xlrec,
*** 4476,4482 ****
/*
* Mark the transaction committed in pg_clog.
*/
! TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids);
}
else
{
--- 4500,4506 ----
/*
* Mark the transaction committed in pg_clog.
*/
! TransactionIdCommitTree(xid, nsubxacts, sub_xids);
}
else
{
*************** xact_redo_commit(xl_xact_commit *xlrec,
*** 4500,4540 ****
* bits set on changes made by transactions that haven't yet
* recovered. It's unlikely but it's good to be safe.
*/
! TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn);
/*
* We must mark clog before we update the ProcArray.
*/
! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as
* occurs in CommitTransaction().
*/
! ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs,
! XactCompletionRelcacheInitFileInval(xlrec),
! xlrec->dbId, xlrec->tsId);
/*
* Release locks, if any. We do this for both two phase and normal one
* phase transactions. In effect we are ignoring the prepare phase and
* just going straight to lock release.
*/
! StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
}
/* Make sure files supposed to be dropped are dropped */
! for (i = 0; i < xlrec->nrels; i++)
{
! SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
{
if (smgrexists(srel, fork))
{
! XLogDropRelation(xlrec->xnodes[i], fork);
smgrdounlink(srel, fork, true);
}
}
--- 4524,4564 ----
* bits set on changes made by transactions that haven't yet
* recovered. It's unlikely but it's good to be safe.
*/
! TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
/*
* We must mark clog before we update the ProcArray.
*/
! ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as
* occurs in CommitTransaction().
*/
! ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
! XactCompletionRelcacheInitFileInval(xinfo),
! dbId, tsId);
/*
* Release locks, if any. We do this for both two phase and normal one
* phase transactions. In effect we are ignoring the prepare phase and
* just going straight to lock release.
*/
! StandbyReleaseLockTree(xid, nsubxacts, sub_xids);
}
/* Make sure files supposed to be dropped are dropped */
! for (i = 0; i < nrels; i++)
{
! SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
{
if (smgrexists(srel, fork))
{
! XLogDropRelation(xnodes[i], fork);
smgrdounlink(srel, fork, true);
}
}
*************** xact_redo_commit(xl_xact_commit *xlrec,
*** 4553,4560 ****
* to reduce that problem window, for any user that requested
* ForceSyncCommit().
*/
! if (XactCompletionForceSyncCommit(xlrec))
XLogFlush(lsn);
}
/*
--- 4577,4616 ----
* to reduce that problem window, for any user that requested
* ForceSyncCommit().
*/
! if (XactCompletionForceSyncCommit(xinfo))
XLogFlush(lsn);
+
+ }
+ /*
+ * utility function to call xact_redo_commit with parameters found in xlrec
+ */
+ static void
+ xact_redo_commit_with_info(xl_xact_commit_with_info *xlrec,
+ TransactionId xid, XLogRecPtr lsn)
+ {
+ TransactionId *sub_xids;
+ SharedInvalidationMessage *inval_msgs;
+
+ /* subxid array follows relfilenodes */
+ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ /* invalidation messages array follows subxids */
+ inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]);
+
+ xact_redo_commit(xlrec->xnodes, xlrec->nrels, sub_xids, xlrec->nsubxacts,
+ inval_msgs, xlrec->nmsgs, xlrec->dbId, xlrec->tsId,
+ xlrec->xinfo, xid, lsn);
+ }
+
+ /*
+ * utility function to call xact_redo_commit with proper parameters:
+ * parameters not available in xl_xact_commit are defaulted to 0.
+ */
+ static void
+ xact_redo_commit_without_info(xl_xact_commit *xlrec,
+ TransactionId xid, XLogRecPtr lsn)
+ {
+ xact_redo_commit(NULL, 0, xlrec->children, xlrec->nsubxacts,
+ NULL, 0, InvalidOid, InvalidOid, 0, xid, lsn);
}
/*
*************** xact_redo(XLogRecPtr lsn, XLogRecord *re
*** 4659,4665 ****
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
! xact_redo_commit(xlrec, record->xl_xid, lsn);
}
else if (info == XLOG_XACT_ABORT)
{
--- 4715,4727 ----
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
! xact_redo_commit_without_info(xlrec, record->xl_xid, lsn);
! }
! else if (info == XLOG_XACT_COMMIT_WITH_INFO)
! {
! xl_xact_commit_with_info *xlrec = (xl_xact_commit_with_info *) XLogRecGetData(record);
!
! xact_redo_commit_with_info(xlrec, record->xl_xid, lsn);
}
else if (info == XLOG_XACT_ABORT)
{
*************** xact_redo(XLogRecPtr lsn, XLogRecord *re
*** 4677,4683 ****
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
! xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
RemoveTwoPhaseFile(xlrec->xid, false);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
--- 4739,4745 ----
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
! xact_redo_commit_with_info(&xlrec->crec, xlrec->xid, lsn);
RemoveTwoPhaseFile(xlrec->xid, false);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
*************** xact_redo(XLogRecPtr lsn, XLogRecord *re
*** 4700,4706 ****
}
static void
! xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
{
int i;
TransactionId *xacts;
--- 4762,4768 ----
}
static void
! xact_desc_commit_with_info(StringInfo buf, xl_xact_commit_with_info *xlrec)
{
int i;
TransactionId *xacts;
*************** xact_desc_commit(StringInfo buf, xl_xact
*** 4732,4738 ****
msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts];
! if (XactCompletionRelcacheInitFileInval(xlrec))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
xlrec->dbId, xlrec->tsId);
--- 4794,4800 ----
msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts];
! if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
xlrec->dbId, xlrec->tsId);
*************** xact_desc_commit(StringInfo buf, xl_xact
*** 4759,4764 ****
--- 4821,4841 ----
}
static void
+ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
+ {
+ int i;
+
+ appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
+
+ if (xlrec->nsubxacts > 0)
+ {
+ appendStringInfo(buf, "; subxacts:");
+ for (i = 0; i < xlrec->nsubxacts; i++)
+ appendStringInfo(buf, " %u", xlrec->children[i]);
+ }
+ }
+
+ static void
xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
{
int i;
*************** xact_desc(StringInfo buf, uint8 xl_info,
*** 4809,4814 ****
--- 4886,4898 ----
appendStringInfo(buf, "commit: ");
xact_desc_commit(buf, xlrec);
}
+ else if (info == XLOG_XACT_COMMIT_WITH_INFO)
+ {
+ xl_xact_commit_with_info *xlrec = (xl_xact_commit_with_info *) rec;
+
+ appendStringInfo(buf, "commit: ");
+ xact_desc_commit_with_info(buf, xlrec);
+ }
else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort *) rec;
*************** xact_desc(StringInfo buf, uint8 xl_info,
*** 4825,4831 ****
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
appendStringInfo(buf, "commit prepared %u: ", xlrec->xid);
! xact_desc_commit(buf, &xlrec->crec);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
--- 4909,4915 ----
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
appendStringInfo(buf, "commit prepared %u: ", xlrec->xid);
! xact_desc_commit_with_info(buf, &xlrec->crec);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
new file mode 100644
index cb440d4..a75be3f
*** a/src/include/access/xact.h
--- b/src/include/access/xact.h
*************** typedef void (*SubXactCallback) (SubXact
*** 106,111 ****
--- 106,112 ----
#define XLOG_XACT_COMMIT_PREPARED 0x30
#define XLOG_XACT_ABORT_PREPARED 0x40
#define XLOG_XACT_ASSIGNMENT 0x50
+ #define XLOG_XACT_COMMIT_WITH_INFO 0x60
typedef struct xl_xact_assignment
{
*************** typedef struct xl_xact_assignment
*** 119,124 ****
--- 120,136 ----
typedef struct xl_xact_commit
{
TimestampTz xact_time; /* time of commit */
+ int nsubxacts; /* number of subtransaction XIDs */
+ /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
+ TransactionId children[1]; /* VARIABLE LENGTH ARRAY */
+ } xl_xact_commit;
+
+ #define MinSizeOfXactCommit offsetof(xl_xact_commit, children)
+
+
+ typedef struct xl_xact_commit_with_info
+ {
+ TimestampTz xact_time; /* time of commit */
uint32 xinfo; /* info flags */
int nrels; /* number of RelFileNodes */
int nsubxacts; /* number of subtransaction XIDs */
*************** typedef struct xl_xact_commit
*** 129,137 ****
RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */
/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
/* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
! } xl_xact_commit;
! #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
/*
* These flags are set in the xinfo fields of WAL commit records,
--- 141,149 ----
RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */
/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
/* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
! } xl_xact_commit_with_info;
! #define MinSizeOfXactCommitWithInfo offsetof(xl_xact_commit_with_info, xnodes)
/*
* These flags are set in the xinfo fields of WAL commit records,
*************** typedef struct xl_xact_commit
*** 145,152 ****
#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
/* Access macros for above flags */
! #define XactCompletionRelcacheInitFileInval(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
! #define XactCompletionForceSyncCommit(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
typedef struct xl_xact_abort
{
--- 157,164 ----
#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
/* Access macros for above flags */
! #define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
! #define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
typedef struct xl_xact_abort
{
*************** typedef struct xl_xact_abort
*** 172,178 ****
typedef struct xl_xact_commit_prepared
{
TransactionId xid; /* XID of prepared xact */
! xl_xact_commit crec; /* COMMIT record */
/* MORE DATA FOLLOWS AT END OF STRUCT */
} xl_xact_commit_prepared;
--- 184,190 ----
typedef struct xl_xact_commit_prepared
{
TransactionId xid; /* XID of prepared xact */
! xl_xact_commit_with_info crec; /* COMMIT record */
/* MORE DATA FOLLOWS AT END OF STRUCT */
} xl_xact_commit_prepared;