ptrack-1.patch
text/x-patch
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: unified
| File | + | − |
|---|---|---|
| src/backend/storage/smgr/md.c | 324 | 1 |
| src/backend/storage/sync/sync.c | 3 | 0 |
| src/backend/utils/misc/guc.c | 18 | 0 |
| src/include/catalog/pg_proc.dat | 9 | 0 |
| src/include/catalog/pg_proc.h | 3 | 0 |
| src/include/storage/fd.h | 9 | 0 |
| src/include/storage/md.h | 1 | 0 |
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 61a8f11..f4b8506 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -25,9 +25,20 @@
#include <fcntl.h>
#include <sys/file.h>
+#include <sys/stat.h>
+#ifndef WIN32
+#include "sys/mman.h"
+#endif
+
#include "miscadmin.h"
+#include "funcapi.h"
+#include "access/hash.h"
+#include "access/table.h"
#include "access/xlogutils.h"
#include "access/xlog.h"
+#include "access/htup_details.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_tablespace.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "storage/fd.h"
@@ -36,6 +47,7 @@
#include "storage/relfilenode.h"
#include "storage/smgr.h"
#include "storage/sync.h"
+#include "utils/builtins.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "pg_trace.h"
@@ -116,6 +128,18 @@ static MemoryContext MdCxt; /* context for all MdfdVec objects */
*/
#define EXTENSION_DONT_CHECK_SIZE (1 << 4)
+/*
+ * Size of ptrack map (number of entries)
+ */
+int ptrack_map_size;
+
+/*
+ * Logarithm of ptrack block size (amount of pages)
+ */
+int ptrack_block_log;
+
+static int ptrack_fd;
+static pg_atomic_uint64* ptrack_map;
/* local routines */
static void mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum,
@@ -138,7 +162,7 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno,
BlockNumber blkno, bool skipFsync, int behavior);
static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
MdfdVec *seg);
-
+static void ptrack_mark_block(SMgrRelation reln, ForkNumber forkno, BlockNumber blkno);
/*
* mdinit() -- Initialize private state for magnetic disk storage manager.
@@ -422,6 +446,8 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
register_dirty_segment(reln, forknum, v);
Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
+
+ ptrack_mark_block(reln, forknum, blocknum);
}
/*
@@ -575,6 +601,8 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum,
nblocks -= nflush;
blocknum += nflush;
}
+
+ ptrack_sync();
}
/*
@@ -700,6 +728,8 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
if (!skipFsync && !SmgrIsTemp(reln))
register_dirty_segment(reln, forknum, v);
+
+ ptrack_mark_block(reln, forknum, blocknum);
}
/*
@@ -886,6 +916,7 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
FilePathName(v->mdfd_vfd))));
segno--;
}
+ ptrack_sync();
}
/*
@@ -918,6 +949,7 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
errmsg("could not fsync file \"%s\": %m",
FilePathName(seg->mdfd_vfd))));
}
+ ptrack_sync();
}
/*
@@ -1315,3 +1347,294 @@ mdfiletagmatches(const FileTag *ftag, const FileTag *candidate)
*/
return ftag->rnode.dbNode == candidate->rnode.dbNode;
}
+
+/*
+ * ---------------------------------------------------------
+ * PTrack functions
+ */
+
+#define PTRACK_MAP_PATH "global/ptrack.map"
+
+/*
+ * Structure identifying block on the disk
+ */
+typedef struct PtBlockId
+{
+ RelFileNode relnode;
+ ForkNumber forknum;
+ BlockNumber blocknum;
+} PtBlockId;
+
+#if PG_VERSION_NUM >= 110000
+#define BID_HASH_FUNC(bid) (size_t)(DatumGetUInt64(hash_any_extended((unsigned char *)&bid, sizeof(bid), 0)) % ptrack_map_size)
+#else
+#define BID_HASH_FUNC(bid) (DatumGetUInt32(hash_any((unsigned char *)&bid, sizeof(bid))) % ptrack_map_size)
+#endif
+
+
+/*
+ * Map ptrack file
+ */
+static void open_ptrack_file(void)
+{
+ /* Align map size on page boundary */
+ size_t size = (ptrack_map_size*sizeof(pg_atomic_uint64) + BLCKSZ - 1) & ~(BLCKSZ-1);
+ off_t file_size;
+
+#if PG_VERSION_NUM >= 110000
+ ptrack_fd = BasicOpenFile(PTRACK_MAP_PATH, O_RDWR | O_CREAT | PG_BINARY);
+#else
+ ptrack_fd = BasicOpenFile(PTRACK_MAP_PATH, O_RDWR | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+#endif
+ if (ptrack_fd < 0)
+ elog(ERROR, "Failed to open ptrack map file: %m");
+
+
+ file_size = lseek(ptrack_fd, 0, SEEK_END);
+ if (file_size != 0 && file_size != size)
+ elog(FATAL, "Specified ptrack map size %ld doesn't match with actual file size %ld",
+ (long)size, (long)file_size);
+
+ #ifdef WIN32
+ {
+ HANDLE mh = CreateFileMapping(_get_osfhandle(ptrack_fd), NULL, PAGE_READWRITE,
+ 0, (DWORD)size, NULL);
+ if (mh == NULL)
+ elog(ERROR, "Failed to create file mapping: %m");
+
+ ptrack_map = (pg_atomic_uint64*)MapViewOfFile(mh, FILE_MAP_ALL_ACCESS, 0, 0, 0);
+ if (ptrack_map == NULL)
+ elog(ERROR, "Failed to mmap ptrack file: %m");
+ CloseHandle(mh);
+ }
+#else
+ if (ftruncate(ptrack_fd, size) < 0)
+ elog(ERROR, "Failed to truncate ptrack file: %m");
+ ptrack_map = (pg_atomic_uint64*)mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, ptrack_fd, 0);
+ if (ptrack_map == MAP_FAILED)
+ elog(ERROR, "Failed to mmap ptrack file: %m");
+#endif
+}
+
+/*
+ * Mark modified block
+ */
+static void ptrack_mark_block(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
+{
+ size_t hash;
+ uint64 new_lsn;
+ uint64 old_lsn;
+ PtBlockId bid;
+
+ if (ptrack_map_size != 0 && reln->smgr_rnode.backend == InvalidBackendId) /* do not track temporary relations */
+ {
+ if (ptrack_map == NULL)
+ open_ptrack_file();
+
+ bid.relnode = reln->smgr_rnode.node;
+ bid.forknum = forknum;
+ bid.blocknum = blocknum >> ptrack_block_log;
+ hash = BID_HASH_FUNC(bid);
+
+ new_lsn = GetXLogInsertRecPtr();
+ old_lsn = pg_atomic_read_u64(&ptrack_map[hash]);
+
+ elog(DEBUG2, "map[%ld]=%ld <- %ld", hash, old_lsn , new_lsn);
+
+ /* Atomically assign new LSN value */
+ while (old_lsn < new_lsn && !pg_atomic_compare_exchange_u64(&ptrack_map[hash], &old_lsn, new_lsn));
+ }
+}
+
+/*
+ * Flush ptrack map on the disk. In case of crash, modified page will be restored from WAL, so we do not need to do synchronous flush here
+ * and call pg_fsync.
+ */
+void ptrack_sync(void)
+{
+ if (ptrack_map)
+ {
+ /* Align map size on page boundary */
+ size_t size = (ptrack_map_size*sizeof(pg_atomic_uint64) + BLCKSZ - 1) & ~(BLCKSZ-1);
+#ifdef WIN32
+ if (!FlushViewOfFile(ptrack_map, size))
+#else
+ if (msync(ptrack_map, size, MS_ASYNC) < 0)
+#endif
+ elog(LOG, "Failed to flush ptrack map: %m");
+#if 0
+ if (pg_fsync(ptrack_file) < 0)
+ elog(LOG, "Failed to fsync ptrack file: %m");
+#endif
+ }
+}
+
+/*
+ * Context for ptrack_changeset set returning function
+ */
+typedef struct PtScanCtx
+{
+ SysScanDesc scan;
+ Relation rel;
+ Form_pg_class meta;
+ XLogRecPtr lsn;
+ PtBlockId bid;
+ Oid oid;
+ uint32 relsize;
+ TupleDesc tupdesc;
+} PtScanCtx;
+
+/*
+ * Check if segment file exists
+ */
+static bool segment_exists(PtScanCtx* ctx)
+{
+ char pathname[MAXPGPATH];
+ uint32 segno = (ctx->bid.blocknum << ptrack_block_log) / RELSEG_SIZE;
+ struct stat fst;
+ uint32 cursize;
+ char* relpath = relpathperm(ctx->bid.relnode, ctx->bid.forknum);
+
+ if (segno == 0)
+ snprintf(pathname, MAXPGPATH, "%s",
+ relpath);
+ else
+ snprintf(pathname, MAXPGPATH, "%s.%u",
+ relpath, segno);
+
+ if (stat(pathname, &fst) != 0)
+ return false;
+
+ cursize = segno*RELSEG_SIZE + fst.st_size/BLCKSZ;
+
+ if (fst.st_size < BLCKSZ*RELSEG_SIZE)
+ {
+ /* If this segment is not full, then check if next segments exists.
+ * It can happen that some blocks of relation were not written, so there can
+ * be non-full non-last segments.
+ */
+ snprintf(pathname, MAXPGPATH, "%s.%u",
+ relpath, segno+1);
+ /* If next segment exists, then consider all blocks of this segment */
+ if (stat(pathname, &fst) == 0)
+ cursize = (segno+1)*RELSEG_SIZE;
+ }
+ /* Initial value of relsize is taken from pg_class table */
+ if (cursize > ctx->relsize)
+ ctx->relsize = cursize;
+ return true;
+}
+
+PG_FUNCTION_INFO_V1(pg_ptrack_get_changeset);
+
+/*
+ * Return set of database blocks which were changed since specified LSN.
+ * This function may return false positives (blocks which were not really updated).
+ */
+Datum
+pg_ptrack_get_changeset(PG_FUNCTION_ARGS)
+{
+ FuncCallContext* funcctx;
+ PtScanCtx* ctx;
+ MemoryContext oldcontext;
+ int32 blocknum;
+ XLogRecPtr update_lsn;
+ int n_blocks;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ if (ptrack_map == NULL)
+ open_ptrack_file();
+
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ ctx = (PtScanCtx*)palloc0(sizeof(PtScanCtx));
+ ctx->rel = heap_open(RelationRelationId, AccessShareLock);
+ ctx->scan = systable_beginscan(ctx->rel, InvalidOid, false,
+ NULL, 0, NULL);
+ ctx->lsn = PG_GETARG_INT64(0); /* Type of argument is actually LSN */
+ get_call_result_type(fcinfo, NULL, &ctx->tupdesc);
+ funcctx->user_fctx = ctx;
+ MemoryContextSwitchTo(oldcontext);
+ }
+ funcctx = SRF_PERCALL_SETUP();
+ ctx = (PtScanCtx*)funcctx->user_fctx;
+
+ while (true)
+ {
+ if (ctx->meta == NULL)
+ {
+ /* Get next non-temporary relation */
+ do
+ {
+ HeapTuple tuple = systable_getnext(ctx->scan);
+ if (!HeapTupleIsValid(tuple))
+ {
+ /* Done: all database relations traversed */
+ systable_endscan(ctx->scan);
+ heap_close(ctx->rel, AccessShareLock);
+ SRF_RETURN_DONE(funcctx);
+ }
+ ctx->meta = (Form_pg_class) GETSTRUCT(tuple);
+ ctx->oid = ctx->meta->oid;
+ } while (ctx->meta->relpersistence == RELPERSISTENCE_TEMP);
+
+ ctx->bid.relnode.spcNode = ctx->meta->reltablespace ? ctx->meta->reltablespace : DEFAULTTABLESPACE_OID;
+ ctx->bid.relnode.dbNode = ctx->meta->relisshared ? 0 : MyDatabaseId;
+ ctx->bid.relnode.relNode = ctx->meta->relfilenode ? ctx->meta->relfilenode : ctx->oid;
+ ctx->bid.forknum = 0;
+ ctx->bid.blocknum = 0;
+ ctx->relsize = ctx->meta->relpages; /* relpages may be not up-to-date, use it as conservative lower boundary */
+ }
+
+ /* Stop traversal if there are no more segments */
+ blocknum = ctx->bid.blocknum << ptrack_block_log;
+ if ((blocknum % RELSEG_SIZE == 0 && !segment_exists(ctx)) || blocknum > ctx->relsize)
+ {
+ /* No more segments in this relation fork */
+ if (++ctx->bid.forknum > MAX_FORKNUM)
+ ctx->meta = NULL;
+ else
+ ctx->bid.blocknum = 0;
+ continue;
+ }
+ update_lsn = pg_atomic_read_u64(&ptrack_map[BID_HASH_FUNC(ctx->bid)]);
+ n_blocks = 0;
+ do {
+ ctx->bid.blocknum += 1;
+ n_blocks += 1 << ptrack_block_log;
+ } while (blocknum + n_blocks < ctx->relsize
+ && pg_atomic_read_u64(&ptrack_map[BID_HASH_FUNC(ctx->bid)]) == update_lsn);
+
+ if (update_lsn >= ctx->lsn) /* block was changed since specified LSN */
+ {
+ Datum values[8];
+ bool nulls[8] = {false};
+ char pathname[MAXPGPATH];
+ int segno = blocknum / RELSEG_SIZE;
+ char* relpath;
+
+ relpath = relpathperm(ctx->bid.relnode, ctx->bid.forknum);
+
+ if (segno == 0)
+ snprintf(pathname, MAXPGPATH, "%s",
+ relpath);
+ else
+ snprintf(pathname, MAXPGPATH, "%s.%u",
+ relpath, segno);
+
+ values[0] = ObjectIdGetDatum(ctx->oid);
+ values[1] = ObjectIdGetDatum(ctx->bid.relnode.relNode);
+ values[2] = ObjectIdGetDatum(ctx->bid.relnode.spcNode);
+ values[3] = Int32GetDatum(ctx->bid.forknum);
+ values[4] = Int32GetDatum(blocknum);
+ values[5] = Int32GetDatum(n_blocks);
+ values[6] = Int64GetDatum(update_lsn);
+ values[7] = CStringGetTextDatum(pathname);
+
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(heap_form_tuple(ctx->tupdesc, values, nulls)));
+ }
+ }
+}
+
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index f77519d..3a7ceee 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -418,6 +418,9 @@ ProcessSyncRequests(void)
CheckpointStats.ckpt_longest_sync = longest;
CheckpointStats.ckpt_agg_sync_time = total_elapsed;
+ /* Flush ptrack files */
+ ptrack_sync();
+
/* Flag successful completion of ProcessSyncRequests */
sync_in_progress = false;
}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index f7f726b..545700b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1963,6 +1963,24 @@ static struct config_bool ConfigureNamesBool[] =
static struct config_int ConfigureNamesInt[] =
{
{
+ {"ptrack_map_size", PGC_POSTMASTER, RESOURCES_DISK,
+ gettext_noop("Size of ptrack map (number of elements) used for incremental backup: 0 disabled."),
+ NULL
+ },
+ &ptrack_map_size,
+ 0, 0, INT_MAX / 2,
+ NULL, NULL, NULL
+ },
+ {
+ {"ptrack_block_log", PGC_POSTMASTER, RESOURCES_DISK,
+ gettext_noop("Logarithm of ptrack block size (amount of pages)."),
+ NULL
+ },
+ &ptrack_block_log,
+ 0, 0, INT_MAX / 2,
+ NULL, NULL, NULL
+ },
+ {
{"archive_timeout", PGC_SIGHUP, WAL_ARCHIVING,
gettext_noop("Forces a switch to the next WAL file if a "
"new file has not been started within N seconds."),
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index ad4519e..add45a2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10672,4 +10672,13 @@
proname => 'pg_partition_root', prorettype => 'regclass',
proargtypes => 'regclass', prosrc => 'pg_partition_root' },
+# Ptract changeset traversal
+{ oid => '5050', descr => 'Return set of database blocks which were changed since specified LSN',
+ proname => 'pg_ptrack_get_changeset', prorows => '1000000', proretset => 't',
+ provolatile => 'v', prorettype => 'record', proargtypes => 'pg_lsn',
+ proallargtypes => '{pg_lsn,oid,oid,oid,int4,int4,int4,pg_lsn,text}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o}',
+ proargnames => '{startlsn,relid,relfilenode,reltablespace,forknum,blocknum,segsize,updlsn,path}',
+ prosrc => 'pg_ptrack_get_changeset' },
+
]
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index e5270d2..a0326d9 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -164,6 +164,9 @@ typedef FormData_pg_proc *Form_pg_proc;
#define PROPARALLEL_RESTRICTED 'r' /* can run in parallel master only */
#define PROPARALLEL_UNSAFE 'u' /* banned while in parallel mode */
+/* Ptract changeset traversal */
+DATA(insert OID = 5050 ( pg_ptrack_get_changeset PGNSP PGUID 12 1 0 0 0 f f f f t t v s 1 0 2249 "3220" "{3220,26,26,26,23,23,23,3220,25}" "{i,o,o,o,o,o,o,o,o}" "{startlsn,relid,relfilenode,reltablespace,forknum,blocknum,segsize,updlsn,path}" _null_ _null_ pg_ptrack_get_changeset _null_ _null_ _null_ ));
+
/*
* Symbolic values for proargmodes column. Note that these must agree with
* the FunctionParameterMode enum in parsenodes.h; we declare them here to
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index a03b4d1..cab7791 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -44,6 +44,15 @@
typedef int File;
+/*
+ * Size of ptrack map (number of entries)
+ */
+extern int ptrack_map_size;
+
+/*
+ * Logarithm of ptrack block size (amount of pages)
+ */
+extern int ptrack_block_log;
/* GUC parameter */
extern PGDLLIMPORT int max_files_per_process;
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index a6758a1..2f94070 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -42,6 +42,7 @@ extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
extern void ForgetDatabaseSyncRequests(Oid dbid);
extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo);
+extern void ptrack_sync(void);
/* md sync callbacks */
extern int mdsyncfiletag(const FileTag *ftag, char *path);