pgoutput.c.gcov
application/octet-stream
Filename: pgoutput.c.gcov
Type: application/octet-stream
Part: 0
Message:
RE: Question for coverage report
-: 0:Source:pgoutput.c
-: 0:Graph:./pgoutput.gcno
-: 0:Data:./pgoutput.gcda
-: 0:Runs:536
-: 1:/*-------------------------------------------------------------------------
-: 2: *
-: 3: * pgoutput.c
-: 4: * Logical Replication output plugin
-: 5: *
-: 6: * Copyright (c) 2012-2025, PostgreSQL Global Development Group
-: 7: *
-: 8: * IDENTIFICATION
-: 9: * src/backend/replication/pgoutput/pgoutput.c
-: 10: *
-: 11: *-------------------------------------------------------------------------
-: 12: */
-: 13:#include "postgres.h"
-: 14:
-: 15:#include "access/tupconvert.h"
-: 16:#include "catalog/partition.h"
-: 17:#include "catalog/pg_publication.h"
-: 18:#include "catalog/pg_publication_rel.h"
-: 19:#include "catalog/pg_subscription.h"
-: 20:#include "commands/defrem.h"
-: 21:#include "commands/subscriptioncmds.h"
-: 22:#include "executor/executor.h"
-: 23:#include "fmgr.h"
-: 24:#include "nodes/makefuncs.h"
-: 25:#include "parser/parse_relation.h"
-: 26:#include "replication/logical.h"
-: 27:#include "replication/logicalproto.h"
-: 28:#include "replication/origin.h"
-: 29:#include "replication/pgoutput.h"
-: 30:#include "rewrite/rewriteHandler.h"
-: 31:#include "utils/builtins.h"
-: 32:#include "utils/inval.h"
-: 33:#include "utils/lsyscache.h"
-: 34:#include "utils/memutils.h"
-: 35:#include "utils/rel.h"
-: 36:#include "utils/syscache.h"
-: 37:#include "utils/varlena.h"
-: 38:
function Pg_magic_func called 536 returned 100% blocks executed 100%
536: 39:PG_MODULE_MAGIC_EXT(
-: 40: .name = "pgoutput",
-: 41: .version = PG_VERSION
-: 42:);
-: 43:
-: 44:static void pgoutput_startup(LogicalDecodingContext *ctx,
-: 45: OutputPluginOptions *opt, bool is_init);
-: 46:static void pgoutput_shutdown(LogicalDecodingContext *ctx);
-: 47:static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
-: 48: ReorderBufferTXN *txn);
-: 49:static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
-: 50: ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
-: 51:static void pgoutput_change(LogicalDecodingContext *ctx,
-: 52: ReorderBufferTXN *txn, Relation relation,
-: 53: ReorderBufferChange *change);
-: 54:static void pgoutput_truncate(LogicalDecodingContext *ctx,
-: 55: ReorderBufferTXN *txn, int nrelations, Relation relations[],
-: 56: ReorderBufferChange *change);
-: 57:static void pgoutput_message(LogicalDecodingContext *ctx,
-: 58: ReorderBufferTXN *txn, XLogRecPtr message_lsn,
-: 59: bool transactional, const char *prefix,
-: 60: Size sz, const char *message);
-: 61:static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
-: 62: RepOriginId origin_id);
-: 63:static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
-: 64: ReorderBufferTXN *txn);
-: 65:static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
-: 66: ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
-: 67:static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
-: 68: ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
-: 69:static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
-: 70: ReorderBufferTXN *txn,
-: 71: XLogRecPtr prepare_end_lsn,
-: 72: TimestampTz prepare_time);
-: 73:static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
-: 74: ReorderBufferTXN *txn);
-: 75:static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
-: 76: ReorderBufferTXN *txn);
-: 77:static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
-: 78: ReorderBufferTXN *txn,
-: 79: XLogRecPtr abort_lsn);
-: 80:static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
-: 81: ReorderBufferTXN *txn,
-: 82: XLogRecPtr commit_lsn);
-: 83:static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
-: 84: ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
-: 85:
-: 86:static bool publications_valid;
-: 87:
-: 88:static List *LoadPublications(List *pubnames);
-: 89:static void publication_invalidation_cb(Datum arg, int cacheid,
-: 90: uint32 hashvalue);
-: 91:static void send_repl_origin(LogicalDecodingContext *ctx,
-: 92: RepOriginId origin_id, XLogRecPtr origin_lsn,
-: 93: bool send_origin);
-: 94:
-: 95:/*
-: 96: * Only 3 publication actions are used for row filtering ("insert", "update",
-: 97: * "delete"). See RelationSyncEntry.exprstate[].
-: 98: */
-: 99:enum RowFilterPubAction
-: 100:{
-: 101: PUBACTION_INSERT,
-: 102: PUBACTION_UPDATE,
-: 103: PUBACTION_DELETE,
-: 104:};
-: 105:
-: 106:#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
-: 107:
-: 108:/*
-: 109: * Entry in the map used to remember which relation schemas we sent.
-: 110: *
-: 111: * The schema_sent flag determines if the current schema record for the
-: 112: * relation (and for its ancestor if publish_as_relid is set) was already
-: 113: * sent to the subscriber (in which case we don't need to send it again).
-: 114: *
-: 115: * The schema cache on downstream is however updated only at commit time,
-: 116: * and with streamed transactions the commit order may be different from
-: 117: * the order the transactions are sent in. Also, the (sub) transactions
-: 118: * might get aborted so we need to send the schema for each (sub) transaction
-: 119: * so that we don't lose the schema information on abort. For handling this,
-: 120: * we maintain the list of xids (streamed_txns) for those we have already sent
-: 121: * the schema.
-: 122: *
-: 123: * For partitions, 'pubactions' considers not only the table's own
-: 124: * publications, but also those of all of its ancestors.
-: 125: */
-: 126:typedef struct RelationSyncEntry
-: 127:{
-: 128: Oid relid; /* relation oid */
-: 129:
-: 130: bool replicate_valid; /* overall validity flag for entry */
-: 131:
-: 132: bool schema_sent;
-: 133:
-: 134: /*
-: 135: * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
-: 136: * columns and the 'publish_generated_columns' parameter is set to
-: 137: * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
-: 138: * indicating that no generated columns should be published, unless
-: 139: * explicitly specified in the column list.
-: 140: */
-: 141: PublishGencolsType include_gencols_type;
-: 142: List *streamed_txns; /* streamed toplevel transactions with this
-: 143: * schema */
-: 144:
-: 145: /* are we publishing this rel? */
-: 146: PublicationActions pubactions;
-: 147:
-: 148: /*
-: 149: * ExprState array for row filter. Different publication actions don't
-: 150: * allow multiple expressions to always be combined into one, because
-: 151: * updates or deletes restrict the column in expression to be part of the
-: 152: * replica identity index whereas inserts do not have this restriction, so
-: 153: * there is one ExprState per publication action.
-: 154: */
-: 155: ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
-: 156: EState *estate; /* executor state used for row filter */
-: 157: TupleTableSlot *new_slot; /* slot for storing new tuple */
-: 158: TupleTableSlot *old_slot; /* slot for storing old tuple */
-: 159:
-: 160: /*
-: 161: * OID of the relation to publish changes as. For a partition, this may
-: 162: * be set to one of its ancestors whose schema will be used when
-: 163: * replicating changes, if publish_via_partition_root is set for the
-: 164: * publication.
-: 165: */
-: 166: Oid publish_as_relid;
-: 167:
-: 168: /*
-: 169: * Map used when replicating using an ancestor's schema to convert tuples
-: 170: * from partition's type to the ancestor's; NULL if publish_as_relid is
-: 171: * same as 'relid' or if unnecessary due to partition and the ancestor
-: 172: * having identical TupleDesc.
-: 173: */
-: 174: AttrMap *attrmap;
-: 175:
-: 176: /*
-: 177: * Columns included in the publication, or NULL if all columns are
-: 178: * included implicitly. Note that the attnums in this bitmap are not
-: 179: * shifted by FirstLowInvalidHeapAttributeNumber.
-: 180: */
-: 181: Bitmapset *columns;
-: 182:
-: 183: /*
-: 184: * Private context to store additional data for this entry - state for the
-: 185: * row filter expressions, column list, etc.
-: 186: */
-: 187: MemoryContext entry_cxt;
-: 188:} RelationSyncEntry;
-: 189:
-: 190:/*
-: 191: * Maintain a per-transaction level variable to track whether the transaction
-: 192: * has sent BEGIN. BEGIN is only sent when the first change in a transaction
-: 193: * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
-: 194: * messages for empty transactions which saves network bandwidth.
-: 195: *
-: 196: * This optimization is not used for prepared transactions because if the
-: 197: * WALSender restarts after prepare of a transaction and before commit prepared
-: 198: * of the same transaction then we won't be able to figure out if we have
-: 199: * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
-: 200: * because we would have lost the in-memory txndata information that was
-: 201: * present prior to the restart. This will result in sending a spurious
-: 202: * COMMIT PREPARED without a corresponding prepared transaction at the
-: 203: * downstream which would lead to an error when it tries to process it.
-: 204: *
-: 205: * XXX We could achieve this optimization by changing protocol to send
-: 206: * additional information so that downstream can detect that the corresponding
-: 207: * prepare has not been sent. However, adding such a check for every
-: 208: * transaction in the downstream could be costly so we might want to do it
-: 209: * optionally.
-: 210: *
-: 211: * We also don't have this optimization for streamed transactions because
-: 212: * they can contain prepared transactions.
-: 213: */
-: 214:typedef struct PGOutputTxnData
-: 215:{
-: 216: bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
-: 217:} PGOutputTxnData;
-: 218:
-: 219:/* Map used to remember which relation schemas we sent. */
-: 220:static HTAB *RelationSyncCache = NULL;
-: 221:
-: 222:static void init_rel_sync_cache(MemoryContext cachectx);
-: 223:static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
-: 224:static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
-: 225: Relation relation);
-: 226:static void send_relation_and_attrs(Relation relation, TransactionId xid,
-: 227: LogicalDecodingContext *ctx,
-: 228: RelationSyncEntry *relentry);
-: 229:static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
-: 230:static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
-: 231: uint32 hashvalue);
-: 232:static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
-: 233: TransactionId xid);
-: 234:static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
-: 235: TransactionId xid);
-: 236:static void init_tuple_slot(PGOutputData *data, Relation relation,
-: 237: RelationSyncEntry *entry);
-: 238:static void pgoutput_memory_context_reset(void *arg);
-: 239:
-: 240:/* row filter routines */
-: 241:static EState *create_estate_for_relation(Relation rel);
-: 242:static void pgoutput_row_filter_init(PGOutputData *data,
-: 243: List *publications,
-: 244: RelationSyncEntry *entry);
-: 245:static bool pgoutput_row_filter_exec_expr(ExprState *state,
-: 246: ExprContext *econtext);
-: 247:static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
-: 248: TupleTableSlot **new_slot_ptr,
-: 249: RelationSyncEntry *entry,
-: 250: ReorderBufferChangeType *action);
-: 251:
-: 252:/* column list routines */
-: 253:static void pgoutput_column_list_init(PGOutputData *data,
-: 254: List *publications,
-: 255: RelationSyncEntry *entry);
-: 256:
-: 257:/*
-: 258: * Specify output plugin callbacks
-: 259: */
-: 260:void
function _PG_output_plugin_init called 725 returned 100% blocks executed 100%
725: 261:_PG_output_plugin_init(OutputPluginCallbacks *cb)
-: 262:{
725: 263: cb->startup_cb = pgoutput_startup;
725: 264: cb->begin_cb = pgoutput_begin_txn;
725: 265: cb->change_cb = pgoutput_change;
725: 266: cb->truncate_cb = pgoutput_truncate;
725: 267: cb->message_cb = pgoutput_message;
725: 268: cb->commit_cb = pgoutput_commit_txn;
-: 269:
725: 270: cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
725: 271: cb->prepare_cb = pgoutput_prepare_txn;
725: 272: cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
725: 273: cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
725: 274: cb->filter_by_origin_cb = pgoutput_origin_filter;
725: 275: cb->shutdown_cb = pgoutput_shutdown;
-: 276:
-: 277: /* transaction streaming */
725: 278: cb->stream_start_cb = pgoutput_stream_start;
725: 279: cb->stream_stop_cb = pgoutput_stream_stop;
725: 280: cb->stream_abort_cb = pgoutput_stream_abort;
725: 281: cb->stream_commit_cb = pgoutput_stream_commit;
725: 282: cb->stream_change_cb = pgoutput_change;
725: 283: cb->stream_message_cb = pgoutput_message;
725: 284: cb->stream_truncate_cb = pgoutput_truncate;
-: 285: /* transaction streaming - two-phase commit */
725: 286: cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
725: 287:}
-: 288:
-: 289:static void
function parse_output_parameters called 400 returned 100% blocks executed 36%
400: 290:parse_output_parameters(List *options, PGOutputData *data)
-: 291:{
-: 292: ListCell *lc;
-: 293: bool protocol_version_given = false;
-: 294: bool publication_names_given = false;
-: 295: bool binary_option_given = false;
-: 296: bool messages_option_given = false;
-: 297: bool streaming_given = false;
-: 298: bool two_phase_option_given = false;
-: 299: bool origin_option_given = false;
-: 300:
-: 301: /* Initialize optional parameters to defaults */
400: 302: data->binary = false;
400: 303: data->streaming = LOGICALREP_STREAM_OFF;
400: 304: data->messages = false;
400: 305: data->two_phase = false;
400: 306: data->publish_no_origin = false;
-: 307:
1995: 308: foreach(lc, options)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 80% (fallthrough)
branch 3 taken 20%
-: 309: {
1595: 310: DefElem *defel = (DefElem *) lfirst(lc);
-: 311:
1595*: 312: Assert(defel->arg == NULL || IsA(defel->arg, String));
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
branch 3 taken 100%
branch 4 taken 0%
-: 313:
-: 314: /* Check each param, whether or not we recognize it */
1595: 315: if (strcmp(defel->defname, "proto_version") == 0)
branch 0 taken 25% (fallthrough)
branch 1 taken 75%
-: 316: {
-: 317: unsigned long parsed;
-: 318: char *endptr;
-: 319:
400: 320: if (protocol_version_given)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 321: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 322: (errcode(ERRCODE_SYNTAX_ERROR),
-: 323: errmsg("conflicting or redundant options")));
-: 324: protocol_version_given = true;
-: 325:
400: 326: errno = 0;
400: 327: parsed = strtoul(strVal(defel->arg), &endptr, 10);
call 0 returned 100%
call 1 returned 100%
400: 328: if (errno != 0 || *endptr != '\0')
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 0% (fallthrough)
branch 3 taken 100%
#####: 329: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 330: (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-: 331: errmsg("invalid proto_version")));
-: 332:
400: 333: if (parsed > PG_UINT32_MAX)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 334: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
call 7 never executed
-: 335: (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-: 336: errmsg("proto_version \"%s\" out of range",
-: 337: strVal(defel->arg))));
-: 338:
400: 339: data->protocol_version = (uint32) parsed;
-: 340: }
1195: 341: else if (strcmp(defel->defname, "publication_names") == 0)
branch 0 taken 33% (fallthrough)
branch 1 taken 67%
-: 342: {
400: 343: if (publication_names_given)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 344: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 345: (errcode(ERRCODE_SYNTAX_ERROR),
-: 346: errmsg("conflicting or redundant options")));
-: 347: publication_names_given = true;
-: 348:
400: 349: if (!SplitIdentifierString(strVal(defel->arg), ',',
call 0 returned 100%
call 1 returned 100%
branch 2 taken 0% (fallthrough)
branch 3 taken 100%
-: 350: &data->publication_names))
#####: 351: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 352: (errcode(ERRCODE_INVALID_NAME),
-: 353: errmsg("invalid publication_names syntax")));
-: 354: }
795: 355: else if (strcmp(defel->defname, "binary") == 0)
branch 0 taken 1% (fallthrough)
branch 1 taken 99%
-: 356: {
10: 357: if (binary_option_given)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 358: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 359: (errcode(ERRCODE_SYNTAX_ERROR),
-: 360: errmsg("conflicting or redundant options")));
-: 361: binary_option_given = true;
-: 362:
10: 363: data->binary = defGetBoolean(defel);
call 0 returned 100%
-: 364: }
785: 365: else if (strcmp(defel->defname, "messages") == 0)
branch 0 taken 1% (fallthrough)
branch 1 taken 99%
-: 366: {
4: 367: if (messages_option_given)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 368: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 369: (errcode(ERRCODE_SYNTAX_ERROR),
-: 370: errmsg("conflicting or redundant options")));
-: 371: messages_option_given = true;
-: 372:
4: 373: data->messages = defGetBoolean(defel);
call 0 returned 100%
-: 374: }
781: 375: else if (strcmp(defel->defname, "streaming") == 0)
branch 0 taken 49% (fallthrough)
branch 1 taken 51%
-: 376: {
382: 377: if (streaming_given)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 378: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 379: (errcode(ERRCODE_SYNTAX_ERROR),
-: 380: errmsg("conflicting or redundant options")));
-: 381: streaming_given = true;
-: 382:
382: 383: data->streaming = defGetStreamingMode(defel);
call 0 returned 100%
-: 384: }
399: 385: else if (strcmp(defel->defname, "two_phase") == 0)
branch 0 taken 2% (fallthrough)
branch 1 taken 98%
-: 386: {
8: 387: if (two_phase_option_given)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 388: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 389: (errcode(ERRCODE_SYNTAX_ERROR),
-: 390: errmsg("conflicting or redundant options")));
-: 391: two_phase_option_given = true;
-: 392:
8: 393: data->two_phase = defGetBoolean(defel);
call 0 returned 100%
-: 394: }
391: 395: else if (strcmp(defel->defname, "origin") == 0)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
-: 396: {
-: 397: char *origin;
-: 398:
391: 399: if (origin_option_given)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 400: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 401: errcode(ERRCODE_SYNTAX_ERROR),
-: 402: errmsg("conflicting or redundant options"));
-: 403: origin_option_given = true;
-: 404:
391: 405: origin = defGetString(defel);
call 0 returned 100%
391: 406: if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
call 0 returned 100%
branch 1 taken 6% (fallthrough)
branch 2 taken 94%
25: 407: data->publish_no_origin = true;
366: 408: else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
call 0 returned 100%
branch 1 taken 100% (fallthrough)
branch 2 taken 0%
366: 409: data->publish_no_origin = false;
-: 410: else
#####: 411: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 412: errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-: 413: errmsg("unrecognized origin value: \"%s\"", origin));
-: 414: }
-: 415: else
#####: 416: elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
-: 417: }
-: 418:
-: 419: /* Check required options */
400: 420: if (!protocol_version_given)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 421: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 422: errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-: 423: errmsg("option \"%s\" missing", "proto_version"));
400: 424: if (!publication_names_given)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 425: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 426: errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-: 427: errmsg("option \"%s\" missing", "publication_names"));
400: 428:}
-: 429:
-: 430:/*
-: 431: * Memory context reset callback of PGOutputData->context.
-: 432: */
-: 433:static void
function pgoutput_memory_context_reset called 1034 returned 100% blocks executed 100%
1034: 434:pgoutput_memory_context_reset(void *arg)
-: 435:{
1034: 436: if (RelationSyncCache)
branch 0 taken 19% (fallthrough)
branch 1 taken 81%
-: 437: {
192: 438: hash_destroy(RelationSyncCache);
call 0 returned 100%
192: 439: RelationSyncCache = NULL;
-: 440: }
1034: 441:}
-: 442:
-: 443:/*
-: 444: * Initialize this plugin
-: 445: */
-: 446:static void
function pgoutput_startup called 725 returned 100% blocks executed 41%
725: 447:pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
-: 448: bool is_init)
-: 449:{
725: 450: PGOutputData *data = palloc0(sizeof(PGOutputData));
call 0 returned 100%
-: 451: static bool publication_callback_registered = false;
-: 452: MemoryContextCallback *mcallback;
-: 453:
-: 454: /* Create our memory context for private allocations. */
725: 455: data->context = AllocSetContextCreate(ctx->context,
call 0 returned 100%
-: 456: "logical replication output context",
-: 457: ALLOCSET_DEFAULT_SIZES);
-: 458:
725: 459: data->cachectx = AllocSetContextCreate(ctx->context,
call 0 returned 100%
-: 460: "logical replication cache context",
-: 461: ALLOCSET_DEFAULT_SIZES);
-: 462:
725: 463: data->pubctx = AllocSetContextCreate(ctx->context,
call 0 returned 100%
-: 464: "logical replication publication list context",
-: 465: ALLOCSET_SMALL_SIZES);
-: 466:
-: 467: /*
-: 468: * Ensure to cleanup RelationSyncCache even when logical decoding invoked
-: 469: * via SQL interface ends up with an error.
-: 470: */
725: 471: mcallback = palloc0(sizeof(MemoryContextCallback));
call 0 returned 100%
725: 472: mcallback->func = pgoutput_memory_context_reset;
725: 473: MemoryContextRegisterResetCallback(ctx->context, mcallback);
call 0 returned 100%
-: 474:
725: 475: ctx->output_plugin_private = data;
-: 476:
-: 477: /* This plugin uses binary protocol. */
725: 478: opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
-: 479:
-: 480: /*
-: 481: * This is replication start and not slot initialization.
-: 482: *
-: 483: * Parse and validate options passed by the client.
-: 484: */
725: 485: if (!is_init)
branch 0 taken 55% (fallthrough)
branch 1 taken 45%
-: 486: {
-: 487: /* Parse the params and ERROR if we see any we don't recognize */
400: 488: parse_output_parameters(ctx->output_plugin_options, data);
call 0 returned 100%
-: 489:
-: 490: /* Check if we support requested protocol */
400: 491: if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 492: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 493: (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-: 494: errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
-: 495: data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
-: 496:
400: 497: if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 498: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 499: (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-: 500: errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
-: 501: data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
-: 502:
-: 503: /*
-: 504: * Decide whether to enable streaming. It is disabled by default, in
-: 505: * which case we just update the flag in decoding context. Otherwise
-: 506: * we only allow it with sufficient version of the protocol, and when
-: 507: * the output plugin supports it.
-: 508: */
400: 509: if (data->streaming == LOGICALREP_STREAM_OFF)
branch 0 taken 4% (fallthrough)
branch 1 taken 96%
18: 510: ctx->streaming = false;
382: 511: else if (data->streaming == LOGICALREP_STREAM_ON &&
branch 0 taken 7% (fallthrough)
branch 1 taken 93%
branch 2 taken 0% (fallthrough)
branch 3 taken 100%
-: 512: data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
#####: 513: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 514: (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-: 515: errmsg("requested proto_version=%d does not support streaming, need %d or higher",
-: 516: data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
382: 517: else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
branch 0 taken 93% (fallthrough)
branch 1 taken 7%
branch 2 taken 0% (fallthrough)
branch 3 taken 100%
-: 518: data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
#####: 519: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 520: (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-: 521: errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
-: 522: data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
382: 523: else if (!ctx->streaming)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 524: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 525: (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-: 526: errmsg("streaming requested, but not supported by output plugin")));
-: 527:
-: 528: /*
-: 529: * Here, we just check whether the two-phase option is passed by
-: 530: * plugin and decide whether to enable it at later point of time. It
-: 531: * remains enabled if the previous start-up has done so. But we only
-: 532: * allow the option to be passed in with sufficient version of the
-: 533: * protocol, and when the output plugin supports it.
-: 534: */
400: 535: if (!data->two_phase)
branch 0 taken 98% (fallthrough)
branch 1 taken 2%
392: 536: ctx->twophase_opt_given = false;
8: 537: else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 538: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 539: (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-: 540: errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
-: 541: data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
8: 542: else if (!ctx->twophase)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 543: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
-: 544: (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-: 545: errmsg("two-phase commit requested, but not supported by output plugin")));
-: 546: else
8: 547: ctx->twophase_opt_given = true;
-: 548:
-: 549: /* Init publication state. */
400: 550: data->publications = NIL;
400: 551: publications_valid = false;
-: 552:
-: 553: /*
-: 554: * Register callback for pg_publication if we didn't already do that
-: 555: * during some previous call in this process.
-: 556: */
400: 557: if (!publication_callback_registered)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
-: 558: {
398: 559: CacheRegisterSyscacheCallback(PUBLICATIONOID,
call 0 returned 100%
-: 560: publication_invalidation_cb,
-: 561: (Datum) 0);
398: 562: CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
call 0 returned 100%
-: 563: (Datum) 0);
398: 564: publication_callback_registered = true;
-: 565: }
-: 566:
-: 567: /* Initialize relation schema cache. */
400: 568: init_rel_sync_cache(CacheMemoryContext);
call 0 returned 100%
-: 569: }
-: 570: else
-: 571: {
-: 572: /*
-: 573: * Disable the streaming and prepared transactions during the slot
-: 574: * initialization mode.
-: 575: */
325: 576: ctx->streaming = false;
325: 577: ctx->twophase = false;
-: 578: }
725: 579:}
-: 580:
-: 581:/*
-: 582: * BEGIN callback.
-: 583: *
-: 584: * Don't send the BEGIN message here instead postpone it until the first
-: 585: * change. In logical replication, a common scenario is to replicate a set of
-: 586: * tables (instead of all tables) and transactions whose changes were on
-: 587: * the table(s) that are not published will produce empty transactions. These
-: 588: * empty transactions will send BEGIN and COMMIT messages to subscribers,
-: 589: * using bandwidth on something with little/no use for logical replication.
-: 590: */
-: 591:static void
function pgoutput_begin_txn called 912 returned 100% blocks executed 100%
912: 592:pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
-: 593:{
912: 594: PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
call 0 returned 100%
-: 595: sizeof(PGOutputTxnData));
-: 596:
912: 597: txn->output_plugin_private = txndata;
912: 598:}
-: 599:
-: 600:/*
-: 601: * Send BEGIN.
-: 602: *
-: 603: * This is called while processing the first change of the transaction.
-: 604: */
-: 605:static void
function pgoutput_send_begin called 450 returned 100% blocks executed 78%
450: 606:pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
-: 607:{
450: 608: bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
450: 609: PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
-: 610:
450*: 611: Assert(txndata);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
450*: 612: Assert(!txndata->sent_begin_txn);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 613:
450: 614: OutputPluginPrepareWrite(ctx, !send_replication_origin);
call 0 returned 100%
450: 615: logicalrep_write_begin(ctx->out, txn);
call 0 returned 100%
450: 616: txndata->sent_begin_txn = true;
-: 617:
450: 618: send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
call 0 returned 100%
-: 619: send_replication_origin);
-: 620:
450: 621: OutputPluginWrite(ctx, true);
call 0 returned 100%
449: 622:}
-: 623:
-: 624:/*
-: 625: * COMMIT callback
-: 626: */
-: 627:static void
function pgoutput_commit_txn called 908 returned 99% blocks executed 93%
908: 628:pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-: 629: XLogRecPtr commit_lsn)
-: 630:{
908: 631: PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
-: 632: bool sent_begin_txn;
-: 633:
908*: 634: Assert(txndata);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 635:
-: 636: /*
-: 637: * We don't need to send the commit message unless some relevant change
-: 638: * from this transaction has been sent to the downstream.
-: 639: */
908: 640: sent_begin_txn = txndata->sent_begin_txn;
908: 641: OutputPluginUpdateProgress(ctx, !sent_begin_txn);
call 0 returned 100%
908: 642: pfree(txndata);
call 0 returned 100%
908: 643: txn->output_plugin_private = NULL;
-: 644:
908: 645: if (!sent_begin_txn)
branch 0 taken 51% (fallthrough)
branch 1 taken 49%
-: 646: {
461: 647: elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
call 0 returned 100%
branch 1 taken 25% (fallthrough)
branch 2 taken 75%
call 3 returned 100%
call 4 returned 100%
461: 648: return;
-: 649: }
-: 650:
447: 651: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
447: 652: logicalrep_write_commit(ctx->out, txn, commit_lsn);
call 0 returned 100%
447: 653: OutputPluginWrite(ctx, true);
call 0 returned 98%
-: 654:}
-: 655:
-: 656:/*
-: 657: * BEGIN PREPARE callback
-: 658: */
-: 659:static void
function pgoutput_begin_prepare_txn called 21 returned 100% blocks executed 100%
21: 660:pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
-: 661:{
21: 662: bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
-: 663:
21: 664: OutputPluginPrepareWrite(ctx, !send_replication_origin);
call 0 returned 100%
21: 665: logicalrep_write_begin_prepare(ctx->out, txn);
call 0 returned 100%
-: 666:
21: 667: send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
call 0 returned 100%
-: 668: send_replication_origin);
-: 669:
21: 670: OutputPluginWrite(ctx, true);
call 0 returned 100%
21: 671:}
-: 672:
-: 673:/*
-: 674: * PREPARE callback
-: 675: */
-: 676:static void
function pgoutput_prepare_txn called 21 returned 100% blocks executed 100%
21: 677:pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-: 678: XLogRecPtr prepare_lsn)
-: 679:{
21: 680: OutputPluginUpdateProgress(ctx, false);
call 0 returned 100%
-: 681:
21: 682: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
21: 683: logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
call 0 returned 100%
21: 684: OutputPluginWrite(ctx, true);
call 0 returned 100%
21: 685:}
-: 686:
-: 687:/*
-: 688: * COMMIT PREPARED callback
-: 689: */
-: 690:static void
function pgoutput_commit_prepared_txn called 25 returned 100% blocks executed 100%
25: 691:pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-: 692: XLogRecPtr commit_lsn)
-: 693:{
25: 694: OutputPluginUpdateProgress(ctx, false);
call 0 returned 100%
-: 695:
25: 696: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
25: 697: logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
call 0 returned 100%
25: 698: OutputPluginWrite(ctx, true);
call 0 returned 100%
25: 699:}
-: 700:
-: 701:/*
-: 702: * ROLLBACK PREPARED callback
-: 703: */
-: 704:static void
function pgoutput_rollback_prepared_txn called 9 returned 100% blocks executed 100%
9: 705:pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
-: 706: ReorderBufferTXN *txn,
-: 707: XLogRecPtr prepare_end_lsn,
-: 708: TimestampTz prepare_time)
-: 709:{
9: 710: OutputPluginUpdateProgress(ctx, false);
call 0 returned 100%
-: 711:
9: 712: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
9: 713: logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
call 0 returned 100%
-: 714: prepare_time);
9: 715: OutputPluginWrite(ctx, true);
call 0 returned 100%
9: 716:}
-: 717:
-: 718:/*
-: 719: * Write the current schema of the relation and its ancestor (if any) if not
-: 720: * done yet.
-: 721: */
-: 722:static void
function maybe_send_schema called 182265 returned 100% blocks executed 100%
182265: 723:maybe_send_schema(LogicalDecodingContext *ctx,
-: 724: ReorderBufferChange *change,
-: 725: Relation relation, RelationSyncEntry *relentry)
-: 726:{
182265: 727: PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
-: 728: bool schema_sent;
-: 729: TransactionId xid = InvalidTransactionId;
-: 730: TransactionId topxid = InvalidTransactionId;
-: 731:
-: 732: /*
-: 733: * Remember XID of the (sub)transaction for the change. We don't care if
-: 734: * it's top-level transaction or not (we have already sent that XID in
-: 735: * start of the current streaming block).
-: 736: *
-: 737: * If we're not in a streaming block, just use InvalidTransactionId and
-: 738: * the write methods will not include it.
-: 739: */
182265: 740: if (data->in_streaming)
branch 0 taken 97% (fallthrough)
branch 1 taken 3%
175941: 741: xid = change->txn->xid;
-: 742:
182265: 743: if (rbtxn_is_subtxn(change->txn))
branch 0 taken 6% (fallthrough)
branch 1 taken 94%
10169: 744: topxid = rbtxn_get_toptxn(change->txn)->xid;
-: 745: else
-: 746: topxid = xid;
-: 747:
-: 748: /*
-: 749: * Do we need to send the schema? We do track streamed transactions
-: 750: * separately, because those may be applied later (and the regular
-: 751: * transactions won't see their effects until then) and in an order that
-: 752: * we don't know at this point.
-: 753: *
-: 754: * XXX There is a scope of optimization here. Currently, we always send
-: 755: * the schema first time in a streaming transaction but we can probably
-: 756: * avoid that by checking 'relentry->schema_sent' flag. However, before
-: 757: * doing that we need to study its impact on the case where we have a mix
-: 758: * of streaming and non-streaming transactions.
-: 759: */
182265: 760: if (data->in_streaming)
branch 0 taken 97% (fallthrough)
branch 1 taken 3%
175941: 761: schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
call 0 returned 100%
-: 762: else
6324: 763: schema_sent = relentry->schema_sent;
-: 764:
-: 765: /* Nothing to do if we already sent the schema. */
182265: 766: if (schema_sent)
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
-: 767: return;
-: 768:
-: 769: /*
-: 770: * Send the schema. If the changes will be published using an ancestor's
-: 771: * schema, not the relation's own, send that ancestor's schema before
-: 772: * sending relation's own (XXX - maybe sending only the former suffices?).
-: 773: */
341: 774: if (relentry->publish_as_relid != RelationGetRelid(relation))
branch 0 taken 10% (fallthrough)
branch 1 taken 90%
-: 775: {
35: 776: Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
call 0 returned 100%
-: 777:
35: 778: send_relation_and_attrs(ancestor, xid, ctx, relentry);
call 0 returned 97%
34: 779: RelationClose(ancestor);
call 0 returned 100%
-: 780: }
-: 781:
340: 782: send_relation_and_attrs(relation, xid, ctx, relentry);
call 0 returned 100%
-: 783:
339: 784: if (data->in_streaming)
branch 0 taken 21% (fallthrough)
branch 1 taken 79%
72: 785: set_schema_sent_in_streamed_txn(relentry, topxid);
call 0 returned 100%
-: 786: else
267: 787: relentry->schema_sent = true;
-: 788:}
-: 789:
-: 790:/*
-: 791: * Sends a relation
-: 792: */
-: 793:static void
function send_relation_and_attrs called 375 returned 99% blocks executed 100%
375: 794:send_relation_and_attrs(Relation relation, TransactionId xid,
-: 795: LogicalDecodingContext *ctx,
-: 796: RelationSyncEntry *relentry)
-: 797:{
375: 798: TupleDesc desc = RelationGetDescr(relation);
375: 799: Bitmapset *columns = relentry->columns;
375: 800: PublishGencolsType include_gencols_type = relentry->include_gencols_type;
-: 801: int i;
-: 802:
-: 803: /*
-: 804: * Write out type info if needed. We do that only for user-created types.
-: 805: * We use FirstGenbkiObjectId as the cutoff, so that we only consider
-: 806: * objects with hand-assigned OIDs to be "built in", not for instance any
-: 807: * function or type defined in the information_schema. This is important
-: 808: * because only hand-assigned OIDs can be expected to remain stable across
-: 809: * major versions.
-: 810: */
1160: 811: for (i = 0; i < desc->natts; i++)
branch 0 taken 68%
branch 1 taken 32% (fallthrough)
-: 812: {
-: 813: Form_pg_attribute att = TupleDescAttr(desc, i);
-: 814:
785: 815: if (!logicalrep_should_publish_column(att, columns,
call 0 returned 100%
branch 1 taken 9% (fallthrough)
branch 2 taken 91%
-: 816: include_gencols_type))
71: 817: continue;
-: 818:
714: 819: if (att->atttypid < FirstGenbkiObjectId)
branch 0 taken 97% (fallthrough)
branch 1 taken 3%
696: 820: continue;
-: 821:
18: 822: OutputPluginPrepareWrite(ctx, false);
call 0 returned 100%
18: 823: logicalrep_write_typ(ctx->out, xid, att->atttypid);
call 0 returned 100%
18: 824: OutputPluginWrite(ctx, false);
call 0 returned 100%
-: 825: }
-: 826:
375: 827: OutputPluginPrepareWrite(ctx, false);
call 0 returned 100%
375: 828: logicalrep_write_rel(ctx->out, xid, relation, columns,
call 0 returned 100%
-: 829: include_gencols_type);
375: 830: OutputPluginWrite(ctx, false);
call 0 returned 99%
373: 831:}
-: 832:
-: 833:/*
-: 834: * Executor state preparation for evaluation of row filter expressions for the
-: 835: * specified relation.
-: 836: */
-: 837:static EState *
function create_estate_for_relation called 17 returned 100% blocks executed 100%
17: 838:create_estate_for_relation(Relation rel)
-: 839:{
-: 840: EState *estate;
-: 841: RangeTblEntry *rte;
17: 842: List *perminfos = NIL;
-: 843:
17: 844: estate = CreateExecutorState();
call 0 returned 100%
-: 845:
-: 846: rte = makeNode(RangeTblEntry);
17: 847: rte->rtekind = RTE_RELATION;
17: 848: rte->relid = RelationGetRelid(rel);
17: 849: rte->relkind = rel->rd_rel->relkind;
17: 850: rte->rellockmode = AccessShareLock;
-: 851:
17: 852: addRTEPermissionInfo(&perminfos, rte);
call 0 returned 100%
-: 853:
17: 854: ExecInitRangeTable(estate, list_make1(rte), perminfos,
call 0 returned 100%
call 1 returned 100%
call 2 returned 100%
-: 855: bms_make_singleton(1));
-: 856:
17: 857: estate->es_output_cid = GetCurrentCommandId(false);
call 0 returned 100%
-: 858:
17: 859: return estate;
-: 860:}
-: 861:
-: 862:/*
-: 863: * Evaluates row filter.
-: 864: *
-: 865: * If the row filter evaluates to NULL, it is taken as false i.e. the change
-: 866: * isn't replicated.
-: 867: */
-: 868:static bool
function pgoutput_row_filter_exec_expr called 38 returned 100% blocks executed 47%
38: 869:pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
-: 870:{
-: 871: Datum ret;
-: 872: bool isnull;
-: 873:
38*: 874: Assert(state != NULL);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 875:
-: 876: ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
-: 877:
38*: 878: elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
call 0 returned 100%
branch 1 taken 0% (fallthrough)
branch 2 taken 100%
branch 3 never executed
branch 4 never executed
branch 5 never executed
branch 6 never executed
branch 7 never executed
branch 8 never executed
call 9 never executed
call 10 never executed
-: 879: isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
-: 880: isnull ? "true" : "false");
-: 881:
38: 882: if (isnull)
branch 0 taken 97% (fallthrough)
branch 1 taken 3%
-: 883: return false;
-: 884:
37: 885: return DatumGetBool(ret);
-: 886:}
-: 887:
-: 888:/*
-: 889: * Make sure the per-entry memory context exists.
-: 890: */
-: 891:static void
function pgoutput_ensure_entry_cxt called 321 returned 100% blocks executed 100%
321: 892:pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
-: 893:{
-: 894: Relation relation;
-: 895:
-: 896: /* The context may already exist, in which case bail out. */
321: 897: if (entry->entry_cxt)
branch 0 taken 95% (fallthrough)
branch 1 taken 5%
-: 898: return;
-: 899:
304: 900: relation = RelationIdGetRelation(entry->publish_as_relid);
call 0 returned 100%
-: 901:
304: 902: entry->entry_cxt = AllocSetContextCreate(data->cachectx,
call 0 returned 100%
-: 903: "entry private context",
-: 904: ALLOCSET_SMALL_SIZES);
-: 905:
304: 906: MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
call 0 returned 100%
call 1 returned 100%
-: 907: RelationGetRelationName(relation));
-: 908:}
-: 909:
-: 910:/*
-: 911: * Initialize the row filter.
-: 912: */
-: 913:static void
function pgoutput_row_filter_init called 304 returned 100% blocks executed 100%
304: 914:pgoutput_row_filter_init(PGOutputData *data, List *publications,
-: 915: RelationSyncEntry *entry)
-: 916:{
-: 917: ListCell *lc;
304: 918: List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
304: 919: bool no_filter[] = {false, false, false}; /* One per pubaction */
-: 920: MemoryContext oldctx;
-: 921: int idx;
-: 922: bool has_filter = true;
304: 923: Oid schemaid = get_rel_namespace(entry->publish_as_relid);
call 0 returned 100%
-: 924:
-: 925: /*
-: 926: * Find if there are any row filters for this relation. If there are, then
-: 927: * prepare the necessary ExprState and cache it in entry->exprstate. To
-: 928: * build an expression state, we need to ensure the following:
-: 929: *
-: 930: * All the given publication-table mappings must be checked.
-: 931: *
-: 932: * Multiple publications might have multiple row filters for this
-: 933: * relation. Since row filter usage depends on the DML operation, there
-: 934: * are multiple lists (one for each operation) to which row filters will
-: 935: * be appended.
-: 936: *
-: 937: * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
-: 938: * expression" so it takes precedence.
-: 939: */
325: 940: foreach(lc, publications)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 95% (fallthrough)
branch 3 taken 5%
-: 941: {
308: 942: Publication *pub = lfirst(lc);
-: 943: HeapTuple rftuple = NULL;
-: 944: Datum rfdatum = 0;
308: 945: bool pub_no_filter = true;
-: 946:
-: 947: /*
-: 948: * If the publication is FOR ALL TABLES, or the publication includes a
-: 949: * FOR TABLES IN SCHEMA where the table belongs to the referred
-: 950: * schema, then it is treated the same as if there are no row filters
-: 951: * (even if other publications have a row filter).
-: 952: */
532: 953: if (!pub->alltables &&
branch 0 taken 97% (fallthrough)
branch 1 taken 3%
branch 2 taken 73%
branch 3 taken 27%
224: 954: !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
call 0 returned 100%
-: 955: ObjectIdGetDatum(schemaid),
-: 956: ObjectIdGetDatum(pub->oid)))
-: 957: {
-: 958: /*
-: 959: * Check for the presence of a row filter in this publication.
-: 960: */
218: 961: rftuple = SearchSysCache2(PUBLICATIONRELMAP,
call 0 returned 100%
-: 962: ObjectIdGetDatum(entry->publish_as_relid),
-: 963: ObjectIdGetDatum(pub->oid));
-: 964:
218: 965: if (HeapTupleIsValid(rftuple))
branch 0 taken 94% (fallthrough)
branch 1 taken 6%
-: 966: {
-: 967: /* Null indicates no filter. */
206: 968: rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
call 0 returned 100%
-: 969: Anum_pg_publication_rel_prqual,
-: 970: &pub_no_filter);
-: 971: }
-: 972: }
-: 973:
308: 974: if (pub_no_filter)
branch 0 taken 95% (fallthrough)
branch 1 taken 5%
-: 975: {
294: 976: if (rftuple)
branch 0 taken 65% (fallthrough)
branch 1 taken 35%
192: 977: ReleaseSysCache(rftuple);
call 0 returned 100%
-: 978:
294: 979: no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
294: 980: no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
294: 981: no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
-: 982:
-: 983: /*
-: 984: * Quick exit if all the DML actions are publicized via this
-: 985: * publication.
-: 986: */
294: 987: if (no_filter[PUBACTION_INSERT] &&
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 98% (fallthrough)
branch 3 taken 2%
287: 988: no_filter[PUBACTION_UPDATE] &&
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
-: 989: no_filter[PUBACTION_DELETE])
-: 990: {
-: 991: has_filter = false;
287: 992: break;
-: 993: }
-: 994:
-: 995: /* No additional work for this publication. Next one. */
7: 996: continue;
-: 997: }
-: 998:
-: 999: /* Form the per pubaction row filter lists. */
14: 1000: if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 100% (fallthrough)
branch 3 taken 0%
14: 1001: rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
call 0 returned 100%
14: 1002: TextDatumGetCString(rfdatum));
call 0 returned 100%
14: 1003: if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 100% (fallthrough)
branch 3 taken 0%
14: 1004: rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
call 0 returned 100%
14: 1005: TextDatumGetCString(rfdatum));
call 0 returned 100%
14: 1006: if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 100% (fallthrough)
branch 3 taken 0%
14: 1007: rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
call 0 returned 100%
14: 1008: TextDatumGetCString(rfdatum));
call 0 returned 100%
-: 1009:
14: 1010: ReleaseSysCache(rftuple);
call 0 returned 100%
-: 1011: } /* loop all subscribed publications */
-: 1012:
-: 1013: /* Clean the row filter */
1216: 1014: for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
branch 0 taken 75%
branch 1 taken 25% (fallthrough)
-: 1015: {
912: 1016: if (no_filter[idx])
branch 0 taken 95% (fallthrough)
branch 1 taken 5%
-: 1017: {
870: 1018: list_free_deep(rfnodes[idx]);
call 0 returned 100%
870: 1019: rfnodes[idx] = NIL;
-: 1020: }
-: 1021: }
-: 1022:
304: 1023: if (has_filter)
branch 0 taken 6% (fallthrough)
branch 1 taken 94%
-: 1024: {
17: 1025: Relation relation = RelationIdGetRelation(entry->publish_as_relid);
call 0 returned 100%
-: 1026:
17: 1027: pgoutput_ensure_entry_cxt(data, entry);
call 0 returned 100%
-: 1028:
-: 1029: /*
-: 1030: * Now all the filters for all pubactions are known. Combine them when
-: 1031: * their pubactions are the same.
-: 1032: */
17: 1033: oldctx = MemoryContextSwitchTo(entry->entry_cxt);
call 0 returned 100%
17: 1034: entry->estate = create_estate_for_relation(relation);
call 0 returned 100%
68: 1035: for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
branch 0 taken 75%
branch 1 taken 25% (fallthrough)
-: 1036: {
-: 1037: List *filters = NIL;
-: 1038: Expr *rfnode;
-: 1039:
51: 1040: if (rfnodes[idx] == NIL)
branch 0 taken 41% (fallthrough)
branch 1 taken 59%
21: 1041: continue;
-: 1042:
63: 1043: foreach(lc, rfnodes[idx])
branch 0 taken 52% (fallthrough)
branch 1 taken 48%
33: 1044: filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
call 0 returned 100%
call 1 returned 100%
call 2 returned 100%
-: 1045:
-: 1046: /* combine the row filter and cache the ExprState */
30: 1047: rfnode = make_orclause(filters);
call 0 returned 100%
30: 1048: entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
call 0 returned 100%
-: 1049: } /* for each pubaction */
-: 1050: MemoryContextSwitchTo(oldctx);
-: 1051:
17: 1052: RelationClose(relation);
call 0 returned 100%
-: 1053: }
304: 1054:}
-: 1055:
-: 1056:/*
-: 1057: * If the table contains a generated column, check for any conflicting
-: 1058: * values of 'publish_generated_columns' parameter in the publications.
-: 1059: */
-: 1060:static void
function check_and_init_gencol called 304 returned 100% blocks executed 69%
304: 1061:check_and_init_gencol(PGOutputData *data, List *publications,
-: 1062: RelationSyncEntry *entry)
-: 1063:{
304: 1064: Relation relation = RelationIdGetRelation(entry->publish_as_relid);
call 0 returned 100%
304: 1065: TupleDesc desc = RelationGetDescr(relation);
-: 1066: bool gencolpresent = false;
-: 1067: bool first = true;
-: 1068:
-: 1069: /* Check if there is any generated column present. */
930: 1070: for (int i = 0; i < desc->natts; i++)
branch 0 taken 68%
branch 1 taken 32% (fallthrough)
-: 1071: {
-: 1072: Form_pg_attribute att = TupleDescAttr(desc, i);
-: 1073:
633: 1074: if (att->attgenerated)
branch 0 taken 99% (fallthrough)
branch 1 taken 1%
-: 1075: {
-: 1076: gencolpresent = true;
-: 1077: break;
-: 1078: }
-: 1079: }
-: 1080:
-: 1081: /* There are no generated columns to be published. */
304: 1082: if (!gencolpresent)
branch 0 taken 98% (fallthrough)
branch 1 taken 2%
-: 1083: {
297: 1084: entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
297: 1085: return;
-: 1086: }
-: 1087:
-: 1088: /*
-: 1089: * There may be a conflicting value for 'publish_generated_columns'
-: 1090: * parameter in the publications.
-: 1091: */
15: 1092: foreach_ptr(Publication, pub, publications)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 53% (fallthrough)
branch 3 taken 47%
-: 1093: {
-: 1094: /*
-: 1095: * The column list takes precedence over the
-: 1096: * 'publish_generated_columns' parameter. Those will be checked later,
-: 1097: * see pgoutput_column_list_init.
-: 1098: */
8: 1099: if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
call 0 returned 100%
branch 1 taken 38%
branch 2 taken 62%
3: 1100: continue;
-: 1101:
5: 1102: if (first)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
-: 1103: {
5: 1104: entry->include_gencols_type = pub->pubgencols_type;
-: 1105: first = false;
-: 1106: }
#####: 1107: else if (entry->include_gencols_type != pub->pubgencols_type)
branch 0 never executed
branch 1 never executed
#####: 1108: ereport(ERROR,
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
call 5 never executed
call 6 never executed
call 7 never executed
-: 1109: errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-: 1110: errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
-: 1111: get_namespace_name(RelationGetNamespace(relation)),
-: 1112: RelationGetRelationName(relation)));
-: 1113: }
-: 1114:}
-: 1115:
-: 1116:/*
-: 1117: * Initialize the column list.
-: 1118: */
-: 1119:static void
function pgoutput_column_list_init called 304 returned 100% blocks executed 96%
304: 1120:pgoutput_column_list_init(PGOutputData *data, List *publications,
-: 1121: RelationSyncEntry *entry)
-: 1122:{
-: 1123: ListCell *lc;
-: 1124: bool first = true;
304: 1125: Relation relation = RelationIdGetRelation(entry->publish_as_relid);
call 0 returned 100%
-: 1126: bool found_pub_collist = false;
-: 1127: Bitmapset *relcols = NULL;
-: 1128:
304: 1129: pgoutput_ensure_entry_cxt(data, entry);
call 0 returned 100%
-: 1130:
-: 1131: /*
-: 1132: * Find if there are any column lists for this relation. If there are,
-: 1133: * build a bitmap using the column lists.
-: 1134: *
-: 1135: * Multiple publications might have multiple column lists for this
-: 1136: * relation.
-: 1137: *
-: 1138: * Note that we don't support the case where the column list is different
-: 1139: * for the same table when combining publications. See comments atop
-: 1140: * fetch_table_list. But one can later change the publication so we still
-: 1141: * need to check all the given publication-table mappings and report an
-: 1142: * error if any publications have a different column list.
-: 1143: */
618: 1144: foreach(lc, publications)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 51% (fallthrough)
branch 3 taken 49%
-: 1145: {
315: 1146: Publication *pub = lfirst(lc);
315: 1147: Bitmapset *cols = NULL;
-: 1148:
-: 1149: /* Retrieve the bitmap of columns for a column list publication. */
315: 1150: found_pub_collist |= check_and_fetch_column_list(pub,
call 0 returned 100%
-: 1151: entry->publish_as_relid,
-: 1152: entry->entry_cxt, &cols);
-: 1153:
-: 1154: /*
-: 1155: * For non-column list publications — e.g. TABLE (without a column
-: 1156: * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
-: 1157: * of the table (including generated columns when
-: 1158: * 'publish_generated_columns' parameter is true).
-: 1159: */
315: 1160: if (!cols)
branch 0 taken 88%
branch 1 taken 12%
-: 1161: {
-: 1162: /*
-: 1163: * Cache the table columns for the first publication with no
-: 1164: * specified column list to detect publication with a different
-: 1165: * column list.
-: 1166: */
276: 1167: if (!relcols && (list_length(publications) > 1))
branch 0 taken 99% (fallthrough)
branch 1 taken 1%
branch 2 taken 3% (fallthrough)
branch 3 taken 97%
-: 1168: {
9: 1169: MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
call 0 returned 100%
-: 1170:
9: 1171: relcols = pub_form_cols_map(relation,
call 0 returned 100%
-: 1172: entry->include_gencols_type);
-: 1173: MemoryContextSwitchTo(oldcxt);
-: 1174: }
-: 1175:
276: 1176: cols = relcols;
-: 1177: }
-: 1178:
315: 1179: if (first)
branch 0 taken 97% (fallthrough)
branch 1 taken 3%
-: 1180: {
304: 1181: entry->columns = cols;
-: 1182: first = false;
-: 1183: }
11: 1184: else if (!bms_equal(entry->columns, cols))
call 0 returned 100%
branch 1 taken 9% (fallthrough)
branch 2 taken 91%
1*: 1185: ereport(ERROR,
call 0 returned 100%
branch 1 taken 100% (fallthrough)
branch 2 taken 0%
call 3 returned 100%
call 4 returned 100%
call 5 returned 100%
call 6 returned 0%
call 7 never executed
-: 1186: errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-: 1187: errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
-: 1188: get_namespace_name(RelationGetNamespace(relation)),
-: 1189: RelationGetRelationName(relation)));
-: 1190: } /* loop all subscribed publications */
-: 1191:
-: 1192: /*
-: 1193: * If no column list publications exist, columns to be published will be
-: 1194: * computed later according to the 'publish_generated_columns' parameter.
-: 1195: */
303: 1196: if (!found_pub_collist)
branch 0 taken 88% (fallthrough)
branch 1 taken 12%
267: 1197: entry->columns = NULL;
-: 1198:
303: 1199: RelationClose(relation);
call 0 returned 100%
303: 1200:}
-: 1201:
-: 1202:/*
-: 1203: * Initialize the slot for storing new and old tuples, and build the map that
-: 1204: * will be used to convert the relation's tuples into the ancestor's format.
-: 1205: */
-: 1206:static void
function init_tuple_slot called 304 returned 100% blocks executed 100%
304: 1207:init_tuple_slot(PGOutputData *data, Relation relation,
-: 1208: RelationSyncEntry *entry)
-: 1209:{
-: 1210: MemoryContext oldctx;
-: 1211: TupleDesc oldtupdesc;
-: 1212: TupleDesc newtupdesc;
-: 1213:
304: 1214: oldctx = MemoryContextSwitchTo(data->cachectx);
call 0 returned 100%
-: 1215:
-: 1216: /*
-: 1217: * Create tuple table slots. Create a copy of the TupleDesc as it needs to
-: 1218: * live as long as the cache remains.
-: 1219: */
304: 1220: oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
call 0 returned 100%
304: 1221: newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
call 0 returned 100%
-: 1222:
304: 1223: entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
call 0 returned 100%
304: 1224: entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
call 0 returned 100%
branch 1 taken 13% (fallthrough)
branch 2 taken 87%
-: 1225:
-: 1226: MemoryContextSwitchTo(oldctx);
-: 1227:
-: 1228: /*
-: 1229: * Cache the map that will be used to convert the relation's tuples into
-: 1230: * the ancestor's format, if needed.
-: 1231: */
304: 1232: if (entry->publish_as_relid != RelationGetRelid(relation))
branch 0 taken 13% (fallthrough)
branch 1 taken 87%
-: 1233: {
40: 1234: Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
call 0 returned 100%
40: 1235: TupleDesc indesc = RelationGetDescr(relation);
40: 1236: TupleDesc outdesc = RelationGetDescr(ancestor);
-: 1237:
-: 1238: /* Map must live as long as the logical decoding context. */
40: 1239: oldctx = MemoryContextSwitchTo(data->cachectx);
call 0 returned 100%
-: 1240:
40: 1241: entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
call 0 returned 100%
call 1 returned 100%
-: 1242:
-: 1243: MemoryContextSwitchTo(oldctx);
40: 1244: RelationClose(ancestor);
call 0 returned 100%
-: 1245: }
304: 1246:}
-: 1247:
-: 1248:/*
-: 1249: * Change is checked against the row filter if any.
-: 1250: *
-: 1251: * Returns true if the change is to be replicated, else false.
-: 1252: *
-: 1253: * For inserts, evaluate the row filter for new tuple.
-: 1254: * For deletes, evaluate the row filter for old tuple.
-: 1255: * For updates, evaluate the row filter for old and new tuple.
-: 1256: *
-: 1257: * For updates, if both evaluations are true, we allow sending the UPDATE and
-: 1258: * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
-: 1259: * only one of the tuples matches the row filter expression, we transform
-: 1260: * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
-: 1261: * following rules:
-: 1262: *
-: 1263: * Case 1: old-row (no match) new-row (no match) -> (drop change)
-: 1264: * Case 2: old-row (no match) new row (match) -> INSERT
-: 1265: * Case 3: old-row (match) new-row (no match) -> DELETE
-: 1266: * Case 4: old-row (match) new row (match) -> UPDATE
-: 1267: *
-: 1268: * The new action is updated in the action parameter.
-: 1269: *
-: 1270: * The new slot could be updated when transforming the UPDATE into INSERT,
-: 1271: * because the original new tuple might not have column values from the replica
-: 1272: * identity.
-: 1273: *
-: 1274: * Examples:
-: 1275: * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
-: 1276: * Since the old tuple satisfies, the initial table synchronization copied this
-: 1277: * row (or another method was used to guarantee that there is data
-: 1278: * consistency). However, after the UPDATE the new tuple doesn't satisfy the
-: 1279: * row filter, so from a data consistency perspective, that row should be
-: 1280: * removed on the subscriber. The UPDATE should be transformed into a DELETE
-: 1281: * statement and be sent to the subscriber. Keeping this row on the subscriber
-: 1282: * is undesirable because it doesn't reflect what was defined in the row filter
-: 1283: * expression on the publisher. This row on the subscriber would likely not be
-: 1284: * modified by replication again. If someone inserted a new row with the same
-: 1285: * old identifier, replication could stop due to a constraint violation.
-: 1286: *
-: 1287: * Let's say the old tuple doesn't match the row filter but the new tuple does.
-: 1288: * Since the old tuple doesn't satisfy, the initial table synchronization
-: 1289: * probably didn't copy this row. However, after the UPDATE the new tuple does
-: 1290: * satisfy the row filter, so from a data consistency perspective, that row
-: 1291: * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
-: 1292: * statements have no effect (it matches no row -- see
-: 1293: * apply_handle_update_internal()). So, the UPDATE should be transformed into a
-: 1294: * INSERT statement and be sent to the subscriber. However, this might surprise
-: 1295: * someone who expects the data set to satisfy the row filter expression on the
-: 1296: * provider.
-: 1297: */
-: 1298:static bool
function pgoutput_row_filter called 182261 returned 100% blocks executed 84%
182261: 1299:pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
-: 1300: TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
-: 1301: ReorderBufferChangeType *action)
-: 1302:{
-: 1303: TupleDesc desc;
-: 1304: int i;
-: 1305: bool old_matched,
-: 1306: new_matched,
-: 1307: result;
-: 1308: TupleTableSlot *tmp_new_slot;
182261: 1309: TupleTableSlot *new_slot = *new_slot_ptr;
-: 1310: ExprContext *ecxt;
-: 1311: ExprState *filter_exprstate;
-: 1312:
-: 1313: /*
-: 1314: * We need this map to avoid relying on ReorderBufferChangeType enums
-: 1315: * having specific values.
-: 1316: */
-: 1317: static const int map_changetype_pubaction[] = {
-: 1318: [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
-: 1319: [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
-: 1320: [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
-: 1321: };
-: 1322:
182261*: 1323: Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 1324: *action == REORDER_BUFFER_CHANGE_UPDATE ||
-: 1325: *action == REORDER_BUFFER_CHANGE_DELETE);
-: 1326:
182261*: 1327: Assert(new_slot || old_slot);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 1328:
-: 1329: /* Get the corresponding row filter */
182261: 1330: filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
-: 1331:
-: 1332: /* Bail out if there is no row filter */
182261: 1333: if (!filter_exprstate)
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
-: 1334: return true;
-: 1335:
34*: 1336: elog(DEBUG3, "table \"%s.%s\" has row filter",
call 0 returned 100%
branch 1 taken 0% (fallthrough)
branch 2 taken 100%
call 3 never executed
call 4 never executed
call 5 never executed
-: 1337: get_namespace_name(RelationGetNamespace(relation)),
-: 1338: RelationGetRelationName(relation));
-: 1339:
34: 1340: ResetPerTupleExprContext(entry->estate);
branch 0 taken 71% (fallthrough)
branch 1 taken 29%
call 2 returned 100%
-: 1341:
34: 1342: ecxt = GetPerTupleExprContext(entry->estate);
branch 0 taken 29% (fallthrough)
branch 1 taken 71%
call 2 returned 100%
-: 1343:
-: 1344: /*
-: 1345: * For the following occasions where there is only one tuple, we can
-: 1346: * evaluate the row filter for that tuple and return.
-: 1347: *
-: 1348: * For inserts, we only have the new tuple.
-: 1349: *
-: 1350: * For updates, we can have only a new tuple when none of the replica
-: 1351: * identity columns changed and none of those columns have external data
-: 1352: * but we still need to evaluate the row filter for the new tuple as the
-: 1353: * existing values of those columns might not match the filter. Also,
-: 1354: * users can use constant expressions in the row filter, so we anyway need
-: 1355: * to evaluate it for the new tuple.
-: 1356: *
-: 1357: * For deletes, we only have the old tuple.
-: 1358: */
34: 1359: if (!new_slot || !old_slot)
branch 0 taken 88% (fallthrough)
branch 1 taken 12%
-: 1360: {
30: 1361: ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
branch 0 taken 7% (fallthrough)
branch 1 taken 93%
30: 1362: result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
call 0 returned 100%
-: 1363:
30: 1364: return result;
-: 1365: }
-: 1366:
-: 1367: /*
-: 1368: * Both the old and new tuples must be valid only for updates and need to
-: 1369: * be checked against the row filter.
-: 1370: */
4*: 1371: Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 1372:
4: 1373: slot_getallattrs(new_slot);
call 0 returned 100%
4: 1374: slot_getallattrs(old_slot);
call 0 returned 100%
-: 1375:
-: 1376: tmp_new_slot = NULL;
4: 1377: desc = RelationGetDescr(relation);
-: 1378:
-: 1379: /*
-: 1380: * The new tuple might not have all the replica identity columns, in which
-: 1381: * case it needs to be copied over from the old tuple.
-: 1382: */
12: 1383: for (i = 0; i < desc->natts; i++)
branch 0 taken 67%
branch 1 taken 33% (fallthrough)
-: 1384: {
-: 1385: CompactAttribute *att = TupleDescCompactAttr(desc, i);
-: 1386:
-: 1387: /*
-: 1388: * if the column in the new tuple or old tuple is null, nothing to do
-: 1389: */
8: 1390: if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
branch 0 taken 88% (fallthrough)
branch 1 taken 12%
branch 2 taken 0% (fallthrough)
branch 3 taken 100%
1: 1391: continue;
-: 1392:
-: 1393: /*
-: 1394: * Unchanged toasted replica identity columns are only logged in the
-: 1395: * old tuple. Copy this over to the new tuple. The changed (or WAL
-: 1396: * Logged) toast values are always assembled in memory and set as
-: 1397: * VARTAG_INDIRECT. See ReorderBufferToastReplace.
-: 1398: */
11: 1399: if (att->attlen == -1 &&
branch 0 taken 57% (fallthrough)
branch 1 taken 43%
branch 2 taken 25% (fallthrough)
branch 3 taken 75%
5: 1400: VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
branch 0 taken 25% (fallthrough)
branch 1 taken 75%
branch 2 taken 100% (fallthrough)
branch 3 taken 0%
1: 1401: !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
-: 1402: {
1: 1403: if (!tmp_new_slot)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
-: 1404: {
1: 1405: tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
call 0 returned 100%
-: 1406: ExecClearTuple(tmp_new_slot);
-: 1407:
1: 1408: memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1: 1409: desc->natts * sizeof(Datum));
1: 1410: memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1: 1411: desc->natts * sizeof(bool));
-: 1412: }
-: 1413:
1: 1414: tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1: 1415: tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
-: 1416: }
-: 1417: }
-: 1418:
4: 1419: ecxt->ecxt_scantuple = old_slot;
4: 1420: old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
call 0 returned 100%
-: 1421:
4: 1422: if (tmp_new_slot)
branch 0 taken 25% (fallthrough)
branch 1 taken 75%
-: 1423: {
1: 1424: ExecStoreVirtualTuple(tmp_new_slot);
call 0 returned 100%
1: 1425: ecxt->ecxt_scantuple = tmp_new_slot;
-: 1426: }
-: 1427: else
3: 1428: ecxt->ecxt_scantuple = new_slot;
-: 1429:
4: 1430: new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
call 0 returned 100%
-: 1431:
-: 1432: /*
-: 1433: * Case 1: if both tuples don't match the row filter, bailout. Send
-: 1434: * nothing.
-: 1435: */
4: 1436: if (!old_matched && !new_matched)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
-: 1437: return false;
-: 1438:
-: 1439: /*
-: 1440: * Case 2: if the old tuple doesn't satisfy the row filter but the new
-: 1441: * tuple does, transform the UPDATE into INSERT.
-: 1442: *
-: 1443: * Use the newly transformed tuple that must contain the column values for
-: 1444: * all the replica identity columns. This is required to ensure that the
-: 1445: * while inserting the tuple in the downstream node, we have all the
-: 1446: * required column values.
-: 1447: */
4: 1448: if (!old_matched && new_matched)
branch 0 taken 50% (fallthrough)
branch 1 taken 50%
-: 1449: {
2: 1450: *action = REORDER_BUFFER_CHANGE_INSERT;
-: 1451:
2: 1452: if (tmp_new_slot)
branch 0 taken 50% (fallthrough)
branch 1 taken 50%
1: 1453: *new_slot_ptr = tmp_new_slot;
-: 1454: }
-: 1455:
-: 1456: /*
-: 1457: * Case 3: if the old tuple satisfies the row filter but the new tuple
-: 1458: * doesn't, transform the UPDATE into DELETE.
-: 1459: *
-: 1460: * This transformation does not require another tuple. The Old tuple will
-: 1461: * be used for DELETE.
-: 1462: */
2: 1463: else if (old_matched && !new_matched)
branch 0 taken 50% (fallthrough)
branch 1 taken 50%
1: 1464: *action = REORDER_BUFFER_CHANGE_DELETE;
-: 1465:
-: 1466: /*
-: 1467: * Case 4: if both tuples match the row filter, transformation isn't
-: 1468: * required. (*action is default UPDATE).
-: 1469: */
-: 1470:
-: 1471: return true;
-: 1472:}
-: 1473:
-: 1474:/*
-: 1475: * Sends the decoded DML over wire.
-: 1476: *
-: 1477: * This is called both in streaming and non-streaming modes.
-: 1478: */
-: 1479:static void
function pgoutput_change called 183433 returned 100% blocks executed 86%
183433: 1480:pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-: 1481: Relation relation, ReorderBufferChange *change)
-: 1482:{
183433: 1483: PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
183433: 1484: PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
-: 1485: MemoryContext old;
-: 1486: RelationSyncEntry *relentry;
-: 1487: TransactionId xid = InvalidTransactionId;
-: 1488: Relation ancestor = NULL;
-: 1489: Relation targetrel = relation;
183433: 1490: ReorderBufferChangeType action = change->action;
-: 1491: TupleTableSlot *old_slot = NULL;
183433: 1492: TupleTableSlot *new_slot = NULL;
-: 1493:
183433: 1494: if (!is_publishable_relation(relation))
call 0 returned 100%
branch 1 taken 100% (fallthrough)
branch 2 taken 0%
1171: 1495: return;
-: 1496:
-: 1497: /*
-: 1498: * Remember the xid for the change in streaming mode. We need to send xid
-: 1499: * with each change in the streaming mode so that subscriber can make
-: 1500: * their association and on aborts, it can discard the corresponding
-: 1501: * changes.
-: 1502: */
183433: 1503: if (data->in_streaming)
branch 0 taken 96% (fallthrough)
branch 1 taken 4%
175941: 1504: xid = change->txn->xid;
-: 1505:
183433: 1506: relentry = get_rel_sync_entry(data, relation);
call 0 returned 100%
-: 1507:
-: 1508: /* First check the table filter */
183432: 1509: switch (action)
branch 0 taken 58%
branch 1 taken 19%
branch 2 taken 23%
branch 3 taken 0%
-: 1510: {
106006: 1511: case REORDER_BUFFER_CHANGE_INSERT:
106006: 1512: if (!relentry->pubactions.pubinsert)
branch 0 taken 100%
branch 1 taken 1%
-: 1513: return;
-: 1514: break;
34491: 1515: case REORDER_BUFFER_CHANGE_UPDATE:
34491: 1516: if (!relentry->pubactions.pubupdate)
branch 0 taken 100%
branch 1 taken 1%
-: 1517: return;
-: 1518: break;
42935: 1519: case REORDER_BUFFER_CHANGE_DELETE:
42935: 1520: if (!relentry->pubactions.pubdelete)
branch 0 taken 98% (fallthrough)
branch 1 taken 2%
-: 1521: return;
-: 1522:
-: 1523: /*
-: 1524: * This is only possible if deletes are allowed even when replica
-: 1525: * identity is not defined for a table. Since the DELETE action
-: 1526: * can't be published, we simply return.
-: 1527: */
41885: 1528: if (!change->data.tp.oldtuple)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
-: 1529: {
#####: 1530: elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
call 0 never executed
branch 1 never executed
branch 2 never executed
call 3 never executed
call 4 never executed
#####: 1531: return;
-: 1532: }
-: 1533: break;
-: 1534: default:
#####: 1535: Assert(false);
call 0 never executed
-: 1536: }
-: 1537:
-: 1538: /* Avoid leaking memory by using and resetting our own context */
182261: 1539: old = MemoryContextSwitchTo(data->context);
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
-: 1540:
-: 1541: /* Switch relation if publishing via root. */
182261: 1542: if (relentry->publish_as_relid != RelationGetRelid(relation))
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
-: 1543: {
73*: 1544: Assert(relation->rd_rel->relispartition);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
73: 1545: ancestor = RelationIdGetRelation(relentry->publish_as_relid);
call 0 returned 100%
-: 1546: targetrel = ancestor;
-: 1547: }
-: 1548:
182261: 1549: if (change->data.tp.oldtuple)
branch 0 taken 23% (fallthrough)
branch 1 taken 77%
-: 1550: {
42019: 1551: old_slot = relentry->old_slot;
42019: 1552: ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
call 0 returned 100%
-: 1553:
-: 1554: /* Convert tuple if needed. */
42019: 1555: if (relentry->attrmap)
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
-: 1556: {
5: 1557: TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
call 0 returned 100%
-: 1558: &TTSOpsVirtual);
-: 1559:
5: 1560: old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
call 0 returned 100%
-: 1561: }
-: 1562: }
-: 1563:
182261: 1564: if (change->data.tp.newtuple)
branch 0 taken 77% (fallthrough)
branch 1 taken 23%
-: 1565: {
140376: 1566: new_slot = relentry->new_slot;
140376: 1567: ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
call 0 returned 100%
-: 1568:
-: 1569: /* Convert tuple if needed. */
140376: 1570: if (relentry->attrmap)
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
-: 1571: {
21: 1572: TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
call 0 returned 100%
-: 1573: &TTSOpsVirtual);
-: 1574:
21: 1575: new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
call 0 returned 100%
-: 1576: }
-: 1577: }
-: 1578:
-: 1579: /*
-: 1580: * Check row filter.
-: 1581: *
-: 1582: * Updates could be transformed to inserts or deletes based on the results
-: 1583: * of the row filter for old and new tuple.
-: 1584: */
182261: 1585: if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
call 0 returned 100%
branch 1 taken 1% (fallthrough)
branch 2 taken 100%
12: 1586: goto cleanup;
-: 1587:
-: 1588: /*
-: 1589: * Send BEGIN if we haven't yet.
-: 1590: *
-: 1591: * We send the BEGIN message after ensuring that we will actually send the
-: 1592: * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
-: 1593: * transactions.
-: 1594: */
182249: 1595: if (txndata && !txndata->sent_begin_txn)
branch 0 taken 3% (fallthrough)
branch 1 taken 97%
branch 2 taken 7% (fallthrough)
branch 3 taken 93%
437: 1596: pgoutput_send_begin(ctx, txn);
call 0 returned 100%
-: 1597:
-: 1598: /*
-: 1599: * Schema should be sent using the original relation because it also sends
-: 1600: * the ancestor's relation.
-: 1601: */
182248: 1602: maybe_send_schema(ctx, change, relation, relentry);
call 0 returned 100%
-: 1603:
182246: 1604: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
-: 1605:
-: 1606: /* Send the data */
182246: 1607: switch (action)
branch 0 taken 58%
branch 1 taken 19%
branch 2 taken 23%
branch 3 taken 0%
-: 1608: {
105916: 1609: case REORDER_BUFFER_CHANGE_INSERT:
105916: 1610: logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
105916: 1611: data->binary, relentry->columns,
call 0 returned 100%
-: 1612: relentry->include_gencols_type);
105916: 1613: break;
34444: 1614: case REORDER_BUFFER_CHANGE_UPDATE:
34444: 1615: logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
34444: 1616: new_slot, data->binary, relentry->columns,
call 0 returned 100%
-: 1617: relentry->include_gencols_type);
34444: 1618: break;
41886: 1619: case REORDER_BUFFER_CHANGE_DELETE:
41886: 1620: logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
41886: 1621: data->binary, relentry->columns,
call 0 returned 100%
-: 1622: relentry->include_gencols_type);
41886: 1623: break;
-: 1624: default:
#####: 1625: Assert(false);
call 0 never executed
-: 1626: }
-: 1627:
182246: 1628: OutputPluginWrite(ctx, true);
call 0 returned 100%
-: 1629:
182258: 1630:cleanup:
182258: 1631: if (RelationIsValid(ancestor))
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
-: 1632: {
70: 1633: RelationClose(ancestor);
call 0 returned 100%
-: 1634: ancestor = NULL;
-: 1635: }
-: 1636:
-: 1637: /* Drop the new slots that were used to store the converted tuples. */
182258: 1638: if (relentry->attrmap)
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
-: 1639: {
26: 1640: if (old_slot)
branch 0 taken 19% (fallthrough)
branch 1 taken 81%
5: 1641: ExecDropSingleTupleTableSlot(old_slot);
call 0 returned 100%
-: 1642:
26: 1643: if (new_slot)
branch 0 taken 81% (fallthrough)
branch 1 taken 19%
21: 1644: ExecDropSingleTupleTableSlot(new_slot);
call 0 returned 100%
-: 1645: }
-: 1646:
-: 1647: MemoryContextSwitchTo(old);
182258: 1648: MemoryContextReset(data->context);
call 0 returned 100%
-: 1649:}
-: 1650:
-: 1651:static void
function pgoutput_truncate called 17 returned 100% blocks executed 92%
17: 1652:pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-: 1653: int nrelations, Relation relations[], ReorderBufferChange *change)
-: 1654:{
17: 1655: PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
17: 1656: PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
-: 1657: MemoryContext old;
-: 1658: RelationSyncEntry *relentry;
-: 1659: int i;
-: 1660: int nrelids;
-: 1661: Oid *relids;
-: 1662: TransactionId xid = InvalidTransactionId;
-: 1663:
-: 1664: /* Remember the xid for the change in streaming mode. See pgoutput_change. */
17: 1665: if (data->in_streaming)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 1666: xid = change->txn->xid;
-: 1667:
17: 1668: old = MemoryContextSwitchTo(data->context);
call 0 returned 100%
-: 1669:
17: 1670: relids = palloc0(nrelations * sizeof(Oid));
call 0 returned 100%
-: 1671: nrelids = 0;
-: 1672:
53: 1673: for (i = 0; i < nrelations; i++)
branch 0 taken 68%
branch 1 taken 32% (fallthrough)
-: 1674: {
36: 1675: Relation relation = relations[i];
36: 1676: Oid relid = RelationGetRelid(relation);
-: 1677:
36*: 1678: if (!is_publishable_relation(relation))
call 0 returned 100%
branch 1 taken 0% (fallthrough)
branch 2 taken 100%
#####: 1679: continue;
-: 1680:
36: 1681: relentry = get_rel_sync_entry(data, relation);
call 0 returned 100%
-: 1682:
36: 1683: if (!relentry->pubactions.pubtruncate)
branch 0 taken 44% (fallthrough)
branch 1 taken 56%
16: 1684: continue;
-: 1685:
-: 1686: /*
-: 1687: * Don't send partitions if the publication wants to send only the
-: 1688: * root tables through it.
-: 1689: */
20: 1690: if (relation->rd_rel->relispartition &&
branch 0 taken 75% (fallthrough)
branch 1 taken 25%
15: 1691: relentry->publish_as_relid != relid)
branch 0 taken 20% (fallthrough)
branch 1 taken 80%
3: 1692: continue;
-: 1693:
17: 1694: relids[nrelids++] = relid;
-: 1695:
-: 1696: /* Send BEGIN if we haven't yet */
17: 1697: if (txndata && !txndata->sent_begin_txn)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 65% (fallthrough)
branch 3 taken 35%
11: 1698: pgoutput_send_begin(ctx, txn);
call 0 returned 100%
-: 1699:
17: 1700: maybe_send_schema(ctx, change, relation, relentry);
call 0 returned 100%
-: 1701: }
-: 1702:
17: 1703: if (nrelids > 0)
branch 0 taken 65% (fallthrough)
branch 1 taken 35%
-: 1704: {
11: 1705: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
11: 1706: logicalrep_write_truncate(ctx->out,
-: 1707: xid,
-: 1708: nrelids,
-: 1709: relids,
11: 1710: change->data.truncate.cascade,
11: 1711: change->data.truncate.restart_seqs);
call 0 returned 100%
11: 1712: OutputPluginWrite(ctx, true);
call 0 returned 100%
-: 1713: }
-: 1714:
-: 1715: MemoryContextSwitchTo(old);
17: 1716: MemoryContextReset(data->context);
call 0 returned 100%
17: 1717:}
-: 1718:
-: 1719:static void
function pgoutput_message called 7 returned 100% blocks executed 91%
7: 1720:pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-: 1721: XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
-: 1722: const char *message)
-: 1723:{
7: 1724: PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
-: 1725: TransactionId xid = InvalidTransactionId;
-: 1726:
7: 1727: if (!data->messages)
branch 0 taken 71% (fallthrough)
branch 1 taken 29%
-: 1728: return;
-: 1729:
-: 1730: /*
-: 1731: * Remember the xid for the message in streaming mode. See
-: 1732: * pgoutput_change.
-: 1733: */
5: 1734: if (data->in_streaming)
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
#####: 1735: xid = txn->xid;
-: 1736:
-: 1737: /*
-: 1738: * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
-: 1739: */
5: 1740: if (transactional)
branch 0 taken 40% (fallthrough)
branch 1 taken 60%
-: 1741: {
2: 1742: PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
-: 1743:
-: 1744: /* Send BEGIN if we haven't yet */
2: 1745: if (txndata && !txndata->sent_begin_txn)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 100% (fallthrough)
branch 3 taken 0%
2: 1746: pgoutput_send_begin(ctx, txn);
call 0 returned 100%
-: 1747: }
-: 1748:
5: 1749: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
5: 1750: logicalrep_write_message(ctx->out,
call 0 returned 100%
-: 1751: xid,
-: 1752: message_lsn,
-: 1753: transactional,
-: 1754: prefix,
-: 1755: sz,
-: 1756: message);
5: 1757: OutputPluginWrite(ctx, true);
call 0 returned 100%
-: 1758:}
-: 1759:
-: 1760:/*
-: 1761: * Return true if the data is associated with an origin and the user has
-: 1762: * requested the changes that don't have an origin, false otherwise.
-: 1763: */
-: 1764:static bool
function pgoutput_origin_filter called 344827 returned 100% blocks executed 100%
344827: 1765:pgoutput_origin_filter(LogicalDecodingContext *ctx,
-: 1766: RepOriginId origin_id)
-: 1767:{
344827: 1768: PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
-: 1769:
344827: 1770: if (data->publish_no_origin && origin_id != InvalidRepOriginId)
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
branch 2 taken 39% (fallthrough)
branch 3 taken 61%
164: 1771: return true;
-: 1772:
-: 1773: return false;
-: 1774:}
-: 1775:
-: 1776:/*
-: 1777: * Shutdown the output plugin.
-: 1778: *
-: 1779: * Note, we don't need to clean the data->context, data->cachectx, and
-: 1780: * data->pubctx as they are child contexts of the ctx->context so they
-: 1781: * will be cleaned up by logical decoding machinery.
-: 1782: */
-: 1783:static void
function pgoutput_shutdown called 517 returned 100% blocks executed 100%
517: 1784:pgoutput_shutdown(LogicalDecodingContext *ctx)
-: 1785:{
517: 1786: pgoutput_memory_context_reset(NULL);
call 0 returned 100%
517: 1787:}
-: 1788:
-: 1789:/*
-: 1790: * Load publications from the list of publication names.
-: 1791: *
-: 1792: * Here, we skip the publications that don't exist yet. This will allow us
-: 1793: * to silently continue the replication in the absence of a missing publication.
-: 1794: * This is required because we allow the users to create publications after they
-: 1795: * have specified the required publications at the time of replication start.
-: 1796: */
-: 1797:static List *
function LoadPublications called 196 returned 100% blocks executed 100%
196: 1798:LoadPublications(List *pubnames)
-: 1799:{
-: 1800: List *result = NIL;
-: 1801: ListCell *lc;
-: 1802:
437: 1803: foreach(lc, pubnames)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
branch 2 taken 55% (fallthrough)
branch 3 taken 45%
-: 1804: {
241: 1805: char *pubname = (char *) lfirst(lc);
241: 1806: Publication *pub = GetPublicationByName(pubname, true);
call 0 returned 100%
-: 1807:
241: 1808: if (pub)
branch 0 taken 99%
branch 1 taken 1%
239: 1809: result = lappend(result, pub);
call 0 returned 100%
-: 1810: else
2: 1811: ereport(WARNING,
call 0 returned 100%
branch 1 taken 100% (fallthrough)
branch 2 taken 0%
call 3 returned 100%
call 4 returned 100%
call 5 returned 100%
call 6 returned 100%
call 7 returned 100%
-: 1812: errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-: 1813: errmsg("skipped loading publication \"%s\"", pubname),
-: 1814: errdetail("The publication does not exist at this point in the WAL."),
-: 1815: errhint("Create the publication if it does not exist."));
-: 1816: }
-: 1817:
196: 1818: return result;
-: 1819:}
-: 1820:
-: 1821:/*
-: 1822: * Publication syscache invalidation callback.
-: 1823: *
-: 1824: * Called for invalidations on pg_publication.
-: 1825: */
-: 1826:static void
function publication_invalidation_cb called 313 returned 100% blocks executed 100%
313: 1827:publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
-: 1828:{
313: 1829: publications_valid = false;
313: 1830:}
-: 1831:
-: 1832:/*
-: 1833: * START STREAM callback
-: 1834: */
-: 1835:static void
function pgoutput_stream_start called 637 returned 100% blocks executed 89%
637: 1836:pgoutput_stream_start(struct LogicalDecodingContext *ctx,
-: 1837: ReorderBufferTXN *txn)
-: 1838:{
637: 1839: PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
637: 1840: bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
-: 1841:
-: 1842: /* we can't nest streaming of transactions */
637*: 1843: Assert(!data->in_streaming);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 1844:
-: 1845: /*
-: 1846: * If we already sent the first stream for this transaction then don't
-: 1847: * send the origin id in the subsequent streams.
-: 1848: */
637: 1849: if (rbtxn_is_streamed(txn))
branch 0 taken 90% (fallthrough)
branch 1 taken 10%
-: 1850: send_replication_origin = false;
-: 1851:
637: 1852: OutputPluginPrepareWrite(ctx, !send_replication_origin);
call 0 returned 100%
637: 1853: logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
call 0 returned 100%
-: 1854:
637: 1855: send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
call 0 returned 100%
-: 1856: send_replication_origin);
-: 1857:
637: 1858: OutputPluginWrite(ctx, true);
call 0 returned 100%
-: 1859:
-: 1860: /* we're streaming a chunk of transaction now */
637: 1861: data->in_streaming = true;
637: 1862:}
-: 1863:
-: 1864:/*
-: 1865: * STOP STREAM callback
-: 1866: */
-: 1867:static void
function pgoutput_stream_stop called 637 returned 100% blocks executed 83%
637: 1868:pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
-: 1869: ReorderBufferTXN *txn)
-: 1870:{
637: 1871: PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
-: 1872:
-: 1873: /* we should be streaming a transaction */
637*: 1874: Assert(data->in_streaming);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 1875:
637: 1876: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
637: 1877: logicalrep_write_stream_stop(ctx->out);
call 0 returned 100%
637: 1878: OutputPluginWrite(ctx, true);
call 0 returned 100%
-: 1879:
-: 1880: /* we've stopped streaming a transaction */
637: 1881: data->in_streaming = false;
637: 1882:}
-: 1883:
-: 1884:/*
-: 1885: * Notify downstream to discard the streamed transaction (along with all
-: 1886: * its subtransactions, if it's a toplevel transaction).
-: 1887: */
-: 1888:static void
function pgoutput_stream_abort called 26 returned 100% blocks executed 82%
26: 1889:pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
-: 1890: ReorderBufferTXN *txn,
-: 1891: XLogRecPtr abort_lsn)
-: 1892:{
-: 1893: ReorderBufferTXN *toptxn;
26: 1894: PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
26: 1895: bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
-: 1896:
-: 1897: /*
-: 1898: * The abort should happen outside streaming block, even for streamed
-: 1899: * transactions. The transaction has to be marked as streamed, though.
-: 1900: */
26*: 1901: Assert(!data->in_streaming);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 1902:
-: 1903: /* determine the toplevel transaction */
26: 1904: toptxn = rbtxn_get_toptxn(txn);
branch 0 taken 88% (fallthrough)
branch 1 taken 12%
-: 1905:
26*: 1906: Assert(rbtxn_is_streamed(toptxn));
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 1907:
26: 1908: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
26: 1909: logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
call 0 returned 100%
-: 1910: txn->abort_time, write_abort_info);
-: 1911:
26: 1912: OutputPluginWrite(ctx, true);
call 0 returned 100%
-: 1913:
26: 1914: cleanup_rel_sync_cache(toptxn->xid, false);
call 0 returned 100%
26: 1915:}
-: 1916:
-: 1917:/*
-: 1918: * Notify downstream to apply the streamed transaction (along with all
-: 1919: * its subtransactions).
-: 1920: */
-: 1921:static void
function pgoutput_stream_commit called 46 returned 100% blocks executed 80%
46: 1922:pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
-: 1923: ReorderBufferTXN *txn,
-: 1924: XLogRecPtr commit_lsn)
-: 1925:{
46: 1926: PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
-: 1927:
-: 1928: /*
-: 1929: * The commit should happen outside streaming block, even for streamed
-: 1930: * transactions. The transaction has to be marked as streamed, though.
-: 1931: */
46*: 1932: Assert(!data->in_streaming);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
46*: 1933: Assert(rbtxn_is_streamed(txn));
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 1934:
46: 1935: OutputPluginUpdateProgress(ctx, false);
call 0 returned 100%
-: 1936:
46: 1937: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
46: 1938: logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
call 0 returned 100%
46: 1939: OutputPluginWrite(ctx, true);
call 0 returned 100%
-: 1940:
46: 1941: cleanup_rel_sync_cache(txn->xid, true);
call 0 returned 100%
46: 1942:}
-: 1943:
-: 1944:/*
-: 1945: * PREPARE callback (for streaming two-phase commit).
-: 1946: *
-: 1947: * Notify the downstream to prepare the transaction.
-: 1948: */
-: 1949:static void
function pgoutput_stream_prepare_txn called 14 returned 100% blocks executed 86%
14: 1950:pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
-: 1951: ReorderBufferTXN *txn,
-: 1952: XLogRecPtr prepare_lsn)
-: 1953:{
14*: 1954: Assert(rbtxn_is_streamed(txn));
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 1955:
14: 1956: OutputPluginUpdateProgress(ctx, false);
call 0 returned 100%
14: 1957: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
14: 1958: logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
call 0 returned 100%
14: 1959: OutputPluginWrite(ctx, true);
call 0 returned 100%
14: 1960:}
-: 1961:
-: 1962:/*
-: 1963: * Initialize the relation schema sync cache for a decoding session.
-: 1964: *
-: 1965: * The hash table is destroyed at the end of a decoding session. While
-: 1966: * relcache invalidations still exist and will still be invoked, they
-: 1967: * will just see the null hash table global and take no action.
-: 1968: */
-: 1969:static void
function init_rel_sync_cache called 400 returned 100% blocks executed 90%
400: 1970:init_rel_sync_cache(MemoryContext cachectx)
-: 1971:{
-: 1972: HASHCTL ctl;
-: 1973: static bool relation_callbacks_registered = false;
-: 1974:
-: 1975: /* Nothing to do if hash table already exists */
400: 1976: if (RelationSyncCache != NULL)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
2: 1977: return;
-: 1978:
-: 1979: /* Make a new hash table for the cache */
400: 1980: ctl.keysize = sizeof(Oid);
400: 1981: ctl.entrysize = sizeof(RelationSyncEntry);
400: 1982: ctl.hcxt = cachectx;
-: 1983:
400: 1984: RelationSyncCache = hash_create("logical replication output relation cache",
call 0 returned 100%
-: 1985: 128, &ctl,
-: 1986: HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
-: 1987:
400*: 1988: Assert(RelationSyncCache != NULL);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 1989:
-: 1990: /* No more to do if we already registered callbacks */
400: 1991: if (relation_callbacks_registered)
branch 0 taken 100% (fallthrough)
branch 1 taken 0%
-: 1992: return;
-: 1993:
-: 1994: /* We must update the cache entry for a relation after a relcache flush */
398: 1995: CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
call 0 returned 100%
-: 1996:
-: 1997: /*
-: 1998: * Flush all cache entries after a pg_namespace change, in case it was a
-: 1999: * schema rename affecting a relation being replicated.
-: 2000: *
-: 2001: * XXX: It is not a good idea to invalidate all the relation entries in
-: 2002: * RelationSyncCache on schema rename. We can optimize it to invalidate
-: 2003: * only the required relations by either having a specific invalidation
-: 2004: * message containing impacted relations or by having schema information
-: 2005: * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
-: 2006: * passed to the callback.
-: 2007: */
398: 2008: CacheRegisterSyscacheCallback(NAMESPACEOID,
call 0 returned 100%
-: 2009: rel_sync_cache_publication_cb,
-: 2010: (Datum) 0);
-: 2011:
398: 2012: relation_callbacks_registered = true;
-: 2013:}
-: 2014:
-: 2015:/*
-: 2016: * We expect relatively small number of streamed transactions.
-: 2017: */
-: 2018:static bool
function get_schema_sent_in_streamed_txn called 175941 returned 100% blocks executed 100%
175941: 2019:get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
-: 2020:{
175941: 2021: return list_member_xid(entry->streamed_txns, xid);
call 0 returned 100%
-: 2022:}
-: 2023:
-: 2024:/*
-: 2025: * Add the xid in the rel sync entry for which we have already sent the schema
-: 2026: * of the relation.
-: 2027: */
-: 2028:static void
function set_schema_sent_in_streamed_txn called 72 returned 100% blocks executed 100%
72: 2029:set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
-: 2030:{
-: 2031: MemoryContext oldctx;
-: 2032:
72: 2033: oldctx = MemoryContextSwitchTo(CacheMemoryContext);
call 0 returned 100%
-: 2034:
72: 2035: entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
call 0 returned 100%
-: 2036:
-: 2037: MemoryContextSwitchTo(oldctx);
72: 2038:}
-: 2039:
-: 2040:/*
-: 2041: * Find or create entry in the relation schema cache.
-: 2042: *
-: 2043: * This looks up publications that the given relation is directly or
-: 2044: * indirectly part of (the latter if it's really the relation's ancestor that
-: 2045: * is part of a publication) and fills up the found entry with the information
-: 2046: * about which operations to publish and whether to use an ancestor's schema
-: 2047: * when publishing.
-: 2048: */
-: 2049:static RelationSyncEntry *
function get_rel_sync_entry called 183469 returned 100% blocks executed 94%
183469: 2050:get_rel_sync_entry(PGOutputData *data, Relation relation)
-: 2051:{
-: 2052: RelationSyncEntry *entry;
-: 2053: bool found;
-: 2054: MemoryContext oldctx;
183469: 2055: Oid relid = RelationGetRelid(relation);
-: 2056:
183469*: 2057: Assert(RelationSyncCache != NULL);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 2058:
-: 2059: /* Find cached relation info, creating if not found */
183469: 2060: entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
call 0 returned 100%
-: 2061: &relid,
-: 2062: HASH_ENTER, &found);
183469*: 2063: Assert(entry != NULL);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 2064:
-: 2065: /* initialize entry, if it's new */
183469: 2066: if (!found)
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
-: 2067: {
294: 2068: entry->replicate_valid = false;
294: 2069: entry->schema_sent = false;
294: 2070: entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
294: 2071: entry->streamed_txns = NIL;
294: 2072: entry->pubactions.pubinsert = entry->pubactions.pubupdate =
294: 2073: entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
294: 2074: entry->new_slot = NULL;
294: 2075: entry->old_slot = NULL;
294: 2076: memset(entry->exprstate, 0, sizeof(entry->exprstate));
294: 2077: entry->entry_cxt = NULL;
294: 2078: entry->publish_as_relid = InvalidOid;
294: 2079: entry->columns = NULL;
294: 2080: entry->attrmap = NULL;
-: 2081: }
-: 2082:
-: 2083: /* Validate the entry */
183469: 2084: if (!entry->replicate_valid)
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
-: 2085: {
376: 2086: Oid schemaId = get_rel_namespace(relid);
call 0 returned 100%
376: 2087: List *pubids = GetRelationPublications(relid);
call 0 returned 100%
-: 2088:
-: 2089: /*
-: 2090: * We don't acquire a lock on the namespace system table as we build
-: 2091: * the cache entry using a historic snapshot and all the later changes
-: 2092: * are absorbed while decoding WAL.
-: 2093: */
376: 2094: List *schemaPubids = GetSchemaPublications(schemaId);
call 0 returned 100%
-: 2095: ListCell *lc;
376: 2096: Oid publish_as_relid = relid;
-: 2097: int publish_ancestor_level = 0;
376: 2098: bool am_partition = get_rel_relispartition(relid);
call 0 returned 100%
376: 2099: char relkind = get_rel_relkind(relid);
call 0 returned 100%
-: 2100: List *rel_publications = NIL;
-: 2101:
-: 2102: /* Reload publications if needed before use. */
376: 2103: if (!publications_valid)
branch 0 taken 52% (fallthrough)
branch 1 taken 48%
-: 2104: {
196: 2105: MemoryContextReset(data->pubctx);
call 0 returned 100%
-: 2106:
196: 2107: oldctx = MemoryContextSwitchTo(data->pubctx);
call 0 returned 100%
196: 2108: data->publications = LoadPublications(data->publication_names);
call 0 returned 100%
-: 2109: MemoryContextSwitchTo(oldctx);
196: 2110: publications_valid = true;
-: 2111: }
-: 2112:
-: 2113: /*
-: 2114: * Reset schema_sent status as the relation definition may have
-: 2115: * changed. Also reset pubactions to empty in case rel was dropped
-: 2116: * from a publication. Also free any objects that depended on the
-: 2117: * earlier definition.
-: 2118: */
376: 2119: entry->schema_sent = false;
376: 2120: entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
376: 2121: list_free(entry->streamed_txns);
call 0 returned 100%
376: 2122: entry->streamed_txns = NIL;
376: 2123: bms_free(entry->columns);
call 0 returned 100%
376: 2124: entry->columns = NULL;
376: 2125: entry->pubactions.pubinsert = false;
376: 2126: entry->pubactions.pubupdate = false;
376: 2127: entry->pubactions.pubdelete = false;
376: 2128: entry->pubactions.pubtruncate = false;
-: 2129:
-: 2130: /*
-: 2131: * Tuple slots cleanups. (Will be rebuilt later if needed).
-: 2132: */
376: 2133: if (entry->old_slot)
branch 0 taken 16% (fallthrough)
branch 1 taken 84%
-: 2134: {
60: 2135: TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
-: 2136:
60*: 2137: Assert(desc->tdrefcount == -1);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 2138:
60: 2139: ExecDropSingleTupleTableSlot(entry->old_slot);
call 0 returned 100%
-: 2140:
-: 2141: /*
-: 2142: * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
-: 2143: * do it now to avoid any leaks.
-: 2144: */
60: 2145: FreeTupleDesc(desc);
call 0 returned 100%
-: 2146: }
376: 2147: if (entry->new_slot)
branch 0 taken 16% (fallthrough)
branch 1 taken 84%
-: 2148: {
60: 2149: TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
-: 2150:
60*: 2151: Assert(desc->tdrefcount == -1);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 2152:
60: 2153: ExecDropSingleTupleTableSlot(entry->new_slot);
call 0 returned 100%
-: 2154:
-: 2155: /*
-: 2156: * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
-: 2157: * do it now to avoid any leaks.
-: 2158: */
60: 2159: FreeTupleDesc(desc);
call 0 returned 100%
-: 2160: }
-: 2161:
376: 2162: entry->old_slot = NULL;
376: 2163: entry->new_slot = NULL;
-: 2164:
376: 2165: if (entry->attrmap)
branch 0 taken 1% (fallthrough)
branch 1 taken 99%
3: 2166: free_attrmap(entry->attrmap);
call 0 returned 100%
376: 2167: entry->attrmap = NULL;
-: 2168:
-: 2169: /*
-: 2170: * Row filter cache cleanups.
-: 2171: */
376: 2172: if (entry->entry_cxt)
branch 0 taken 16% (fallthrough)
branch 1 taken 84%
60: 2173: MemoryContextDelete(entry->entry_cxt);
call 0 returned 100%
-: 2174:
376: 2175: entry->entry_cxt = NULL;
376: 2176: entry->estate = NULL;
376: 2177: memset(entry->exprstate, 0, sizeof(entry->exprstate));
-: 2178:
-: 2179: /*
-: 2180: * Build publication cache. We can't use one provided by relcache as
-: 2181: * relcache considers all publications that the given relation is in,
-: 2182: * but here we only need to consider ones that the subscriber
-: 2183: * requested.
-: 2184: */
917: 2185: foreach(lc, data->publications)
branch 0 taken 100% (fallthrough)
branch 1 taken 1%
branch 2 taken 59% (fallthrough)
branch 3 taken 41%
-: 2186: {
541: 2187: Publication *pub = lfirst(lc);
-: 2188: bool publish = false;
-: 2189:
-: 2190: /*
-: 2191: * Under what relid should we publish changes in this publication?
-: 2192: * We'll use the top-most relid across all publications. Also
-: 2193: * track the ancestor level for this publication.
-: 2194: */
541: 2195: Oid pub_relid = relid;
-: 2196: int ancestor_level = 0;
-: 2197:
-: 2198: /*
-: 2199: * If this is a FOR ALL TABLES publication, pick the partition
-: 2200: * root and set the ancestor level accordingly.
-: 2201: */
541: 2202: if (pub->alltables)
branch 0 taken 16%
branch 1 taken 84%
-: 2203: {
-: 2204: publish = true;
86: 2205: if (pub->pubviaroot && am_partition)
branch 0 taken 19% (fallthrough)
branch 1 taken 81%
branch 2 taken 100% (fallthrough)
branch 3 taken 0%
-: 2206: {
16: 2207: List *ancestors = get_partition_ancestors(relid);
call 0 returned 100%
-: 2208:
16: 2209: pub_relid = llast_oid(ancestors);
call 0 returned 100%
branch 1 taken 100% (fallthrough)
branch 2 taken 0%
-: 2210: ancestor_level = list_length(ancestors);
-: 2211: }
-: 2212: }
-: 2213:
541: 2214: if (!publish)
branch 0 taken 84% (fallthrough)
branch 1 taken 16%
-: 2215: {
-: 2216: bool ancestor_published = false;
-: 2217:
-: 2218: /*
-: 2219: * For a partition, check if any of the ancestors are
-: 2220: * published. If so, note down the topmost ancestor that is
-: 2221: * published via this publication, which will be used as the
-: 2222: * relation via which to publish the partition's changes.
-: 2223: */
455: 2224: if (am_partition)
branch 0 taken 27% (fallthrough)
branch 1 taken 73%
-: 2225: {
-: 2226: Oid ancestor;
-: 2227: int level;
121: 2228: List *ancestors = get_partition_ancestors(relid);
call 0 returned 100%
-: 2229:
121: 2230: ancestor = GetTopMostAncestorInPublication(pub->oid,
call 0 returned 100%
-: 2231: ancestors,
-: 2232: &level);
-: 2233:
121: 2234: if (ancestor != InvalidOid)
branch 0 taken 40% (fallthrough)
branch 1 taken 60%
-: 2235: {
-: 2236: ancestor_published = true;
48: 2237: if (pub->pubviaroot)
branch 0 taken 52% (fallthrough)
branch 1 taken 48%
-: 2238: {
-: 2239: pub_relid = ancestor;
25: 2240: ancestor_level = level;
-: 2241: }
-: 2242: }
-: 2243: }
-: 2244:
709: 2245: if (list_member_oid(pubids, pub->oid) ||
call 0 returned 100%
branch 1 taken 56% (fallthrough)
branch 2 taken 44%
branch 3 taken 98% (fallthrough)
branch 4 taken 2%
502: 2246: list_member_oid(schemaPubids, pub->oid) ||
call 0 returned 100%
branch 1 taken 11% (fallthrough)
branch 2 taken 89%
-: 2247: ancestor_published)
-: 2248: publish = true;
-: 2249: }
-: 2250:
-: 2251: /*
-: 2252: * If the relation is to be published, determine actions to
-: 2253: * publish, and list of columns, if appropriate.
-: 2254: *
-: 2255: * Don't publish changes for partitioned tables, because
-: 2256: * publishing those of its partitions suffices, unless partition
-: 2257: * changes won't be published due to pubviaroot being set.
-: 2258: */
541: 2259: if (publish &&
branch 0 taken 59% (fallthrough)
branch 1 taken 41%
branch 2 taken 1% (fallthrough)
branch 3 taken 99%
4: 2260: (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
branch 0 taken 25% (fallthrough)
branch 1 taken 75%
-: 2261: {
318: 2262: entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
318: 2263: entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
318: 2264: entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
318: 2265: entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
-: 2266:
-: 2267: /*
-: 2268: * We want to publish the changes as the top-most ancestor
-: 2269: * across all publications. So we need to check if the already
-: 2270: * calculated level is higher than the new one. If yes, we can
-: 2271: * ignore the new value (as it's a child). Otherwise the new
-: 2272: * value is an ancestor, so we keep it.
-: 2273: */
318: 2274: if (publish_ancestor_level > ancestor_level)
branch 0 taken 1% (fallthrough)
branch 1 taken 100%
1: 2275: continue;
-: 2276:
-: 2277: /*
-: 2278: * If we found an ancestor higher up in the tree, discard the
-: 2279: * list of publications through which we replicate it, and use
-: 2280: * the new ancestor.
-: 2281: */
317: 2282: if (publish_ancestor_level < ancestor_level)
branch 0 taken 87% (fallthrough)
branch 1 taken 13%
-: 2283: {
-: 2284: publish_as_relid = pub_relid;
-: 2285: publish_ancestor_level = ancestor_level;
-: 2286:
-: 2287: /* reset the publication list for this relation */
-: 2288: rel_publications = NIL;
-: 2289: }
-: 2290: else
-: 2291: {
-: 2292: /* Same ancestor level, has to be the same OID. */
276*: 2293: Assert(publish_as_relid == pub_relid);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 2294: }
-: 2295:
-: 2296: /* Track publications for this ancestor. */
317: 2297: rel_publications = lappend(rel_publications, pub);
call 0 returned 100%
-: 2298: }
-: 2299: }
-: 2300:
376: 2301: entry->publish_as_relid = publish_as_relid;
-: 2302:
-: 2303: /*
-: 2304: * Initialize the tuple slot, map, and row filter. These are only used
-: 2305: * when publishing inserts, updates, or deletes.
-: 2306: */
376: 2307: if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
branch 0 taken 81% (fallthrough)
branch 1 taken 19%
-: 2308: entry->pubactions.pubdelete)
-: 2309: {
-: 2310: /* Initialize the tuple slot and map */
304: 2311: init_tuple_slot(data, relation, entry);
call 0 returned 100%
-: 2312:
-: 2313: /* Initialize the row filter */
304: 2314: pgoutput_row_filter_init(data, rel_publications, entry);
call 0 returned 100%
-: 2315:
-: 2316: /* Check whether to publish generated columns. */
304: 2317: check_and_init_gencol(data, rel_publications, entry);
call 0 returned 100%
-: 2318:
-: 2319: /* Initialize the column list */
304: 2320: pgoutput_column_list_init(data, rel_publications, entry);
call 0 returned 100%
-: 2321: }
-: 2322:
375: 2323: list_free(pubids);
call 0 returned 100%
375: 2324: list_free(schemaPubids);
call 0 returned 100%
375: 2325: list_free(rel_publications);
call 0 returned 100%
-: 2326:
375: 2327: entry->replicate_valid = true;
-: 2328: }
-: 2329:
183468: 2330: return entry;
-: 2331:}
-: 2332:
-: 2333:/*
-: 2334: * Cleanup list of streamed transactions and update the schema_sent flag.
-: 2335: *
-: 2336: * When a streamed transaction commits or aborts, we need to remove the
-: 2337: * toplevel XID from the schema cache. If the transaction aborted, the
-: 2338: * subscriber will simply throw away the schema records we streamed, so
-: 2339: * we don't need to do anything else.
-: 2340: *
-: 2341: * If the transaction is committed, the subscriber will update the relation
-: 2342: * cache - so tweak the schema_sent flag accordingly.
-: 2343: */
-: 2344:static void
function cleanup_rel_sync_cache called 72 returned 100% blocks executed 94%
72: 2345:cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
-: 2346:{
-: 2347: HASH_SEQ_STATUS hash_seq;
-: 2348: RelationSyncEntry *entry;
-: 2349:
72*: 2350: Assert(RelationSyncCache != NULL);
branch 0 taken 0% (fallthrough)
branch 1 taken 100%
call 2 never executed
-: 2351:
72: 2352: hash_seq_init(&hash_seq, RelationSyncCache);
call 0 returned 100%
147: 2353: while ((entry = hash_seq_search(&hash_seq)) != NULL)
call 0 returned 100%
branch 1 taken 51%
branch 2 taken 49% (fallthrough)
-: 2354: {
-: 2355: /*
-: 2356: * We can set the schema_sent flag for an entry that has committed xid
-: 2357: * in the list as that ensures that the subscriber would have the
-: 2358: * corresponding schema and we don't need to send it unless there is
-: 2359: * any invalidation for that relation.
-: 2360: */
95: 2361: foreach_xid(streamed_txn, entry->streamed_txns)
branch 0 taken 78% (fallthrough)
branch 1 taken 22%
branch 2 taken 100% (fallthrough)
branch 3 taken 0%
-: 2362: {
74: 2363: if (xid == streamed_txn)
branch 0 taken 73%
branch 1 taken 27%
-: 2364: {
54: 2365: if (is_commit)
branch 0 taken 80% (fallthrough)
branch 1 taken 20%
43: 2366: entry->schema_sent = true;
-: 2367:
54: 2368: entry->streamed_txns =
54: 2369: foreach_delete_current(entry->streamed_txns, streamed_txn);
call 0 returned 100%
54: 2370: break;
-: 2371: }
-: 2372: }
-: 2373: }
72: 2374:}
-: 2375:
-: 2376:/*
-: 2377: * Relcache invalidation callback
-: 2378: */
-: 2379:static void
function rel_sync_cache_relation_cb called 3707 returned 100% blocks executed 100%
3707: 2380:rel_sync_cache_relation_cb(Datum arg, Oid relid)
-: 2381:{
-: 2382: RelationSyncEntry *entry;
-: 2383:
-: 2384: /*
-: 2385: * We can get here if the plugin was used in SQL interface as the
-: 2386: * RelationSyncCache is destroyed when the decoding finishes, but there is
-: 2387: * no way to unregister the relcache invalidation callback.
-: 2388: */
3707: 2389: if (RelationSyncCache == NULL)
branch 0 taken 99% (fallthrough)
branch 1 taken 1%
-: 2390: return;
-: 2391:
-: 2392: /*
-: 2393: * Nobody keeps pointers to entries in this hash table around outside
-: 2394: * logical decoding callback calls - but invalidation events can come in
-: 2395: * *during* a callback if we do any syscache access in the callback.
-: 2396: * Because of that we must mark the cache entry as invalid but not damage
-: 2397: * any of its substructure here. The next get_rel_sync_entry() call will
-: 2398: * rebuild it all.
-: 2399: */
3681: 2400: if (OidIsValid(relid))
branch 0 taken 98% (fallthrough)
branch 1 taken 2%
-: 2401: {
-: 2402: /*
-: 2403: * Getting invalidations for relations that aren't in the table is
-: 2404: * entirely normal. So we don't care if it's found or not.
-: 2405: */
3619: 2406: entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
call 0 returned 100%
-: 2407: HASH_FIND, NULL);
3619: 2408: if (entry != NULL)
branch 0 taken 19% (fallthrough)
branch 1 taken 81%
671: 2409: entry->replicate_valid = false;
-: 2410: }
-: 2411: else
-: 2412: {
-: 2413: /* Whole cache must be flushed. */
-: 2414: HASH_SEQ_STATUS status;
-: 2415:
62: 2416: hash_seq_init(&status, RelationSyncCache);
call 0 returned 100%
132: 2417: while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
call 0 returned 100%
branch 1 taken 53%
branch 2 taken 47% (fallthrough)
-: 2418: {
70: 2419: entry->replicate_valid = false;
-: 2420: }
-: 2421: }
-: 2422:}
-: 2423:
-: 2424:/*
-: 2425: * Publication relation/schema map syscache invalidation callback
-: 2426: *
-: 2427: * Called for invalidations on pg_namespace.
-: 2428: */
-: 2429:static void
function rel_sync_cache_publication_cb called 32 returned 100% blocks executed 100%
32: 2430:rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
-: 2431:{
-: 2432: HASH_SEQ_STATUS status;
-: 2433: RelationSyncEntry *entry;
-: 2434:
-: 2435: /*
-: 2436: * We can get here if the plugin was used in SQL interface as the
-: 2437: * RelationSyncCache is destroyed when the decoding finishes, but there is
-: 2438: * no way to unregister the invalidation callbacks.
-: 2439: */
32: 2440: if (RelationSyncCache == NULL)
branch 0 taken 31% (fallthrough)
branch 1 taken 69%
10: 2441: return;
-: 2442:
-: 2443: /*
-: 2444: * We have no easy way to identify which cache entries this invalidation
-: 2445: * event might have affected, so just mark them all invalid.
-: 2446: */
22: 2447: hash_seq_init(&status, RelationSyncCache);
call 0 returned 100%
41: 2448: while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
call 0 returned 100%
branch 1 taken 46%
branch 2 taken 54% (fallthrough)
-: 2449: {
19: 2450: entry->replicate_valid = false;
-: 2451: }
-: 2452:}
-: 2453:
-: 2454:/* Send Replication origin */
-: 2455:static void
function send_repl_origin called 1108 returned 100% blocks executed 100%
1108: 2456:send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
-: 2457: XLogRecPtr origin_lsn, bool send_origin)
-: 2458:{
1108: 2459: if (send_origin)
branch 0 taken 1% (fallthrough)
branch 1 taken 99%
-: 2460: {
-: 2461: char *origin;
-: 2462:
-: 2463: /*----------
-: 2464: * XXX: which behaviour do we want here?
-: 2465: *
-: 2466: * Alternatives:
-: 2467: * - don't send origin message if origin name not found
-: 2468: * (that's what we do now)
-: 2469: * - throw error - that will break replication, not good
-: 2470: * - send some special "unknown" origin
-: 2471: *----------
-: 2472: */
8: 2473: if (replorigin_by_oid(origin_id, true, &origin))
call 0 returned 100%
branch 1 taken 100% (fallthrough)
branch 2 taken 0%
-: 2474: {
-: 2475: /* Message boundary */
8: 2476: OutputPluginWrite(ctx, false);
call 0 returned 100%
8: 2477: OutputPluginPrepareWrite(ctx, true);
call 0 returned 100%
-: 2478:
8: 2479: logicalrep_write_origin(ctx->out, origin, origin_lsn);
call 0 returned 100%
-: 2480: }
-: 2481: }
1108: 2482:}