v20251223-0005-VCI-main-part4.patch
application/octet-stream
Filename: v20251223-0005-VCI-main-part4.patch
Type: application/octet-stream
Part: 4
From f094ab8e03889d0ac0c72cca7f7e1386e1941239 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Tue, 23 Dec 2025 15:40:30 +1100
Subject: [PATCH v20251223] VCI - main - part4
---
contrib/vci/storage/Makefile | 16 +-
contrib/vci/storage/meson.build | 16 +-
contrib/vci/storage/vci_chunk.c | 614 ++++++++
contrib/vci/storage/vci_columns.c | 1150 +++++++++++++++
contrib/vci/storage/vci_columns_data.c | 229 +++
contrib/vci/storage/vci_fetch.c | 2459 ++++++++++++++++++++++++++++++++
contrib/vci/storage/vci_freelist.c | 474 ++++++
contrib/vci/storage/vci_low_utils.c | 89 ++
contrib/vci/storage/vci_memory_entry.c | 907 ++++++++++++
contrib/vci/storage/vci_xact.c | 146 ++
10 files changed, 6084 insertions(+), 16 deletions(-)
create mode 100644 contrib/vci/storage/vci_chunk.c
create mode 100644 contrib/vci/storage/vci_columns.c
create mode 100644 contrib/vci/storage/vci_columns_data.c
create mode 100644 contrib/vci/storage/vci_fetch.c
create mode 100644 contrib/vci/storage/vci_freelist.c
create mode 100644 contrib/vci/storage/vci_low_utils.c
create mode 100644 contrib/vci/storage/vci_memory_entry.c
create mode 100644 contrib/vci/storage/vci_xact.c
diff --git a/contrib/vci/storage/Makefile b/contrib/vci/storage/Makefile
index 364a944..332e9a5 100644
--- a/contrib/vci/storage/Makefile
+++ b/contrib/vci/storage/Makefile
@@ -1,21 +1,21 @@
# contrib/vci/storage/Makefile
SUBOBJS = \
-# vci_chunk.o \
-# vci_columns.o \
-# vci_columns_data.o \
-# vci_fetch.o \
-# vci_freelist.o \
+ vci_chunk.o \
+ vci_columns.o \
+ vci_columns_data.o \
+ vci_fetch.o \
+ vci_freelist.o \
vci_index.o \
vci_internal_view.o \
-# vci_low_utils.o \
-# vci_memory_entry.o \
+ vci_low_utils.o \
+ vci_memory_entry.o \
vci_ros.o \
vci_ros_command.o \
vci_ros_daemon.o \
vci_tidcrid.o \
vci_wos.o \
-# vci_xact.o
+ vci_xact.o
EXTRA_CLEAN = SUBSYS.o $(SUBOBJS)
diff --git a/contrib/vci/storage/meson.build b/contrib/vci/storage/meson.build
index 87fa17a..247acb0 100644
--- a/contrib/vci/storage/meson.build
+++ b/contrib/vci/storage/meson.build
@@ -1,19 +1,19 @@
# Copyright (c) 2025, PostgreSQL Global Development Group
vci_storage_sources = files(
-# 'vci_chunk.c',
-# 'vci_columns.c',
-# 'vci_columns_data.c',
-# 'vci_fetch.c',
-# 'vci_freelist.c',
+ 'vci_chunk.c',
+ 'vci_columns.c',
+ 'vci_columns_data.c',
+ 'vci_fetch.c',
+ 'vci_freelist.c',
'vci_index.c',
'vci_internal_view.c',
-# 'vci_low_utils.c',
-# 'vci_memory_entry.c',
+ 'vci_low_utils.c',
+ 'vci_memory_entry.c',
'vci_ros.c',
'vci_ros_command.c',
'vci_ros_daemon.c',
'vci_tidcrid.c',
'vci_wos.c',
-# 'vci_xact.c',
+ 'vci_xact.c',
)
diff --git a/contrib/vci/storage/vci_chunk.c b/contrib/vci/storage/vci_chunk.c
new file mode 100644
index 0000000..994c81d
--- /dev/null
+++ b/contrib/vci/storage/vci_chunk.c
@@ -0,0 +1,614 @@
+/*-------------------------------------------------------------------------
+ *
+ * vci_chunk.c
+ * Buffering mechanism used for WOS->ROS conversion
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/vci/storage/vci_chunk.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "utils/snapmgr.h"
+#include "utils/timestamp.h"
+#include "utils/uuid.h"
+
+#include "vci.h"
+#include "vci_chunk.h"
+#include "vci_columns.h"
+#include "vci_columns_data.h"
+#include "vci_ros.h"
+
+static void
+InitOneRosChunkBufferCore(RosChunkBuffer *rosChunkBuffer,
+ int numRowsAtOnce,
+ int16 *columnIdList,
+ int16 *columnSizeList,
+ bool useTid,
+ bool useDeleteVector,
+ vci_MainRelHeaderInfo *info)
+{
+ int16 nullBitId = 0;
+ char *bufferIndex;
+ char *bufferData;
+ int sizeIndexArray;
+ Size sizeTuple = 0;
+
+ const int16 numColumns = rosChunkBuffer->numColumns;
+ const int16 numNullableColumns = rosChunkBuffer->numNullableColumns;
+
+ rosChunkBuffer->numColumnsWithIndex = 0;
+ rosChunkBuffer->nullWidthInByte = (numNullableColumns + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
+ rosChunkBuffer->numFilled = 0;
+ rosChunkBuffer->compType = palloc_array(vcis_compression_type_t, numColumns);
+ rosChunkBuffer->nullBitId = palloc_array(int16, numColumns);
+ rosChunkBuffer->columnSizeList = palloc_array(int16, numColumns);
+ rosChunkBuffer->data = palloc0_array(char *, numColumns * 2);
+ rosChunkBuffer->dataOffset = (vci_offset_in_extent_t **)
+ &(rosChunkBuffer->data[numColumns]);
+ MemCpy(rosChunkBuffer->columnSizeList,
+ columnSizeList,
+ sizeof(int16) * numColumns);
+ rosChunkBuffer->nullData = palloc_array(char, rosChunkBuffer->nullWidthInByte *
+ numRowsAtOnce);
+ rosChunkBuffer->tidData = useTid ? palloc(sizeof(ItemPointerData) * numRowsAtOnce)
+ : NULL;
+ rosChunkBuffer->deleteData = useDeleteVector
+ ? palloc(vci_RoundUpValue(numRowsAtOnce, 8))
+ : NULL;
+
+ for (int16 colId = VCI_FIRST_NORMALCOLUMN_ID; colId < numColumns; ++colId)
+ {
+ vcis_compression_type_t compType;
+
+ compType = vci_GetMColumn(info, columnIdList ? columnIdList[colId] : colId)
+ ->comp_type;
+ rosChunkBuffer->compType[colId] = compType;
+ switch (compType)
+ {
+ case vcis_compression_type_fixed_raw:
+ rosChunkBuffer->dataOffset[colId] = NULL;
+ break;
+ case vcis_compression_type_variable_raw:
+
+ /*
+ * we put the value 1 in rosChunkBuffer->dataOffset[colId] as
+ * a mark that later in this function the memory area to keep
+ * offsets should be allocated.
+ */
+ rosChunkBuffer->dataOffset[colId] = (vci_offset_in_extent_t *) 1;
+ ++(rosChunkBuffer->numColumnsWithIndex);
+ break;
+ default:
+ elog(ERROR, "unsupported compression type"); /* FIXME */
+ }
+ sizeTuple += columnSizeList[colId];
+ if (0 < numNullableColumns)
+ rosChunkBuffer->nullBitId[colId] = nullBitId++;
+ }
+ Assert(nullBitId == numNullableColumns);
+
+ sizeIndexArray = sizeof(vci_offset_in_extent_t) *
+ rosChunkBuffer->numColumnsWithIndex;
+ rosChunkBuffer->dataAllocPtr = palloc((sizeIndexArray *
+ (numRowsAtOnce + 1)) +
+ (sizeTuple * numRowsAtOnce) +
+ (VCI_DATA_ALIGNMENT_IN_STORAGE
+ * numColumns));
+ bufferIndex = rosChunkBuffer->dataAllocPtr;
+ bufferData = &(bufferIndex[sizeIndexArray * (numRowsAtOnce + 1)]);
+ for (int16 colId = VCI_FIRST_NORMALCOLUMN_ID; colId < numColumns; ++colId)
+ {
+ int colSize = vci_RoundUpValue(columnSizeList[colId] * numRowsAtOnce,
+ VCI_DATA_ALIGNMENT_IN_STORAGE);
+
+ rosChunkBuffer->data[colId] = bufferData;
+ bufferData += colSize;
+
+ /*
+ * we put 1 in rosChunkBuffer->dataOffset[colId] for those columns to
+ * need offset data.
+ */
+ if (rosChunkBuffer->dataOffset[colId])
+ {
+ rosChunkBuffer->dataOffset[colId] =
+ (vci_offset_in_extent_t *) bufferIndex;
+ bufferIndex += sizeof(vci_offset_in_extent_t) *
+ (numRowsAtOnce + 1);
+ rosChunkBuffer->dataOffset[colId][0] = 0;
+ }
+ }
+}
+
+/**
+ * @brief Initialize a buffer to keep a chunk for ROS.
+ *
+ * The buffer initialized by this function must be destroyed by
+ * vci_DestroyOneRosChunkBuffer().
+ *
+ * @param[out] rosChunkBuffer data in rosChunkBuffer is initialized.
+ * @param[in] numRowsAtOnce number of rows to be stored in a chunk.
+ * @param[in] columnSizeList worst-case column sizes.
+ * @param[in] numColumns number of columns.
+ * @param[in] info VCI main relation header information.
+ */
+void
+vci_InitOneRosChunkBuffer(RosChunkBuffer *rosChunkBuffer,
+ int numRowsAtOnce,
+ int16 *columnSizeList,
+ int numColumns,
+ bool useDeleteVector,
+ vci_MainRelHeaderInfo *info)
+{
+ rosChunkBuffer->numColumns = numColumns;
+ rosChunkBuffer->numNullableColumns = vci_GetNumberOfNullableColumn(
+ vci_GetTupleDescr(info));
+ InitOneRosChunkBufferCore(rosChunkBuffer,
+ numRowsAtOnce,
+ NULL,
+ columnSizeList,
+ true,
+ useDeleteVector,
+ info);
+}
+
+/**
+ * @brief Destroy chunk buffer.
+ *
+ * @param[in] rosChunkBuffer target to destroy.
+ */
+void
+vci_DestroyOneRosChunkBuffer(RosChunkBuffer *rosChunkBuffer)
+{
+ Assert(rosChunkBuffer);
+
+ if (NULL == rosChunkBuffer->compType)
+ return;
+
+ vci_PfreeAndNull(&(rosChunkBuffer->compType));
+ vci_PfreeAndNull(&(rosChunkBuffer->nullBitId));
+ vci_PfreeAndNull(&(rosChunkBuffer->columnSizeList));
+ vci_PfreeAndNull(&(rosChunkBuffer->data));
+ vci_PfreeAndNull(&(rosChunkBuffer->nullData));
+ vci_PfreeAndNull(&(rosChunkBuffer->tidData));
+ vci_PfreeAndNull(&(rosChunkBuffer->deleteData));
+ vci_PfreeAndNull(&(rosChunkBuffer->dataAllocPtr));
+ rosChunkBuffer->numColumns = 0;
+}
+
+/**
+ * @brief Initialize a RosChunkStorage, which holds multiple
+ * RosChunkBuffer.
+ *
+ * @param[out] rosChunkStorage pointer to the target RosChunkStorage that
+ * is initialized.
+ * @param[in] numRowsAtOnce number of rows to be stored in a chunk
+ * @param[in] forAppending false for normal ROS creation.
+ * make true only for collect-deleted-rows with appending new data.
+ *
+ * @note The instance should be destroyed by vci_DestroyRosChunkStorage().
+ */
+void
+vci_InitRosChunkStorage(RosChunkStorage *rosChunkStorage,
+ int numRowsAtOnce,
+ bool forAppending)
+{
+ Assert(rosChunkStorage);
+ rosChunkStorage->numChunks = (VCI_NUM_ROWS_IN_EXTENT + numRowsAtOnce - 1) /
+ numRowsAtOnce;
+
+ if ((rosChunkStorage->forAppending = forAppending)) /* pgr0011 */
+ rosChunkStorage->numChunks *= 2;
+
+ rosChunkStorage->numFilled = 0;
+ rosChunkStorage->numTotalRows = 0;
+ rosChunkStorage->chunk = palloc0_array(RosChunkBuffer *,
+ rosChunkStorage->numChunks);
+}
+
+/**
+ * @brief Reset RosChunkStorage to reuse it for new extent creation.
+ *
+ * The RosChunkBuffer's held by the storage are destroyed.
+ *
+ * @param[in] rosChunkStorage the target to be reset.
+ */
+void
+vci_ResetRosChunkStorage(RosChunkStorage *rosChunkStorage)
+{
+ int cId;
+
+ for (cId = 0; cId < rosChunkStorage->numFilled; ++cId)
+ {
+ vci_DestroyOneRosChunkBuffer(rosChunkStorage->chunk[cId]);
+ rosChunkStorage->chunk[cId] = NULL;
+ }
+ rosChunkStorage->numFilled = 0;
+ rosChunkStorage->numTotalRows = 0;
+}
+
+/**
+ * @brief Destroy RosChunkStorage.
+ *
+ * The RosChunkBuffer's held by the storage are also destroyed.
+ *
+ * @param[in] rosChunkStorage the target to be destroyed.
+ */
+void
+vci_DestroyRosChunkStorage(RosChunkStorage *rosChunkStorage)
+{
+ int cId;
+
+ Assert(rosChunkStorage);
+ if (NULL == rosChunkStorage->chunk)
+ return;
+
+ for (cId = 0; cId < rosChunkStorage->numFilled; ++cId)
+ vci_DestroyOneRosChunkBuffer(rosChunkStorage->chunk[cId]);
+ pfree(rosChunkStorage->chunk);
+ rosChunkStorage->chunk = NULL;
+ rosChunkStorage->numChunks = 0;
+ rosChunkStorage->numFilled = 0;
+}
+
+/**
+ * @brief Fill one tuple in a RosChunkBuffer.
+ *
+ * @param[in] rosChunkBuffer the buffer where the tuple is stored into.
+ * @param[in] info VCI main relation header information.
+ * @param[in] tid the tid to be stored.
+ * @param[in] tuple the tuple to be stored.
+ * @param[in] dstColumnIdList the target column IDs in the VCI.
+ * @param[in] heapAttrNumList attribute numbers of the target columns
+ * in the original heap tuple.
+ * @param[in] tupleDesc the tuple descriptor of the original heap
+ * relation.
+ */
+void
+vci_FillOneRowInRosChunkBuffer(RosChunkBuffer *rosChunkBuffer,
+ vci_MainRelHeaderInfo *info,
+ ItemPointer tid,
+ HeapTuple tuple,
+ int16 *dstColumnIdList,
+ AttrNumber *heapAttrNumList,
+ TupleDesc tupleDesc)
+{
+ int16 colId;
+ int offset = (rosChunkBuffer->numFilled)++;
+ int nullWidthInByte = rosChunkBuffer->nullWidthInByte;
+ char *nullData = (NULL == rosChunkBuffer->nullData) ? NULL :
+ &(rosChunkBuffer->nullData[nullWidthInByte * offset]);
+
+ if (nullData)
+ MemSet(nullData, 0, nullWidthInByte);
+
+ if (rosChunkBuffer->tidData)
+ MemCpy(&(rosChunkBuffer->tidData[sizeof(ItemPointerData) * offset]), tid, sizeof(ItemPointerData));
+
+ for (colId = VCI_FIRST_NORMALCOLUMN_ID; colId < rosChunkBuffer->numColumns; ++colId)
+ {
+ bool isnull;
+ Datum datum = heap_getattr(tuple,
+ heapAttrNumList[colId],
+ tupleDesc,
+ &isnull);
+
+ if (isnull)
+ {
+ Assert((VCI_FIRST_NORMALCOLUMN_ID <= dstColumnIdList[colId]) &&
+ (dstColumnIdList[colId] <
+ vci_GetMainRelVar(info, vcimrv_num_columns, 0)));
+ if (nullData)
+ vci_SetBit(nullData,
+ rosChunkBuffer->nullBitId[dstColumnIdList[colId]]);
+
+ switch (rosChunkBuffer->compType[colId])
+ {
+ case vcis_compression_type_fixed_raw:
+ {
+ int size = rosChunkBuffer->columnSizeList[colId];
+ char *ptr;
+
+ ptr = &(rosChunkBuffer->data[colId][offset * size]);
+ if (0 == offset)
+ MemSet(ptr, 0, size);
+ else
+ MemCpy(ptr, &(ptr[-size]), size);
+ }
+ break;
+ case vcis_compression_type_variable_raw:
+ {
+ static struct varlena datumNull;
+ static vci_offset_in_extent_t size = 0;
+ vci_offset_in_extent_t curOffset;
+
+ if (size == 0)
+ {
+ /* One-time initialization */
+
+ MemSet(&datumNull, 0, sizeof(datumNull));
+
+ /*
+ * varlena for extenal is type of 1B_E and has the
+ * the length of zero. We must give 1 or larger
+ * length to normal varlena data.
+ */
+ SET_VARSIZE_SHORT(&datumNull, 1);
+ size = 1;
+ }
+ curOffset = rosChunkBuffer->dataOffset[colId][offset];
+ rosChunkBuffer->dataOffset[colId][offset + 1] =
+ curOffset + size;
+ MemCpy(&(rosChunkBuffer->data[colId][curOffset]),
+ &datumNull,
+ size);
+ }
+ break;
+ default:
+ elog(ERROR, "unsupported compression type"); /* FIXME */
+
+ }
+ }
+ else
+ {
+ switch (rosChunkBuffer->compType[colId])
+ {
+ case vcis_compression_type_fixed_raw:
+ {
+ int size = rosChunkBuffer->columnSizeList[colId];
+ char *ptr;
+
+ ptr = &(rosChunkBuffer->data[colId][offset * size]);
+ if (size <= sizeof(Datum))
+ {
+ switch (size)
+ {
+ case 1:
+ *ptr = DatumGetUInt8(datum);
+ break;
+ case 2:
+ {
+ uint16 val = DatumGetUInt16(datum);
+
+ MemCpy(ptr, &val, sizeof(uint16));
+ }
+ break;
+ case 4:
+ {
+ uint32 val = DatumGetUInt32(datum);
+
+ MemCpy(ptr, &val, sizeof(uint32));
+ }
+ break;
+ case 8:
+ {
+ uint64 val = DatumGetInt64(datum);
+
+ MemCpy(ptr, &val, sizeof(uint64));
+ }
+ break;
+ default:
+ elog(ERROR, "unsupported fixed length");
+ }
+ }
+ else
+ {
+ size = rosChunkBuffer->columnSizeList[colId];
+
+ /* FIXME */
+
+ /*
+ * sizeof(TimeTzADT) is 16, 4 bytes are padding,
+ * so we cannot use (sizeof(TimeTzADT) == size).
+ * Instead, (12U == size). Can we use better way?
+ */
+ Assert((12U == size) ||
+ (sizeof(Interval) == size) ||
+ (UUID_LEN == size) ||
+ (NAMEDATALEN == size));
+ MemCpy(ptr, DatumGetPointer(datum), size);
+ }
+ }
+ break;
+
+ /* FIXME */
+
+ /*
+ * We need to fill variable length data into fixed length
+ * area in order to reduce the space for the offsets and
+ * headers.
+ */
+ case vcis_compression_type_variable_raw:
+ {
+ vci_offset_in_extent_t curOffset;
+ vci_offset_in_extent_t size = VARSIZE_ANY(DatumGetPointer(datum));
+
+ /* Check worst size. */
+ Assert(size <= rosChunkBuffer->columnSizeList[colId]);
+
+ curOffset = rosChunkBuffer->dataOffset[colId][offset];
+ rosChunkBuffer->dataOffset[colId][offset + 1] =
+ curOffset + size;
+ MemCpy(&(rosChunkBuffer->data[colId][curOffset]),
+ DatumGetPointer(datum),
+ size);
+ }
+ break;
+ default:
+ elog(ERROR, "unsupported compression type"); /* FIXME */
+ }
+ }
+ }
+}
+
+/**
+ * @brief Reset counter of a RosChunkBuffer.
+ *
+ * @param[in] buffer the target RosChunkBuffer.
+ */
+void
+vci_ResetRosChunkBufferCounter(RosChunkBuffer *buffer)
+{
+ int colId;
+
+ for (colId = VCI_FIRST_NORMALCOLUMN_ID; colId < buffer->numColumns; ++colId)
+ {
+ if (NULL != buffer->dataOffset[colId])
+ buffer->dataOffset[colId][0] = 0;
+ }
+ buffer->numFilled = 0;
+}
+
+/**
+ * @brief Create a copy of a RosChunkBuffer.
+ *
+ * In creation, memory are allocated only by the necessary capacity.
+ *
+ * @param[in] src the original RosChunkBuffer.
+ * @return pointer to the copy.
+ *
+ * @note The created RosChunkBuffer should be destroyed by
+ * vci_DestroyOneRosChunkBuffer().
+ */
+static RosChunkBuffer *
+vci_CopyRosChunkBuffer(RosChunkBuffer *src)
+{
+ Size totalSize;
+ int16 colId;
+ char *bufferIndex;
+ char *bufferData;
+ int sizeIndexArray;
+ int16 numColumns = src->numColumns;
+ int numFilled = src->numFilled;
+ RosChunkBuffer *dst;
+
+ CHECK_FOR_INTERRUPTS();
+
+ dst = vci_AllocateAndCopy(src, sizeof(RosChunkBuffer));
+
+ dst->compType = vci_AllocateAndCopy(src->compType,
+ sizeof(vcis_compression_type_t) * numColumns);
+ dst->nullBitId = vci_AllocateAndCopy(src->nullBitId,
+ sizeof(int16) * numColumns);
+ dst->columnSizeList = vci_AllocateAndCopy(src->columnSizeList,
+ sizeof(int16) * numColumns);
+ dst->nullData = vci_AllocateAndCopy(src->nullData,
+ src->nullWidthInByte * numFilled);
+ dst->tidData = vci_AllocateAndCopy(src->tidData,
+ sizeof(ItemPointerData) * numFilled);
+ dst->deleteData = NULL;
+ if (src->deleteData)
+ dst->deleteData = vci_AllocateAndCopy(src->deleteData,
+ vci_RoundUpValue(numFilled, 8));
+
+ CHECK_FOR_INTERRUPTS();
+
+ dst->data = palloc_array(char *, numColumns * 2);
+ dst->dataOffset = (vci_offset_in_extent_t **) &(dst->data[numColumns]);
+
+ sizeIndexArray = sizeof(vci_offset_in_extent_t) * src->numColumnsWithIndex;
+ totalSize = sizeIndexArray * (numFilled + 1); /* pgr0062 */
+ for (colId = VCI_FIRST_NORMALCOLUMN_ID; colId < numColumns; ++colId)
+ {
+ if (NULL == src->dataOffset[colId])
+ totalSize += src->columnSizeList[colId] * numFilled;
+ else
+ totalSize += src->dataOffset[colId][numFilled] -
+ src->dataOffset[colId][0];
+ }
+ dst->dataAllocPtr = palloc(totalSize);
+ bufferIndex = dst->dataAllocPtr;
+ bufferData = &(bufferIndex[sizeIndexArray * (numFilled + 1)]);
+ for (colId = VCI_FIRST_NORMALCOLUMN_ID; colId < numColumns; ++colId)
+ {
+ if (0 == (colId & 3))
+ CHECK_FOR_INTERRUPTS();
+
+ if (NULL == src->dataOffset[colId])
+ {
+ Size copySize = src->columnSizeList[colId] * numFilled;
+
+ dst->data[colId] = bufferData;
+ MemCpy(bufferData, src->data[colId], copySize);
+ bufferData += copySize;
+ dst->dataOffset[colId] = NULL;
+ }
+ else
+ {
+ Size copySize = src->dataOffset[colId][numFilled] -
+ src->dataOffset[colId][0];
+
+ dst->data[colId] = bufferData;
+ MemCpy(bufferData, src->data[colId], copySize);
+ bufferData += copySize;
+ copySize = sizeof(vci_offset_in_extent_t) * (numFilled + 1);
+ dst->dataOffset[colId] = (vci_offset_in_extent_t *) bufferIndex;
+ MemCpy(bufferIndex, src->dataOffset[colId], copySize);
+ bufferIndex += copySize;
+ }
+ }
+
+ return dst;
+}
+
+/**
+ * @brief Register a RosChunkBuffer to a RosChunkStorage.
+ *
+ * @param[in] rosChunkStorage the holder of RosChunkBuffer.
+ * @param[in] src the RosChunkBuffer to be registered.
+ */
+void
+vci_RegisterChunkBuffer(RosChunkStorage *rosChunkStorage, RosChunkBuffer *src)
+{
+ Assert(rosChunkStorage->numFilled < rosChunkStorage->numChunks);
+ rosChunkStorage->chunk[rosChunkStorage->numFilled] =
+ vci_CopyRosChunkBuffer(src);
+ ++(rosChunkStorage->numFilled);
+ rosChunkStorage->numTotalRows += src->numFilled;
+}
+
+/**
+ * @brief Calculate the data size of the specified column
+ * in the RosChunkStorage.
+ *
+ * @param[in] src the target RosChunkStorage to be inspected.
+ * @param[in] columnId the ID of the target column.
+ * @param[in] asFixed true to treat a variable-field-length column as a
+ * fixed-field-length column.
+ */
+Size
+vci_GetDataSizeInChunkStorage(RosChunkStorage *src, int columnId, bool asFixed)
+{
+ Size dataSize;
+ int chunkId;
+
+ if (src->numFilled < 1)
+ return 0;
+
+ Assert((VCI_FIRST_NORMALCOLUMN_ID <= columnId) && (columnId < src->chunk[0]->numColumns));
+
+ switch (src->chunk[0]->compType[columnId])
+ {
+ case vcis_compression_type_fixed_raw:
+ return src->numTotalRows * src->chunk[0]->columnSizeList[columnId];
+
+ default:
+ ;
+ }
+
+ dataSize = 0;
+ for (chunkId = 0; chunkId < src->numFilled; ++chunkId)
+ {
+ RosChunkBuffer *chunk = src->chunk[chunkId];
+ vci_offset_in_extent_t *dataOffset = chunk->dataOffset[columnId];
+
+ dataSize += dataOffset[chunk->numFilled] - dataOffset[0];
+ }
+
+ return dataSize;
+}
diff --git a/contrib/vci/storage/vci_columns.c b/contrib/vci/storage/vci_columns.c
new file mode 100644
index 0000000..1882b44
--- /dev/null
+++ b/contrib/vci/storage/vci_columns.c
@@ -0,0 +1,1150 @@
+/*-------------------------------------------------------------------------
+ *
+ * vci_columns.c
+ * Column store which consists ROS
+ *
+ * Column store consists of a main and a meta relation. Main relation
+ * consists of some extents and dictionaries. This file contains their
+ * handlings.
+ *
+ * Also, delete vector is also handled here.
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/vci/storage/vci_columns.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <stdint.h>
+#include <limits.h>
+
+#include "access/xact.h"
+#include "catalog/index.h"
+
+#include "postgresql_copy.h"
+
+#include "vci.h"
+#include "vci_chunk.h"
+
+#include "vci_columns.h"
+#include "vci_columns_data.h"
+
+#include "vci_freelist.h"
+#include "vci_fetch.h"
+#include "vci_mem.h"
+#include "vci_ros.h"
+#include "vci_tidcrid.h"
+
+#define VCI_LIMIT_INEFFICIENT_COUNT (10)
+#define GROWTH_NODE (10)
+
+#define VCI_MINIMUM_DATA_AMOUNT_FOR_COMMON_DICT (64 * 1024 * 1024)
+
+/**
+ * function to cast from Page to (vcis_column_meta_t *).
+ */
+#define vci_GetColumnMetaT(page) \
+ ((vcis_column_meta_t *) &((page)[VCI_MIN_PAGE_HEADER]))
+
+static void
+ UpdateInfoInMetaForFixedLengthRawData(vci_ColumnRelations *rel,
+ int numExtentPages);
+
+static uint32 GetVarlenAHeader(Datum *header,
+ Buffer *buffer,
+ BlockNumber *currentBlockNumber,
+ uint32 offsetInPage,
+ Relation rel);
+
+typedef struct vci_CmpInfo
+{
+ vci_DictInfo dict_info;
+
+ /*
+ * pointer to compressed data. NULL if no compressed data, or the size of
+ * compressed data is larger than or equal to that of raw. In this case,
+ * the memory areas pointed by compressed_data and compressed_offset
+ * should be freed, and compressed_num_offset should be zero.
+ */
+ char *compressed_data;
+
+ vci_offset_in_extent_t *compressed_offset; /* offset of each
+ * VCI_COMPACTION_UNIT_ROW */
+ uint32 compressed_num_offset; /* number of offset */
+} vci_CmpInfo;
+
+static void
+InitializeCmpInfo(vci_CmpInfo *cmpInfo)
+{
+ Assert(cmpInfo);
+ vci_InitializeDictInfo(&(cmpInfo->dict_info));
+ cmpInfo->compressed_data = NULL;
+ cmpInfo->compressed_offset = NULL;
+ cmpInfo->compressed_num_offset = 0;
+}
+
+/* ***************************
+ * Extent operation function
+ * ***************************
+ */
+void
+vci_WriteRawDataExtentInfo(Relation rel,
+ int32 extentId,
+ uint32 startBlockNumber,
+ uint32 numBlocks,
+ char *minData,
+ char *maxData,
+ bool validMinMax,
+ bool checkOverwrite)
+{
+ Buffer bufMeta;
+ Buffer buffer;
+ BlockNumber blockNumber;
+ vcis_c_extent_t *columnExtent;
+ vcis_column_meta_t *columnMeta = vci_GetColumnMeta(&bufMeta, rel);
+
+ Assert(false == validMinMax);
+
+ columnExtent = vci_GetColumnExtent(&buffer,
+ &blockNumber,
+ rel,
+ extentId);
+
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /* vci_MinMaxTypeInfo *mmti = vci_GetMinMaxTypeInfo(attr); */
+ if (checkOverwrite)
+ if (columnExtent->enabled &&
+ (0 != columnExtent->block_number))
+ elog(ERROR, "overwrite column meta data"); /* FIXME */
+
+ columnExtent->block_number = startBlockNumber;
+ columnExtent->num_blocks = numBlocks;
+ columnExtent->enabled = (startBlockNumber != InvalidBlockNumber);
+ columnExtent->valid_min_max = validMinMax;
+
+ if (minData)
+ MemCpy(columnExtent->min, minData, columnMeta->min_max_content_size);
+
+ if (maxData)
+ MemCpy(&(columnExtent->min[columnMeta->min_max_field_size]),
+ maxData, columnMeta->min_max_content_size);
+
+ vci_WriteOneItemPage(rel, buffer);
+ UnlockReleaseBuffer(buffer);
+ ReleaseBuffer(bufMeta);
+}
+
+static void
+WriteFixedLengthRawData(vci_MainRelHeaderInfo *info,
+ RosChunkStorage *src,
+ int16 columnId,
+ int extentId)
+{
+ vci_ColumnRelations rel;
+ int16 columnSize;
+ Size dataSize;
+ int extentHeaderSize;
+ int numExtentPages;
+ char minData[VCI_MAX_MIN_MAX_SIZE];
+ char maxData[VCI_MAX_MIN_MAX_SIZE];
+ BlockNumber startBlockNumber;
+ BlockNumber blockNumber;
+ uint32 offsetInPage;
+ Buffer buffer = InvalidBuffer;
+ Page page = NULL; /* invalid page */
+ LOCKMODE lockmode = RowExclusiveLock;
+ bool fixedPages = true;
+
+ Assert(VCI_FIRST_NORMAL_EXTENT_ID <= extentId);
+
+ Assert(info);
+ Assert(src);
+ Assert(0 < src->numFilled);
+ Assert(src->chunk[0]);
+ Assert((VCI_COLUMN_ID_TID == columnId) ||
+ (VCI_COLUMN_ID_NULL == columnId) ||
+ (columnId < src->chunk[0]->numColumns));
+
+ vci_OpenColumnRelations(&rel, info, columnId, lockmode);
+
+ columnSize = vci_GetFixedColumnSize(info, columnId);
+
+ dataSize = (Size) columnSize * VCI_NUM_ROWS_IN_EXTENT;
+ extentHeaderSize = vci_GetExtentFixedLengthRawDataHeaderSize(
+ VCI_NUM_ROWS_IN_EXTENT);
+ numExtentPages = vci_GetNumBlocks(dataSize + extentHeaderSize);
+
+ startBlockNumber = extentId * numExtentPages;
+ if (VCI_FIRST_NORMALCOLUMN_ID <= columnId)
+ {
+ vcis_m_column_t *colInfo = vci_GetMColumn(info, columnId);
+
+ switch (colInfo->comp_type)
+ {
+ case vcis_compression_type_fixed_raw:
+ vci_WriteRawDataExtentInfo(rel.meta,
+ extentId,
+ startBlockNumber,
+ numExtentPages,
+ NULL,
+ NULL,
+ false,
+ true);
+ UpdateInfoInMetaForFixedLengthRawData(&rel,
+ numExtentPages);
+ break;
+ default:
+ Assert(false);
+ elog(ERROR, "internal error");
+ }
+ }
+
+ vci_PreparePagesWithOneItemIfNecessary(rel.data,
+ startBlockNumber + numExtentPages - 1);
+
+ vci_GetBlockNumberAndOffsetInPage(&blockNumber,
+ &offsetInPage,
+ extentHeaderSize);
+ blockNumber += startBlockNumber;
+ buffer = vci_ReadBufferWithPageInit(rel.data, blockNumber);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ page = BufferGetPage(buffer);
+
+ {
+ vcis_extent_t *extent = vci_GetExtentT(page);
+
+ extent->size = numExtentPages * VCI_MAX_PAGE_SPACE;
+ extent->type = vcis_extent_type_data;
+ extent->id = extentId;
+ extent->comp_type = vcis_compression_type_fixed_raw;
+ extent->offset_offset = 0;
+ extent->offset_size = 0;
+ extent->data_offset = extentHeaderSize;
+ extent->data_size = dataSize;
+ extent->compressed = 0;
+ extent->dict_offset = 0;
+ extent->dict_size = 0;
+ extent->dict_type = vcis_dict_type_none;
+ }
+
+ for (int chunkId = 0; chunkId < src->numFilled; ++chunkId)
+ {
+ RosChunkBuffer *chunk = src->chunk[chunkId];
+ int size;
+
+ Assert(chunk);
+
+ size = chunk->numFilled * columnSize;
+ for (int written = 0; written < size;)
+ {
+ int writeSize;
+
+ if (VCI_MAX_PAGE_SPACE <= offsetInPage)
+ {
+ if (BufferIsValid(buffer))
+ {
+ vci_WriteOneItemPage(rel.data, buffer);
+ UnlockReleaseBuffer(buffer);
+ }
+ ++blockNumber;
+ /* FIXME */
+
+ /*
+ * To obtain better performance, each DB page should be
+ * initialized only when it is accessed for the first time.
+ */
+ offsetInPage = 0;
+ buffer = ReadBuffer(rel.data, blockNumber);
+ vci_InitPageCore(buffer, 1, false);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ page = BufferGetPage(buffer);
+ }
+
+ writeSize = Min(VCI_MAX_PAGE_SPACE - offsetInPage, size - written); /* pgr0063 */
+ switch (columnId)
+ {
+ case VCI_COLUMN_ID_TID:
+ MemCpy(&(((char *) page)[VCI_MIN_PAGE_HEADER + offsetInPage]),
+ &(chunk->tidData[written]),
+ writeSize);
+ break;
+ case VCI_COLUMN_ID_NULL:
+ MemCpy(&(((char *) page)[VCI_MIN_PAGE_HEADER + offsetInPage]),
+ &(chunk->nullData[written]),
+ writeSize);
+ break;
+ default:
+ MemCpy(&(((char *) page)[VCI_MIN_PAGE_HEADER + offsetInPage]),
+ &(chunk->data[columnId][written]),
+ writeSize);
+ break;
+ }
+ written += writeSize;
+ offsetInPage += writeSize;
+ }
+ }
+ if (BufferIsValid(buffer))
+ {
+ vci_WriteOneItemPage(rel.data, buffer);
+ UnlockReleaseBuffer(buffer);
+ ++blockNumber;
+
+ if (fixedPages)
+ {
+ for (; blockNumber < (startBlockNumber + numExtentPages);
+ ++blockNumber)
+ {
+ buffer = vci_ReadBufferWithPageInit(rel.data, blockNumber);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ vci_WriteOneItemPage(rel.data, buffer);
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+ }
+
+ vci_WriteRawDataExtentInfo(rel.meta,
+ extentId,
+ startBlockNumber,
+ numExtentPages,
+ minData,
+ maxData,
+ false,
+ false);
+
+ vci_CloseColumnRelations(&rel, lockmode);
+}
+
+static void
+WriteVariableLengthRawData(vci_MainRelHeaderInfo *info,
+ RosChunkStorage *src,
+ int columnId,
+ int extentId,
+ TransactionId xId)
+{
+ vci_ColumnRelations rel;
+ Size dataSize;
+ int extentHeaderSize;
+ int numExtentPages;
+ int numCommonDictPages;
+ char minData[VCI_MAX_MIN_MAX_SIZE];
+ char maxData[VCI_MAX_MIN_MAX_SIZE];
+ BlockNumber startBlockNumber;
+ BlockNumber blockNumber;
+ BlockNumber blockNumberOld = InvalidBlockNumber;
+ uint32 offsetInPage;
+ Buffer buffer = InvalidBuffer;
+ LOCKMODE lockmode = RowExclusiveLock;
+ vcis_extent_t *extent;
+ vcis_compression_type_t compType;
+ vci_CmpInfo cmpInfo;
+
+ vcis_free_space_t *FS;
+
+ Assert(VCI_FIRST_NORMAL_EXTENT_ID <= extentId);
+ Assert(info);
+ Assert(src);
+ Assert(0 < src->numFilled);
+ Assert(src->chunk[0]);
+ Assert(columnId < src->chunk[0]->numColumns);
+
+ InitializeCmpInfo(&cmpInfo);
+
+ vci_OpenColumnRelations(&rel, info, columnId, lockmode);
+
+ {
+ vcis_m_column_t *colInfo = vci_GetMColumn(info, columnId);
+
+ compType = colInfo->comp_type;
+ }
+ Assert(compType == vcis_compression_type_variable_raw);
+
+ dataSize = vci_GetDataSizeInChunkStorage(src, columnId, false);
+
+ extentHeaderSize = vci_GetExtentVariableLengthRawDataHeaderSize(
+ src->numTotalRows);
+
+ numExtentPages = vci_GetNumBlocks(dataSize + extentHeaderSize);
+ numCommonDictPages = 0;
+
+ startBlockNumber = vci_FindFreeSpaceForExtent(&rel, numExtentPages + numCommonDictPages);
+ FS = vci_GetFreeSpace((vci_RelationPair *) &rel, startBlockNumber);
+ vci_WriteRecoveryRecordForFreeSpace(&rel, columnId, cmpInfo.dict_info.common_dict_id,
+ startBlockNumber, FS);
+ ReleaseBuffer(rel.bufData);
+
+ vci_RemoveFreeSpaceFromLinkList(&rel,
+ startBlockNumber,
+ numExtentPages + numCommonDictPages);
+
+ vci_WriteRawDataExtentInfo(rel.meta,
+ extentId,
+ startBlockNumber,
+ numExtentPages + numCommonDictPages,
+ NULL, /* min */
+ NULL, /* max */
+ false,
+ true);
+
+ /* write the header part of extent data in data relation */
+ blockNumberOld = blockNumber = startBlockNumber + numCommonDictPages;
+ buffer = vci_ReadBufferWithPageInit(rel.data, blockNumber);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ extent = vci_GetExtentT(BufferGetPage(buffer));
+ extent->size = numExtentPages * VCI_MAX_PAGE_SPACE;
+ extent->type = vcis_extent_type_data;
+ extent->id = extentId;
+ extent->comp_type = compType;
+ extent->offset_offset = offsetof(vcis_extent_t, dict_body);
+ extent->offset_size = vci_GetOffsetArraySize(src->numTotalRows);
+ extent->data_offset = extentHeaderSize;
+ extent->data_size = dataSize;
+ extent->compressed = 0;
+ extent->dict_offset = VCI_INVALID_DICTIONARY_ID;
+ extent->dict_size = 0;
+ extent->dict_type = vcis_dict_type_none;
+
+ /* write offset data */
+ /***************
+ * ** CAUTION **
+ * *************
+ * Here, we only record pointers of the head of each
+ * VCI_COMPACTION_UNIT_ROW entries.
+ */
+ vci_GetBlockNumberAndOffsetInPage(&blockNumber,
+ &offsetInPage,
+ extent->offset_offset);
+ blockNumber += startBlockNumber + numCommonDictPages;
+
+ { /* raw data */
+ /*
+ * Make offset data
+ */
+#ifdef USE_ASSERT_CHECKING
+ uint32 numRowSamples = vci_GetOffsetArrayLength(src->numTotalRows);
+#endif /* #ifdef USE_ASSERT_CHECKING */
+ uint32 offsetSize = vci_GetOffsetArraySize(src->numTotalRows);
+ vci_offset_in_extent_t *offset = palloc(offsetSize);
+ int rowId = 0;
+ int globalOffset = 0;
+ int offsetPtr = 0;
+
+ for (int chunkId = 0; chunkId < src->numFilled; ++chunkId)
+ {
+ RosChunkBuffer *chunk = src->chunk[chunkId];
+ vci_offset_in_extent_t *dataOffset = chunk->dataOffset[columnId];
+ int elemId = rowId % VCI_COMPACTION_UNIT_ROW;
+
+ for (; elemId < chunk->numFilled;
+ elemId += VCI_COMPACTION_UNIT_ROW)
+ {
+ offset[offsetPtr++] = globalOffset + dataOffset[elemId];
+ }
+ rowId += chunk->numFilled;
+ globalOffset += dataOffset[chunk->numFilled] - dataOffset[0];
+ }
+ Assert(rowId == src->numTotalRows);
+ Assert(globalOffset == vci_GetDataSizeInChunkStorage(src, columnId,
+ false));
+ Assert(offsetPtr == (numRowSamples - 1));
+ offset[offsetPtr] = globalOffset;
+
+ buffer = vci_WriteDataIntoMultiplePages(rel.data,
+ &blockNumber,
+ &blockNumberOld,
+ &offsetInPage,
+ buffer,
+ offset,
+ offsetSize);
+ pfree(offset);
+ }
+
+ /* write data */
+ vci_GetBlockNumberAndOffsetInPage(&blockNumber,
+ &offsetInPage,
+ extent->data_offset);
+ blockNumber += startBlockNumber + numCommonDictPages;
+
+ {
+ for (int chunkId = 0; chunkId < src->numFilled; ++chunkId)
+ {
+ RosChunkBuffer *chunk = src->chunk[chunkId];
+ vci_offset_in_extent_t *dataOffset = chunk->dataOffset[columnId];
+ int size = dataOffset[chunk->numFilled] - dataOffset[0];
+
+ Assert(chunk);
+
+ buffer = vci_WriteDataIntoMultiplePages(rel.data,
+ &blockNumber, &blockNumberOld, &offsetInPage,
+ buffer,
+ chunk->data[columnId], size);
+ }
+ }
+
+ if (BufferIsValid(buffer))
+ {
+ vci_WriteOneItemPage(rel.data, buffer);
+ UnlockReleaseBuffer(buffer);
+ }
+
+ vci_WriteRawDataExtentInfo(rel.meta,
+ extentId,
+ startBlockNumber + numCommonDictPages,
+ numExtentPages,
+ minData,
+ maxData,
+ false,
+ false);
+
+ vci_CloseColumnRelations(&rel, lockmode);
+}
+
+static void
+WriteDeleteVector(vci_MainRelHeaderInfo *info,
+ RosChunkStorage *src,
+ int extentId)
+{
+ vci_ColumnRelations rel;
+ LOCKMODE lockmode = RowExclusiveLock;
+ int numExtentPages = VCI_NUM_PAGES_IN_EXTENT_FOR_DELETE;
+ BlockNumber startBlockNumber = numExtentPages * extentId;
+
+ Buffer buffer;
+
+ Assert(VCI_FIRST_NORMAL_EXTENT_ID <= extentId);
+
+ vci_OpenColumnRelations(&rel, info, VCI_COLUMN_ID_DELETE, lockmode);
+
+ vci_WriteRawDataExtentInfo(rel.meta,
+ extentId,
+ startBlockNumber,
+ numExtentPages,
+ NULL,
+ NULL,
+ false,
+ false /* don't check ovwerwrite */ );
+ UpdateInfoInMetaForFixedLengthRawData(&rel,
+ numExtentPages);
+ {
+ for (int rId = 0; rId < numExtentPages; ++rId)
+ {
+ vci_PreparePagesIfNecessaryCore(rel.data,
+ startBlockNumber + rId,
+ VCI_ITEMS_IN_PAGE_FOR_DELETE,
+ true,
+ true);
+
+ buffer = ReadBuffer(rel.data, startBlockNumber + rId);
+ vci_InitPageCore(buffer, VCI_ITEMS_IN_PAGE_FOR_DELETE, false);
+ ReleaseBuffer(buffer);
+ }
+ }
+
+ for (int chunkId = 0; chunkId < src->numFilled; ++chunkId)
+ {
+ RosChunkBuffer *chunk = src->chunk[chunkId];
+
+ if (chunk->deleteData)
+ {
+ abort(); /* FIXME */
+ }
+ }
+
+ vci_WriteRawDataExtentInfo(rel.meta,
+ extentId,
+ startBlockNumber,
+ numExtentPages,
+ NULL,
+ NULL,
+ false,
+ false);
+
+ vci_CloseColumnRelations(&rel, lockmode);
+}
+
+void
+vci_WriteOneExtent(vci_MainRelHeaderInfo *info,
+ RosChunkStorage *src,
+ int extentId,
+ TransactionId xgen, /* xgen in extent info */
+ TransactionId xdel, /* xdel in extent info */
+ TransactionId xid) /* in tuple header */
+{
+ Assert(src);
+ if (src->numTotalRows < 1)
+ return;
+ Assert(src->chunk[0]);
+ Assert(VCI_FIRST_NORMAL_EXTENT_ID <= extentId);
+
+ WriteDeleteVector(info, src, extentId);
+ WriteFixedLengthRawData(info, src, VCI_COLUMN_ID_TID, extentId);
+ WriteFixedLengthRawData(info, src, VCI_COLUMN_ID_NULL, extentId);
+ for (int16 colId = VCI_FIRST_NORMALCOLUMN_ID; colId < src->chunk[0]->numColumns; ++colId)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ switch (src->chunk[0]->compType[colId])
+ {
+ case vcis_compression_type_fixed_raw:
+ WriteFixedLengthRawData(info, src, colId, extentId);
+ break;
+ case vcis_compression_type_variable_raw:
+ WriteVariableLengthRawData(info, src, colId, extentId, xid);
+ break;
+ default:
+ elog(ERROR, "unsupported compression type"); /* FIXME */
+ }
+ }
+ vci_WriteExtentInfo(info,
+ extentId,
+ src->numTotalRows,
+ 0,
+ 0,
+ xgen,
+ xdel);
+}
+
+void
+vci_InitializeDictInfo(vci_DictInfo *dictInfo)
+{
+ Assert(dictInfo);
+ dictInfo->dictionary_storage = NULL;
+ dictInfo->storage_size = 0;
+ dictInfo->extent_id = VCI_INVALID_EXTENT_ID;
+ dictInfo->common_dict_id = VCI_INVALID_DICTIONARY_ID;
+ dictInfo->dict_type = vcis_dict_type_none;
+}
+
+vcis_c_extent_t *
+vci_GetColumnExtent(Buffer *buffer,
+ BlockNumber *blockNumber,
+ Relation rel,
+ int32 extentId)
+{
+ Page page;
+ vcis_column_meta_t *columnMeta = vci_GetColumnMeta(buffer, rel);
+
+ /* vci_MinMaxTypeInfo *mmti = vci_GetMinMaxTypeInfo(attr); */
+ int columnExtentSize;
+ int headerSize = offsetof(vcis_column_meta_t, common_dict_info) +
+ (sizeof(vcis_c_common_dict_t) * columnMeta->num_common_dicts);
+ int numExtentsInFirstPage;
+ int numExtentsInPage;
+
+ Assert(VCI_FIRST_NORMAL_EXTENT_ID <= extentId);
+
+ *blockNumber = VCI_NUM_COLUMN_META_HEADER_PAGE - 1;
+ columnExtentSize = offsetof(vcis_c_extent_t, min) + (2 * columnMeta->min_max_field_size);
+ numExtentsInFirstPage = (VCI_MAX_PAGE_SPACE - headerSize) /
+ columnExtentSize;
+ if (extentId < numExtentsInFirstPage)
+ return (vcis_c_extent_t *) &(((char *) columnMeta)
+ [headerSize + (extentId * columnExtentSize)]);
+ ReleaseBuffer(*buffer);
+
+ extentId -= numExtentsInFirstPage;
+ numExtentsInPage = VCI_MAX_PAGE_SPACE / columnExtentSize;
+ *blockNumber = extentId / numExtentsInPage;
+ extentId -= *blockNumber * numExtentsInPage;
+ *blockNumber += VCI_NUM_COLUMN_META_HEADER_PAGE;
+ vci_PreparePagesWithOneItemIfNecessary(rel, *blockNumber);
+ *buffer = ReadBuffer(rel, *blockNumber);
+ page = BufferGetPage(*buffer);
+
+ return (vcis_c_extent_t *) &(((char *) page)
+ [VCI_MIN_PAGE_HEADER + (extentId * columnExtentSize)]);
+}
+
+vcis_column_meta_t *
+vci_GetColumnMeta(Buffer *buffer, Relation rel)
+{
+ Page page;
+
+ *buffer = vci_ReadBufferWithPageInit(rel, VCI_COLUMN_META_HEADER_PAGE_ID);
+ page = BufferGetPage(*buffer);
+
+ return vci_GetColumnMetaT(page);
+}
+
+static void
+GetColumnOids(Oid *metaOid,
+ Oid *dataOid,
+ vci_MainRelHeaderInfo *info,
+ int16 columnId)
+{
+ switch (columnId)
+ {
+ case VCI_COLUMN_ID_DELETE:
+ *metaOid = vci_GetMainRelVar(info, vcimrv_delete_meta_oid, 0);
+ *dataOid = vci_GetMainRelVar(info, vcimrv_delete_data_oid, 0);
+ break;
+ case VCI_COLUMN_ID_CRID:
+ *metaOid = InvalidOid;
+ *dataOid = InvalidOid;
+ break;
+ case VCI_COLUMN_ID_TID:
+ *metaOid = vci_GetMainRelVar(info, vcimrv_tid_meta_oid, 0);
+ *dataOid = vci_GetMainRelVar(info, vcimrv_tid_data_oid, 0);
+ break;
+ case VCI_COLUMN_ID_NULL:
+ *metaOid = vci_GetMainRelVar(info, vcimrv_null_meta_oid, 0);
+ *dataOid = vci_GetMainRelVar(info, vcimrv_null_data_oid, 0);
+ break;
+ default:
+ {
+ vcis_m_column_t *colInfo = vci_GetMColumn(info, columnId);
+
+ *metaOid = colInfo->meta_oid;
+ *dataOid = colInfo->data_oid;
+ break;
+ }
+ }
+}
+
+void
+vci_OpenColumnRelations(vci_ColumnRelations *rel,
+ vci_MainRelHeaderInfo *info,
+ int16 columnId,
+ LOCKMODE lockmode)
+{
+ Oid metaOid;
+ Oid dataOid;
+
+ GetColumnOids(&metaOid, &dataOid, info, columnId);
+ rel->meta = table_open(metaOid, lockmode);
+ rel->data = table_open(dataOid, lockmode);
+
+ rel->info = info;
+}
+
+void
+vci_CloseColumnRelations(vci_ColumnRelations *rel, LOCKMODE lockmode)
+{
+ if (rel)
+ {
+ if (RelationIsValid(rel->data))
+ table_close(rel->data, lockmode);
+ if (RelationIsValid(rel->meta))
+ table_close(rel->meta, lockmode);
+ }
+}
+
+static void
+UpdateInfoInMetaForFixedLengthRawData(vci_ColumnRelations *rel,
+ int numExtentPages)
+{
+ vcis_column_meta_t *columnMeta;
+
+ if (0 == numExtentPages)
+ return;
+ columnMeta = vci_GetColumnMeta(&rel->bufMeta, rel->meta);
+ if (0 < numExtentPages) /* an extent added */
+ {
+ ++(columnMeta->num_extents);
+ if (columnMeta->num_free_pages < numExtentPages)
+ columnMeta->num_free_pages = 0;
+ else
+ columnMeta->num_free_pages = columnMeta->num_free_pages -
+ numExtentPages;
+ if (0 < columnMeta->num_free_page_blocks)
+ --(columnMeta->num_free_page_blocks);
+ }
+ else /* an extent deleted */
+ {
+ Assert(0 < columnMeta->num_extents);
+ --(columnMeta->num_extents);
+ columnMeta->num_free_pages -= numExtentPages;
+ ++(columnMeta->num_free_page_blocks);
+ }
+
+ LockBuffer(rel->bufMeta, BUFFER_LOCK_EXCLUSIVE);
+ vci_WriteOneItemPage(rel->meta, rel->bufMeta);
+ UnlockReleaseBuffer(rel->bufMeta);
+}
+
+static uint32
+GetVarlenAHeader(Datum *header,
+ Buffer *buffer,
+ BlockNumber *currentBlockNumber,
+ uint32 offsetInPage,
+ Relation rel)
+{
+ Page page = BufferGetPage(*buffer);
+ char *curPtr = &(page[VCI_MIN_PAGE_HEADER + offsetInPage]);
+ int len = VCI_MAX_PAGE_SPACE - offsetInPage;
+ int reqLen = vci_VARHDSZ_ANY(curPtr);
+
+ if (reqLen <= len)
+ {
+ MemCpy(header, curPtr, reqLen);
+
+ return offsetInPage + reqLen;
+ }
+
+ MemCpy(header, curPtr, len);
+ ReleaseBuffer(*buffer);
+ ++*currentBlockNumber;
+ *buffer = vci_ReadBufferWithPageInit(rel, *currentBlockNumber);
+ page = BufferGetPage(*buffer);
+ MemCpy(&(((char *) header)[len]),
+ &(page[VCI_MIN_PAGE_HEADER]),
+ reqLen - len);
+
+ return reqLen - len;
+}
+
+void
+vci_GetElementPosition(uint32 *offset, /* not array */
+ BlockNumber *blockNumberBase,
+ uint32 *dataOffset,
+ vci_ColumnRelations *rel,
+ int32 extentId,
+ uint32 rowIdInExtent,
+ Form_pg_attribute attr)
+{
+ uint32 offset_[2];
+ Size totalSize;
+
+ Assert(VCI_FIRST_NORMAL_EXTENT_ID <= extentId);
+ vci_GetChunkPositionAndSize(offset_,
+ &totalSize,
+ blockNumberBase,
+ dataOffset,
+ rel,
+ extentId,
+ rowIdInExtent,
+ 1,
+ attr);
+
+ *offset = offset_[0] + *dataOffset;
+
+ {
+ uint32 rowIdInChunk = rowIdInExtent % VCI_COMPACTION_UNIT_ROW;
+ BlockNumber curBN = (*offset) / VCI_MAX_PAGE_SPACE;
+ BlockNumber oldBN = InvalidBlockNumber;
+ uint32 offsetInPage;
+ Buffer buffer = InvalidBuffer;
+
+ offsetInPage = (*offset) - (curBN * VCI_MAX_PAGE_SPACE);
+ curBN += *blockNumberBase;
+
+ for (uint32 rowId = 0; rowId < rowIdInChunk; ++rowId)
+ {
+ Datum datum;
+
+ if (oldBN != curBN)
+ {
+ if (BufferIsValid(buffer))
+ ReleaseBuffer(buffer);
+ buffer = vci_ReadBufferWithPageInit(rel->data, curBN);
+ oldBN = curBN;
+ }
+
+ GetVarlenAHeader(&datum,
+ &buffer,
+ &curBN,
+ offsetInPage,
+ rel->data);
+
+ {
+ uint32 size = VARSIZE_ANY(&datum);
+
+ (*offset) += size;
+ offsetInPage += size;
+ if (VCI_MAX_PAGE_SPACE <= offsetInPage)
+ {
+ offsetInPage -= VCI_MAX_PAGE_SPACE;
+ if (oldBN == curBN)
+ ++curBN;
+ else
+ oldBN = curBN;
+ Assert(offsetInPage < VCI_MAX_PAGE_SPACE);
+ }
+ }
+ }
+ if (BufferIsValid(buffer))
+ ReleaseBuffer(buffer);
+ }
+ *offset -= *dataOffset;
+ Assert((*offset) < offset_[1]);
+}
+
+void
+vci_GetChunkPositionAndSize(uint32 *offset,
+ Size *totalSize,
+ BlockNumber *blockNumberBase,
+ uint32 *dataOffset,
+ vci_ColumnRelations *rel,
+ int32 extentId,
+ uint32 rowIdInExtent,
+ int32 numUnit,
+ Form_pg_attribute attr)
+{
+ uint32 offsetUnit;
+
+ Assert(VCI_FIRST_NORMAL_EXTENT_ID <= extentId);
+ {
+ Buffer buffer;
+ Buffer bufData;
+ BlockNumber blockNumber;
+ Page page;
+ uint32 unitId = rowIdInExtent / VCI_COMPACTION_UNIT_ROW;
+ vcis_c_extent_t *cExtent = vci_GetColumnExtent(&buffer,
+ &blockNumber,
+ rel->meta,
+ extentId);
+ vcis_extent_t *extent;
+
+ *blockNumberBase = cExtent->enabled ? cExtent->block_number : InvalidBlockNumber;
+ bufData = vci_ReadBufferWithPageInit(rel->data, *blockNumberBase);
+ page = BufferGetPage(bufData);
+ extent = vci_GetExtentT(page);
+ *dataOffset = extent->data_offset;
+ offsetUnit = (sizeof(uint32) * unitId) + extent->offset_offset;
+ ReleaseBuffer(bufData);
+ ReleaseBuffer(buffer);
+ }
+
+ {
+ BlockNumber blockNumber;
+ uint32 offsetPtr;
+ Buffer buffer;
+ Page page;
+
+ vci_GetBlockNumberAndOffsetInPage(&blockNumber,
+ &offsetPtr,
+ offsetUnit);
+ blockNumber += *blockNumberBase;
+ buffer = vci_ReadBufferWithPageInit(rel->data, blockNumber);
+ page = BufferGetPage(buffer);
+ for (int aId = 0; aId <= numUnit; ++aId)
+ {
+ offset[aId] = *(uint32 *) &(page[offsetPtr + VCI_MIN_PAGE_HEADER]);
+ offsetPtr += sizeof(uint32);
+ if (VCI_MAX_PAGE_SPACE <= offsetPtr)
+ {
+ ReleaseBuffer(buffer);
+ ++blockNumber;
+ buffer = vci_ReadBufferWithPageInit(rel->data, blockNumber);
+ page = BufferGetPage(buffer);
+ offsetPtr = 0;
+ }
+ }
+ *totalSize = offset[numUnit] - offset[0]; /* pgr0063 */
+ ReleaseBuffer(buffer);
+ }
+}
+
+/**
+ * @brief Get byte size of an entry in a column with fixed field length.
+ *
+ * @param[in] info pointer to the target vci_MainRelHeaderInfo.
+ * @param[in] columnId column ID in the VCI index.
+ * @return byte size of an entry in the column.
+ */
+uint16
+vci_GetFixedColumnSize(vci_MainRelHeaderInfo *info, int16 columnId)
+{
+ switch (columnId)
+ {
+ case VCI_COLUMN_ID_TID:
+
+ return sizeof(ItemPointerData);
+ case VCI_COLUMN_ID_NULL:
+
+ return vci_GetMainRelVar(info, vcimrv_null_width_in_byte, 0);
+ case VCI_COLUMN_ID_DELETE:
+
+ return 1;
+ default:;
+ }
+
+ {
+ vcis_m_column_t *colInfo;
+
+ Assert(VCI_FIRST_NORMALCOLUMN_ID <= columnId);
+ colInfo = vci_GetMColumn(info, columnId);
+
+ return colInfo->max_columns_size;
+ }
+}
+
+/**
+ * @brief Get the position of the target entry in the relation of the column
+ * with fixed field.
+ *
+ * @param[out] blockNumber block number of the target entry.
+ * @param[out] offset offset in the block where the target is written.
+ * @param[in] info pointer to the target vci_MainRelHeaderInfo.
+ * @param[in] columnId column ID in the VCI index.
+ * @param[in] extentId extent ID of the target entry
+ * @param[in] rowIdInExtent entry ID in the extent.
+ */
+void
+vci_GetPositionForFixedColumn(BlockNumber *blockNumber,
+ uint32 *offset,
+ vci_MainRelHeaderInfo *info,
+ int16 columnId,
+ int32 extentId,
+ uint32 rowIdInExtent,
+ bool atEnd)
+{
+ uint32 columnSize = vci_GetFixedColumnSize(info, columnId);
+ Size dataSize = (Size) columnSize * VCI_NUM_ROWS_IN_EXTENT;
+ int32 extentHeaderSize = vci_GetExtentFixedLengthRawDataHeaderSize(
+ VCI_NUM_ROWS_IN_EXTENT);
+ uint32 numExtentPages = vci_GetNumBlocks(dataSize + extentHeaderSize);
+
+ /*
+ * The start block number of extents can be directly calculated in the
+ * case of Fixed field length.
+ */
+ uint32 startBlockNumber = extentId * numExtentPages;
+ uint32 extraOffset = extentHeaderSize + (rowIdInExtent * columnSize);
+
+ Assert(VCI_FIRST_NORMAL_EXTENT_ID <= extentId);
+
+ if (atEnd)
+ extraOffset += columnSize - 1;
+ vci_GetBlockNumberAndOffsetInPage(blockNumber, offset, extraOffset);
+ *blockNumber += startBlockNumber;
+}
+
+static void
+InitColumnMetaRelation(vci_ColumnRelations *relPair,
+ Form_pg_attribute attr,
+ vcis_compression_type_t compType,
+ TupleDesc heapTupleDesc)
+{
+ vcis_column_meta_t *columnMeta;
+ BlockNumber firstBlockNumber = VCI_COLUMN_DATA_FIRST_PAGE_ID;
+
+ vci_FormatPageWithOneItem(relPair->meta, VCI_COLUMN_META_HEADER_PAGE_ID);
+
+ columnMeta = vci_GetColumnMeta(&relPair->bufMeta, relPair->meta);
+ LockBuffer(relPair->bufMeta, BUFFER_LOCK_EXCLUSIVE);
+
+ if (attr)
+ { /* normal columns */
+ columnMeta->pgsql_atttypid = attr->atttypid;
+ columnMeta->pgsql_attnum = vci_GetAttNum(heapTupleDesc, NameStr(attr->attname));
+ columnMeta->pgsql_attlen = attr->attlen;
+ columnMeta->pgsql_atttypmod = attr->atttypmod;
+
+ if (InvalidAttrNumber == columnMeta->pgsql_attnum)
+ ereport(ERROR, (errmsg("column missed in VCI index creation"),
+ errhint("This must never happen. "
+ "Give up to use VCI index.")));
+ }
+ else
+ { /* delete, null, or tid */
+ columnMeta->pgsql_atttypid = InvalidOid;
+ columnMeta->pgsql_attlen = 0;
+ columnMeta->pgsql_atttypmod = 0;
+ }
+
+ columnMeta->num_extents = 0;
+ columnMeta->num_extents_old = 0;
+ columnMeta->free_page_begin_id = firstBlockNumber;
+ columnMeta->free_page_end_id = firstBlockNumber;
+ columnMeta->free_page_prev_id = InvalidBlockNumber;
+ columnMeta->free_page_next_id = InvalidBlockNumber;
+ columnMeta->num_free_pages = 1;
+ columnMeta->num_free_pages_old = 1;
+ columnMeta->num_free_page_blocks = 1;
+ columnMeta->num_free_page_blocks_old = 1;
+ columnMeta->min_max_field_size = 0;
+ columnMeta->min_max_content_size = 0;
+ columnMeta->latest_common_dict_id = VCI_INVALID_DICTIONARY_ID;
+
+ columnMeta->num_common_dicts = 0;
+ columnMeta->common_dict_info_offset = 0;
+ columnMeta->block_number_extent_offset = offsetof(vcis_column_meta_t,
+ common_dict_info);
+
+ vci_WriteColumnMetaDataHeader(relPair->meta, relPair->bufMeta);
+ UnlockReleaseBuffer(relPair->bufMeta);
+}
+
+static void
+InitDeleteVectorRelation(vci_ColumnRelations *relPair)
+{
+ vci_FormatPageWithItems(relPair->data,
+ VCI_COLUMN_DATA_FIRST_PAGE_ID,
+ VCI_ITEMS_IN_PAGE_FOR_DELETE);
+ relPair->bufData = ReadBuffer(relPair->data, VCI_COLUMN_DATA_FIRST_PAGE_ID);
+ LockBuffer(relPair->bufData, BUFFER_LOCK_EXCLUSIVE);
+
+ for (OffsetNumber oNum = FirstOffsetNumber;
+ oNum <= VCI_ITEMS_IN_PAGE_FOR_DELETE;
+ ++oNum)
+ vci_WriteItem(relPair->data, relPair->bufData, oNum);
+
+ UnlockReleaseBuffer(relPair->bufData);
+}
+
+static void
+InitColumnDataRelation(vci_ColumnRelations *relPair)
+{
+ vcis_free_space_t *freeSpace;
+
+ vci_FormatPageWithOneItem(relPair->data, VCI_COLUMN_DATA_FIRST_PAGE_ID);
+
+ freeSpace = vci_GetFreeSpace((vci_RelationPair *) relPair, VCI_COLUMN_DATA_FIRST_PAGE_ID);
+ freeSpace->size = MaxBlockNumber;
+ freeSpace->type = vcis_free_space;
+ freeSpace->prev_pos = InvalidBlockNumber;
+ freeSpace->next_pos = InvalidBlockNumber;
+
+ LockBuffer(relPair->bufData, BUFFER_LOCK_EXCLUSIVE);
+ vci_WriteOneItemPage(relPair->data, relPair->bufData);
+ UnlockReleaseBuffer(relPair->bufData);
+}
+
+void
+vci_InitializeColumnRelations(vci_MainRelHeaderInfo *info,
+ TupleDesc tupdesc,
+ Relation heapRel)
+{
+ const LOCKMODE lockmode = ShareLock;
+ TupleDesc heapTupleDesc = RelationGetDescr(heapRel);
+
+ Assert((INT64CONST(0xFFFFFFFFFFFF0000) & tupdesc->natts) == 0);
+
+ for (int16 colId = VCI_COLUMN_ID_DELETE; colId < (int16) tupdesc->natts; ++colId)
+ {
+ vci_ColumnRelations relPairData;
+ vci_ColumnRelations *relPair = &relPairData;
+
+ Form_pg_attribute attr;
+ vcis_compression_type_t compType;
+
+ if (colId >= VCI_FIRST_NORMALCOLUMN_ID)
+ {
+ attr = TupleDescAttr(tupdesc, colId);
+ compType = vci_GetMColumn(info, colId)->comp_type;
+ }
+ else
+ {
+ attr = NULL;
+ compType = vcis_compression_type_fixed_raw;
+ }
+
+ vci_OpenColumnRelations(relPair, info, colId, lockmode);
+ InitColumnMetaRelation(relPair, attr, compType, heapTupleDesc);
+
+ if (colId == VCI_COLUMN_ID_DELETE)
+ {
+ InitDeleteVectorRelation(relPair);
+ }
+ else
+ {
+ InitColumnDataRelation(relPair);
+ }
+ vci_CloseColumnRelations(relPair, lockmode);
+ }
+}
diff --git a/contrib/vci/storage/vci_columns_data.c b/contrib/vci/storage/vci_columns_data.c
new file mode 100644
index 0000000..690c527
--- /dev/null
+++ b/contrib/vci/storage/vci_columns_data.c
@@ -0,0 +1,229 @@
+/*-------------------------------------------------------------------------
+ *
+ * vci_columns_data.c
+ * Definitions of functions to check which columns are indexed.
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/vci/storage/vci_columns_data.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/reloptions.h"
+#include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/objectaccess.h"
+#include "funcapi.h"
+#include "nodes/makefuncs.h"
+#include "nodes/nodes.h"
+#include "storage/lmgr.h"
+#include "storage/lock.h"
+#include "utils/builtins.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+#include "utils/varlena.h"
+
+#include "vci.h"
+#include "vci_columns.h"
+#include "vci_columns_data.h"
+#include "vci_ros.h"
+
+static Bitmapset *parseVciColumnsIds(const char *vci_column_ids);
+
+/* Convert comma-separated column ids to Bitmapset */
+static Bitmapset *
+parseVciColumnsIds(const char *vci_column_ids)
+{
+ List *columnlist;
+ ListCell *l;
+
+ /* SplitIdentifierString can destroy the first argument. */
+ char *copied_ids = pstrdup(vci_column_ids);
+ Bitmapset *indexedAttids = NULL;
+ int attid = 0;
+
+ if (!SplitIdentifierString(copied_ids, ',', &columnlist))
+ ereport(ERROR, (errmsg("internal error. failed to split")));
+
+ foreach(l, columnlist)
+ {
+ char *number_str = (char *) lfirst(l);
+
+ /* The max id is '1600' -> 4 digits. */
+ int attid_diff = pg_strtoint32(number_str);
+
+ attid += attid_diff;
+
+ if (attid >= MaxHeapAttributeNumber)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("invalid attribute number %d", attid + 1)));
+
+ if (bms_is_member(attid, indexedAttids))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ (errmsg("duplicated columns in vci index creation")),
+ errhint("duplicated columns are specified")));
+
+ indexedAttids = bms_add_member(indexedAttids, attid);
+ }
+
+ pfree(copied_ids);
+
+ return indexedAttids;
+}
+
+/*
+ * vci_ConvertAttidBitmap2String -- Convert a Bitmapset that represents which
+ attids are targets to comma separated string
+ */
+char *
+vci_ConvertAttidBitmap2String(Bitmapset *attid_bitmap)
+{
+ int attid;
+ int preAttid = 0;
+ StringInfo buf = makeStringInfo();
+
+ attid = -1;
+ while ((attid = bms_next_member(attid_bitmap, attid)) >= 0)
+ {
+ if (attid >= MaxHeapAttributeNumber)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("invalid attribute number %d", attid + 1)));
+
+ if (buf->len == 0)
+ appendStringInfo(buf, "%d", attid - preAttid);
+ else
+ appendStringInfo(buf, ",%d", attid - preAttid);
+
+ preAttid = attid;
+ }
+ return buf->data;
+}
+
+/*
+ * vci_ExtractColumnDataUsingIds -- returns TupleDesc that contains indexed columns
+ * information.
+ *
+ * The vci_GetTupleDescr() requires a prebuilt vci_MainRelHeaderInfo. So please use
+ * this when building a VCI because the structure is in the process of building.
+ */
+TupleDesc
+vci_ExtractColumnDataUsingIds(const char *vci_column_ids, Relation heapRel)
+{
+ int i;
+ int attid;
+ TupleDesc heapTupDesc;
+ TupleDesc result;
+ Bitmapset *indexedAttids = NULL; /* for duplication check */
+
+ heapTupDesc = RelationGetDescr(heapRel);
+ indexedAttids = parseVciColumnsIds(vci_column_ids);
+ result = CreateTemplateTupleDesc(bms_num_members(indexedAttids));
+
+ attid = -1;
+ i = 0;
+ while ((attid = bms_next_member(indexedAttids, attid)) >= 0)
+ {
+ if (attid >= heapTupDesc->natts)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid attribute number %d", attid + 1)));
+
+ TupleDescCopyEntry(result, i + 1, heapTupDesc, attid + 1);
+ i++;
+ }
+
+ bms_free(indexedAttids);
+
+ return result;
+}
+
+/*
+ * vci_GetTupleDescr -- returns TupleDesc that contains indexed columns
+ * information from vci_MainRelHeaderInfo.
+ */
+TupleDesc
+vci_GetTupleDescr(vci_MainRelHeaderInfo *info)
+{
+ MemoryContext oldcontext;
+
+ if (info->cached_tupledesc)
+ return info->cached_tupledesc;
+
+ oldcontext = MemoryContextSwitchTo(info->initctx);
+
+ info->cached_tupledesc = RelationGetDescr(info->rel);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return info->cached_tupledesc;
+}
+
+Bitmapset *
+vci_MakeIndexedColumnBitmap(Oid mainRelationOid,
+ MemoryContext sharedMemCtx,
+ LOCKMODE lockmode)
+{
+ Relation main_rel;
+ Bitmapset *result = NULL;
+ vci_MainRelHeaderInfo *info;
+
+ info = MemoryContextAllocZero(sharedMemCtx,
+ sizeof(vci_MainRelHeaderInfo));
+ main_rel = relation_open(mainRelationOid, lockmode);
+ vci_InitMainRelHeaderInfo(info, main_rel, vci_rc_query);
+ vci_KeepMainRelHeader(info);
+
+ {
+ int32 indexNumColumns = vci_GetMainRelVar(info,
+ vcimrv_num_columns, 0);
+
+ for (int aId = 0; aId < indexNumColumns; ++aId)
+ {
+ vcis_m_column_t *mColumn = vci_GetMColumn(info, aId);
+ LOCKMODE lockmode_for_meta = AccessShareLock;
+ Relation column_meta_rel = table_open(mColumn->meta_oid, lockmode_for_meta);
+ Buffer buffer;
+ vcis_column_meta_t *metaHeader = vci_GetColumnMeta(&buffer, column_meta_rel);
+
+ Assert(metaHeader->pgsql_attnum > InvalidAttrNumber);
+ result = bms_add_member(result, metaHeader->pgsql_attnum);
+ ReleaseBuffer(buffer);
+ table_close(column_meta_rel, lockmode_for_meta);
+ }
+ }
+
+ vci_ReleaseMainRelHeader(info);
+ relation_close(main_rel, lockmode);
+
+ return result;
+}
+
+/**
+ * @brief Get attribute number from the name.
+ *
+ * @param[in] desc The tuple descriptor of the relation.
+ * @param[in] name The name of attribute.
+ * @return The attribute number.
+ * If the name is not found in the descriptor, InvalidAttrNumber is returned.
+ */
+AttrNumber
+vci_GetAttNum(TupleDesc desc, const char *name)
+{
+ for (int aId = 0; aId < desc->natts; ++aId)
+ {
+ if (strcmp(name, NameStr(TupleDescAttr(desc, aId)->attname)) == 0)
+ return aId + 1;
+ }
+
+ return InvalidAttrNumber;
+}
diff --git a/contrib/vci/storage/vci_fetch.c b/contrib/vci/storage/vci_fetch.c
new file mode 100644
index 0000000..e7df01a
--- /dev/null
+++ b/contrib/vci/storage/vci_fetch.c
@@ -0,0 +1,2459 @@
+/*-------------------------------------------------------------------------
+ *
+ * vci_fetch.c
+ * Column fetch store
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/vci/storage/vci_fetch.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <stdlib.h>
+
+#include "access/heapam_xlog.h"
+#include "access/xact.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
+#include "storage/procarray.h" /* for TransactionIdIsInProgress() */
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+
+#include "vci.h"
+#include "vci_ros.h"
+#include "vci_columns.h"
+#include "vci_columns_data.h"
+
+#include "vci_fetch.h"
+#include "vci_ros.h"
+#include "vci_utils.h"
+#include "vci_wos.h"
+#include "vci_tidcrid.h"
+#include "vci_xact.h"
+
+static void ChangeLockModeInQueryContext(vci_CSQueryContext queryContext, LOCKMODE new_lockmode);
+
+static int
+CompAttrNumber(const void *a, const void *b)
+{
+ AttrNumber aA = *(const AttrNumber *) a;
+ AttrNumber aB = *(const AttrNumber *) b;
+
+ return aA - aB;
+}
+
+static bool
+NeedDatumPointer(vci_MainRelHeaderInfo *info, int16 columnId)
+{
+ TupleDesc tupDesc;
+
+ if (columnId < VCI_FIRST_NORMALCOLUMN_ID)
+ return false;
+ tupDesc = vci_GetTupleDescr(info);
+
+ return vci_PassByRefForFixed(TupleDescAttr(tupDesc, columnId));
+}
+
+/**
+ * @brief Create query context.
+ *
+ * @param[in] mainRelationOid Oid of VCI main relation.
+ * @param[in] numReadColumns The number of read columns in the part of query.
+ * @param[in] attrNum The attribute numbers in the original heap relation,
+ * not those of the VCI main relation.
+ * @param[in] sharedMemCtx The shared memory context to keep elements of
+ * query context, fetch context, local ROS.
+ * @param[in] lockmode lockmode set after local ROS is generated.
+ * @return The pointer to the allocated vci_CSQueryContext.
+ */
+vci_CSQueryContext
+vci_CSCreateQueryContextWLockMode(Oid mainRelationOid,
+ int numReadColumns,
+ AttrNumber *attrNum,
+ /* attribute number in original relation */
+ MemoryContext sharedMemCtx,
+ LOCKMODE lockmode)
+{
+ TransactionId curRosVer;
+ TransactionId lastRosVer;
+ vci_CSQueryContext result;
+ Relation rel;
+
+ result = MemoryContextAllocZero(sharedMemCtx, sizeof(vci_CSQueryContextData));
+ result->shared_memory_context = sharedMemCtx;
+
+ result->lockmode = lockmode;
+
+ result->main_relation_oid = mainRelationOid;
+
+ result->heap_rel = relation_open(IndexGetRelation(mainRelationOid, false),
+ AccessShareLock);
+
+ result->num_columns = numReadColumns;
+ result->attr_num = MemoryContextAllocZero(sharedMemCtx,
+ sizeof(AttrNumber) * numReadColumns);
+ MemCpy(result->attr_num, attrNum, sizeof(AttrNumber) * numReadColumns);
+ qsort(result->attr_num, numReadColumns, sizeof(AttrNumber), CompAttrNumber);
+ result->column_id = MemoryContextAllocZero(sharedMemCtx,
+ sizeof(int16) * numReadColumns);
+
+ result->num_local_ros_extents = 0;
+ result->local_ros = NULL;
+
+ result->num_delete = 0;
+ result->delete_list = NULL;
+
+ result->info = MemoryContextAllocZero(sharedMemCtx,
+ sizeof(vci_MainRelHeaderInfo));
+ rel = relation_open(mainRelationOid, result->lockmode);
+ vci_InitMainRelHeaderInfo(result->info, rel, vci_rc_query);
+ vci_KeepMainRelHeader(result->info);
+ result->num_nullable_columns = vci_GetMainRelVar(result->info,
+ vcimrv_num_nullable_columns,
+ 0);
+ result->null_width_in_byte = vci_GetMainRelVar(result->info,
+ vcimrv_null_width_in_byte,
+ 0);
+ result->num_ros_extents = vci_GetMainRelVar(result->info,
+ vcimrv_num_extents,
+ 0);
+
+ curRosVer = vci_GetMainRelVar(result->info, vcimrv_current_ros_version, 0);
+ lastRosVer = vci_GetMainRelVar(result->info, vcimrv_last_ros_version, 0);
+
+ switch (vci_transaction_get_type(curRosVer))
+ {
+ case VCI_XACT_DID_COMMIT:
+ case VCI_XACT_SELF:
+ result->ros_version = curRosVer;
+ result->inclusive_xid = curRosVer;
+ result->exclusive_xid = InvalidTransactionId;
+ result->tid_crid_diff_sel = vci_GetMainRelVar(result->info, vcimrv_tid_crid_diff_sel, 0);
+ break;
+
+ case VCI_XACT_IN_PROGRESS:
+ case VCI_XACT_DID_CRASH:
+ case VCI_XACT_DID_ABORT:
+ result->ros_version = lastRosVer;
+ result->inclusive_xid = InvalidTransactionId;
+ result->exclusive_xid = curRosVer;
+ result->tid_crid_diff_sel = vci_GetMainRelVar(result->info, vcimrv_tid_crid_diff_sel_old, 0);
+ break;
+
+ case VCI_XACT_INVALID:
+ elog(ERROR, "current ROS version is invalid"); /* @todo */
+ break;
+ }
+
+ Assert(TransactionIdIsValid(result->ros_version));
+
+ {
+ int32 indexNumColumns = vci_GetMainRelVar(result->info,
+ vcimrv_num_columns, 0);
+ AttrNumber *indexAttNums = palloc_array(AttrNumber, indexNumColumns);
+ TupleDesc descHeap = RelationGetDescr(result->heap_rel);
+
+ for (int aId = 0; aId < indexNumColumns; ++aId)
+ {
+ vcis_m_column_t *mColumn = vci_GetMColumn(result->info, aId);
+ LOCKMODE lockmode_asl = AccessShareLock;
+ Relation mcol_rel = table_open(mColumn->meta_oid, lockmode_asl);
+ Buffer buffer;
+ vcis_column_meta_t *metaHeader = vci_GetColumnMeta(&buffer, mcol_rel);
+
+ indexAttNums[aId] = metaHeader->pgsql_attnum;
+ ReleaseBuffer(buffer);
+ table_close(mcol_rel, lockmode_asl);
+ }
+
+ for (int aId = 0; aId < numReadColumns; ++aId)
+ {
+ AttrNumber attNum = TupleDescAttr(descHeap, result->attr_num[aId] - 1)
+ ->attnum;
+
+ /* AttrNumber is 1 origin. We use 0 origin value. */
+ result->column_id[aId] = FindInt16(indexAttNums, indexNumColumns,
+ attNum);
+ Assert(0 <= result->column_id[aId]);
+ }
+
+ pfree(indexAttNums);
+ }
+
+ result->num_data_wos_entries =
+ vci_EstimateNumEntriesInHeapRelation(vci_GetMainRelVar(result->info, vcimrv_data_wos_oid, 0));
+
+ result->num_whiteout_wos_entries =
+ vci_EstimateNumEntriesInHeapRelation(vci_GetMainRelVar(result->info, vcimrv_whiteout_wos_oid, 0));
+
+ return result;
+}
+
+/**
+ * @brief Destroy query context.
+ *
+ * @param[in] queryContext Pointer to the target context to be destroy.
+ */
+void
+vci_CSDestroyQueryContext(vci_CSQueryContext queryContext)
+{
+ vci_MainRelHeaderInfo *info;
+ Relation heapRel;
+ Relation indexRel;
+
+ Assert(queryContext != NULL);
+ Assert(queryContext->info != NULL);
+
+ info = queryContext->info;
+
+ heapRel = queryContext->heap_rel;
+ indexRel = info->rel;
+
+ vci_ReleaseMainRelHeader(info);
+
+ if (RelationIsValid(indexRel))
+ table_close(indexRel, queryContext->lockmode);
+
+ if (RelationIsValid(heapRel))
+ table_close(heapRel, AccessShareLock);
+
+ if (queryContext->column_id)
+ pfree(queryContext->column_id);
+
+ if (queryContext->attr_num)
+ pfree(queryContext->attr_num);
+
+ pfree(info);
+ pfree(queryContext);
+}
+
+static void
+ChangeLockModeInQueryContext(vci_CSQueryContext queryContext, LOCKMODE new_lockmode)
+{
+ Assert(queryContext);
+
+ if (queryContext->lockmode != new_lockmode)
+ {
+ Assert(queryContext->info);
+ Assert(queryContext->info->rel);
+ LockRelation(queryContext->info->rel, new_lockmode);
+ UnlockRelation(queryContext->info->rel, queryContext->lockmode);
+ queryContext->lockmode = new_lockmode;
+ }
+}
+
+/**
+ * @brief Collect information of the specified column and the maximum size of
+ * tuples.
+ *
+ * @param info[in] Pointer to the VCI master information.
+ * @param fetchContext[in] Pointer to the fetch context.
+ * @param columnId[in] column ID in VCI main relation.
+ * @param datumSize[in,out] If not NULL, the datum size is written.
+ * @param maxElemSize[in,out] If not NULL, the maximum size of an element is written.
+ * @param maxDictSize[in,out] If not NULL, the maximum size of dictionary is written.
+ * @param nullBitId[in,out] If not NULL, the null bit Id is written.
+ * @param compType[in,out] If not NULL, the compression type is written.
+ * @param atttypid[in,out] If not NULL, the atttypid in PostgreSQL is written.
+ * @param strictDatumType[in,out] If not NULL, the flag is returned, indicating
+ * true as Datum has only the pointer to the real value, or false that
+ * Datum has the value itself.
+ * @return the maximum size of elements, the same value in maxElemSize.
+ */
+static int
+SizeOfElementAndPointer(vci_MainRelHeaderInfo *info,
+ vci_CSFetchContext fetchContext,
+ int16 columnId, /* the column ID in VCI main relation */
+ uint32 *datumSize,
+ uint32 *maxElemSize,
+ uint32 *maxDictSize,
+ int32 *nullBitId,
+ vcis_compression_type_t *compType,
+ Oid *atttypid,
+ bool *strictDatumType)
+{
+ int maxElementSize;
+
+ if (nullBitId)
+ *nullBitId = -1; /* default is not nullable */
+
+ if (compType)
+ *compType = vcis_compression_type_fixed_raw;
+
+ if (atttypid)
+ *atttypid = InvalidOid;
+
+ if (strictDatumType)
+ *strictDatumType = false;
+
+ if (maxDictSize)
+ *maxDictSize = 0;
+
+ if (VCI_FIRST_NORMALCOLUMN_ID <= columnId)
+ {
+ TupleDesc desc = vci_GetTupleDescr(info);
+ vcis_m_column_t *mColumn = vci_GetMColumn(info, columnId);
+
+ Assert((VCI_FIRST_NORMALCOLUMN_ID <= columnId) && (columnId < desc->natts));
+
+ maxElementSize = mColumn->max_columns_size;
+ if (maxElemSize)
+ *maxElemSize = maxElementSize;
+
+ if (compType)
+ *compType = mColumn->comp_type;
+
+ if (nullBitId)
+ *nullBitId = vci_GetBitIdInNullBits(desc, columnId);
+
+ if (atttypid)
+ *atttypid = TupleDescAttr(desc, columnId)->atttypid;
+
+ switch (mColumn->comp_type)
+ {
+ case vcis_compression_type_fixed_raw:
+ {
+ if (vci_PassByRefForFixed(TupleDescAttr(desc, columnId)))
+ {
+ if (strictDatumType)
+ *strictDatumType = true;
+ if (datumSize)
+ *datumSize = sizeof(Datum);
+ }
+ else
+ {
+ if (strictDatumType)
+ *strictDatumType = false;
+ if (datumSize)
+ *datumSize = maxElementSize;
+ }
+ }
+ break;
+ case vcis_compression_type_variable_raw:
+ if (strictDatumType)
+ *strictDatumType = true;
+ if (datumSize)
+ *datumSize = sizeof(Datum);
+ break;
+ /* for compressions */
+ default:
+ ereport(ERROR, (errmsg("internal error: unsupported compression type"), errhint("Disable VCI by 'SELECT vci_disable();'")));
+ }
+
+ /*
+ * we put large data in some area, and Datum have the pointer
+ */
+ if (NeedDatumPointer(info, columnId))
+ maxElementSize = MAXALIGN(maxElementSize) + sizeof(Datum);
+
+ return maxElementSize;
+ }
+
+ switch (columnId)
+ {
+ case VCI_COLUMN_ID_TID:
+ maxElementSize = sizeof(int64);
+ break;
+
+ case VCI_COLUMN_ID_NULL:
+ maxElementSize = sizeof(bool) * fetchContext->num_columns;
+ break;
+
+ case VCI_COLUMN_ID_DELETE:
+ maxElementSize = sizeof(uint16);
+ break;
+
+ case VCI_COLUMN_ID_CRID:
+ maxElementSize = sizeof(uint64);
+ break;
+
+ default:
+ abort();
+ }
+
+ if (datumSize)
+ *datumSize = maxElementSize;
+
+ if (maxElemSize)
+ *maxElemSize = maxElementSize;
+
+ return maxElementSize;
+}
+
+static vci_MainRelHeaderInfo *
+GetMainRelHeaderInfoFromFetchContext(vci_CSFetchContext fetchContext)
+{
+ return (fetchContext->info) ? fetchContext->info
+ : fetchContext->query_context->info;
+}
+
+/**
+ * @brief Obtain tuple size where each attribute is aligned by MAXALIGN.
+ */
+static void
+GetWorstCaseTupleSize(vci_CSFetchContext fetchContext,
+ Size *sumWorstCaseDictionarySize_,
+ Size *sumWorstCaseValueSize_,
+ Size *sumWorstCaseFlagSize_,
+ Size *sumWorstCaseAreaSize_,
+ int16 numReadColumns,
+ /* attribute number in original relation */
+ AttrNumber *attrNum,
+ bool returnTid,
+ bool returnCrid)
+{
+ vci_CSQueryContext queryContext = fetchContext->query_context;
+ vci_MainRelHeaderInfo *info = GetMainRelHeaderInfoFromFetchContext(fetchContext);
+ uint32 datumSize;
+ uint32 maxElemSize;
+ uint32 maxDictSize;
+ bool strictDatumType;
+ Size sumWorstCaseDictionarySize = 0;
+ Size sumWorstCaseValueSize = 0;
+ Size sumWorstCaseFlagSize = 0;
+ Size sumWorstCaseAreaSize = 0;
+
+ for (int aId = 0; aId < numReadColumns; ++aId)
+ {
+ int16 colId = fetchContext->column_link[aId];
+
+ SizeOfElementAndPointer(info, fetchContext,
+ queryContext->column_id[colId],
+ &datumSize, &maxElemSize, &maxDictSize,
+ NULL, NULL, NULL, &strictDatumType);
+ sumWorstCaseValueSize += TYPEALIGN(sizeof(Datum), datumSize);
+ sumWorstCaseDictionarySize += MAXALIGN(maxDictSize);
+ if (strictDatumType)
+ sumWorstCaseAreaSize += MAXALIGN(maxElemSize);
+ }
+
+ SizeOfElementAndPointer(info, fetchContext, VCI_COLUMN_ID_NULL,
+ &datumSize, &maxElemSize, &maxDictSize,
+ NULL, NULL, NULL, &strictDatumType);
+ sumWorstCaseFlagSize += MAXALIGN(maxElemSize);
+ sumWorstCaseDictionarySize += MAXALIGN(maxDictSize);
+
+ SizeOfElementAndPointer(info, fetchContext, VCI_COLUMN_ID_DELETE,
+ &datumSize, &maxElemSize, &maxDictSize,
+ NULL, NULL, NULL, &strictDatumType);
+ sumWorstCaseFlagSize += MAXALIGN(maxElemSize);
+ sumWorstCaseDictionarySize += MAXALIGN(maxDictSize);
+
+ if (fetchContext->need_tid)
+ {
+ SizeOfElementAndPointer(info, fetchContext, VCI_COLUMN_ID_TID,
+ &datumSize, &maxElemSize, &maxDictSize,
+ NULL, NULL, NULL, &strictDatumType);
+ sumWorstCaseFlagSize += MAXALIGN(maxElemSize);
+ sumWorstCaseDictionarySize += MAXALIGN(maxDictSize);
+ }
+
+ if (fetchContext->need_crid)
+ {
+ SizeOfElementAndPointer(info, fetchContext, VCI_COLUMN_ID_CRID,
+ &datumSize, &maxElemSize, &maxDictSize,
+ NULL, NULL, NULL, &strictDatumType);
+ sumWorstCaseFlagSize += MAXALIGN(maxElemSize);
+ sumWorstCaseDictionarySize += MAXALIGN(maxDictSize);
+ }
+
+ sumWorstCaseFlagSize += sizeof((((vci_virtual_tuples_t *) NULL)->skip)[0]) +
+ sizeof((((vci_virtual_tuples_t *) NULL)->local_skip)[0]);
+
+ if (sumWorstCaseDictionarySize_)
+ *sumWorstCaseDictionarySize_ = sumWorstCaseDictionarySize;
+ if (sumWorstCaseValueSize_)
+ *sumWorstCaseValueSize_ = sumWorstCaseValueSize;
+ if (sumWorstCaseFlagSize_)
+ *sumWorstCaseFlagSize_ = sumWorstCaseFlagSize;
+ if (sumWorstCaseAreaSize_)
+ *sumWorstCaseAreaSize_ = sumWorstCaseAreaSize;
+}
+
+/**
+ * @brief The base function of creating an instance of \c vci_CSFetchContext.
+ *
+ * This function is normally called via vci_CSCreateFetchContext().
+ *
+ * @param[in] queryContext The query context.
+ * @param[in] numRowsReadAtOnce The number of rows which read at once and
+ * stored in the virtual tuples.
+ * @param[in] numReadColumns The number of columns to be read.
+ * @param[in] attrNum The pointer to the array which has the attribute numbers
+ * of the original heap relation, not VCI main relation.
+ * @param[in] useColumnStore True for column-wise store. False for row-wise.
+ * @param[in] returnTid True to get TID in virtual tuples.
+ * @param[in] returnCrid True to get CRID in virtual tuples.
+ * @param[in] useCompression True to use compression.
+ * @return The pointer to the created fetch context.
+ * NULL if some parameters are invald resulting no fetch context is created.
+ * @note This function registers CurrentMemoryContext as local_memory_context
+ * in \c vci_CSFetchContext.
+ */
+vci_CSFetchContext
+vci_CSCreateFetchContextBase(vci_CSQueryContext queryContext,
+ uint32 numRowsReadAtOnce,
+ int16 numReadColumns,
+ /* attribute number in original relation */
+ AttrNumber *attrNum,
+ bool useColumnStore,
+ bool returnTid,
+ bool returnCrid,
+ bool useCompression)
+{
+ Size size = sizeof(vci_CSFetchContextData) +
+ ((numReadColumns - 1) * sizeof(int16));
+ vci_CSFetchContext result;
+
+ Assert(useCompression == false); /* Compression code has been extracted
+ * from the contrib/vci module */
+
+ result = MemoryContextAllocZero(queryContext->shared_memory_context, size);
+
+ result->query_context = queryContext;
+ result->size = size;
+
+ /*
+ * The master copy does not have own vci_MainRelHeaderInfo. The localized
+ * copies have their own vci_MainRelHeaderInfo, which will be created in
+ * vci_CSLocalizeFetchContext().
+ */
+ result->info = NULL;
+
+ result->num_columns = numReadColumns;
+ result->num_rows_read_at_once = TYPEALIGN(VCI_COMPACTION_UNIT_ROW,
+ numRowsReadAtOnce);
+ result->use_column_store = useColumnStore;
+ result->need_crid = returnCrid;
+ result->need_tid = returnTid;
+ result->buffer = MemoryContextAllocZero(queryContext->shared_memory_context,
+ sizeof(vci_seq_scan_buffer_t));
+ result->local_memory_context = CurrentMemoryContext;
+
+ result->extent_id = VCI_INVALID_EXTENT_ID;
+ result->num_rows = 0;
+
+ {
+ Size sumWorstCaseAreaSize;
+ Size valueSizePerTuple;
+ Size flagSizePerTuple;
+ Size flagSizeBaseline;
+
+ for (int aId = 0; aId < numReadColumns; ++aId)
+ {
+ int16 colId;
+
+ Assert(0 < attrNum[aId]);
+ colId = FindInt16(queryContext->attr_num,
+ queryContext->num_columns,
+ attrNum[aId]);
+ Assert(VCI_FIRST_NORMALCOLUMN_ID <= colId);
+ result->column_link[aId] = colId;
+ }
+
+ /* Should we use faster way? */
+ GetWorstCaseTupleSize(result,
+ &(result->size_dictionary_area),
+ &valueSizePerTuple,
+ &flagSizePerTuple,
+ &sumWorstCaseAreaSize,
+ numReadColumns,
+ attrNum,
+ returnTid,
+ returnCrid);
+
+ result->size_dictionary_area = useCompression
+ ? result->size_dictionary_area : 0;
+ result->size_decompression_area = (result->size_dictionary_area)
+ ? MAXALIGN(VCI_MAX_PAGE_SPACE * VCI_COMPACTION_UNIT_ROW) : 0;
+
+ if (!(result->use_column_store))
+ flagSizePerTuple += result->num_columns * (sizeof(Datum) + sizeof(bool));
+
+ flagSizeBaseline = sizeof((((vci_virtual_tuples_t *) NULL)->skip)[0]) +
+ sizeof((((vci_virtual_tuples_t *) NULL)->local_skip)[0]) +
+ result->size_dictionary_area +
+ result->size_decompression_area;
+
+recalculation:
+ /* The skip information has additional one elements at the tail. */
+ result->size_flags = flagSizeBaseline + flagSizePerTuple * result->num_rows_read_at_once;
+ result->size_values = valueSizePerTuple * result->num_rows_read_at_once;
+
+ /* add padding space for each MemoryContextAlloc() */
+ result->size_vector_memory_context = result->size_values +
+ result->size_flags + sumWorstCaseAreaSize +
+ ((2 * result->num_columns) * MAXIMUM_ALIGNOF);
+
+ if (MaxAllocSize < result->size_vector_memory_context)
+ {
+ uint32 new_num_rows_read_at_once;
+
+ new_num_rows_read_at_once =
+ (MaxAllocSize - (flagSizeBaseline + sumWorstCaseAreaSize + 2 * result->num_columns * MAXIMUM_ALIGNOF))
+ / (flagSizePerTuple + valueSizePerTuple);
+
+ if (new_num_rows_read_at_once > VCI_COMPACTION_UNIT_ROW)
+ new_num_rows_read_at_once = TYPEALIGN(VCI_COMPACTION_UNIT_ROW,
+ new_num_rows_read_at_once - VCI_COMPACTION_UNIT_ROW + 1);
+
+ if (new_num_rows_read_at_once == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+
+ result->num_rows_read_at_once = new_num_rows_read_at_once;
+
+ goto recalculation;
+ }
+ }
+
+ /*
+ * The following fields are filled by vci_CSLocalizeFetchContext()
+ */
+ result->rel_column = NULL;
+ vci_Initvci_ColumnRelations(&(result->rel_delete));
+ vci_Initvci_ColumnRelations(&(result->rel_null));
+ vci_Initvci_ColumnRelations(&(result->rel_tid));
+
+ return result;
+}
+
+/**
+ * @brief Destroy the given instance of \c vci_CSFetchContext.
+ *
+ * The opened relations registerd in the target are all closed and
+ * related \c vci_ReleaseMainRelHeader is released.
+ *
+ * @param[in] fetchContext target to destroy.
+ */
+void
+vci_CSDestroyFetchContext(vci_CSFetchContext fetchContext)
+{
+ LOCKMODE lockmode = AccessShareLock;
+
+ vci_CloseColumnRelations(&(fetchContext->rel_delete), lockmode);
+ vci_CloseColumnRelations(&(fetchContext->rel_null), lockmode);
+ vci_CloseColumnRelations(&(fetchContext->rel_tid), lockmode);
+ if (fetchContext->rel_column)
+ {
+ for (int cId = 0; cId < fetchContext->num_columns; ++cId)
+ vci_CloseColumnRelations(&(fetchContext->rel_column[cId]), lockmode);
+ pfree(fetchContext->rel_column);
+ }
+
+ if (fetchContext->info)
+ {
+ Relation rel = fetchContext->info->rel;
+
+ vci_ReleaseMainRelHeader(fetchContext->info);
+
+ table_close(rel, lockmode);
+ }
+
+ pfree(fetchContext->buffer);
+ pfree(fetchContext);
+}
+
+/* call this function using local memory context */
+/**
+ * @brief Make a local copy of \c fetchContext.
+ *
+ * Because file handles are unable to be shared among processes,
+ * and \c fetchContext has many file handles, we need a copy of
+ * \c fetchContext made by each process.
+ *
+ * @param[in] fetchContext source of copy.
+ * @param[in] memoryContext the memory context where the copy is written.
+ * @return copy of given \c fetchContext.
+ */
+vci_CSFetchContext
+vci_CSLocalizeFetchContext(vci_CSFetchContext fetchContext,
+ MemoryContext memoryContext)
+{
+ LOCKMODE lockmode = AccessShareLock;
+ MemoryContext oldMemCtx = MemoryContextSwitchTo(memoryContext);
+ Relation rel = relation_open(fetchContext->query_context->main_relation_oid,
+ AccessShareLock);
+ vci_CSFetchContext result = palloc0(fetchContext->size);
+
+ MemCpy(result, fetchContext, fetchContext->size);
+ result->local_memory_context = memoryContext;
+ result->buffer = MemoryContextAllocZero(result->local_memory_context,
+ sizeof(vci_seq_scan_buffer_t));
+
+ Assert(NULL == result->info);
+ result->info = MemoryContextAllocZero(result->local_memory_context,
+ sizeof(vci_MainRelHeaderInfo));
+ vci_InitMainRelHeaderInfo(result->info, rel,
+ fetchContext->query_context->info->command);
+ vci_KeepMainRelHeader(result->info);
+
+ result->rel_column = MemoryContextAllocZero(result->local_memory_context,
+ sizeof(vci_ColumnRelations) * result->num_columns);
+
+ vci_OpenColumnRelations(&(result->rel_delete), result->info, VCI_COLUMN_ID_DELETE, lockmode);
+ vci_OpenColumnRelations(&(result->rel_null), result->info, VCI_COLUMN_ID_NULL, lockmode);
+ vci_OpenColumnRelations(&(result->rel_tid), result->info, VCI_COLUMN_ID_TID, lockmode);
+
+ for (int columnId = VCI_FIRST_NORMALCOLUMN_ID; columnId < result->num_columns; ++columnId)
+ {
+ int16 cId = vci_GetColumnIdFromFetchContext(fetchContext, columnId);
+
+ vci_OpenColumnRelations(&(result->rel_column[columnId]),
+ result->info, cId, lockmode);
+ }
+
+ MemoryContextSwitchTo(oldMemCtx);
+
+ return result;
+}
+
+/**
+ * @brief Create an instance of \c vci_extent_status_t where the status
+ * of extents are written.
+ *
+ * @param[in] fetchContext the \c fetchContext of the target VCI relation
+ * in the process, i.e. localized one.
+ * @return the pointer of newly created instance of \c vci_extent_status_t.
+ */
+vci_extent_status_t *
+vci_CSCreateCheckExtent(vci_CSFetchContext fetchContext)
+{
+ uint32 size = sizeof(vci_extent_status_t) +
+ ((fetchContext->num_columns - 1) * sizeof(vci_minmax_t));
+ vci_extent_status_t *result;
+
+ Assert(fetchContext->local_memory_context);
+ result = MemoryContextAllocZero(fetchContext->local_memory_context, size);
+ result->size = size;
+ result->num_rows = 0;
+ result->existence = false;
+ result->visible = false;
+
+ return result;
+}
+
+/**
+ * @brief Destroy given instance of \c vci_extent_status_t.
+ *
+ * @param[in] fetchContext target to destory.
+ */
+void
+vci_CSDestroyCheckExtent(vci_extent_status_t *status)
+{
+ pfree(status);
+}
+
+static void
+SetMinMaxInvalid(vci_extent_status_t *status, vci_CSFetchContext fetchContext)
+{
+ for (int aId = 0; aId < fetchContext->num_columns; ++aId)
+ status->minmax[aId].valid = false;
+}
+
+/**
+ * @brief Get the status of an extent.
+ *
+ * @details Check if the extent is visible with the relation among current ROS
+ * version and \c Xgen, \c Xdel.
+ *
+ * The current ROS version is obtained from
+ * \c fetchContext->query_context->ros_version.
+ *
+ * @param[in,out] status the status of an extent is written in \c * \c status.
+ * @param[in] fetchContext the \c fetchContext of the target VCI relation
+ * in the process, i.e. localized one.
+ * @param[in] extentId the extent ID to probe.
+ * @param[in] readMinMax obtain min-max information if true.
+ */
+void
+vci_CSCheckExtent(vci_extent_status_t *status,
+ vci_CSFetchContext fetchContext,
+ int32 extentId,
+ bool readMinMax)
+{
+ vci_MainRelHeaderInfo *info = GetMainRelHeaderInfoFromFetchContext(
+ fetchContext);
+
+ if (extentId < VCI_FIRST_NORMAL_EXTENT_ID)
+ {
+ bool existence;
+ vci_local_ros_t *localRos;
+ vci_virtual_tuples_t *extent;
+
+ Assert(fetchContext->query_context);
+ existence = -(fetchContext->query_context->num_local_ros_extents) <= extentId;
+ status->existence = existence;
+ status->visible = existence;
+
+ localRos = fetchContext->query_context->local_ros;
+ Assert(localRos->num_local_extents == fetchContext->query_context->num_local_ros_extents);
+ extent = localRos->extent[-1 - extentId];
+ status->num_rows = extent->num_rows;
+
+ SetMinMaxInvalid(status, fetchContext);
+
+ return;
+ }
+
+ /* check if the VCI index has the extent. */
+ if (!vci_ExtentInfoExists(info, extentId))
+ {
+ status->num_rows = 0;
+ status->existence = false;
+ status->visible = false;
+ SetMinMaxInvalid(status, fetchContext);
+
+ return;
+ }
+
+ {
+ /* check information of extent in main relation */
+ Buffer buffer = InvalidBuffer;
+ vcis_m_extent_t *mExtent;
+
+ mExtent = vci_GetMExtent(&buffer, info, extentId);
+
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+ status->existence = TransactionIdIsValid(mExtent->xgen) ||
+ TransactionIdIsValid(mExtent->xdel);
+
+ status->visible = vci_ExtentIsVisible(mExtent,
+ fetchContext->query_context->ros_version);
+
+ status->num_rows = mExtent->num_rows;
+
+ UnlockReleaseBuffer(buffer);
+ }
+
+ if (!status->visible)
+ {
+ SetMinMaxInvalid(status, fetchContext);
+
+ return;
+ }
+}
+
+/**
+ * @brief Entry point to generate local ROS.
+ *
+ * @param[in] queryContext Query context the local ROS is generated for.
+ * @return The pointer to the local ROS information.
+ */
+vci_local_ros_t *
+vci_CSGenerateLocalRos(vci_CSQueryContextData *queryContext)
+{
+ int64 numDataWosRows;
+ int64 numWhiteoutWosRows;
+ vci_local_ros_t *result;
+
+ numDataWosRows = queryContext->num_data_wos_entries;
+ numWhiteoutWosRows = queryContext->num_whiteout_wos_entries;
+
+ numDataWosRows = Max(numDataWosRows, 1);
+ numWhiteoutWosRows = Max(numWhiteoutWosRows, 1);
+
+ result = vci_GenerateLocalRos(queryContext,
+ VciGuc.max_local_ros_size * INT64CONST(1024),
+ numDataWosRows,
+ numWhiteoutWosRows);
+
+ ChangeLockModeInQueryContext(queryContext, AccessShareLock);
+
+ return result;
+}
+
+/**
+ * @brief Estimate the size of local ROS.
+ *
+ * @details We have some ways to estimate the number of rows in data WOS.
+ * In the system catalog, we have two values.
+ * One is pg_class.reltuples, which is updated on VACUUM.
+ * The other is pg_stat_user_tables.n_live_tup, which is
+ * updated on COMMIT.
+ * Both cannot count those rows INSERTed in the same
+ * transaction.
+ * To obtain the actual count, it seems necessary to count
+ * rows in data WOS.
+ * And we have the count itself in memory object,
+ * vci_memory_entry_t.tid_tree->number_of_nodes_in_wos.
+ *
+ * For local delete list, we estimate the number of entries, i.e. deleted
+ * rows, from the number of DB pages in the whiteout WOS.
+ *
+ * We also use memory for chunk storage in building Local ROS,
+ * but which can be counted in WOS -> ROS conversion memory.
+ *
+ * @param[in] queryContext estimate the size of local ROS for the given
+ * query context.
+ * @return estimated size of local ROS.
+ */
+Size
+vci_CSEstimateLocalRosSize(vci_CSQueryContext queryContext)
+{
+ Size result = 0;
+ int numRowsInExtent;
+ int64 numDataWosRows;
+ int64 numWhiteoutWosRows;
+ int64 numLocalDeleteListRows;
+ int numRowsAtOneFetch;
+ Size oneFetchMemorySize;
+
+ Assert(queryContext);
+ Assert(queryContext->info);
+
+ numRowsInExtent = vci_GetNumRowsInLocalRosExtent(queryContext->num_columns);
+
+ numDataWosRows = queryContext->num_data_wos_entries =
+ vci_EstimateNumEntriesInHeapRelation(
+ vci_GetMainRelVar(queryContext->info, vcimrv_data_wos_oid, 0));
+
+ numWhiteoutWosRows = queryContext->num_whiteout_wos_entries =
+ vci_EstimateNumEntriesInHeapRelation(
+ vci_GetMainRelVar(queryContext->info, vcimrv_whiteout_wos_oid, 0));
+
+ numLocalDeleteListRows = numDataWosRows + numWhiteoutWosRows;
+
+ if (VCI_NUM_ROWS_IN_EXTENT * VCI_MAX_NUMBER_UNCONVERTED_ROS < numDataWosRows)
+ return (Size) -1;
+
+ /*
+ * Calculate the size of memory area to store multiple
+ * vci_virtual_tuples_t. We assume to use column store for local ROS.
+ */
+ {
+ vci_CSFetchContext fetchContext;
+
+ /*
+ * We are estimating the data size of local ROS, for which we do not
+ * use data compression.
+ */
+ fetchContext = vci_CSCreateFetchContextBase(queryContext,
+ Min(numRowsInExtent, numDataWosRows),
+ queryContext->num_columns,
+ queryContext->attr_num,
+ true, /* useColumnStore */
+ true,
+ true,
+ false); /* no compression */
+
+ numRowsAtOneFetch = fetchContext->num_rows_read_at_once;
+
+ oneFetchMemorySize = fetchContext->size_vector_memory_context;
+
+ vci_CSDestroyFetchContext(fetchContext);
+ }
+
+ if (numDataWosRows <= numRowsAtOneFetch)
+ {
+ result = oneFetchMemorySize;
+ }
+ else
+ {
+ int numFullFetches;
+ int numRemainedRows;
+
+ numFullFetches = numDataWosRows / numRowsAtOneFetch;
+ numRemainedRows = numDataWosRows - (numFullFetches * numRowsAtOneFetch);
+
+ result = numFullFetches * oneFetchMemorySize;
+
+ if (0 < numRemainedRows)
+ {
+ vci_CSFetchContext fetchContext;
+
+ fetchContext = vci_CSCreateFetchContextBase(queryContext,
+ numRemainedRows,
+ queryContext->num_columns,
+ queryContext->attr_num,
+ true, /* useColumnStore */
+ true,
+ true,
+ false); /* no compression */
+
+ result += fetchContext->size_vector_memory_context;
+
+ vci_CSDestroyFetchContext(fetchContext);
+ }
+ }
+
+ /*
+ * Calculate the size of local delete list
+ */
+ result += numLocalDeleteListRows * sizeof(queryContext->delete_list[0]);
+
+ return result;
+}
+
+static void
+RefillPointersOfVirtualTuples(vci_virtual_tuples_t *vTuples, bool keepStatus)
+{
+ vci_CSFetchContext fetchContext = vTuples->fetch_context;
+ uint32 numRows = vTuples->num_rows_read_at_once;
+ vci_MainRelHeaderInfo *info;
+ char *ptr;
+
+ info = GetMainRelHeaderInfoFromFetchContext(fetchContext);
+
+ vTuples->buffer_capacity = numRows;
+
+ vTuples->values = (Datum *) MAXALIGN(vTuples->al_values);
+ vTuples->flags = (char *) MAXALIGN(vTuples->al_flags);
+
+ ptr = vTuples->flags;
+
+ vTuples->crid = NULL;
+ if (fetchContext->need_crid)
+ {
+ vTuples->crid = (int64 *) ptr;
+ ptr = (char *) &(vTuples->crid[numRows]);
+ Assert((uintptr_t) ptr - (uintptr_t) (vTuples->flags) <= vTuples->size_flags);
+ }
+
+ vTuples->tid = NULL;
+ if (fetchContext->need_tid)
+ {
+ vTuples->tid = (int64 *) ptr;
+ ptr = (char *) &(vTuples->tid[numRows]);
+ Assert((uintptr_t) ptr - (uintptr_t) (vTuples->flags) <= vTuples->size_flags);
+ }
+
+ vTuples->skip = (uint16 *) ptr;
+ ptr = (char *) &(vTuples->skip[TYPEALIGN(8, numRows) + 1]);
+ Assert((uintptr_t) ptr - (uintptr_t) (vTuples->flags) <= vTuples->size_flags);
+
+ vTuples->local_skip = (uint16 *) ptr;
+ ptr = (char *) &(vTuples->local_skip[numRows + 1]);
+ Assert((uintptr_t) ptr - (uintptr_t) (vTuples->flags) <= vTuples->size_flags);
+
+ vTuples->isnull = (bool *) ptr;
+ ptr = (char *) &(vTuples->isnull[numRows * vTuples->num_columns]);
+ Assert((uintptr_t) ptr - (uintptr_t) (vTuples->flags) <= vTuples->size_flags);
+
+ vTuples->row_wise_local_ros = NULL;
+ if (!(vTuples->use_column_store))
+ {
+ vTuples->row_wise_local_ros = (char *) MAXALIGN(ptr);
+ ptr += numRows * vTuples->num_columns * (sizeof(Datum) + sizeof(bool));
+ Assert((uintptr_t) ptr - (uintptr_t) (vTuples->flags) <= vTuples->size_flags);
+ }
+
+ vTuples->work_decompression = NULL;
+ if (0 < vTuples->size_decompression_area)
+ {
+ vTuples->work_decompression = ptr;
+ ptr += vTuples->size_decompression_area;
+ Assert((uintptr_t) ptr - (uintptr_t) (vTuples->flags) <= vTuples->size_flags);
+ }
+
+ for (int columnId = VCI_FIRST_NORMALCOLUMN_ID; columnId < vTuples->num_columns; ++columnId)
+ {
+ uint32 datumSize;
+ uint32 maxDictSize;
+ int16 cId = vci_GetColumnIdFromFetchContext(fetchContext, columnId);
+ vci_virtual_tuples_column_info_t *colInfo;
+
+ colInfo = &(vTuples->column_info[columnId]);
+ SizeOfElementAndPointer(info,
+ fetchContext,
+ cId,
+ &datumSize,
+ &(colInfo->max_column_size),
+ &maxDictSize,
+ &(colInfo->null_bit_id),
+ &(colInfo->comp_type),
+ &(colInfo->atttypid),
+ &(colInfo->strict_datum_type));
+ colInfo->dict_info = NULL;
+ if (0 < maxDictSize)
+ {
+ colInfo->dict_info = (vci_DictInfo *) MAXALIGN(ptr);
+ ptr += sizeof(vci_DictInfo);
+ Assert((uintptr_t) ptr - (uintptr_t) (vTuples->flags) <= vTuples->size_flags);
+ if (!keepStatus)
+ {
+ vci_InitializeDictInfo(colInfo->dict_info);
+ colInfo->dict_info->dictionary_storage = (unsigned char *) ptr;
+ colInfo->dict_info->storage_size = maxDictSize;
+ }
+ ptr += maxDictSize;
+ Assert((uintptr_t) ptr - (uintptr_t) (vTuples->flags) <= vTuples->size_flags);
+ }
+
+ if (vTuples->use_column_store)
+ {
+ colInfo->isnull = &(vTuples->isnull[numRows * columnId]);
+ colInfo->values = &(vTuples->values[numRows * columnId]);
+ }
+ else
+ {
+ colInfo->isnull = NULL;
+ colInfo->values = NULL;
+ }
+
+ colInfo->area = (char *) MAXALIGN(colInfo->al_area);
+ }
+}
+
+/**
+ * @brief Create an instance of \c vci_virtual_tuples_t where the read
+ * ROS is stored.
+ *
+ * @details The function \c vci_CSFetchVirtualTuples() reads and stores
+ * multiple rows at once from the specified columns and rows.
+ * Users need to have enough area to store the maximum number of rows.
+ * To prepare the area, the maximum number of rows should be passed via
+ * the parameter \c numRows of this function.
+ *
+ * @param[in] fetchContext the fetch context.
+ * @param[in] numRows required number of rows read at once.
+ * @return the pointer to the created \c vci_virtual_tuples_t.
+ */
+vci_virtual_tuples_t *
+vci_CSCreateVirtualTuplesWithNumRows(vci_CSFetchContext fetchContext,
+ uint32 numRows)
+{
+ MemoryContext mctx;
+ vci_virtual_tuples_t *result;
+ int32 size;
+ vci_MainRelHeaderInfo *info;
+
+ Assert(fetchContext);
+ mctx = fetchContext->local_memory_context;
+ Assert(mctx);
+ info = GetMainRelHeaderInfoFromFetchContext(fetchContext);
+
+ size = sizeof(vci_virtual_tuples_t) + ((fetchContext->num_columns - 1) *
+ sizeof(vci_virtual_tuples_column_info_t));
+
+ result = MemoryContextAllocZero(mctx, size);
+ result->size = size;
+ result->num_columns = fetchContext->num_columns;
+ result->extent_id = VCI_INVALID_EXTENT_ID;
+ result->num_rows_in_extent = 0;
+ result->row_id_in_extent = -1;
+ result->num_rows = 0;
+ result->buffer_capacity = 0;
+ result->offset_of_first_tuple_of_vector = 0;
+ result->num_rows_read_at_once = numRows;
+ result->fetch_context = fetchContext;
+ result->use_column_store = fetchContext->use_column_store;
+ result->status = vcirvs_out_of_range;
+
+ result->size_vector_memory_context = fetchContext->size_vector_memory_context;
+ Assert(result->num_rows_read_at_once <= fetchContext->num_rows_read_at_once);
+ result->size_values = fetchContext->size_values;
+ result->size_flags = fetchContext->size_flags;
+ result->size_dictionary_area = fetchContext->size_dictionary_area;
+ result->size_decompression_area = fetchContext->size_decompression_area;
+
+ result->al_values = MemoryContextAlloc(mctx,
+ result->size_values + MAXIMUM_ALIGNOF);
+
+ result->al_flags = MemoryContextAlloc(mctx,
+ result->size_flags + MAXIMUM_ALIGNOF);
+
+ {
+ for (int columnId = VCI_FIRST_NORMALCOLUMN_ID; columnId < result->num_columns; ++columnId)
+ {
+ int16 cId = vci_GetColumnIdFromFetchContext(fetchContext, columnId);
+
+ result->column_info[columnId].al_area = NULL;
+ if (NeedDatumPointer(info, cId))
+ {
+ uint32 maxElemSize;
+
+ SizeOfElementAndPointer(info, fetchContext, cId, NULL,
+ &maxElemSize, NULL, NULL, NULL, NULL, NULL);
+ result->column_info[columnId].al_area =
+ MemoryContextAlloc(mctx,
+ MAXIMUM_ALIGNOF +
+ (MAXALIGN(maxElemSize) * numRows));
+ }
+ }
+ }
+
+ RefillPointersOfVirtualTuples(result, false);
+
+ return result;
+}
+
+/**
+ * @brief Destroy given instance of \c vci_virtual_tuples_t.
+ *
+ * @param[in] vTuples target to be destory.
+ */
+void
+vci_CSDestroyVirtualTuples(vci_virtual_tuples_t *vTuples)
+{
+ for (int columnId = VCI_FIRST_NORMALCOLUMN_ID; columnId < vTuples->num_columns; ++columnId)
+ {
+ if (vTuples->column_info[columnId].al_area)
+ pfree(vTuples->column_info[columnId].al_area);
+ }
+
+ pfree(vTuples->al_values);
+ pfree(vTuples->al_flags);
+ pfree(vTuples);
+}
+
+/**
+ * @brief Fill CRIDs in the given \c vci_virtual_tuples_t.
+ *
+ * @param[in] vTuples target virtual tuples.
+ */
+void
+vci_FillCridInVirtualTuples(vci_virtual_tuples_t *vTuples)
+{
+ int aId;
+ int aIdRem = (vTuples->num_rows) & 7;
+ int64 *dst = vTuples->crid;
+ int64 crid = vci_CalcCrid64(vTuples->extent_id,
+ vTuples->row_id_in_extent);
+
+ for (aId = 0; aId < aIdRem; ++aId, ++crid)
+ dst[aId] = crid;
+
+ for (; aId < vTuples->num_rows; aId += 8, crid += 8)
+ {
+ dst[aId + 0] = crid + 0;
+ dst[aId + 1] = crid + 1;
+ dst[aId + 2] = crid + 2;
+ dst[aId + 3] = crid + 3;
+ dst[aId + 4] = crid + 4;
+ dst[aId + 5] = crid + 5;
+ dst[aId + 6] = crid + 6;
+ dst[aId + 7] = crid + 7;
+ }
+}
+
+static void
+FillSkipLoadFillBitImage(uint64 *dst, uint8 data)
+{
+ /*
+ * This function expands values like below, for little endian case,
+ * ((int16*) dst)[0] = (data >> 0) & 1; ((int16*) dst)[1] = (data >> 1) &
+ * 1; ((int16*) dst)[2] = (data >> 2) & 1; ((int16*) dst)[3] = (data >> 3)
+ * & 1; ((int16*) dst)[4] = (data >> 4) & 1; ((int16*) dst)[5] = (data >>
+ * 5) & 1; ((int16*) dst)[6] = (data >> 6) & 1; ((int16*) dst)[7] = (data
+ * >> 7) & 1;
+ */
+#ifdef WORDS_BIGENDIAN
+ uint64 value = UINT64CONST(0x0001000080004000) * data;
+
+ value += (UINT64CONST(0x2000) * data) >> 16;
+ dst[0] = value & UINT64CONST(0x0001000100010001);
+ dst[1] = (value >> 4) & UINT64CONST(0x0001000100010001);
+#else
+ uint64 value = UINT64CONST(0x0000200040008001) * data;
+
+ dst[0] = value & UINT64CONST(0x0001000100010001);
+ dst[1] = (value >> 4) & UINT64CONST(0x0001000100010001);
+#endif /* #ifdef WORDS_BIGENDIAN */
+}
+
+static void
+FillSkipLoadBody(uint16 **dst_,
+ int *startOf,
+ Page page,
+ OffsetNumber oNum,
+ int numRows)
+{
+ ItemId itemId = PageGetItemId(page, oNum);
+ HeapTupleHeader hTup = (HeapTupleHeader) PageGetItem(page, itemId);
+ uint8 *data = &(((uint8 *) hTup)[hTup->t_hoff + *startOf]);
+ int aIdMax = numRows / 8;
+ uint64 *dst = (uint64 *) *dst_;
+
+ *dst_ += numRows;
+ *startOf = 0;
+ for (int aId = 0; aId < aIdMax; aId += 4)
+ {
+ FillSkipLoadFillBitImage(dst + 0, data[0]);
+ FillSkipLoadFillBitImage(dst + 2, data[1]);
+ FillSkipLoadFillBitImage(dst + 4, data[2]);
+ FillSkipLoadFillBitImage(dst + 6, data[3]);
+ dst += 8;
+ data += 4;
+ }
+}
+
+static void
+FillSkipLoad(vci_virtual_tuples_t *vTuples)
+{
+ int64 cridStart = vci_CalcCrid64(vTuples->extent_id,
+ vTuples->row_id_in_extent);
+ int64 cridEnd = cridStart + vTuples->num_rows - 1;
+ vci_ColumnRelations rel = vTuples->fetch_context->rel_delete;
+ int initCorr = vTuples->row_id_in_extent % VCI_NUM_ROWS_IN_ONE_ITEM_FOR_DELETE;
+
+ BlockNumber startBN = vci_CalcBlockNumberFromCrid64ForDelete(cridStart);
+ OffsetNumber startON = vci_CalcOffsetNumberFromCrid64ForDelete(cridStart);
+ int startOf = vci_CalcByteFromCrid64ForDelete(cridStart);
+
+ BlockNumber endBN = vci_CalcBlockNumberFromCrid64ForDelete(cridEnd);
+ OffsetNumber endON = vci_CalcOffsetNumberFromCrid64ForDelete(cridEnd);
+ int endOf = vci_CalcByteFromCrid64ForDelete(cridEnd);
+
+ BlockNumber bNum;
+ uint16 *dst = vTuples->skip;
+#ifdef USE_ASSERT_CHECKING
+ uint16 *dstSave = dst;
+#endif /* #ifdef USE_ASSERT_CHECKING */
+
+ /*
+ * We always expand eight bits in a byte. So the first row ID should be a
+ * multiple of eight.
+ */
+ Assert(0 == (vTuples->row_id_in_extent & 7));
+
+ for (bNum = startBN; bNum < endBN; ++bNum)
+ {
+ Buffer buffer = ReadBuffer(rel.data, bNum);
+ Page page = BufferGetPage(buffer);
+
+ for (OffsetNumber oNum = startON;
+ oNum < (VCI_ITEMS_IN_PAGE_FOR_DELETE + FirstOffsetNumber);
+ ++oNum)
+ {
+ FillSkipLoadBody(&dst,
+ &startOf,
+ page,
+ oNum,
+ VCI_NUM_ROWS_IN_ONE_ITEM_FOR_DELETE - initCorr);
+ initCorr = 0;
+ Assert((uintptr_t) dst <= (uintptr_t) &(dstSave[TYPEALIGN(8, vTuples->num_rows)]));
+ }
+
+ ReleaseBuffer(buffer);
+ startON = FirstOffsetNumber;
+ }
+
+ {
+ Buffer buffer = ReadBuffer(rel.data, bNum);
+ Page page = BufferGetPage(buffer);
+ OffsetNumber oNum;
+
+ for (oNum = startON; oNum < endON; ++oNum)
+ {
+ FillSkipLoadBody(&dst,
+ &startOf,
+ page,
+ oNum,
+ VCI_NUM_ROWS_IN_ONE_ITEM_FOR_DELETE - initCorr);
+ initCorr = 0;
+ Assert((uintptr_t) dst <= (uintptr_t) &(dstSave[TYPEALIGN(8, vTuples->num_rows)]));
+ }
+
+ FillSkipLoadBody(&dst,
+ &startOf,
+ page,
+ oNum,
+ (endOf - startOf + 1) * 8);
+ Assert((uintptr_t) dst <= (uintptr_t) &(dstSave[TYPEALIGN(8, vTuples->num_rows)]));
+
+ ReleaseBuffer(buffer);
+ }
+}
+
+static int64
+GetPrevIdInDeleteList(vci_CSQueryContext queryContext, uint64 crid)
+{
+ if (queryContext->num_delete < 16)
+ {
+ for (int64 result = queryContext->num_delete; result--;)
+ {
+ if (queryContext->delete_list[result] <= crid)
+ return result;
+ }
+
+ return -1;
+ }
+ else
+ {
+ uint64 result = 0;
+ int shiftBit = vci_GetHighestBit(queryContext->num_delete);
+
+ Assert(0 <= shiftBit);
+ for (uint64 tgtBit = UINT64CONST(1) << shiftBit; tgtBit; tgtBit >>= 1)
+ {
+ uint64 cand = result + tgtBit;
+
+ if ((cand < queryContext->num_delete) &&
+ (queryContext->delete_list[cand] <= crid))
+ result = cand;
+ }
+ if (0 == result)
+ return (queryContext->delete_list[0] <= crid) ? 0 : -1;
+
+ return result;
+ }
+}
+
+static void
+MergeLocalDeleteListToSkip(vci_virtual_tuples_t *vTuples)
+{
+ vci_CSFetchContext fetchContext = vTuples->fetch_context;
+ vci_CSQueryContext queryContext = fetchContext->query_context;
+ int64 cridStart = vci_CalcCrid64(vTuples->extent_id,
+ vTuples->row_id_in_extent);
+ int numRows = vTuples->num_rows;
+ int startId;
+ int endId;
+
+ if (queryContext->num_delete < 1)
+ return;
+
+ startId = Max(0, GetPrevIdInDeleteList(queryContext, cridStart));
+ if (queryContext->delete_list[startId] < cridStart)
+ ++startId;
+
+ endId = GetPrevIdInDeleteList(queryContext, cridStart + numRows - 1);
+
+ for (int aid = startId; aid <= endId; ++aid)
+ {
+ uint64 crid = queryContext->delete_list[aid];
+ uint64 offset = crid - cridStart;
+
+ vTuples->skip[offset] = 1;
+ }
+}
+
+static void
+FillSkipCountUp(vci_virtual_tuples_t *vTuples)
+{
+ uint16 *dst = vTuples->skip;
+ uint16 count = 0;
+
+ for (int aId = vTuples->num_rows; aId--;)
+ dst[aId] = count += dst[aId] + ((dst[aId] - 1) * count);
+}
+
+static void
+FillSkip(vci_virtual_tuples_t *vTuples)
+{
+ FillSkipLoad(vTuples);
+ MergeLocalDeleteListToSkip(vTuples);
+
+ vTuples->skip[vTuples->num_rows] = 0;
+
+ FillSkipCountUp(vTuples);
+}
+
+static char *
+FillFixedWidthCopyBody1(char *dstData,
+ BlockNumber startBN,
+ uint32 startOf,
+ int stepDstData,
+ int dataWidth,
+ Relation rel,
+ int numRows)
+{
+ BlockNumber bNumCur = startBN;
+ int64 offset = startOf;
+ Buffer buffer = InvalidBuffer;
+ Page page = NULL;
+ const Datum zero = 0;
+#ifdef WORDS_BIGENDIAN
+ const int offsetCont = MAXALIGN(dataWidth) - dataWidth;
+#else /* #ifdef WORDS_BIGENDIAN */
+ const int offsetCont = MAXALIGN(dataWidth) - sizeof(Datum);
+#endif /* #ifdef WORDS_BIGENDIAN */
+
+ if (0 < numRows)
+ {
+ buffer = ReadBuffer(rel, bNumCur);
+ page = BufferGetPage(buffer);
+ }
+
+ for (int aId = 0; aId < numRows;)
+ {
+ int64 rest = VCI_MAX_PAGE_SPACE - offset;
+ int numElem = rest / dataWidth;
+ int maxBId = Min(aId + numElem, numRows);
+
+ Assert(0 <= rest);
+ for (int bId = aId; bId < maxBId; ++bId)
+ {
+#ifdef WORDS_BIGENDIAN
+ *(Datum *) dstData = zero;
+ MemCpy(&(dstData[offsetCont]), &(page[VCI_MIN_PAGE_HEADER + offset]), dataWidth);
+#else /* #ifdef WORDS_BIGENDIAN */
+ *(Datum *) &(dstData[offsetCont]) = zero;
+ MemCpy(dstData, &(page[VCI_MIN_PAGE_HEADER + offset]), dataWidth);
+#endif /* #ifdef WORDS_BIGENDIAN */
+ dstData += stepDstData;
+ offset += dataWidth;
+ }
+
+ aId = maxBId;
+
+ if (numRows <= aId)
+ break;
+
+ if (offset < VCI_MAX_PAGE_SPACE)
+ {
+ int size = VCI_MAX_PAGE_SPACE - offset;
+ int nextOffset = dataWidth - size;
+
+ Assert(VCI_MAX_PAGE_SPACE < (offset + dataWidth - 1));
+#ifdef WORDS_BIGENDIAN
+ *(Datum *) dstData = zero;
+ MemCpy(&(dstData[offsetCont]), &(page[VCI_MIN_PAGE_HEADER + offset]), size);
+#else /* #ifdef WORDS_BIGENDIAN */
+ *(Datum *) &(dstData[offsetCont]) = zero;
+ MemCpy(dstData, &(page[VCI_MIN_PAGE_HEADER + offset]), size);
+#endif /* #ifdef WORDS_BIGENDIAN */
+ ReleaseBuffer(buffer);
+ buffer = ReadBuffer(rel, ++bNumCur);
+ page = BufferGetPage(buffer);
+#ifdef WORDS_BIGENDIAN
+ MemCpy(&(dstData[offsetCont + size]), &(page[VCI_MIN_PAGE_HEADER]), nextOffset);
+#else /* #ifdef WORDS_BIGENDIAN */
+ MemCpy(dstData + size, &(page[VCI_MIN_PAGE_HEADER]), nextOffset);
+#endif /* #ifdef WORDS_BIGENDIAN */
+ dstData += stepDstData;
+ offset = nextOffset;
+ ++aId;
+ }
+ else
+ {
+ Assert(offset == VCI_MAX_PAGE_SPACE);
+ offset = 0;
+ ReleaseBuffer(buffer);
+ buffer = ReadBuffer(rel, ++bNumCur);
+ page = BufferGetPage(buffer);
+ }
+ }
+ if (BufferIsValid(buffer))
+ ReleaseBuffer(buffer);
+
+ return dstData;
+}
+
+static void
+FillFixedWidth(vci_virtual_tuples_t *vTuples,
+ int16 columnId,
+ vci_ColumnRelations *rel)
+{
+ vci_MainRelHeaderInfo *info = GetMainRelHeaderInfoFromFetchContext(vTuples->fetch_context);
+ Datum *dstPtr = NULL;
+ char *dstData = NULL;
+
+ int dataWidth = 0;
+
+ int16 colId = columnId;
+
+ bool passByRef = false;
+
+ char *checkPtr PG_USED_FOR_ASSERTS_ONLY;
+
+ BlockNumber startBN;
+ uint32 startOf;
+
+ int facRow = vTuples->num_columns;
+ int stepDstData = sizeof(Datum) * facRow;
+
+ if (VCI_FIRST_NORMALCOLUMN_ID <= columnId)
+ {
+ int facCol = 1;
+
+ Assert(columnId < vTuples->num_columns);
+
+ dataWidth = vTuples->column_info[columnId].max_column_size;
+ if (vTuples->use_column_store)
+ {
+ facRow = 1;
+ stepDstData = sizeof(Datum) * facRow;
+ facCol = vTuples->num_rows_read_at_once;
+ }
+
+ dstData = (char *) &(vTuples->values[facCol * columnId]);
+
+ if ((passByRef = vTuples->column_info[columnId].strict_datum_type)) /* pgr0011 */
+ {
+ dstPtr = (Datum *) dstData;
+ dstData = vTuples->column_info[columnId].area;
+ stepDstData = MAXALIGN(dataWidth);
+ Assert(dstData);
+ }
+ else
+ Assert(NULL == vTuples->column_info[columnId].area);
+ colId = vci_GetColumnIdFromFetchContext(vTuples->fetch_context,
+ columnId);
+ }
+ else
+ {
+ Assert(VCI_COLUMN_ID_TID == columnId);
+ dstData = (char *) (vTuples->tid);
+ stepDstData = sizeof(vTuples->tid[0]);
+ dataWidth = sizeof(ItemPointerData);
+ }
+
+ vci_GetPositionForFixedColumn(&startBN,
+ &startOf,
+ info,
+ colId,
+ vTuples->extent_id,
+ vTuples->row_id_in_extent,
+ false);
+
+ checkPtr = FillFixedWidthCopyBody1(dstData,
+ startBN,
+ startOf,
+ stepDstData,
+ dataWidth,
+ rel->data,
+ vTuples->num_rows);
+ if (passByRef)
+ {
+ Assert(VCI_FIRST_NORMALCOLUMN_ID <= columnId);
+ for (int rId = 0; rId < vTuples->num_rows; ++rId)
+ dstPtr[facRow * rId] = PointerGetDatum(&(dstData[stepDstData * rId]));
+ Assert((uintptr_t) (vTuples->column_info[columnId].area) <= (uintptr_t) checkPtr);
+ Assert((uintptr_t) checkPtr <= (uintptr_t) &(vTuples->column_info[columnId].area[vTuples->column_info[columnId].max_column_size * vTuples->num_rows_read_at_once]));
+ if (vTuples->use_column_store)
+ Assert(vTuples->column_info[columnId].values == dstPtr);
+ else
+ Assert(&(vTuples->values[columnId]) == dstPtr);
+ }
+ else
+ {
+ if (VCI_FIRST_NORMALCOLUMN_ID <= columnId)
+ {
+ if (vTuples->use_column_store)
+ {
+ Assert((uintptr_t) (vTuples->column_info[columnId].values) <= (uintptr_t) checkPtr);
+ Assert((uintptr_t) checkPtr <= (uintptr_t) &(vTuples->column_info[columnId].values[vTuples->num_rows_read_at_once]));
+ }
+ else
+ {
+ Assert((uintptr_t) (&(vTuples->values[columnId])) <= (uintptr_t) checkPtr);
+ Assert((uintptr_t) checkPtr <= (uintptr_t) &(vTuples->values[columnId + (vTuples->num_rows_read_at_once * vTuples->num_columns)]));
+ }
+ }
+ else
+ {
+ Assert((uintptr_t) (vTuples->tid) <= (uintptr_t) checkPtr);
+ Assert((uintptr_t) checkPtr <= (uintptr_t) &(vTuples->tid[vTuples->num_rows_read_at_once]));
+ }
+ }
+}
+
+static void
+Copy3(char *dst, char *src, int len)
+{
+ if (2 & len)
+ {
+#if defined(__i386__) || defined(__x86_64__) || defined(__powerpc64__) /* Little Endian */
+ *(uint16 *) dst = *(uint16 *) src;
+ dst += 2;
+ src += 2;
+#else /* #if defined(__i386__) ||
+ * defined(__x86_64__) */
+ *(dst++) = *(src++);
+ *(dst++) = *(src++);
+#endif /* #if defined(__i386__) ||
+ * defined(__x86_64__) */
+ }
+ if (1 & len)
+ *dst = *src;
+}
+
+static uint32
+GetVarlenAHeader(Datum *header_,
+ Buffer *buffer,
+ BlockNumber *currentBlockNumber,
+ uint32 offsetInPage,
+ Relation rel)
+{
+ char *header = (char *) header_;
+ Page page;
+ char *curPtr;
+ int len = VCI_MAX_PAGE_SPACE - offsetInPage;
+
+ if (len <= 0)
+ {
+ Assert(BlockNumberIsValid(*currentBlockNumber));
+ if (MaxBlockNumber == *currentBlockNumber)
+ ereport(ERROR, (errmsg("relation full"), errhint("Disable VCI by 'SELECT vci_disable();'")));
+ if (BufferIsValid(*buffer))
+ ReleaseBuffer(*buffer);
+ *buffer = ReadBuffer(rel, ++*currentBlockNumber);
+ offsetInPage -= VCI_MAX_PAGE_SPACE;
+ len = VCI_MAX_PAGE_SPACE - offsetInPage;
+ }
+
+ page = BufferGetPage(*buffer);
+ curPtr = &(page[VCI_MIN_PAGE_HEADER + offsetInPage]);
+
+ Assert(0 < len);
+
+ if (VARATT_IS_1B_E(curPtr)) /* VARHDRSZ_EXTERNAL */
+ {
+ Assert(2 == VARHDRSZ_EXTERNAL);
+ if (VARHDRSZ_EXTERNAL <= len)
+ {
+ *(header++) = *(curPtr++);
+ *(header++) = *(curPtr++);
+
+ return offsetInPage + VARHDRSZ_EXTERNAL;
+ }
+
+ Assert(1 == len);
+ *(header++) = *(curPtr++);
+ ReleaseBuffer(*buffer);
+ ++*currentBlockNumber;
+ *buffer = ReadBuffer(rel, *currentBlockNumber);
+ page = BufferGetPage(*buffer);
+ *header = page[VCI_MIN_PAGE_HEADER];
+
+ return 1;
+ }
+
+ if (VARATT_IS_1B(curPtr)) /* VARHDRSZ_SHORT */
+ {
+ Assert(1 == VARHDRSZ_SHORT);
+ Assert(VARHDRSZ_SHORT <= len);
+ *header = *curPtr;
+
+ return offsetInPage + VARHDRSZ_SHORT;
+ }
+
+ /* VARHDRSZ */
+ Assert(4 == VARHDRSZ);
+
+ if (VARHDRSZ <= len)
+ {
+ *(uint32 *) header = *(uint32 *) curPtr;
+
+ return offsetInPage + VARHDRSZ;
+ }
+
+ Assert((0 <= len) && (len <= 3));
+ Copy3(header, curPtr, len);
+ header += len;
+ curPtr += len;
+
+ ReleaseBuffer(*buffer);
+ ++*currentBlockNumber;
+ *buffer = ReadBuffer(rel, *currentBlockNumber);
+ page = BufferGetPage(*buffer);
+ len = VARHDRSZ - len;
+ curPtr = &(page[VCI_MIN_PAGE_HEADER]);
+
+ Assert((0 <= len) && (len <= 3));
+ Copy3(header, curPtr, len);
+
+ return len;
+}
+
+static void
+FillVariableWidth(vci_virtual_tuples_t *vTuples,
+ int16 columnId,
+ vci_ColumnRelations *rel)
+{
+ vci_MainRelHeaderInfo *info = GetMainRelHeaderInfoFromFetchContext(vTuples->fetch_context);
+ char *dstData = vTuples->column_info[columnId].area;
+ Datum *dstPtr = &(vTuples->values[columnId]);
+ int ptrStep = vTuples->num_columns;
+
+ BlockNumber startBN;
+ uint32 startOf;
+
+ if (vTuples->use_column_store)
+ {
+ dstPtr = &(vTuples->values[vTuples->num_rows_read_at_once * columnId]);
+ ptrStep = 1;
+ }
+
+ /* This function must be called only for ROS, not local ROS. */
+ Assert(VCI_FIRST_NORMAL_EXTENT_ID <= vTuples->extent_id);
+
+ {
+ uint32 offset;
+ uint32 dataOffset;
+ BlockNumber blockNumberBase;
+ TupleDesc desc = vci_GetTupleDescr(info);
+ int16 cId = vci_GetColumnIdFromFetchContext(vTuples->fetch_context,
+ columnId);
+
+ vci_GetElementPosition(&offset,
+ &blockNumberBase,
+ &dataOffset,
+ rel,
+ vTuples->extent_id,
+ vTuples->row_id_in_extent,
+ TupleDescAttr(desc, cId));
+ vci_GetBlockNumberAndOffsetInPage(&startBN,
+ &startOf,
+ offset + dataOffset);
+ startBN += blockNumberBase;
+ }
+
+ {
+ BlockNumber bNum = startBN;
+ Size offsetInPage = startOf;
+ Buffer buffer = InvalidBuffer;
+ Page page;
+ int numWrite = 0;
+ int numRows = vTuples->num_rows;
+
+ if (0 < numRows)
+ {
+ buffer = ReadBuffer(rel->data, bNum);
+ page = BufferGetPage(buffer);
+ }
+
+ for (int aId = 0; aId < numRows; ++aId)
+ {
+ offsetInPage = GetVarlenAHeader((Datum *) dstData,
+ &buffer,
+ &bNum,
+ offsetInPage,
+ rel->data);
+
+ {
+ int32 copySize;
+ uint32 dataSize = VARSIZE_ANY_EXHDR(dstData);
+ uint32 headerSize = vci_VARHDSZ_ANY(dstData);
+
+ dstPtr[ptrStep * (numWrite++)] = PointerGetDatum(dstData);
+
+ if (VCI_MAX_PAGE_SPACE <= offsetInPage)
+ {
+ offsetInPage -= VCI_MAX_PAGE_SPACE;
+ Assert(offsetInPage < VCI_MAX_PAGE_SPACE);
+ if (0 == offsetInPage)
+ {
+ ReleaseBuffer(buffer);
+ buffer = ReadBuffer(rel->data, ++bNum);
+ }
+ }
+ page = BufferGetPage(buffer);
+
+ copySize = Min(dataSize, VCI_MAX_PAGE_SPACE - offsetInPage);
+ MemCpy(&(dstData[headerSize]),
+ &(page[VCI_MIN_PAGE_HEADER + offsetInPage]),
+ copySize);
+ offsetInPage += copySize;
+
+ if (copySize < dataSize)
+ {
+ ReleaseBuffer(buffer);
+ buffer = ReadBuffer(rel->data, ++bNum);
+ page = BufferGetPage(buffer);
+ MemCpy(&(dstData[copySize + headerSize]),
+ &(page[VCI_MIN_PAGE_HEADER]),
+ dataSize - copySize);
+ offsetInPage = dataSize - copySize; /* pgr0063 */
+ }
+ dstData += MAXALIGN(dataSize + headerSize);
+ }
+
+ if (VCI_MAX_PAGE_SPACE <= offsetInPage)
+ {
+ ReleaseBuffer(buffer);
+ buffer = ReadBuffer(rel->data, ++bNum);
+ page = BufferGetPage(buffer);
+ offsetInPage -= VCI_MAX_PAGE_SPACE;
+ Assert(offsetInPage < VCI_MAX_PAGE_SPACE);
+ }
+ }
+
+ if (BufferIsValid(buffer))
+ ReleaseBuffer(buffer);
+
+ Assert(vTuples->num_rows == numWrite);
+ }
+}
+
+static void
+FillValues(vci_virtual_tuples_t *vTuples)
+{
+ for (int16 columnId = VCI_FIRST_NORMALCOLUMN_ID; columnId < vTuples->num_columns; ++columnId)
+ {
+ switch (vTuples->column_info[columnId].comp_type)
+ {
+ case vcis_compression_type_fixed_raw:
+ FillFixedWidth(vTuples, columnId,
+ &(vTuples->fetch_context->rel_column[columnId]));
+ break;
+ case vcis_compression_type_variable_raw:
+ FillVariableWidth(vTuples, columnId,
+ &(vTuples->fetch_context->rel_column[columnId]));
+ break;
+ default:
+ abort();
+ }
+ }
+}
+
+static int
+GetNullableColumnInfo(uint16 **columnId, uint16 **nullBitId,
+ vci_virtual_tuples_t *vTuples)
+{
+ int cId;
+
+ vci_CSQueryContext queryContext = vTuples->fetch_context->query_context;
+
+ *columnId = palloc0_array(uint16, queryContext->num_nullable_columns);
+ *nullBitId = palloc0_array(uint16, queryContext->num_nullable_columns);
+
+ cId = 0;
+ for (int aId = 0; aId < vTuples->num_columns; ++aId)
+ {
+ int bitId = vTuples->column_info[aId].null_bit_id;
+
+ if (0 <= bitId)
+ {
+ (*columnId)[cId] = aId;
+ (*nullBitId)[cId] = bitId;
+ ++cId;
+ }
+ }
+ Assert(cId <= queryContext->num_nullable_columns);
+
+ return cId;
+}
+
+static void
+FillIsNull(vci_virtual_tuples_t *vTuples)
+{
+ int colOffset[MaxAttrNumber];
+ Buffer buffer = InvalidBuffer;
+ Page page = NULL;
+ vci_CSQueryContext queryContext = vTuples->fetch_context->query_context;
+ vci_MainRelHeaderInfo *info = GetMainRelHeaderInfoFromFetchContext(vTuples->fetch_context);
+ const int32 strideR = 16;
+ uint16 *columnId;
+ uint16 *nullBitId;
+ uint8 *nullCopy = palloc0_array(uint8, (strideR * queryContext->null_width_in_byte));
+
+ BlockNumber bNumCur;
+ uint32 offset;
+
+ Relation rel = vTuples->fetch_context->rel_null.data;
+
+ int numNullableColumns = GetNullableColumnInfo(&columnId, &nullBitId, vTuples);
+
+ int facCol = 1;
+ int facRow = vTuples->num_columns;
+
+ if (vTuples->use_column_store)
+ {
+ facCol = vTuples->num_rows_read_at_once;
+ facRow = 1;
+ }
+
+ Assert(VCI_FIRST_NORMAL_EXTENT_ID <= vTuples->extent_id);
+ MemSet(vTuples->isnull, 0, vTuples->num_columns * vTuples->num_rows);
+
+ {
+ for (int aId = 0; aId < numNullableColumns; ++aId)
+ colOffset[aId] = facCol * columnId[aId];
+ }
+ vci_GetPositionForFixedColumn(&bNumCur,
+ &offset,
+ info,
+ VCI_COLUMN_ID_NULL,
+ vTuples->extent_id,
+ vTuples->row_id_in_extent,
+ false);
+
+ if (0 < vTuples->num_rows)
+ {
+ buffer = ReadBuffer(rel, bNumCur);
+ page = BufferGetPage(buffer);
+ }
+
+ /* This tiling is the best? */
+ for (int32 rId = 0; rId < vTuples->num_rows; rId += strideR)
+ {
+ int32 pIdMax = Min(rId + strideR, vTuples->num_rows);
+ int nwib = queryContext->null_width_in_byte;
+ int32 inc = (pIdMax - rId) * nwib;
+ uint32 nextOffset = offset + inc;
+ uint8 *ptr = (uint8 *) &(page[VCI_MIN_PAGE_HEADER + offset]);
+ uint8 *ptrSave = NULL;
+
+ Assert((0 <= offset) && (offset < VCI_MAX_PAGE_SPACE));
+ if (VCI_MAX_PAGE_SPACE < nextOffset)
+ {
+ int size = VCI_MAX_PAGE_SPACE - offset;
+
+ MemCpy(nullCopy, ptr, size);
+ ReleaseBuffer(buffer);
+ buffer = ReadBuffer(rel, ++bNumCur);
+ page = BufferGetPage(buffer);
+ MemCpy(&(nullCopy[size]), &(page[VCI_MIN_PAGE_HEADER]),
+ inc - size);
+ ptr = nullCopy;
+ }
+
+ ptrSave = ptr;
+ for (int cId = 0; cId < numNullableColumns; ++cId)
+ {
+ int32 pId;
+ int bitId = nullBitId[cId];
+ bool *dst = &(vTuples->isnull[colOffset[cId] + (rId * facRow)]);
+
+ ptr = ptrSave;
+ for (pId = rId; pId <= (pIdMax - 4); pId += 4)
+ {
+ *dst = vci_GetBit(ptr, bitId);
+ ptr += nwib;
+ dst += facRow;
+ *dst = vci_GetBit(ptr, bitId);
+ ptr += nwib;
+ dst += facRow;
+ *dst = vci_GetBit(ptr, bitId);
+ ptr += nwib;
+ dst += facRow;
+ *dst = vci_GetBit(ptr, bitId);
+ ptr += nwib;
+ dst += facRow;
+ }
+ for (; pId < pIdMax; ++pId)
+ {
+ *dst = vci_GetBit(ptr, bitId);
+ ptr += nwib;
+ dst += facRow;
+ }
+ }
+
+ offset = nextOffset;
+ if (VCI_MAX_PAGE_SPACE <= offset)
+ {
+ if (VCI_MAX_PAGE_SPACE == offset)
+ {
+ ReleaseBuffer(buffer);
+ buffer = ReadBuffer(rel, ++bNumCur);
+ page = BufferGetPage(buffer);
+ }
+ offset -= VCI_MAX_PAGE_SPACE;
+ }
+ }
+ if (BufferIsValid(buffer))
+ ReleaseBuffer(buffer);
+
+ pfree(nullCopy);
+ pfree(nullBitId);
+ pfree(columnId);
+}
+
+/**
+ * @brief Fetch or read data in columns specified in \c vTuples,
+ * \c numReadRows rows from \c cridStart.
+ *
+ * @details vci_CSFetchVirtualTuples returns number of rows which can be read
+ * from stored data after cridStart. For example, if cridStart = 50, but
+ * actualNumberOfRowsReadAtOnce = 128, vci_CSFetchVirtualTuples() returns 78
+ * (= 128 - 50).
+ *
+ * When all the tuples between cridStart and (cridStart + numReadRows - 1)
+ * is stored in vTuples, it does not read ROS.
+ * Otherwise, tuples at TYPEALIGN_DOWN(VCI_COMPACTION_UNIT_ROW, cridStart)
+ * and following (actualNumRowsReadAtOnce - 1) rows are read from ROS.
+ *
+ * @param[in,out] vTuples the read data are stored in the pointed area.
+ * @param[in] cridStart the data from \c cridStart row are read.
+ * @param[in] numReadRows required number of rows to be read.
+ * @return number of rows enable to be read out from \c vTuples.
+ */
+int
+vci_CSFetchVirtualTuples(vci_virtual_tuples_t *vTuples,
+ int64 cridStart,
+ uint32 numReadRows)
+{
+ const int32 extentId = vci_CalcExtentIdFromCrid64(cridStart);
+ const uint32 rowId = vci_CalcRowIdInExtentFromCrid64(cridStart);
+
+ Assert(vTuples);
+
+ vTuples->status = vcirvs_out_of_range;
+ if (VCI_INVALID_EXTENT_ID == extentId)
+ {
+ return 0;
+ }
+
+ RefillPointersOfVirtualTuples(vTuples, true);
+
+ /* local ROS */
+ if (extentId < VCI_FIRST_NORMAL_EXTENT_ID)
+ {
+ vci_CSFetchContext fetchContext = vTuples->fetch_context;
+ vci_CSQueryContext queryContext = fetchContext->query_context;
+ vci_local_ros_t *localRos = queryContext->local_ros;
+ int localRosId = -extentId - 1;
+
+ Assert(queryContext->num_local_ros_extents == localRos->num_local_extents);
+ if (queryContext->num_local_ros_extents <= localRosId)
+ {
+ vTuples->status = vcirvs_not_exist;
+
+ return 0;
+ }
+
+ if (localRos->extent[localRosId]->num_rows_in_extent <= rowId)
+ {
+ vTuples->status = vcirvs_out_of_range;
+
+ return 0;
+ }
+
+ vTuples->num_rows_in_extent = localRos->extent[localRosId]->num_rows_in_extent;
+ vTuples->extent_id = extentId;
+ vTuples->num_rows = Min(numReadRows,
+ vTuples->num_rows_in_extent - rowId);
+ vTuples->offset_of_first_tuple_of_vector = 0;
+
+ if (vTuples->tid)
+ MemCpy(vTuples->tid, &(localRos->extent[localRosId]->tid[rowId]),
+ sizeof(vTuples->tid[0]) * vTuples->num_rows);
+
+ if (vTuples->crid)
+ MemCpy(vTuples->crid, &(localRos->extent[localRosId]->crid[rowId]),
+ sizeof(vTuples->crid[0]) * vTuples->num_rows);
+
+ MemSet(vTuples->skip, 0,
+ sizeof(vTuples->skip[0]) * (vTuples->num_rows + 1));
+
+ if (vTuples->use_column_store)
+ {
+ for (int cId = 0; cId < vTuples->num_columns; ++cId)
+ {
+ vci_virtual_tuples_column_info_t *dColI;
+ vci_virtual_tuples_column_info_t *sColI;
+
+ dColI = &(vTuples->column_info[cId]);
+ sColI = &(localRos->extent[localRosId]->column_info[
+ fetchContext->column_link[cId]]);
+ MemCpy(dColI->values, &(sColI->values[rowId]),
+ sizeof(Datum) * vTuples->num_rows);
+ MemCpy(dColI->isnull, &(sColI->isnull[rowId]),
+ sizeof(bool) * vTuples->num_rows);
+ }
+ }
+ else
+ {
+ vTuples->values = (Datum *) TYPEALIGN(sizeof(Datum),
+ vTuples->row_wise_local_ros);
+ vTuples->isnull = (bool *) &(vTuples->values[vTuples->num_rows_read_at_once *
+ vTuples->num_columns]);
+ for (int rId = 0; rId < vTuples->num_rows; ++rId)
+ {
+ int offset = rId * vTuples->num_columns;
+ Datum *dstValues = &(vTuples->values[offset]);
+ bool *dstIsNull = &(vTuples->isnull[offset]);
+
+ for (int cId = 0; cId < vTuples->num_columns; ++cId)
+ {
+ vci_virtual_tuples_column_info_t *sColI;
+
+ sColI = &(localRos->extent[localRosId]->column_info[
+ fetchContext->column_link[cId]]);
+ dstValues[cId] = sColI->values[rowId + rId];
+ dstIsNull[cId] = sColI->isnull[rowId + rId];
+ }
+ }
+ }
+
+ vTuples->status = (localRos->extent[localRosId]->num_rows_in_extent <=
+ (rowId + vTuples->num_rows))
+ ? vcirvs_end_of_extent : vcirvs_read_whole;
+ }
+ else
+ {
+ vTuples->status = vcirvs_read_whole;
+ /* use stored data */
+ if ((extentId == vTuples->extent_id) &&
+ (vTuples->row_id_in_extent <= rowId) &&
+ ((rowId + numReadRows) <=
+ (vTuples->row_id_in_extent + vTuples->num_rows)))
+ {
+ vTuples->offset_of_first_tuple_of_vector = rowId -
+ vTuples->row_id_in_extent;
+ }
+ else
+ {
+ uint32 numRowsInExtent = vTuples->num_rows_in_extent;
+
+ {
+ vci_extent_status_t status;
+
+ vci_CSCheckExtent(&status, vTuples->fetch_context, extentId, false);
+ /* check if the extent is visible */
+ if (!((status.existence) && (status.visible)))
+ {
+ vTuples->status = vcirvs_not_visible;
+
+ return 0; /* not visible */
+ }
+ }
+
+ if (extentId != vTuples->extent_id)
+ {
+ Buffer buffer = InvalidBuffer;
+ vcis_m_extent_t *mExtent;
+
+ mExtent = vci_GetMExtent(&buffer,
+ GetMainRelHeaderInfoFromFetchContext(
+ vTuples->fetch_context),
+ extentId);
+
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
+ numRowsInExtent = mExtent->num_rows;
+ UnlockReleaseBuffer(buffer);
+
+ vTuples->num_rows_in_extent = numRowsInExtent;
+ vTuples->extent_id = extentId;
+ vTuples->num_rows = 0;
+ }
+
+ /* no such a row in the extent */
+ if (numRowsInExtent <= rowId)
+ {
+ vTuples->status = vcirvs_out_of_range;
+
+ return 0;
+ }
+
+ vTuples->row_id_in_extent = TYPEALIGN_DOWN(VCI_COMPACTION_UNIT_ROW,
+ rowId);
+ vTuples->offset_of_first_tuple_of_vector = rowId -
+ vTuples->row_id_in_extent;
+ vTuples->num_rows = TYPEALIGN(VCI_COMPACTION_UNIT_ROW,
+ vTuples->offset_of_first_tuple_of_vector +
+ numReadRows);
+ vTuples->num_rows = Min(vTuples->num_rows,
+ vTuples->num_rows_read_at_once);
+ vTuples->num_rows = Min(vTuples->num_rows,
+ numRowsInExtent - vTuples->row_id_in_extent);
+
+ if (vTuples->crid)
+ vci_FillCridInVirtualTuples(vTuples);
+
+ if (vTuples->tid)
+ FillFixedWidth(vTuples, VCI_COLUMN_ID_TID,
+ &(vTuples->fetch_context->rel_tid));
+
+ FillSkip(vTuples);
+
+ FillIsNull(vTuples);
+ FillValues(vTuples);
+ }
+
+ if (vTuples->num_rows_in_extent <= (vTuples->row_id_in_extent +
+ vTuples->offset_of_first_tuple_of_vector + numReadRows))
+ vTuples->status = vcirvs_end_of_extent;
+ }
+
+ Assert(vTuples->offset_of_first_tuple_of_vector <= vTuples->num_rows);
+
+ return Min(vTuples->num_rows - vTuples->offset_of_first_tuple_of_vector,
+ numReadRows);
+}
+
+/**
+ * @brief Fill data of the specified fixed-field-length column in
+ * \c RosChunkStorage into \c vci_virtual_tuples_t.
+ *
+ * @param[in,out] vTuples the pointer of vci_virtual_tuples_t where data are
+ * stored.
+ * @param[in] columnId target column ID.
+ * @param[in] rosChunkStorage data source.
+ */
+void
+vci_FillFixedWidthColumnarFromRosChunkStorage(vci_virtual_tuples_t *vTuples,
+ int16 columnId,
+ RosChunkStorage *rosChunkStorage)
+{
+ int16 colIdInVciMain = VCI_FIRST_NORMALCOLUMN_ID;
+ Datum *dstPtr = NULL;
+ char *dstData = NULL;
+
+ int stepDstData = 0;
+ int stepSrc = 0;
+
+ bool passByRef = false;
+ int offsetCont = 0;
+
+ if (VCI_FIRST_NORMALCOLUMN_ID <= columnId)
+ {
+ vci_CSFetchContext fetchContext = vTuples->fetch_context;
+
+ Assert(columnId < vTuples->num_columns);
+ colIdInVciMain = fetchContext->query_context->column_id[fetchContext->column_link[columnId]];
+ dstData = (char *) &(vTuples->values[vTuples->num_rows_in_extent * columnId]);
+ if ((passByRef = vTuples->column_info[columnId].strict_datum_type)) /* pgr0011 */
+ {
+ dstPtr = (Datum *) dstData;
+ dstData = vTuples->column_info[columnId].area;
+ Assert(dstData);
+ }
+ else
+ Assert(NULL == vTuples->column_info[columnId].area);
+ stepSrc = vTuples->column_info[columnId].max_column_size;
+ stepDstData = MAXALIGN(stepSrc);
+ }
+ else
+ {
+ Assert(VCI_COLUMN_ID_TID == columnId);
+ dstData = (char *) (vTuples->tid);
+ stepDstData = sizeof(vTuples->tid[0]);
+ stepSrc = sizeof(ItemPointerData);
+ }
+
+ {
+ const Datum zero = 0;
+#ifdef WORDS_BIGENDIAN
+
+ if (stepSrc < sizeof(Datum))
+ {
+ /*
+ * if the value itself is contained in Datum. for example uint32 1
+ * is contained in Datum
+ */
+ /*
+ * the value should be 0x0000000000000001 (not 0x0001000000000000)
+ * so offsetCont should be 4
+ */
+ offsetCont = stepDstData - stepSrc;
+ }
+ else
+ {
+ offsetCont = 0;
+ }
+
+#else /* #ifdef WORDS_BIGENDIAN */
+ offsetCont = stepDstData - sizeof(Datum);
+#endif /* #ifdef WORDS_BIGENDIAN */
+
+ for (int sId = 0; sId < rosChunkStorage->numFilled; ++sId)
+ {
+ RosChunkBuffer *chunk = rosChunkStorage->chunk[sId];
+ char *srcPtr = chunk->tidData;
+
+ if (VCI_FIRST_NORMALCOLUMN_ID <= columnId)
+ srcPtr = chunk->data[colIdInVciMain];
+ for (int rId = 0; rId < chunk->numFilled; ++rId)
+ {
+#ifdef WORDS_BIGENDIAN
+ *(Datum *) dstData = zero;
+ MemCpy(&(dstData[offsetCont]), &(srcPtr[stepSrc * rId]), stepSrc);
+#else /* #ifdef WORDS_BIGENDIAN */
+ *(Datum *) &(dstData[offsetCont]) = zero;
+ MemCpy(dstData, &(srcPtr[stepSrc * rId]), stepSrc);
+#endif /* #ifdef WORDS_BIGENDIAN */
+ dstData += stepDstData;
+ }
+ }
+ }
+
+ if (passByRef)
+ {
+ dstData = vTuples->column_info[columnId].area;
+ for (int rId = 0; rId < vTuples->num_rows; ++rId)
+ dstPtr[rId] = PointerGetDatum(&(dstData[stepDstData * rId]));
+ }
+}
+
+/**
+ * @brief Fill data of the specified variable-field-length column in
+ * \c RosChunkStorage into \c vci_virtual_tuples_t.
+ *
+ * @param[in,out] vTuples the pointer of vci_virtual_tuples_t where data are
+ * stored.
+ * @param[in] columnId target column ID.
+ * @param[in] rosChunkStorage data source.
+ */
+void
+vci_FillVariableWidthColumnarFromRosChunkStorage(vci_virtual_tuples_t *vTuples,
+ int16 columnId,
+ RosChunkStorage *rosChunkStorage)
+{
+ int16 colIdInVciMain = VCI_FIRST_NORMALCOLUMN_ID;
+ Datum *dstPtr = NULL;
+ char *dstData = NULL;
+
+ Assert((VCI_FIRST_NORMALCOLUMN_ID <= columnId) && (columnId < vTuples->num_columns));
+ dstData = (char *) &(vTuples->values[vTuples->num_rows_in_extent * columnId]);
+ Assert(vTuples->column_info[columnId].strict_datum_type);
+
+ colIdInVciMain = vTuples->fetch_context->query_context->column_id[vTuples->fetch_context->column_link[columnId]];
+
+ dstPtr = (Datum *) dstData;
+ dstData = vTuples->column_info[columnId].area;
+ Assert(dstData);
+
+ {
+ const Datum zero = 0;
+
+ for (int sId = 0; sId < rosChunkStorage->numFilled; ++sId)
+ {
+ RosChunkBuffer *chunk = rosChunkStorage->chunk[sId];
+
+ Assert(chunk->data[colIdInVciMain]);
+ Assert(chunk->dataOffset[colIdInVciMain]);
+ for (int rId = 0; rId < chunk->numFilled; ++rId)
+ {
+ int size = chunk->dataOffset[colIdInVciMain][rId + 1] - chunk->dataOffset[colIdInVciMain][rId];
+
+ *(Datum *) &(dstData[TYPEALIGN_DOWN(sizeof(Datum), size - 1)]) = zero;
+ MemCpy(dstData, &(chunk->data[colIdInVciMain][chunk->dataOffset[colIdInVciMain][rId]]), size);
+ *dstPtr++ = PointerGetDatum(dstData);
+ dstData += TYPEALIGN(sizeof(Datum), size);
+ }
+ }
+ }
+}
+
+/**
+ * @brief Get column IDs of nullable columns.
+ *
+ * The result is stored in a \c palloc()ed area. Thus, caller should \c pfree()
+ * the result after use.
+ *
+ * @param[in] vTuples target vci_virtual_tuples_t.
+ * @return the pointer of \c (int16 \*) where the result stored.
+ */
+int16 *
+vci_GetNullableColumnIds(vci_virtual_tuples_t *vTuples)
+{
+ vci_CSQueryContext queryContext = vTuples->fetch_context->query_context;
+ int16 *result = palloc0_array(int16, queryContext->num_nullable_columns);
+#ifdef USE_ASSERT_CHECKING
+ int16 cId = 0;
+#endif
+
+ for (int i = 0; i < queryContext->num_nullable_columns; i++)
+ result[i] = -1;
+
+ for (int16 aId = 0; aId < vTuples->num_columns; ++aId)
+ {
+ int bitId = vTuples->column_info[aId].null_bit_id;
+
+ Assert((-1 <= bitId) && (bitId < (int) (queryContext->num_nullable_columns)));
+ if (0 <= bitId)
+ {
+ Assert(-1 == result[bitId]);
+ result[bitId] = aId;
+#ifdef USE_ASSERT_CHECKING
+ ++cId;
+#endif
+ }
+ }
+ Assert(cId <= queryContext->num_nullable_columns);
+
+ return result;
+}
diff --git a/contrib/vci/storage/vci_freelist.c b/contrib/vci/storage/vci_freelist.c
new file mode 100644
index 0000000..f813053
--- /dev/null
+++ b/contrib/vci/storage/vci_freelist.c
@@ -0,0 +1,474 @@
+/*-------------------------------------------------------------------------
+ *
+ * vci_freelist.c
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/vci/storage/vci_freelist.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "vci.h"
+
+#include "vci_freelist.h"
+#include "vci_ros.h"
+#include "vci_columns.h"
+
+static vcis_free_space_t *GetFreeSpaceT(Page page);
+static void UpdatePrevNextFreeSpace(vci_RelationPair *relPair,
+ BlockNumber prevFreeBlockNumber,
+ BlockNumber nextFreeBlockNumber,
+ BlockNumber prev_next,
+ BlockNumber next_prev,
+ vcis_column_meta_t *columnMeta);
+
+/**
+ * function to cast from Page to (vcis_freespace_t *)
+ */
+static vcis_free_space_t *
+GetFreeSpaceT(Page page)
+{
+ HeapTupleHeader htup;
+
+ htup = (HeapTupleHeader) PageGetItem(page, PageGetItemId(page, VCI_FREESPACE_ITEM_ID));
+
+ return (vcis_free_space_t *) ((char *) htup + htup->t_hoff);
+}
+
+vcis_free_space_t *
+vci_GetFreeSpace(vci_RelationPair *relPair, BlockNumber blk)
+{
+ Page page;
+
+ relPair->bufData = vci_ReadBufferWithPageInit(relPair->data, blk);
+ page = BufferGetPage(relPair->bufData);
+
+ return GetFreeSpaceT(page);
+}
+
+static void
+UpdatePrevNextFreeSpace(vci_RelationPair *relPair,
+ BlockNumber prevFreeBlockNumber,
+ BlockNumber nextFreeBlockNumber,
+ BlockNumber prev_next,
+ BlockNumber next_prev,
+ vcis_column_meta_t *columnMeta)
+{
+ /* update link information in previous free space */
+ if (BlockNumberIsValid(prevFreeBlockNumber))
+ {
+ vcis_free_space_t *prevFreePtr = vci_GetFreeSpace(relPair,
+ prevFreeBlockNumber);
+
+ Assert(vci_hasFreeLinkNode(prevFreePtr));
+ LockBuffer(relPair->bufData, BUFFER_LOCK_EXCLUSIVE);
+ prevFreePtr->next_pos = prev_next;
+ vci_WriteOneItemPage(relPair->data, relPair->bufData);
+ UnlockReleaseBuffer(relPair->bufData);
+ }
+ else
+ columnMeta->free_page_begin_id = prev_next;
+
+ /* update link information in next free space */
+ if (BlockNumberIsValid(nextFreeBlockNumber))
+ {
+ vcis_free_space_t *nextFreePtr = vci_GetFreeSpace(relPair,
+ nextFreeBlockNumber);
+
+ Assert(vci_hasFreeLinkNode(nextFreePtr));
+ LockBuffer(relPair->bufData, BUFFER_LOCK_EXCLUSIVE);
+ nextFreePtr->prev_pos = next_prev;
+ vci_WriteOneItemPage(relPair->data, relPair->bufData);
+ UnlockReleaseBuffer(relPair->bufData);
+ }
+ else
+ columnMeta->free_page_end_id = next_prev;
+}
+
+int32
+vci_MakeFreeSpace(vci_RelationPair *relPair,
+ BlockNumber startBlockNumber,
+ BlockNumber *newFSBlockNumber,
+ vcis_free_space_t *newFS,
+ bool coalesce)
+{
+ int numMerged = 0;
+
+ vcis_free_space_t *origSpace;
+ vcis_free_space_t *freeSpace;
+
+ BlockNumber freeSpacePtr;
+
+ /* -- Start Block -- */
+ origSpace = vci_GetFreeSpace(relPair, startBlockNumber);
+ newFS->size = origSpace->size;
+ ReleaseBuffer(relPair->bufData);
+
+ newFS->type = vcis_free_space;
+ newFS->prev_pos = InvalidBlockNumber;
+ newFS->next_pos = InvalidBlockNumber;
+ *newFSBlockNumber = startBlockNumber;
+ numMerged = 1;
+
+ freeSpacePtr = vci_GetColumnMeta(&relPair->bufMeta, relPair->meta)->free_page_begin_id;
+ Assert(BlockNumberIsValid(freeSpacePtr));
+ ReleaseBuffer(relPair->bufMeta);
+
+ while (BlockNumberIsValid(freeSpacePtr))
+ {
+ freeSpace = vci_GetFreeSpace(relPair, freeSpacePtr);
+
+ Assert(freeSpacePtr != startBlockNumber);
+ Assert(!BlockNumberIsValid(freeSpace->next_pos) ||
+ freeSpace->next_pos != startBlockNumber);
+
+ if (startBlockNumber < freeSpacePtr)
+ {
+ newFS->prev_pos = InvalidBlockNumber;
+ newFS->next_pos = freeSpacePtr;
+ ReleaseBuffer(relPair->bufData);
+ break;
+ }
+ else if ((freeSpacePtr < startBlockNumber) &&
+ (startBlockNumber < freeSpace->next_pos))
+ {
+ newFS->prev_pos = freeSpacePtr;
+ newFS->next_pos = freeSpace->next_pos;
+ ReleaseBuffer(relPair->bufData);
+ break;
+ }
+ else if (!BlockNumberIsValid(freeSpace->next_pos))
+ {
+ Assert(freeSpacePtr > startBlockNumber);
+ newFS->prev_pos = freeSpace->prev_pos;
+ newFS->next_pos = freeSpacePtr;
+ ReleaseBuffer(relPair->bufData);
+ break;
+ }
+
+ freeSpacePtr = freeSpace->next_pos;
+ ReleaseBuffer(relPair->bufData);
+ }
+
+ if (coalesce)
+ {
+ if (BlockNumberIsValid(newFS->prev_pos))
+ {
+ freeSpace = vci_GetFreeSpace(relPair, newFS->prev_pos);
+ Assert(vci_hasFreeLinkNode(freeSpace));
+
+ if (newFS->prev_pos + vci_GetNumBlocks(freeSpace->size) ==
+ *newFSBlockNumber)
+ {
+ *newFSBlockNumber = newFS->prev_pos;
+
+ newFS->size += freeSpace->size;
+ newFS->prev_pos = freeSpace->prev_pos;
+
+ numMerged++;
+ elog(DEBUG2, "privious FreeSpace marged ,size %d! ", newFS->size);
+ }
+ ReleaseBuffer(relPair->bufData);
+ }
+
+ if (BlockNumberIsValid(newFS->next_pos))
+ {
+ freeSpace = vci_GetFreeSpace(relPair, newFS->next_pos);
+ Assert(vci_hasFreeLinkNode(freeSpace));
+
+ if (newFS->next_pos ==
+ *newFSBlockNumber + vci_GetNumBlocks(newFS->size))
+ {
+ newFS->size += freeSpace->size;
+ if (freeSpace->size == MaxBlockNumber)
+ newFS->size = MaxBlockNumber;
+ newFS->next_pos = freeSpace->next_pos;
+
+ numMerged++;
+ elog(DEBUG2, "next FreeSpace marged ,size %d! ", newFS->size);
+
+ }
+ ReleaseBuffer(relPair->bufData);
+ }
+ }
+
+ return numMerged;
+}
+
+void
+vci_AppendFreeSpaceToLinkList(vci_RelationPair *relPair,
+ BlockNumber startBlockNumber,
+ BlockNumber prevFreeBlockNumber,
+ BlockNumber nextFreeBlockNumber,
+ BlockNumber size)
+{
+ vcis_column_meta_t *columnMeta;
+ vcis_free_space_t *freeSpace;
+ vcis_extent_type_t type;
+
+ Assert(startBlockNumber != prevFreeBlockNumber);
+ Assert(startBlockNumber != nextFreeBlockNumber);
+
+ columnMeta = vci_GetColumnMeta(&relPair->bufMeta, relPair->meta);
+ LockBuffer(relPair->bufMeta, BUFFER_LOCK_EXCLUSIVE);
+
+ freeSpace = vci_GetFreeSpace(relPair, columnMeta->free_page_end_id);
+ Assert(vci_hasFreeLinkNode(freeSpace));
+ type = freeSpace->type;
+ ReleaseBuffer(relPair->bufData);
+
+ /* rebuild freespace */
+ UpdatePrevNextFreeSpace(relPair, prevFreeBlockNumber, nextFreeBlockNumber,
+ startBlockNumber, startBlockNumber, columnMeta);
+
+ freeSpace = vci_GetFreeSpace(relPair, startBlockNumber);
+ LockBuffer(relPair->bufData, BUFFER_LOCK_EXCLUSIVE);
+
+ freeSpace->prev_pos = prevFreeBlockNumber;
+ freeSpace->next_pos = nextFreeBlockNumber;
+ freeSpace->type = type;
+ freeSpace->size = size;
+
+ vci_WriteOneItemPage(relPair->data, relPair->bufData);
+ UnlockReleaseBuffer(relPair->bufData);
+
+ columnMeta->num_extents -= 1;
+ columnMeta->num_free_pages += vci_GetNumBlocks(size);
+ columnMeta->num_free_page_blocks += 1;
+
+ vci_WriteColumnMetaDataHeader(relPair->meta, relPair->bufMeta);
+ UnlockReleaseBuffer(relPair->bufMeta);
+}
+
+void
+vci_RemoveFreeSpaceFromLinkList(vci_ColumnRelations *relPair,
+ BlockNumber startBlockNumber,
+ BlockNumber numExtentPages)
+{
+ vcis_column_meta_t *columnMeta;
+ vcis_free_space_t *freeSpace = vci_GetFreeSpace(relPair, startBlockNumber);
+ BlockNumber prevFreeBlockNumber = freeSpace->prev_pos;
+ BlockNumber nextFreeBlockNumber = freeSpace->next_pos;
+ uint32 size = freeSpace->size;
+ vcis_extent_type_t type = freeSpace->type;
+
+ BlockNumber next_prev = prevFreeBlockNumber;
+ BlockNumber prev_next = nextFreeBlockNumber;
+
+ BlockNumber numBlocksInCurrentFreeSpace = vci_GetNumBlocks(size);
+
+ Assert(vci_hasFreeLinkNode(freeSpace));
+ ReleaseBuffer(relPair->bufData);
+
+ Assert(numExtentPages <= numBlocksInCurrentFreeSpace);
+
+ columnMeta = vci_GetColumnMeta(&relPair->bufMeta, relPair->meta);
+ LockBuffer(relPair->bufMeta, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * prepare new free space from tail part
+ */
+ if (numExtentPages < numBlocksInCurrentFreeSpace)
+ {
+ vcis_free_space_t *freeSpace_new;
+ BlockNumber newFreeBlockNumber = startBlockNumber + numExtentPages;
+
+ freeSpace_new = vci_GetFreeSpace(relPair, newFreeBlockNumber);
+ freeSpace_new->type = type;
+ freeSpace_new->size = size - (numExtentPages * VCI_MAX_PAGE_SPACE);
+
+ /* it is sentinel */
+ if (numBlocksInCurrentFreeSpace == MaxBlockNumber)
+ {
+ freeSpace_new->size = MaxBlockNumber;
+ columnMeta->num_free_pages += numExtentPages;
+ }
+
+ /* construct new link */
+ freeSpace_new->prev_pos = prevFreeBlockNumber;
+ freeSpace_new->next_pos = nextFreeBlockNumber;
+ prev_next = next_prev = newFreeBlockNumber;
+
+ ++columnMeta->num_free_page_blocks;
+
+ LockBuffer(relPair->bufData, BUFFER_LOCK_EXCLUSIVE);
+ vci_WriteOneItemPage(relPair->data, relPair->bufData);
+ UnlockReleaseBuffer(relPair->bufData);
+ }
+
+ UpdatePrevNextFreeSpace(relPair, prevFreeBlockNumber, nextFreeBlockNumber,
+ prev_next, next_prev, columnMeta);
+
+ ++(columnMeta->num_extents);
+ columnMeta->num_free_pages -= numExtentPages;
+ --(columnMeta->num_free_page_blocks);
+
+ vci_WriteOneItemPage(relPair->meta, relPair->bufMeta);
+ UnlockReleaseBuffer(relPair->bufMeta);
+}
+
+BlockNumber
+vci_FindFreeSpaceForExtent(vci_RelationPair *relPair, BlockNumber requiredSize)
+{
+ vcis_column_meta_t *columnMeta;
+ vcis_free_space_t *freeSpace;
+
+ BlockNumber freeSpacePtr;
+ BlockNumber found = InvalidBlockNumber;
+ bool is_sentinel = false;
+
+ columnMeta = vci_GetColumnMeta(&relPair->bufMeta, relPair->meta);
+ freeSpacePtr = columnMeta->free_page_begin_id;
+
+ while (BlockNumberIsValid(freeSpacePtr))
+ {
+ freeSpace = vci_GetFreeSpace(relPair, freeSpacePtr);
+ Assert(vci_hasFreeLinkNode(freeSpace));
+
+ if (vci_GetNumBlocks(freeSpace->size) >= requiredSize)
+ {
+ found = freeSpacePtr;
+ if (!BlockNumberIsValid(freeSpace->next_pos))
+ is_sentinel = true;
+ ReleaseBuffer(relPair->bufData);
+ break;
+ }
+ freeSpacePtr = freeSpace->next_pos;
+ ReleaseBuffer(relPair->bufData);
+ }
+
+ if (is_sentinel)
+ {
+ /*
+ * vci_AppendNewPages(relPair->data, requiredSize +
+ * columnMeta->free_page_end_id - numRelPages + 1);
+ */
+ int16 numItems;
+
+ relPair->bufData = ReadBuffer(relPair->data, 0);
+ numItems = PageGetMaxOffsetNumber(BufferGetPage(relPair->bufData));
+ ReleaseBuffer(relPair->bufData);
+
+ vci_PreparePagesIfNecessary(relPair->data,
+ requiredSize + columnMeta->free_page_end_id,
+ numItems);
+ }
+
+ ReleaseBuffer(relPair->bufMeta);
+
+ return found;
+}
+
+void
+vci_WriteRecoveryRecordForFreeSpace(vci_RelationPair *relPair,
+ int16 colId,
+ int16 dictId,
+ BlockNumber StartBlockNumber,
+ vcis_free_space_t *FS)
+{
+ vcis_column_meta_t *columnMeta = vci_GetColumnMeta(&relPair->bufMeta, relPair->meta);
+
+ Assert(!BlockNumberIsValid(FS->prev_pos) || FS->prev_pos < StartBlockNumber);
+ Assert(!BlockNumberIsValid(FS->next_pos) || StartBlockNumber < FS->next_pos);
+
+ LockBuffer(relPair->bufMeta, BUFFER_LOCK_EXCLUSIVE);
+
+ columnMeta->new_data_head = StartBlockNumber;
+ columnMeta->free_page_prev_id = FS->prev_pos;
+ columnMeta->free_page_next_id = FS->next_pos;
+ columnMeta->free_page_old_size = FS->size;
+
+ columnMeta->num_extents_old = columnMeta->num_extents;
+ columnMeta->num_free_pages_old = columnMeta->num_free_pages;
+ columnMeta->num_free_page_blocks_old = columnMeta->num_free_page_blocks;
+
+ vci_WriteOneItemPage(relPair->meta, relPair->bufMeta);
+ UnlockReleaseBuffer(relPair->bufMeta);
+
+ vci_SetMainRelVar(relPair->info, vcimrv_working_column_id, 0, colId);
+ vci_SetMainRelVar(relPair->info, vcimrv_working_dictionary_id, 0, dictId);
+ vci_WriteMainRelVar(relPair->info, vci_wmrv_update);
+}
+
+void
+vci_InitRecoveryRecordForFreeSpace(vci_MainRelHeaderInfo *info)
+{
+ vci_SetMainRelVar(info, vcimrv_working_column_id, 0, VCI_INVALID_COLUMN_ID);
+}
+
+void
+vci_RecoveryFreeSpace(vci_MainRelHeaderInfo *info, vci_ros_command_t command)
+{
+ LOCKMODE lockmode = AccessShareLock; /** @todo ? */
+
+ int16 colId;
+ vci_ColumnRelations relPairData;
+ vci_ColumnRelations *relPair = &relPairData;
+ vcis_column_meta_t *columnMeta;
+
+ BlockNumber startBlockNumber;
+ BlockNumber prevFreeBlockNumber;
+ BlockNumber nextFreeBlockNumber;
+ uint32 oldSize;
+
+ int32 extentId;
+
+ /* get last working column */
+ colId = vci_GetMainRelVar(info, vcimrv_working_column_id, 0);
+
+ if (colId != VCI_INVALID_COLUMN_ID)
+ {
+ vci_OpenColumnRelations(relPair, info, colId, lockmode);
+
+ /* get column rel set */
+ columnMeta = vci_GetColumnMeta(&relPair->bufMeta, relPair->meta);
+ LockBuffer(relPair->bufMeta, BUFFER_LOCK_EXCLUSIVE);
+
+ /* restore from old fields */
+ columnMeta->num_extents = columnMeta->num_extents_old;
+ columnMeta->num_free_pages = columnMeta->num_free_pages_old;
+ columnMeta->num_free_page_blocks = columnMeta->num_free_page_blocks_old;
+
+ /* read free link list recovery information */
+ startBlockNumber = columnMeta->new_data_head;
+ prevFreeBlockNumber = columnMeta->free_page_prev_id;
+ nextFreeBlockNumber = columnMeta->free_page_next_id;
+ oldSize = columnMeta->free_page_old_size;
+
+ vci_WriteColumnMetaDataHeader(relPair->meta, relPair->bufMeta);
+ UnlockReleaseBuffer(relPair->bufMeta);
+
+ vci_AppendFreeSpaceToLinkList(relPair, startBlockNumber, prevFreeBlockNumber,
+ nextFreeBlockNumber, oldSize);
+
+ switch (command)
+ {
+ case vci_rc_wos_ros_conv:
+ case vci_rc_collect_deleted:
+ extentId = vci_GetMainRelVar(info, vcimrv_new_extent_id, 0);
+ break;
+ case vci_rc_collect_extent:
+ extentId = vci_GetMainRelVar(info, vcimrv_old_extent_id, 0);
+ break;
+ default:
+ extentId = VCI_INVALID_EXTENT_ID;
+ break;
+ }
+ Assert(extentId != VCI_INVALID_EXTENT_ID);
+
+ vci_WriteRawDataExtentInfo(relPair->meta,
+ extentId,
+ InvalidBlockNumber,
+ 0,
+ NULL, /* min */
+ NULL, /* max */
+ false,
+ false);
+
+ vci_CloseColumnRelations(relPair, lockmode);
+ }
+}
diff --git a/contrib/vci/storage/vci_low_utils.c b/contrib/vci/storage/vci_low_utils.c
new file mode 100644
index 0000000..c65d5c0
--- /dev/null
+++ b/contrib/vci/storage/vci_low_utils.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * vci_low_utils.c
+ * Low-level utility function
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/vci/storage/vci_low_utils.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "utils/snapmgr.h"
+
+#include "vci.h"
+#include "vci_ros.h"
+
+/**
+ * @brief This function writes data over pages.
+ *
+ * The last page is not flushed.
+ * So, after calling for the last data, the last page in the return value
+ * must be written by functions like vci_WriteOnePageIfNecessaryAndGetBuffer().
+ *
+ * @param[in] rel relation to store the data.
+ * @param[in, out] blockNumber the first block number to write as input.
+ * the blockNumber for the next data.
+ * @param[in, out] blockNumberOld the block number of buffer in the argument.
+ * the blockNumber of the buffer returned as output.
+ * @param[in, out] offsetInPage the offset in the page to write as input.
+ * the offset in the page of the next data as output.
+ * @param[in] buffer shared buffer of *blockNumber.
+ * @param[in] data_ the pointer of the data to write.
+ * @param[in] size the size of the data to write.
+ * @return the shared buffer read last, which is not written.
+ */
+Buffer
+vci_WriteDataIntoMultiplePages(Relation rel,
+ BlockNumber *blockNumber,
+ BlockNumber *blockNumberOld,
+ uint32 *offsetInPage,
+ Buffer buffer,
+ const void *data_,
+ Size size)
+{
+ const char *data = (const char *) data_;
+
+ Assert(*offsetInPage < VCI_MAX_PAGE_SPACE);
+ for (Size ptr = 0; ptr < size;)
+ {
+ Page page;
+ uint32 writeSize;
+
+ writeSize = Min(VCI_MAX_PAGE_SPACE - *offsetInPage, size - ptr);
+ buffer = vci_WriteOnePageIfNecessaryAndGetBuffer(rel,
+ *blockNumber,
+ *blockNumberOld,
+ buffer);
+ *blockNumberOld = *blockNumber;
+ page = BufferGetPage(buffer);
+ MemCpy(&(page[VCI_MIN_PAGE_HEADER + *offsetInPage]), &(data[ptr]),
+ writeSize);
+ ptr += writeSize;
+ *offsetInPage += writeSize;
+ if (VCI_MAX_PAGE_SPACE <= *offsetInPage)
+ {
+ ++(*blockNumber);
+ *offsetInPage = 0;
+ }
+ }
+
+ return buffer;
+}
+
+/**
+ * @brief Get active snapshot and push it, update command ID.
+ *
+ * @return active snapshot.
+ */
+Snapshot
+vci_GetCurrentSnapshot(void)
+{
+ PushCopiedSnapshot(GetActiveSnapshot());
+ UpdateActiveSnapshotCommandId();
+
+ return GetActiveSnapshot();
+}
diff --git a/contrib/vci/storage/vci_memory_entry.c b/contrib/vci/storage/vci_memory_entry.c
new file mode 100644
index 0000000..deffc40
--- /dev/null
+++ b/contrib/vci/storage/vci_memory_entry.c
@@ -0,0 +1,907 @@
+/*-------------------------------------------------------------------------
+ *
+ * vci_memory_entry.c
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/vci/storage/vci_memory_entry.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "c.h"
+#include "access/sysattr.h"
+#include "access/xact.h"
+#include "access/reloptions.h"
+#include "catalog/indexing.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_tablespace.h"
+#include "miscadmin.h"
+#include "storage/lock.h"
+#include "storage/lwlock.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/snapmgr.h"
+
+#include "vci.h"
+
+#include "vci_mem.h"
+#include "vci_memory_entry.h"
+
+#define VCI_LOCKTAG_MEMORY_ENTRY LOCKTAG_USERLOCK
+
+static void debug_show_vciid_ts(const char *head, Oid oid, Oid tsid);
+static char *getTablespacePath(Oid tsid);
+static HeapTuple GetDatabaseTupleByOid(Oid dboid);
+static void initializeMemoryEntryCommon(vci_memory_entry_t *entry, vci_id_t *vciid, Oid tsid);
+static void initializeMemoryEntry(vci_memory_entry_t *entry, vci_id_t *vciid, Oid tsid, int32 timeStamp);
+static void resetMemoryEntry(vci_memory_entry_t *entry, vci_id_t *vciid, Oid tsid, int32 timeStamp);
+static int findMemoryEntryLocation(vci_id_t *vciid);
+static void stampOnMemoryEntry(vci_memory_entry_t *entry, vci_memory_entries_t *entries);
+static int determineRoomPosition(Oid oid);
+static void setLockTagPointer(LOCKTAG *locktag, void *ptr);
+static LockAcquireResult lockAcquirePointer(vci_memory_entry_t *entry, LOCKMODE lockmode, bool dontWait);
+static bool lockReleasePointer(vci_memory_entry_t *entry, LOCKMODE lockmode);
+static int makeRoomAndGetLocation(vci_id_t *vciid, Oid tsid);
+static void registerMemoryEntry2Device(vci_memory_entry_t *entry);
+
+/**
+ * output debug log
+ * @param[in] head the string in the head of a log
+ * @param[in] oid vci oid
+ * @param[in] tsid tablespace oid
+ * @param[in] path path of tablespace
+ */
+static void
+debug_show_vciid_ts(const char *head, Oid oid, Oid tsid)
+{
+ bool free_flag = true;
+ char *path = getTablespacePath(tsid);
+
+ if (path == NULL)
+ {
+ path = DataDir;
+ free_flag = false;
+ }
+ elog(DEBUG2, "%s entry oid=%u tsid=%u (%s)", head, oid, tsid, path);
+
+ if (free_flag)
+ pfree(path);
+}
+
+#define VCI_HASH_WIDTH 65536
+
+Size
+vci_GetSizeOfMemoryEntries(void)
+{
+ Size result;
+ uint32 capacity;
+
+ /* LCOV_EXCL_START */
+ capacity = Max(VCI_HASH_WIDTH / NUM_BUFFER_PARTITIONS, 1);
+ /* LCOV_EXCL_STOP */
+
+ result = offsetof(vci_memory_entries_t, data) +
+ sizeof(vci_memory_entry_t) * capacity;
+
+ return result;
+}
+
+/**
+ * @brief Initialize the area for VCI memory objects residing with PostgreSQL
+ * instance, and the hash table area.
+ */
+void
+vci_InitMemoryEntries(void)
+{
+ uint32 capacity;
+ vci_id_t vciid;
+
+ elog(DEBUG2, "vci_InitMemoryEntries");
+ dlist_init(&VciShmemAddr->memory_entry_device_unknown_list);
+
+ /* LCOV_EXCL_START */
+ capacity = Max(VCI_HASH_WIDTH / NUM_BUFFER_PARTITIONS, 1);
+ elog(DEBUG2, ">>> capacity = %d", capacity);
+ /* LCOV_EXCL_STOP */
+
+ VciShmemAddr->memory_entries->capacity_hash_entries = capacity;
+
+ VciShmemAddr->memory_entries->lock = VciShmemAddr->vci_memory_entries_lock;
+
+ vciid.oid = InvalidOid;
+ vciid.dbid = InvalidOid;
+
+ for (int i = 0; i < capacity; i++)
+ initializeMemoryEntryCommon(&VciShmemAddr->memory_entries->data[i], &vciid, InvalidOid);
+}
+
+static char *
+getTablespacePath(Oid tsid)
+{
+ char *tablespace_path;
+
+ Assert(OidIsValid(tsid));
+
+ if (tsid == DEFAULTTABLESPACE_OID ||
+ tsid == GLOBALTABLESPACE_OID)
+ {
+ tablespace_path = NULL;
+ }
+ else
+ {
+ LOCAL_FCINFO(ci, 1);
+
+ ci->nargs = 1;
+ ci->args[0].value = ObjectIdGetDatum(tsid);
+ ci->args[0].isnull = false;
+
+ tablespace_path = text_to_cstring(DatumGetTextP(pg_tablespace_location(ci)));
+ }
+
+ return tablespace_path;
+}
+
+/*
+ * This function is a static function in src/backend/utils/init/postinit.c in
+ * PostgreSQL
+ *
+ * @param[in] dboid oid of a database
+ */
+static HeapTuple
+GetDatabaseTupleByOid(Oid dboid)
+{
+ HeapTuple tuple;
+ Relation relation;
+ SysScanDesc scan;
+ ScanKeyData key[1];
+
+ /*
+ * form a scan key
+ */
+ ScanKeyInit(&key[0],
+ Anum_pg_class_oid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(dboid));
+
+ /*
+ * Open pg_database and fetch a tuple. Force heap scan if we haven't yet
+ * built the critical shared relcache entries (i.e., we're starting up
+ * without a shared relcache cache file).
+ */
+ relation = table_open(DatabaseRelationId, AccessShareLock);
+ scan = systable_beginscan(relation, DatabaseOidIndexId,
+ criticalSharedRelcachesBuilt,
+ NULL,
+ 1, key);
+
+ tuple = systable_getnext(scan);
+
+ /* Must copy tuple before releasing buffer */
+ if (HeapTupleIsValid(tuple))
+ tuple = heap_copytuple(tuple);
+
+ /* all done */
+ systable_endscan(scan);
+ table_close(relation, AccessShareLock);
+
+ return tuple;
+}
+
+/**
+ * DefaultTablespaceOid for dbid.
+ * This function must be called in a transaction
+ *
+ * @param[in] dboid oid of a database
+ *
+ * @return Oid of default tablespace
+ */
+static Oid
+getDefaultTablespaceOid(Oid dboid)
+{
+ HeapTuple tuple;
+ Form_pg_database dbform;
+
+ tuple = GetDatabaseTupleByOid(dboid);
+ if (!HeapTupleIsValid(tuple))
+ {
+ elog(DEBUG2,
+ "database %u does not exist", dboid);
+ return InvalidOid;
+ }
+ dbform = (Form_pg_database) GETSTRUCT(tuple);
+
+ return dbform->dattablespace;
+}
+
+/**
+ * Initialize memory entry.
+ * If a valid vci oid is given, this memory entry is registered to the list in vci_devload_t
+ * for WOS->ROS transformation.
+ *
+ * @param[in] entry memory entry to be initialized
+ * @param[in] vciid id of vci
+ * @param[in] tsid tablespace oid
+ */
+static void
+initializeMemoryEntryCommon(vci_memory_entry_t *entry, vci_id_t *vciid, Oid tsid)
+{
+ elog(DEBUG2, "initializeMemoryEntryCommon: vciid->oid: %d, vciid->dbid: %d, tsid: %d)", vciid->oid, vciid->dbid, tsid);
+
+ entry->id.oid = vciid->oid;
+ entry->id.dbid = vciid->dbid;
+ entry->tsid = tsid;
+ if (!OidIsValid(tsid) && OidIsValid(vciid->dbid))
+ {
+ entry->real_tsid = getDefaultTablespaceOid(vciid->dbid);
+ if (!OidIsValid(entry->real_tsid))
+ ereport(FATAL,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database %u does not exist", vciid->dbid)));
+ }
+ else
+ entry->real_tsid = tsid;
+
+ if (OidIsValid(vciid->oid))
+ {
+ dlist_push_head(&VciShmemAddr->memory_entry_device_unknown_list, &(entry->link));
+
+ entry->force_next_wosros_conv = false;
+ }
+}
+
+/**
+ * Initialize memory entry
+ *
+ * @param[in] entry memory entry to be initialized
+ * @param[in] vciid id of vci (a pair of oid of vci and oid of database)
+ * @param[in] tsid oid of tablespace
+ * @param[in] timeStamp timestamp
+ */
+static void
+initializeMemoryEntry(vci_memory_entry_t *entry, vci_id_t *vciid, Oid tsid, int32 timeStamp)
+{
+ elog(DEBUG2, "initializeMemoryEntry: vciid->oid: %d, vciid->dbid: %d, tsid: %d)", vciid->oid, vciid->dbid, tsid);
+
+ if (OidIsValid(entry->id.oid))
+ {
+ /* clean up previous registration. */
+ dlist_delete(&entry->link);
+ }
+
+ initializeMemoryEntryCommon(entry, vciid, tsid);
+
+ entry->time_stamp = timeStamp;
+}
+
+/**
+ * reset memory entry
+ *
+ * @param[in] entry memory entry to be initialized
+ * @param[in] vciid id of vci (a pair of oid of vci and oid of database)
+ * @param[in] tsid oid of tablespace
+ * @param[in] timeStamp timestamp
+ */
+static void
+resetMemoryEntry(vci_memory_entry_t *entry, vci_id_t *vciid, Oid tsid, int32 timeStamp)
+{
+ elog(DEBUG2, "reset memory object of OID %d", vciid->oid);
+
+ initializeMemoryEntry(entry, vciid, tsid, timeStamp);
+}
+
+#define MAKE_ROOM_MAX_SCAN_SPAN (128)
+#define MAKE_ROOM_SCAN_SPAN (8)
+#define MAKE_ROOM_THRESHOLD (0x10000000)
+
+/**
+ * free memory entry
+ *
+ * @param[in] vciid id of vci whose memory entry is freed
+ */
+void
+vci_freeMemoryEntry(vci_id_t *vciid)
+{
+ int index;
+ vci_memory_entries_t *entries = VciShmemAddr->memory_entries;
+ vci_id_t invalid_vciid;
+
+ LWLockAcquire(entries->lock, LW_EXCLUSIVE);
+
+ index = findMemoryEntryLocation(vciid);
+
+ invalid_vciid.oid = InvalidOid;
+ invalid_vciid.dbid = InvalidOid;
+ if (index != -1)
+ initializeMemoryEntry(&entries->data[index], &invalid_vciid, InvalidOid, entries->time_stamp);
+ LWLockRelease(entries->lock);
+
+ return;
+}
+
+/**
+ * @brief This function returns the position in vci_memory_entries_t.data.
+ *
+ * If some data[ptr].oid has the given oid or InvalidOid, return ptr.
+ * In other case, return -1, meaning no room for the OID.
+ *
+ * @param[in] vciid id of vci, information of which are stored in the found
+ * memory entry.
+ * @return The index to the found object.
+ *
+ * @note This function must be called under the lock of the
+ * vci_memory_entries_t is acquired exclusively. Since this function does not
+ * acquire any lock on the entry, user must lock the entry and check if it has
+ * the correct OID.
+ */
+static int
+findMemoryEntryLocation(vci_id_t *vciid)
+{
+ vci_memory_entries_t *entries = VciShmemAddr->memory_entries;
+ int capacity = entries->capacity_hash_entries;
+ int ptr = vciid->oid % capacity;
+ int ptr_candidate = -1;
+ int aIdMax = Min(capacity, MAKE_ROOM_MAX_SCAN_SPAN);
+
+ for (int aId = 0; aId < aIdMax; ++aId)
+ {
+ if (entries->data[ptr].id.oid == vciid->oid && entries->data[ptr].id.dbid == vciid->dbid)
+ {
+ return ptr;
+ }
+ else if (ptr_candidate == -1 && !OidIsValid(entries->data[ptr].id.oid))
+ {
+ ptr_candidate = ptr;
+ }
+ ptr = (ptr == (capacity - 1)) ? 0 : (1 + ptr);
+ }
+
+ return ptr_candidate;
+}
+
+/**
+ * set timestamp on memory entry
+ *
+ * @param[in] entry memory entry whose timestamp are set
+ * @param[in] entries memory entries in which set timestamp exists
+ */
+static void
+stampOnMemoryEntry(vci_memory_entry_t *entry, vci_memory_entries_t *entries)
+{
+ entry->time_stamp = entries->time_stamp;
+}
+
+/**
+ * This function is called when no room is available in the hash table.
+ * It finds a position to store information for oid. First, it
+ * calculates the hash value (HV) by oid % capacity. Then, look into the
+ * range [HV, HV + MAKE_ROOM_MAX_SCAN_SPAN - 1]. If there are entries, whose
+ * timestamps are older by MAKE_ROOM_THRESHOLD from the currrent, the oldest
+ * one is selected as the result of this function. Otherwise, it repeats the
+ * sequence in the next range,
+ * [HV + MAKE_ROOM_MAX_SCAN_SPAN, HV + 2 * MAKE_ROOM_MAX_SCAN_SPAN - 1], and
+ * so on. If there is no such entry, then it returns HV itself.
+ *
+ * @param[in] oid Oid of VCI main relation, for which the hash table entry is
+ * allocated.
+ *
+ * @return position ID of hash table entry.
+ */
+static int
+determineRoomPosition(Oid oid)
+{
+ vci_memory_entries_t *entries = VciShmemAddr->memory_entries;
+ int capacity = entries->capacity_hash_entries;
+ int32 currentStamp = entries->time_stamp;
+ vci_memory_entry_t *data = entries->data;
+ int ptr = oid % capacity;
+ int maxPtr = ptr;
+ uint32 maxDiff = 0;
+ int aIdMax = Min(capacity, MAKE_ROOM_MAX_SCAN_SPAN);
+
+ for (int aId = 0; aId < aIdMax; aId += MAKE_ROOM_SCAN_SPAN)
+ {
+ int bIdMax = Min(capacity, aId + MAKE_ROOM_SCAN_SPAN);
+
+ for (int bId = aId; bId < bIdMax; ++bId)
+ {
+ uint32 newDiff = currentStamp - data[ptr].time_stamp;
+
+ if (maxDiff < newDiff)
+ {
+ maxDiff = newDiff;
+ maxPtr = ptr;
+ }
+ ptr = (ptr == (capacity - 1)) ? 0 : (1 + ptr);
+ }
+
+ if (MAKE_ROOM_THRESHOLD <= maxDiff)
+ break;
+ }
+
+ elog(DEBUG2,
+ "discard OID %d at position %d with time stamp difference %d"
+ " for OID %d under capacity %d",
+ data[ptr].id.oid, ptr, maxDiff, oid, capacity);
+
+ return maxPtr;
+}
+
+static void
+setLockTagPointer(LOCKTAG *locktag, void *ptr)
+{
+ locktag->locktag_field1 = ((uintptr_t) ptr) & 0xFFFFFFFF;
+ locktag->locktag_field2 = 0;
+ locktag->locktag_field3 = (((uintptr_t) ptr) >> 32) & 0xFFFFFFFF;
+ locktag->locktag_field4 = 0;
+ locktag->locktag_type = VCI_LOCKTAG_MEMORY_ENTRY;
+ locktag->locktag_lockmethodid = DEFAULT_LOCKMETHOD;
+}
+
+static LockAcquireResult
+lockAcquirePointer(vci_memory_entry_t *entry, LOCKMODE lockmode, bool dontWait)
+{
+ LOCKTAG locktag;
+
+ Assert(entry);
+
+ setLockTagPointer(&locktag, entry);
+
+ return LockAcquire(&locktag, lockmode, false, dontWait);
+}
+
+static bool
+lockReleasePointer(vci_memory_entry_t *entry, LOCKMODE lockmode)
+{
+ LOCKTAG locktag;
+
+ setLockTagPointer(&locktag, entry);
+
+ return LockRelease(&locktag, lockmode, false);
+}
+
+/**
+ * @brief This function removes idle entry of vci_memory_entry_t
+ * in vci_memory_entries_t, resets the area, and return the index.
+ *
+ * @param[in] vciid id of vci whose information is stored in the determined
+ * memory entry.
+ * @param[in] tsid tablespace oid
+ * @return The index to the object acquired and resetted.
+ *
+ * @note This function must be called under the lock of the
+ * vci_memory_entries_t is acquired exclusively.
+ */
+static int
+makeRoomAndGetLocation(vci_id_t *vciid, Oid tsid)
+{
+ vci_memory_entries_t *entries = VciShmemAddr->memory_entries;
+ vci_memory_entry_t *entry = NULL;
+ int result = -1;
+
+ do
+ {
+ result = determineRoomPosition(vciid->oid);
+ Assert(0 <= result);
+ entry = &(entries->data[result]);
+
+ /* Test if the entry is really free. */
+ switch (lockAcquirePointer(entry, AccessExclusiveLock, true /* don't wait */ ))
+ {
+ case LOCKACQUIRE_OK:
+ /* It is free. */
+ break;
+
+ case LOCKACQUIRE_NOT_AVAIL:
+
+ /*
+ * The lock should be taken in shared mode. We have to search
+ * another entry. To skip current entry, we stamp it.
+ */
+ stampOnMemoryEntry(entry, entries);
+ result = -1;
+ break;
+
+ /* LCOV_EXCL_START */
+ case LOCKACQUIRE_ALREADY_HELD:
+ /* NEVER COME HERE */
+ ereport(ERROR,
+ (errmsg("duplicate lock detected"),
+ errhint("Disable VCI by 'SELECT vci_disable();'")));
+ break;
+
+ default:
+ /* NEVER COME HERE */
+ ereport(ERROR,
+ (errmsg("undefined lock state"),
+ errhint("Disable VCI by 'SELECT vci_disable();'")));
+ /* LCOV_EXCL_STOP */
+ }
+ } while (result < 0);
+
+ Assert(NULL != entry);
+
+ resetMemoryEntry(entry, vciid, tsid, entries->time_stamp);
+
+ /* LCOV_EXCL_START */
+ if (!lockReleasePointer(entry, AccessExclusiveLock))
+ {
+ /* NEVER COME HERE */
+ ereport(ERROR,
+ (errmsg("undefined lock state"),
+ errhint("Disable VCI by 'SELECT vci_disable();'")));
+ }
+ /* LCOV_EXCL_STOP */
+
+ return result;
+}
+
+/**
+ * get memory entry which corresponds to vciid
+ *
+ * @param[in] vciid vci_id_t identifying vci
+ * @param[in] tsid oid of tablespace
+ *
+ * @return memory entry which corresponds to vciid
+ */
+static vci_memory_entry_t *
+vci_GetMemoryEntry(vci_id_t *vciid, Oid tsid)
+{
+ vci_memory_entries_t *entries = VciShmemAddr->memory_entries;
+ vci_memory_entry_t *entry = NULL;
+ LockAcquireResult lockResult;
+ int ptr;
+
+ LWLockAcquire(entries->lock, LW_SHARED);
+
+ entries->time_stamp++;
+
+retry:
+ ptr = findMemoryEntryLocation(vciid);
+
+ if (ptr == -1)
+ {
+ LWLockRelease(entries->lock);
+ ptr = makeRoomAndGetLocation(vciid, tsid);
+ LWLockAcquire(entries->lock, LW_EXCLUSIVE);
+ }
+
+ entry = &entries->data[ptr];
+
+ if (!OidIsValid(entry->id.oid))
+ initializeMemoryEntry(entry, vciid, tsid, entries->time_stamp);
+
+ LWLockRelease(entries->lock);
+
+ lockResult = lockAcquirePointer(entry, AccessShareLock, false /* wait */ );
+
+ if (lockResult == LOCKACQUIRE_OK)
+ {
+ if (entry->id.oid == vciid->oid)
+ return entry;
+ else
+ lockReleasePointer(entry, AccessShareLock);
+ }
+
+ LWLockAcquire(entries->lock, LW_SHARED);
+
+ goto retry;
+}
+
+/**
+ * @brief release memory entry
+ *
+ * @param[out] entry The memory entry to be released
+ */
+static void
+vci_ReleaseMemoryEntry(vci_memory_entry_t *entry)
+{
+ lockReleasePointer(entry, AccessShareLock);
+}
+
+static void
+registerMemoryEntry2Device(vci_memory_entry_t *entry)
+{
+ vci_devload_t *dload;
+ const char *devname;
+ char *tablespace_path;
+ bool free_flag = true;
+
+ elog(DEBUG2, "registerMemoryEntry2Device");
+
+ tablespace_path = getTablespacePath(entry->real_tsid);
+ if (tablespace_path == NULL)
+ {
+ tablespace_path = DataDir;
+ free_flag = false;
+ }
+
+ /* OSS always uses "unmonitored" device */
+ devname = VCI_PSEUDO_UNMONITORED_DEVICE;
+ elog(DEBUG2, "vci oid %u tablespace(%s) is on a device (%s)", entry->id.oid, tablespace_path, devname);
+
+ if (free_flag)
+ pfree(tablespace_path);
+
+ Assert(VciShmemAddr->num_devload_info == 1);
+ dload = &(VciShmemAddr->devload_array[0]);
+ Assert(dload != NULL);
+ Assert(strcmp(dload->devname, VCI_PSEUDO_UNMONITORED_DEVICE) == 0);
+
+ dlist_push_head(&(dload->memory_entry_queue->head), &(entry->link));
+}
+
+/**
+ * @return the index to be checked
+ *
+ * XXX: Consider just removing this function, because for OSS it only returns 0.
+ */
+static int
+get_new_checked_device_index(int index)
+{
+ /*
+ * For OSS the num_devload_info is hardwired as 1, so this function can
+ * only return an index of 0. ([0] is the "unmonitored" device)
+ */
+ Assert(VciShmemAddr->num_devload_info == 1);
+ Assert(index == 0);
+
+ return index;
+}
+
+/**
+ * @param[out] vciid id on which WOS->ROS conversion should be done
+ *
+ * @return true if VCI for transformation is found, false otherwise.
+ */
+bool
+vci_GetWosRosConvertingVCI(vci_wosros_conv_worker_arg_t *vciinfo)
+{
+ int index;
+ int head_index;
+ vci_memory_entry_t *entry;
+ bool found = false;
+ bool check_started = false;
+ vci_devload_t *dl;
+
+ index = VciShmemAddr->translated_dev_pos;
+
+ head_index = index;
+
+ elog(DEBUG2, ">>> index=%d, head_index=%d, check_started=%d, found=%d", index, head_index, check_started, found);
+ dl = &(VciShmemAddr->devload_array[index]);
+ while ((index != head_index || !check_started)
+ && !found)
+ {
+ check_started = true;
+
+ if (dl->memory_entry_pos == NULL)
+ {
+ elog(LOG, "wos->ros translation: skip translation on device [%s] because no memory entry", dl->devname);
+ index = get_new_checked_device_index(index);
+
+ elog(LOG, ">>> vci_GetWosRosConvertingVCI: index=%d, num_devload_info=%d", index, VciShmemAddr->num_devload_info);
+ dl = &(VciShmemAddr->devload_array[index]);
+ }
+ else
+ found = true;
+
+ elog(DEBUG2, ">>> index=%d, head_index=%d, check_started=%d, found=%d", index, head_index, check_started, found);
+ }
+
+ if (!found)
+ {
+ elog(LOG, "wos->ros translation: no vci is found for translation");
+
+ }
+ else
+ {
+ dlist_node *ret;
+ dlist_head *memory_entry_queue;
+
+ elog(LOG, "dev info: [%s] ", VciShmemAddr->devload_array[index].devname);
+
+ memory_entry_queue = &(dl->memory_entry_queue->head);
+
+ Assert(dl->memory_entry_pos != NULL);
+
+ ret = dl->memory_entry_pos;
+ if (dlist_has_next(memory_entry_queue, ret))
+ dl->memory_entry_pos = dlist_next_node(memory_entry_queue, ret);
+ else
+ dl->memory_entry_pos = NULL;
+
+ VciShmemAddr->translated_dev_pos = get_new_checked_device_index(index);
+
+ entry = dlist_container(vci_memory_entry_t, link, ret);
+
+ vciinfo->dbid = entry->id.dbid;
+ vciinfo->oid = entry->id.oid;
+ vciinfo->force_next_wosros_conv = entry->force_next_wosros_conv;
+
+ elog(LOG, "wos->ros conversion on device (%s): vci oid=%u dbid=%u", dl->devname, vciinfo->oid, vciinfo->dbid);
+ }
+
+ return found;
+}
+
+/**
+ * update a timestamp or newly create a memoryentry for a vci.
+ *
+ * @param[in] vciid id of a vci index
+ * @param[in] tsid Oid of tablespace of vci index identified by the first argument'oid'
+ */
+void
+vci_TouchMemoryEntry(vci_id_t *vciid, Oid tsid)
+{
+ vci_memory_entry_t *entry;
+
+ entry = vci_GetMemoryEntry(vciid, tsid);
+ entry->time_stamp = VciShmemAddr->memory_entries->time_stamp;
+ vci_ReleaseMemoryEntry(entry);
+}
+
+void
+vci_update_memoryentry_in_devloadinfo(void)
+{
+ elog(DEBUG2, "vci_update_memoryentry_in_devloadinfo: start");
+
+ /* dlist_mutable_iter miter; */
+
+ LWLockAcquire(VciShmemAddr->memory_entries->lock, LW_EXCLUSIVE);
+ while (!dlist_is_empty(&VciShmemAddr->memory_entry_device_unknown_list))
+ {
+ dlist_node *tmp; /* vci_memory_entry_t */
+ vci_memory_entry_t *entry;
+
+ elog(DEBUG2, ">>> vci_update_memoryentry_in_devloadinfo: in loop");
+ tmp = dlist_pop_head_node(&VciShmemAddr->memory_entry_device_unknown_list);
+ entry = dlist_container(vci_memory_entry_t, link, tmp);
+ Assert(OidIsValid(entry->id.dbid));
+
+#if 1
+ debug_show_vciid_ts("ros extract one: ", entry->id.oid, entry->real_tsid);
+#endif
+
+ registerMemoryEntry2Device(entry);
+ }
+ LWLockRelease(VciShmemAddr->memory_entries->lock);
+}
+
+void
+vci_ResetDevloadCurrentPos(void)
+{
+ vci_devload_t *item;
+
+ elog(DEBUG2, "vci_ResetDevloadCurrentPos: start; VciShmemAddr->num_devload_info is %d", VciShmemAddr->num_devload_info);
+ for (int i = 0; i < VciShmemAddr->num_devload_info; i++)
+ {
+ dlist_head *memory_entry_queue = &(VciShmemAddr->devload_array[i].memory_entry_queue->head);
+
+ item = &(VciShmemAddr->devload_array[i]);
+
+ if (dlist_is_empty(memory_entry_queue))
+ item->memory_entry_pos = NULL;
+ else
+ item->memory_entry_pos = dlist_head_node(memory_entry_queue);
+
+ }
+}
+
+void
+vci_MoveTranslatedVCI2Tail(void)
+{
+ for (int i = 0; i < VciShmemAddr->num_devload_info; i++)
+ {
+ vci_devload_t *dl;
+
+ dl = &(VciShmemAddr->devload_array[i]);
+
+ {
+ dlist_head *memory_entry_queue = &(dl->memory_entry_queue->head);
+
+ if (dl->memory_entry_pos != NULL
+ && dl->memory_entry_pos != dlist_head_node(memory_entry_queue))
+ {
+ while (dlist_head_node(memory_entry_queue) != dl->memory_entry_pos)
+ {
+ dlist_node *n;
+
+ n = dlist_pop_head_node(memory_entry_queue);
+ dlist_push_tail(memory_entry_queue, n);
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Check VCI' database is exists, and
+ * remove memory entory on dropped database.
+ */
+void
+vci_RemoveMemoryEntryOnDroppedDatabase(void)
+{
+ vci_id_t invalid_vciid;
+ int32 time_stamp;
+
+ invalid_vciid.oid = InvalidOid;
+ invalid_vciid.dbid = InvalidOid;
+ time_stamp = VciShmemAddr->memory_entries->time_stamp;
+
+ /* start transaction */
+ SetCurrentStatementStartTimestamp();
+ StartTransactionCommand();
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ for (int i = 0; i < VciShmemAddr->num_devload_info; i++)
+ {
+ vci_devload_t *devload;
+ dlist_head *memory_entry_queue;
+ dlist_node *node;
+ dlist_node *next_node;
+
+ devload = &(VciShmemAddr->devload_array[i]);
+ memory_entry_queue = &(devload->memory_entry_queue->head);
+
+ if (dlist_is_empty(memory_entry_queue))
+ node = NULL;
+ else
+ node = dlist_head_node(memory_entry_queue);
+
+ while (node != NULL)
+ {
+ vci_memory_entry_t *entry;
+ HeapTuple tuple;
+
+ if (dlist_has_next(memory_entry_queue, node))
+ next_node = dlist_next_node(memory_entry_queue, node);
+ else
+ next_node = NULL;
+
+ entry = dlist_container(vci_memory_entry_t, link, node);
+ tuple = GetDatabaseTupleByOid(entry->id.dbid);
+ if (!HeapTupleIsValid(tuple))
+ {
+ elog(DEBUG2,
+ "vci %d was dropped by DROP DATABASE", entry->id.oid);
+
+ initializeMemoryEntry(entry, &invalid_vciid, InvalidOid, time_stamp);
+ }
+
+ node = next_node;
+ }
+
+ if (dlist_is_empty(memory_entry_queue))
+ devload->memory_entry_pos = NULL;
+ else
+ devload->memory_entry_pos = dlist_head_node(memory_entry_queue);
+ }
+
+ /* close transaction */
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+}
+
+/**
+ * Set the flag to force WOS->ROS conversion next time.
+ * @param[in] vciid id of vci
+ * @param[in] value flag
+ */
+void
+vci_SetForceNextWosRosConvFlag(vci_id_t *vciid, bool value)
+{
+ int index;
+ vci_memory_entries_t *entries = VciShmemAddr->memory_entries;
+
+ LWLockAcquire(entries->lock, LW_EXCLUSIVE);
+
+ index = findMemoryEntryLocation(vciid);
+ if (index != -1)
+ entries->data[index].force_next_wosros_conv = value;
+
+ LWLockRelease(entries->lock);
+}
diff --git a/contrib/vci/storage/vci_xact.c b/contrib/vci/storage/vci_xact.c
new file mode 100644
index 0000000..fe46f29
--- /dev/null
+++ b/contrib/vci/storage/vci_xact.c
@@ -0,0 +1,146 @@
+/*-------------------------------------------------------------------------
+ *
+ * vci_xact.c
+ * Transaction control
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/vci/storage/vci_xact.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "access/xact.h"
+#include "storage/lmgr.h"
+#include "storage/procarray.h"
+#include "miscadmin.h"
+
+#include "vci.h"
+#include "vci_ros.h"
+#include "vci_xact.h"
+
+/*
+ * Returns the status of the transaction
+ */
+enum vci_xact_status_kind
+vci_transaction_get_type(TransactionId xid)
+{
+ /*
+ * XXX please preserve the ordering.
+ *
+ * TransactionIdDidCommit() and TransactionIdDidAbort() can be used when
+ * TransactionIdIsInProgress() is false
+ */
+ if (!TransactionIdIsValid(xid))
+ return VCI_XACT_INVALID;
+ else if (xid == FrozenTransactionId)
+ return VCI_XACT_DID_COMMIT;
+ else if (TransactionIdIsCurrentTransactionId(xid))
+ return VCI_XACT_SELF;
+ else if (TransactionIdIsInProgress(xid))
+ return VCI_XACT_IN_PROGRESS;
+ else if (TransactionIdDidCommit(xid))
+ return VCI_XACT_DID_COMMIT;
+ else if (TransactionIdDidAbort(xid))
+ return VCI_XACT_DID_ABORT;
+ else
+ return VCI_XACT_DID_CRASH;
+}
+
+/*==========================================================================*/
+/* Extended transaction ID generations */
+/*==========================================================================*/
+
+/*
+ * WOS extends transaction IDs to 64bit, called "xid64". The upper 32bit is
+ * same as normal xid, and the lower 32bit represents the "generation" of the
+ * transaction.
+ *
+ * The generation can be advanced when the 1/4 32-bit transaction has been
+ * advanced. Ideally it can be done when whole of them are consumed, but
+ * current approach is simpler.
+ *
+ * vcimrv_xid_generation of the main relation stores the generation, and
+ * vcimrv_xid_gen_update_xid stores 32-bit xid when the generation is lastly
+ * advanced. E.g., either of XID which happened CREATE XID or VACUUM.
+ *
+ * While doing VACUUM, the upper 2 bit of vcimrv_xid_gen_update_xid and current
+ * xid would be compared, and the generation would be advanced if they are
+ * different.
+ *
+ * When the index is created, the generation is 1. If older transaction than
+ * doing CREATE INDEX refers to the index (can happen if the isolation level is
+ * READ COMMITTED), their generation would be 0.
+ */
+
+/* The length of shift used for advancing generations */
+static const int xid_shift_bits = 30;
+
+/*
+ * Returns extended xid based on the given one.
+ *
+ * @param[in] target_xid 32-bit xid
+ * @param[in] info information of the main relation
+ * @return 64-bit xid
+ */
+int64
+vci_GenerateXid64(TransactionId target_xid, vci_MainRelHeaderInfo *info)
+{
+ uint64 xid_gen;
+ TransactionId base_xid;
+ uint32 base_xid_upper_bits;
+ uint32 target_xid_upper_bits;
+ int32 diff;
+
+ xid_gen = (uint64) vci_GetMainRelVar(info, vcimrv_xid_generation, 0);
+ base_xid = vci_GetMainRelVar(info, vcimrv_xid_gen_update_xid, 0);
+
+ base_xid_upper_bits = ((uint32) base_xid) >> xid_shift_bits;
+ target_xid_upper_bits = ((uint32) target_xid) >> xid_shift_bits;
+
+ diff = (target_xid_upper_bits - base_xid_upper_bits) << xid_shift_bits;
+
+ return (int64) (((xid_gen + (diff >> xid_shift_bits)) << 32) | (uint64) target_xid);
+}
+
+/*
+ * Updates the generation based on the current transaction ID.
+ *
+ * This can be called only from VACUUM, and won't be rolled back.
+ */
+void
+vci_UpdateXidGeneration(vci_MainRelHeaderInfo *info)
+{
+ TransactionId cur_xid;
+ uint32 xid_gen;
+ uint32 base_xid;
+ uint32 cur_xid_upper_bits;
+ uint32 base_xid_upper_bits;
+ int32 diff;
+
+ cur_xid = GetCurrentTransactionId();
+
+ xid_gen = (uint32) vci_GetMainRelVar(info, vcimrv_xid_generation, 0);
+ base_xid = vci_GetMainRelVar(info, vcimrv_xid_gen_update_xid, 0);
+
+ base_xid_upper_bits = ((uint32) base_xid) >> xid_shift_bits;
+ cur_xid_upper_bits = ((uint32) cur_xid) >> xid_shift_bits;
+
+ diff = (cur_xid_upper_bits - base_xid_upper_bits) << xid_shift_bits;
+
+ if (diff != 0)
+ {
+ LockRelation(info->rel, AccessExclusiveLock);
+
+ vci_SetMainRelVar(info, vcimrv_xid_generation, 0, xid_gen + (diff >> xid_shift_bits));
+ vci_SetMainRelVar(info, vcimrv_xid_gen_update_xid, 0, cur_xid);
+
+ vci_WriteMainRelVar(info, vci_wmrv_update);
+
+ UnlockRelation(info->rel, AccessExclusiveLock);
+ }
+}
--
1.8.3.1