pgoutput.c.gcov

application/octet-stream

Filename: pgoutput.c.gcov
Type: application/octet-stream
Part: 0
Message: RE: Question for coverage report
        -:    0:Source:pgoutput.c
        -:    0:Graph:./pgoutput.gcno
        -:    0:Data:./pgoutput.gcda
        -:    0:Runs:536
        -:    1:/*-------------------------------------------------------------------------
        -:    2: *
        -:    3: * pgoutput.c
        -:    4: *		Logical Replication output plugin
        -:    5: *
        -:    6: * Copyright (c) 2012-2025, PostgreSQL Global Development Group
        -:    7: *
        -:    8: * IDENTIFICATION
        -:    9: *		  src/backend/replication/pgoutput/pgoutput.c
        -:   10: *
        -:   11: *-------------------------------------------------------------------------
        -:   12: */
        -:   13:#include "postgres.h"
        -:   14:
        -:   15:#include "access/tupconvert.h"
        -:   16:#include "catalog/partition.h"
        -:   17:#include "catalog/pg_publication.h"
        -:   18:#include "catalog/pg_publication_rel.h"
        -:   19:#include "catalog/pg_subscription.h"
        -:   20:#include "commands/defrem.h"
        -:   21:#include "commands/subscriptioncmds.h"
        -:   22:#include "executor/executor.h"
        -:   23:#include "fmgr.h"
        -:   24:#include "nodes/makefuncs.h"
        -:   25:#include "parser/parse_relation.h"
        -:   26:#include "replication/logical.h"
        -:   27:#include "replication/logicalproto.h"
        -:   28:#include "replication/origin.h"
        -:   29:#include "replication/pgoutput.h"
        -:   30:#include "rewrite/rewriteHandler.h"
        -:   31:#include "utils/builtins.h"
        -:   32:#include "utils/inval.h"
        -:   33:#include "utils/lsyscache.h"
        -:   34:#include "utils/memutils.h"
        -:   35:#include "utils/rel.h"
        -:   36:#include "utils/syscache.h"
        -:   37:#include "utils/varlena.h"
        -:   38:
function Pg_magic_func called 536 returned 100% blocks executed 100%
      536:   39:PG_MODULE_MAGIC_EXT(
        -:   40:					.name = "pgoutput",
        -:   41:					.version = PG_VERSION
        -:   42:);
        -:   43:
        -:   44:static void pgoutput_startup(LogicalDecodingContext *ctx,
        -:   45:							 OutputPluginOptions *opt, bool is_init);
        -:   46:static void pgoutput_shutdown(LogicalDecodingContext *ctx);
        -:   47:static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
        -:   48:							   ReorderBufferTXN *txn);
        -:   49:static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
        -:   50:								ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
        -:   51:static void pgoutput_change(LogicalDecodingContext *ctx,
        -:   52:							ReorderBufferTXN *txn, Relation relation,
        -:   53:							ReorderBufferChange *change);
        -:   54:static void pgoutput_truncate(LogicalDecodingContext *ctx,
        -:   55:							  ReorderBufferTXN *txn, int nrelations, Relation relations[],
        -:   56:							  ReorderBufferChange *change);
        -:   57:static void pgoutput_message(LogicalDecodingContext *ctx,
        -:   58:							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
        -:   59:							 bool transactional, const char *prefix,
        -:   60:							 Size sz, const char *message);
        -:   61:static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
        -:   62:								   RepOriginId origin_id);
        -:   63:static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
        -:   64:									   ReorderBufferTXN *txn);
        -:   65:static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
        -:   66:								 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
        -:   67:static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
        -:   68:										 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
        -:   69:static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
        -:   70:										   ReorderBufferTXN *txn,
        -:   71:										   XLogRecPtr prepare_end_lsn,
        -:   72:										   TimestampTz prepare_time);
        -:   73:static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
        -:   74:								  ReorderBufferTXN *txn);
        -:   75:static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
        -:   76:								 ReorderBufferTXN *txn);
        -:   77:static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
        -:   78:								  ReorderBufferTXN *txn,
        -:   79:								  XLogRecPtr abort_lsn);
        -:   80:static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
        -:   81:								   ReorderBufferTXN *txn,
        -:   82:								   XLogRecPtr commit_lsn);
        -:   83:static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
        -:   84:										ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
        -:   85:
        -:   86:static bool publications_valid;
        -:   87:
        -:   88:static List *LoadPublications(List *pubnames);
        -:   89:static void publication_invalidation_cb(Datum arg, int cacheid,
        -:   90:										uint32 hashvalue);
        -:   91:static void send_repl_origin(LogicalDecodingContext *ctx,
        -:   92:							 RepOriginId origin_id, XLogRecPtr origin_lsn,
        -:   93:							 bool send_origin);
        -:   94:
        -:   95:/*
        -:   96: * Only 3 publication actions are used for row filtering ("insert", "update",
        -:   97: * "delete"). See RelationSyncEntry.exprstate[].
        -:   98: */
        -:   99:enum RowFilterPubAction
        -:  100:{
        -:  101:	PUBACTION_INSERT,
        -:  102:	PUBACTION_UPDATE,
        -:  103:	PUBACTION_DELETE,
        -:  104:};
        -:  105:
        -:  106:#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
        -:  107:
        -:  108:/*
        -:  109: * Entry in the map used to remember which relation schemas we sent.
        -:  110: *
        -:  111: * The schema_sent flag determines if the current schema record for the
        -:  112: * relation (and for its ancestor if publish_as_relid is set) was already
        -:  113: * sent to the subscriber (in which case we don't need to send it again).
        -:  114: *
        -:  115: * The schema cache on downstream is however updated only at commit time,
        -:  116: * and with streamed transactions the commit order may be different from
        -:  117: * the order the transactions are sent in. Also, the (sub) transactions
        -:  118: * might get aborted so we need to send the schema for each (sub) transaction
        -:  119: * so that we don't lose the schema information on abort. For handling this,
        -:  120: * we maintain the list of xids (streamed_txns) for those we have already sent
        -:  121: * the schema.
        -:  122: *
        -:  123: * For partitions, 'pubactions' considers not only the table's own
        -:  124: * publications, but also those of all of its ancestors.
        -:  125: */
        -:  126:typedef struct RelationSyncEntry
        -:  127:{
        -:  128:	Oid			relid;			/* relation oid */
        -:  129:
        -:  130:	bool		replicate_valid;	/* overall validity flag for entry */
        -:  131:
        -:  132:	bool		schema_sent;
        -:  133:
        -:  134:	/*
        -:  135:	 * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
        -:  136:	 * columns and the 'publish_generated_columns' parameter is set to
        -:  137:	 * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
        -:  138:	 * indicating that no generated columns should be published, unless
        -:  139:	 * explicitly specified in the column list.
        -:  140:	 */
        -:  141:	PublishGencolsType include_gencols_type;
        -:  142:	List	   *streamed_txns;	/* streamed toplevel transactions with this
        -:  143:								 * schema */
        -:  144:
        -:  145:	/* are we publishing this rel? */
        -:  146:	PublicationActions pubactions;
        -:  147:
        -:  148:	/*
        -:  149:	 * ExprState array for row filter. Different publication actions don't
        -:  150:	 * allow multiple expressions to always be combined into one, because
        -:  151:	 * updates or deletes restrict the column in expression to be part of the
        -:  152:	 * replica identity index whereas inserts do not have this restriction, so
        -:  153:	 * there is one ExprState per publication action.
        -:  154:	 */
        -:  155:	ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
        -:  156:	EState	   *estate;			/* executor state used for row filter */
        -:  157:	TupleTableSlot *new_slot;	/* slot for storing new tuple */
        -:  158:	TupleTableSlot *old_slot;	/* slot for storing old tuple */
        -:  159:
        -:  160:	/*
        -:  161:	 * OID of the relation to publish changes as.  For a partition, this may
        -:  162:	 * be set to one of its ancestors whose schema will be used when
        -:  163:	 * replicating changes, if publish_via_partition_root is set for the
        -:  164:	 * publication.
        -:  165:	 */
        -:  166:	Oid			publish_as_relid;
        -:  167:
        -:  168:	/*
        -:  169:	 * Map used when replicating using an ancestor's schema to convert tuples
        -:  170:	 * from partition's type to the ancestor's; NULL if publish_as_relid is
        -:  171:	 * same as 'relid' or if unnecessary due to partition and the ancestor
        -:  172:	 * having identical TupleDesc.
        -:  173:	 */
        -:  174:	AttrMap    *attrmap;
        -:  175:
        -:  176:	/*
        -:  177:	 * Columns included in the publication, or NULL if all columns are
        -:  178:	 * included implicitly.  Note that the attnums in this bitmap are not
        -:  179:	 * shifted by FirstLowInvalidHeapAttributeNumber.
        -:  180:	 */
        -:  181:	Bitmapset  *columns;
        -:  182:
        -:  183:	/*
        -:  184:	 * Private context to store additional data for this entry - state for the
        -:  185:	 * row filter expressions, column list, etc.
        -:  186:	 */
        -:  187:	MemoryContext entry_cxt;
        -:  188:} RelationSyncEntry;
        -:  189:
        -:  190:/*
        -:  191: * Maintain a per-transaction level variable to track whether the transaction
        -:  192: * has sent BEGIN. BEGIN is only sent when the first change in a transaction
        -:  193: * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
        -:  194: * messages for empty transactions which saves network bandwidth.
        -:  195: *
        -:  196: * This optimization is not used for prepared transactions because if the
        -:  197: * WALSender restarts after prepare of a transaction and before commit prepared
        -:  198: * of the same transaction then we won't be able to figure out if we have
        -:  199: * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
        -:  200: * because we would have lost the in-memory txndata information that was
        -:  201: * present prior to the restart. This will result in sending a spurious
        -:  202: * COMMIT PREPARED without a corresponding prepared transaction at the
        -:  203: * downstream which would lead to an error when it tries to process it.
        -:  204: *
        -:  205: * XXX We could achieve this optimization by changing protocol to send
        -:  206: * additional information so that downstream can detect that the corresponding
        -:  207: * prepare has not been sent. However, adding such a check for every
        -:  208: * transaction in the downstream could be costly so we might want to do it
        -:  209: * optionally.
        -:  210: *
        -:  211: * We also don't have this optimization for streamed transactions because
        -:  212: * they can contain prepared transactions.
        -:  213: */
        -:  214:typedef struct PGOutputTxnData
        -:  215:{
        -:  216:	bool		sent_begin_txn; /* flag indicating whether BEGIN has been sent */
        -:  217:} PGOutputTxnData;
        -:  218:
        -:  219:/* Map used to remember which relation schemas we sent. */
        -:  220:static HTAB *RelationSyncCache = NULL;
        -:  221:
        -:  222:static void init_rel_sync_cache(MemoryContext cachectx);
        -:  223:static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
        -:  224:static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
        -:  225:											 Relation relation);
        -:  226:static void send_relation_and_attrs(Relation relation, TransactionId xid,
        -:  227:									LogicalDecodingContext *ctx,
        -:  228:									RelationSyncEntry *relentry);
        -:  229:static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
        -:  230:static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
        -:  231:										  uint32 hashvalue);
        -:  232:static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
        -:  233:											TransactionId xid);
        -:  234:static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
        -:  235:											TransactionId xid);
        -:  236:static void init_tuple_slot(PGOutputData *data, Relation relation,
        -:  237:							RelationSyncEntry *entry);
        -:  238:static void pgoutput_memory_context_reset(void *arg);
        -:  239:
        -:  240:/* row filter routines */
        -:  241:static EState *create_estate_for_relation(Relation rel);
        -:  242:static void pgoutput_row_filter_init(PGOutputData *data,
        -:  243:									 List *publications,
        -:  244:									 RelationSyncEntry *entry);
        -:  245:static bool pgoutput_row_filter_exec_expr(ExprState *state,
        -:  246:										  ExprContext *econtext);
        -:  247:static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
        -:  248:								TupleTableSlot **new_slot_ptr,
        -:  249:								RelationSyncEntry *entry,
        -:  250:								ReorderBufferChangeType *action);
        -:  251:
        -:  252:/* column list routines */
        -:  253:static void pgoutput_column_list_init(PGOutputData *data,
        -:  254:									  List *publications,
        -:  255:									  RelationSyncEntry *entry);
        -:  256:
        -:  257:/*
        -:  258: * Specify output plugin callbacks
        -:  259: */
        -:  260:void
function _PG_output_plugin_init called 725 returned 100% blocks executed 100%
      725:  261:_PG_output_plugin_init(OutputPluginCallbacks *cb)
        -:  262:{
      725:  263:	cb->startup_cb = pgoutput_startup;
      725:  264:	cb->begin_cb = pgoutput_begin_txn;
      725:  265:	cb->change_cb = pgoutput_change;
      725:  266:	cb->truncate_cb = pgoutput_truncate;
      725:  267:	cb->message_cb = pgoutput_message;
      725:  268:	cb->commit_cb = pgoutput_commit_txn;
        -:  269:
      725:  270:	cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
      725:  271:	cb->prepare_cb = pgoutput_prepare_txn;
      725:  272:	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
      725:  273:	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
      725:  274:	cb->filter_by_origin_cb = pgoutput_origin_filter;
      725:  275:	cb->shutdown_cb = pgoutput_shutdown;
        -:  276:
        -:  277:	/* transaction streaming */
      725:  278:	cb->stream_start_cb = pgoutput_stream_start;
      725:  279:	cb->stream_stop_cb = pgoutput_stream_stop;
      725:  280:	cb->stream_abort_cb = pgoutput_stream_abort;
      725:  281:	cb->stream_commit_cb = pgoutput_stream_commit;
      725:  282:	cb->stream_change_cb = pgoutput_change;
      725:  283:	cb->stream_message_cb = pgoutput_message;
      725:  284:	cb->stream_truncate_cb = pgoutput_truncate;
        -:  285:	/* transaction streaming - two-phase commit */
      725:  286:	cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
      725:  287:}
        -:  288:
        -:  289:static void
function parse_output_parameters called 400 returned 100% blocks executed 36%
      400:  290:parse_output_parameters(List *options, PGOutputData *data)
        -:  291:{
        -:  292:	ListCell   *lc;
        -:  293:	bool		protocol_version_given = false;
        -:  294:	bool		publication_names_given = false;
        -:  295:	bool		binary_option_given = false;
        -:  296:	bool		messages_option_given = false;
        -:  297:	bool		streaming_given = false;
        -:  298:	bool		two_phase_option_given = false;
        -:  299:	bool		origin_option_given = false;
        -:  300:
        -:  301:	/* Initialize optional parameters to defaults */
      400:  302:	data->binary = false;
      400:  303:	data->streaming = LOGICALREP_STREAM_OFF;
      400:  304:	data->messages = false;
      400:  305:	data->two_phase = false;
      400:  306:	data->publish_no_origin = false;
        -:  307:
     1995:  308:	foreach(lc, options)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 80% (fallthrough)
branch  3 taken 20%
        -:  309:	{
     1595:  310:		DefElem    *defel = (DefElem *) lfirst(lc);
        -:  311:
    1595*:  312:		Assert(defel->arg == NULL || IsA(defel->arg, String));
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
branch  3 taken 100%
branch  4 taken 0%
        -:  313:
        -:  314:		/* Check each param, whether or not we recognize it */
     1595:  315:		if (strcmp(defel->defname, "proto_version") == 0)
branch  0 taken 25% (fallthrough)
branch  1 taken 75%
        -:  316:		{
        -:  317:			unsigned long parsed;
        -:  318:			char	   *endptr;
        -:  319:
      400:  320:			if (protocol_version_given)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  321:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  322:						(errcode(ERRCODE_SYNTAX_ERROR),
        -:  323:						 errmsg("conflicting or redundant options")));
        -:  324:			protocol_version_given = true;
        -:  325:
      400:  326:			errno = 0;
      400:  327:			parsed = strtoul(strVal(defel->arg), &endptr, 10);
call    0 returned 100%
call    1 returned 100%
      400:  328:			if (errno != 0 || *endptr != '\0')
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 0% (fallthrough)
branch  3 taken 100%
    #####:  329:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  330:						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        -:  331:						 errmsg("invalid proto_version")));
        -:  332:
      400:  333:			if (parsed > PG_UINT32_MAX)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  334:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
call    7 never executed
        -:  335:						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        -:  336:						 errmsg("proto_version \"%s\" out of range",
        -:  337:								strVal(defel->arg))));
        -:  338:
      400:  339:			data->protocol_version = (uint32) parsed;
        -:  340:		}
     1195:  341:		else if (strcmp(defel->defname, "publication_names") == 0)
branch  0 taken 33% (fallthrough)
branch  1 taken 67%
        -:  342:		{
      400:  343:			if (publication_names_given)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  344:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  345:						(errcode(ERRCODE_SYNTAX_ERROR),
        -:  346:						 errmsg("conflicting or redundant options")));
        -:  347:			publication_names_given = true;
        -:  348:
      400:  349:			if (!SplitIdentifierString(strVal(defel->arg), ',',
call    0 returned 100%
call    1 returned 100%
branch  2 taken 0% (fallthrough)
branch  3 taken 100%
        -:  350:									   &data->publication_names))
    #####:  351:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  352:						(errcode(ERRCODE_INVALID_NAME),
        -:  353:						 errmsg("invalid publication_names syntax")));
        -:  354:		}
      795:  355:		else if (strcmp(defel->defname, "binary") == 0)
branch  0 taken 1% (fallthrough)
branch  1 taken 99%
        -:  356:		{
       10:  357:			if (binary_option_given)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  358:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  359:						(errcode(ERRCODE_SYNTAX_ERROR),
        -:  360:						 errmsg("conflicting or redundant options")));
        -:  361:			binary_option_given = true;
        -:  362:
       10:  363:			data->binary = defGetBoolean(defel);
call    0 returned 100%
        -:  364:		}
      785:  365:		else if (strcmp(defel->defname, "messages") == 0)
branch  0 taken 1% (fallthrough)
branch  1 taken 99%
        -:  366:		{
        4:  367:			if (messages_option_given)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  368:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  369:						(errcode(ERRCODE_SYNTAX_ERROR),
        -:  370:						 errmsg("conflicting or redundant options")));
        -:  371:			messages_option_given = true;
        -:  372:
        4:  373:			data->messages = defGetBoolean(defel);
call    0 returned 100%
        -:  374:		}
      781:  375:		else if (strcmp(defel->defname, "streaming") == 0)
branch  0 taken 49% (fallthrough)
branch  1 taken 51%
        -:  376:		{
      382:  377:			if (streaming_given)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  378:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  379:						(errcode(ERRCODE_SYNTAX_ERROR),
        -:  380:						 errmsg("conflicting or redundant options")));
        -:  381:			streaming_given = true;
        -:  382:
      382:  383:			data->streaming = defGetStreamingMode(defel);
call    0 returned 100%
        -:  384:		}
      399:  385:		else if (strcmp(defel->defname, "two_phase") == 0)
branch  0 taken 2% (fallthrough)
branch  1 taken 98%
        -:  386:		{
        8:  387:			if (two_phase_option_given)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  388:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  389:						(errcode(ERRCODE_SYNTAX_ERROR),
        -:  390:						 errmsg("conflicting or redundant options")));
        -:  391:			two_phase_option_given = true;
        -:  392:
        8:  393:			data->two_phase = defGetBoolean(defel);
call    0 returned 100%
        -:  394:		}
      391:  395:		else if (strcmp(defel->defname, "origin") == 0)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
        -:  396:		{
        -:  397:			char	   *origin;
        -:  398:
      391:  399:			if (origin_option_given)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  400:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  401:						errcode(ERRCODE_SYNTAX_ERROR),
        -:  402:						errmsg("conflicting or redundant options"));
        -:  403:			origin_option_given = true;
        -:  404:
      391:  405:			origin = defGetString(defel);
call    0 returned 100%
      391:  406:			if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
call    0 returned 100%
branch  1 taken 6% (fallthrough)
branch  2 taken 94%
       25:  407:				data->publish_no_origin = true;
      366:  408:			else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
call    0 returned 100%
branch  1 taken 100% (fallthrough)
branch  2 taken 0%
      366:  409:				data->publish_no_origin = false;
        -:  410:			else
    #####:  411:				ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  412:						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        -:  413:						errmsg("unrecognized origin value: \"%s\"", origin));
        -:  414:		}
        -:  415:		else
    #####:  416:			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
        -:  417:	}
        -:  418:
        -:  419:	/* Check required options */
      400:  420:	if (!protocol_version_given)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  421:		ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  422:				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        -:  423:				errmsg("option \"%s\" missing", "proto_version"));
      400:  424:	if (!publication_names_given)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  425:		ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  426:				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        -:  427:				errmsg("option \"%s\" missing", "publication_names"));
      400:  428:}
        -:  429:
        -:  430:/*
        -:  431: * Memory context reset callback of PGOutputData->context.
        -:  432: */
        -:  433:static void
function pgoutput_memory_context_reset called 1034 returned 100% blocks executed 100%
     1034:  434:pgoutput_memory_context_reset(void *arg)
        -:  435:{
     1034:  436:	if (RelationSyncCache)
branch  0 taken 19% (fallthrough)
branch  1 taken 81%
        -:  437:	{
      192:  438:		hash_destroy(RelationSyncCache);
call    0 returned 100%
      192:  439:		RelationSyncCache = NULL;
        -:  440:	}
     1034:  441:}
        -:  442:
        -:  443:/*
        -:  444: * Initialize this plugin
        -:  445: */
        -:  446:static void
function pgoutput_startup called 725 returned 100% blocks executed 41%
      725:  447:pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
        -:  448:				 bool is_init)
        -:  449:{
      725:  450:	PGOutputData *data = palloc0(sizeof(PGOutputData));
call    0 returned 100%
        -:  451:	static bool publication_callback_registered = false;
        -:  452:	MemoryContextCallback *mcallback;
        -:  453:
        -:  454:	/* Create our memory context for private allocations. */
      725:  455:	data->context = AllocSetContextCreate(ctx->context,
call    0 returned 100%
        -:  456:										  "logical replication output context",
        -:  457:										  ALLOCSET_DEFAULT_SIZES);
        -:  458:
      725:  459:	data->cachectx = AllocSetContextCreate(ctx->context,
call    0 returned 100%
        -:  460:										   "logical replication cache context",
        -:  461:										   ALLOCSET_DEFAULT_SIZES);
        -:  462:
      725:  463:	data->pubctx = AllocSetContextCreate(ctx->context,
call    0 returned 100%
        -:  464:										 "logical replication publication list context",
        -:  465:										 ALLOCSET_SMALL_SIZES);
        -:  466:
        -:  467:	/*
        -:  468:	 * Ensure to cleanup RelationSyncCache even when logical decoding invoked
        -:  469:	 * via SQL interface ends up with an error.
        -:  470:	 */
      725:  471:	mcallback = palloc0(sizeof(MemoryContextCallback));
call    0 returned 100%
      725:  472:	mcallback->func = pgoutput_memory_context_reset;
      725:  473:	MemoryContextRegisterResetCallback(ctx->context, mcallback);
call    0 returned 100%
        -:  474:
      725:  475:	ctx->output_plugin_private = data;
        -:  476:
        -:  477:	/* This plugin uses binary protocol. */
      725:  478:	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
        -:  479:
        -:  480:	/*
        -:  481:	 * This is replication start and not slot initialization.
        -:  482:	 *
        -:  483:	 * Parse and validate options passed by the client.
        -:  484:	 */
      725:  485:	if (!is_init)
branch  0 taken 55% (fallthrough)
branch  1 taken 45%
        -:  486:	{
        -:  487:		/* Parse the params and ERROR if we see any we don't recognize */
      400:  488:		parse_output_parameters(ctx->output_plugin_options, data);
call    0 returned 100%
        -:  489:
        -:  490:		/* Check if we support requested protocol */
      400:  491:		if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  492:			ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  493:					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
        -:  494:					 errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
        -:  495:							data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
        -:  496:
      400:  497:		if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  498:			ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  499:					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
        -:  500:					 errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
        -:  501:							data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
        -:  502:
        -:  503:		/*
        -:  504:		 * Decide whether to enable streaming. It is disabled by default, in
        -:  505:		 * which case we just update the flag in decoding context. Otherwise
        -:  506:		 * we only allow it with sufficient version of the protocol, and when
        -:  507:		 * the output plugin supports it.
        -:  508:		 */
      400:  509:		if (data->streaming == LOGICALREP_STREAM_OFF)
branch  0 taken 4% (fallthrough)
branch  1 taken 96%
       18:  510:			ctx->streaming = false;
      382:  511:		else if (data->streaming == LOGICALREP_STREAM_ON &&
branch  0 taken 7% (fallthrough)
branch  1 taken 93%
branch  2 taken 0% (fallthrough)
branch  3 taken 100%
        -:  512:				 data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
    #####:  513:			ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  514:					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        -:  515:					 errmsg("requested proto_version=%d does not support streaming, need %d or higher",
        -:  516:							data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
      382:  517:		else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
branch  0 taken 93% (fallthrough)
branch  1 taken 7%
branch  2 taken 0% (fallthrough)
branch  3 taken 100%
        -:  518:				 data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
    #####:  519:			ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  520:					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        -:  521:					 errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
        -:  522:							data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
      382:  523:		else if (!ctx->streaming)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  524:			ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  525:					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        -:  526:					 errmsg("streaming requested, but not supported by output plugin")));
        -:  527:
        -:  528:		/*
        -:  529:		 * Here, we just check whether the two-phase option is passed by
        -:  530:		 * plugin and decide whether to enable it at later point of time. It
        -:  531:		 * remains enabled if the previous start-up has done so. But we only
        -:  532:		 * allow the option to be passed in with sufficient version of the
        -:  533:		 * protocol, and when the output plugin supports it.
        -:  534:		 */
      400:  535:		if (!data->two_phase)
branch  0 taken 98% (fallthrough)
branch  1 taken 2%
      392:  536:			ctx->twophase_opt_given = false;
        8:  537:		else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  538:			ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  539:					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        -:  540:					 errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
        -:  541:							data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
        8:  542:		else if (!ctx->twophase)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####:  543:			ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
        -:  544:					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        -:  545:					 errmsg("two-phase commit requested, but not supported by output plugin")));
        -:  546:		else
        8:  547:			ctx->twophase_opt_given = true;
        -:  548:
        -:  549:		/* Init publication state. */
      400:  550:		data->publications = NIL;
      400:  551:		publications_valid = false;
        -:  552:
        -:  553:		/*
        -:  554:		 * Register callback for pg_publication if we didn't already do that
        -:  555:		 * during some previous call in this process.
        -:  556:		 */
      400:  557:		if (!publication_callback_registered)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
        -:  558:		{
      398:  559:			CacheRegisterSyscacheCallback(PUBLICATIONOID,
call    0 returned 100%
        -:  560:										  publication_invalidation_cb,
        -:  561:										  (Datum) 0);
      398:  562:			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
call    0 returned 100%
        -:  563:										 (Datum) 0);
      398:  564:			publication_callback_registered = true;
        -:  565:		}
        -:  566:
        -:  567:		/* Initialize relation schema cache. */
      400:  568:		init_rel_sync_cache(CacheMemoryContext);
call    0 returned 100%
        -:  569:	}
        -:  570:	else
        -:  571:	{
        -:  572:		/*
        -:  573:		 * Disable the streaming and prepared transactions during the slot
        -:  574:		 * initialization mode.
        -:  575:		 */
      325:  576:		ctx->streaming = false;
      325:  577:		ctx->twophase = false;
        -:  578:	}
      725:  579:}
        -:  580:
        -:  581:/*
        -:  582: * BEGIN callback.
        -:  583: *
        -:  584: * Don't send the BEGIN message here instead postpone it until the first
        -:  585: * change. In logical replication, a common scenario is to replicate a set of
        -:  586: * tables (instead of all tables) and transactions whose changes were on
        -:  587: * the table(s) that are not published will produce empty transactions. These
        -:  588: * empty transactions will send BEGIN and COMMIT messages to subscribers,
        -:  589: * using bandwidth on something with little/no use for logical replication.
        -:  590: */
        -:  591:static void
function pgoutput_begin_txn called 912 returned 100% blocks executed 100%
      912:  592:pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
        -:  593:{
      912:  594:	PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
call    0 returned 100%
        -:  595:													  sizeof(PGOutputTxnData));
        -:  596:
      912:  597:	txn->output_plugin_private = txndata;
      912:  598:}
        -:  599:
        -:  600:/*
        -:  601: * Send BEGIN.
        -:  602: *
        -:  603: * This is called while processing the first change of the transaction.
        -:  604: */
        -:  605:static void
function pgoutput_send_begin called 450 returned 100% blocks executed 78%
      450:  606:pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
        -:  607:{
      450:  608:	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
      450:  609:	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
        -:  610:
     450*:  611:	Assert(txndata);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
     450*:  612:	Assert(!txndata->sent_begin_txn);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -:  613:
      450:  614:	OutputPluginPrepareWrite(ctx, !send_replication_origin);
call    0 returned 100%
      450:  615:	logicalrep_write_begin(ctx->out, txn);
call    0 returned 100%
      450:  616:	txndata->sent_begin_txn = true;
        -:  617:
      450:  618:	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
call    0 returned 100%
        -:  619:					 send_replication_origin);
        -:  620:
      450:  621:	OutputPluginWrite(ctx, true);
call    0 returned 100%
      449:  622:}
        -:  623:
        -:  624:/*
        -:  625: * COMMIT callback
        -:  626: */
        -:  627:static void
function pgoutput_commit_txn called 908 returned 99% blocks executed 93%
      908:  628:pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        -:  629:					XLogRecPtr commit_lsn)
        -:  630:{
      908:  631:	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
        -:  632:	bool		sent_begin_txn;
        -:  633:
     908*:  634:	Assert(txndata);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -:  635:
        -:  636:	/*
        -:  637:	 * We don't need to send the commit message unless some relevant change
        -:  638:	 * from this transaction has been sent to the downstream.
        -:  639:	 */
      908:  640:	sent_begin_txn = txndata->sent_begin_txn;
      908:  641:	OutputPluginUpdateProgress(ctx, !sent_begin_txn);
call    0 returned 100%
      908:  642:	pfree(txndata);
call    0 returned 100%
      908:  643:	txn->output_plugin_private = NULL;
        -:  644:
      908:  645:	if (!sent_begin_txn)
branch  0 taken 51% (fallthrough)
branch  1 taken 49%
        -:  646:	{
      461:  647:		elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
call    0 returned 100%
branch  1 taken 25% (fallthrough)
branch  2 taken 75%
call    3 returned 100%
call    4 returned 100%
      461:  648:		return;
        -:  649:	}
        -:  650:
      447:  651:	OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
      447:  652:	logicalrep_write_commit(ctx->out, txn, commit_lsn);
call    0 returned 100%
      447:  653:	OutputPluginWrite(ctx, true);
call    0 returned 98%
        -:  654:}
        -:  655:
        -:  656:/*
        -:  657: * BEGIN PREPARE callback
        -:  658: */
        -:  659:static void
function pgoutput_begin_prepare_txn called 21 returned 100% blocks executed 100%
       21:  660:pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
        -:  661:{
       21:  662:	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
        -:  663:
       21:  664:	OutputPluginPrepareWrite(ctx, !send_replication_origin);
call    0 returned 100%
       21:  665:	logicalrep_write_begin_prepare(ctx->out, txn);
call    0 returned 100%
        -:  666:
       21:  667:	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
call    0 returned 100%
        -:  668:					 send_replication_origin);
        -:  669:
       21:  670:	OutputPluginWrite(ctx, true);
call    0 returned 100%
       21:  671:}
        -:  672:
        -:  673:/*
        -:  674: * PREPARE callback
        -:  675: */
        -:  676:static void
function pgoutput_prepare_txn called 21 returned 100% blocks executed 100%
       21:  677:pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        -:  678:					 XLogRecPtr prepare_lsn)
        -:  679:{
       21:  680:	OutputPluginUpdateProgress(ctx, false);
call    0 returned 100%
        -:  681:
       21:  682:	OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
       21:  683:	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
call    0 returned 100%
       21:  684:	OutputPluginWrite(ctx, true);
call    0 returned 100%
       21:  685:}
        -:  686:
        -:  687:/*
        -:  688: * COMMIT PREPARED callback
        -:  689: */
        -:  690:static void
function pgoutput_commit_prepared_txn called 25 returned 100% blocks executed 100%
       25:  691:pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        -:  692:							 XLogRecPtr commit_lsn)
        -:  693:{
       25:  694:	OutputPluginUpdateProgress(ctx, false);
call    0 returned 100%
        -:  695:
       25:  696:	OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
       25:  697:	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
call    0 returned 100%
       25:  698:	OutputPluginWrite(ctx, true);
call    0 returned 100%
       25:  699:}
        -:  700:
        -:  701:/*
        -:  702: * ROLLBACK PREPARED callback
        -:  703: */
        -:  704:static void
function pgoutput_rollback_prepared_txn called 9 returned 100% blocks executed 100%
        9:  705:pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
        -:  706:							   ReorderBufferTXN *txn,
        -:  707:							   XLogRecPtr prepare_end_lsn,
        -:  708:							   TimestampTz prepare_time)
        -:  709:{
        9:  710:	OutputPluginUpdateProgress(ctx, false);
call    0 returned 100%
        -:  711:
        9:  712:	OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
        9:  713:	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
call    0 returned 100%
        -:  714:									   prepare_time);
        9:  715:	OutputPluginWrite(ctx, true);
call    0 returned 100%
        9:  716:}
        -:  717:
        -:  718:/*
        -:  719: * Write the current schema of the relation and its ancestor (if any) if not
        -:  720: * done yet.
        -:  721: */
        -:  722:static void
function maybe_send_schema called 182265 returned 100% blocks executed 100%
   182265:  723:maybe_send_schema(LogicalDecodingContext *ctx,
        -:  724:				  ReorderBufferChange *change,
        -:  725:				  Relation relation, RelationSyncEntry *relentry)
        -:  726:{
   182265:  727:	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
        -:  728:	bool		schema_sent;
        -:  729:	TransactionId xid = InvalidTransactionId;
        -:  730:	TransactionId topxid = InvalidTransactionId;
        -:  731:
        -:  732:	/*
        -:  733:	 * Remember XID of the (sub)transaction for the change. We don't care if
        -:  734:	 * it's top-level transaction or not (we have already sent that XID in
        -:  735:	 * start of the current streaming block).
        -:  736:	 *
        -:  737:	 * If we're not in a streaming block, just use InvalidTransactionId and
        -:  738:	 * the write methods will not include it.
        -:  739:	 */
   182265:  740:	if (data->in_streaming)
branch  0 taken 97% (fallthrough)
branch  1 taken 3%
   175941:  741:		xid = change->txn->xid;
        -:  742:
   182265:  743:	if (rbtxn_is_subtxn(change->txn))
branch  0 taken 6% (fallthrough)
branch  1 taken 94%
    10169:  744:		topxid = rbtxn_get_toptxn(change->txn)->xid;
        -:  745:	else
        -:  746:		topxid = xid;
        -:  747:
        -:  748:	/*
        -:  749:	 * Do we need to send the schema? We do track streamed transactions
        -:  750:	 * separately, because those may be applied later (and the regular
        -:  751:	 * transactions won't see their effects until then) and in an order that
        -:  752:	 * we don't know at this point.
        -:  753:	 *
        -:  754:	 * XXX There is a scope of optimization here. Currently, we always send
        -:  755:	 * the schema first time in a streaming transaction but we can probably
        -:  756:	 * avoid that by checking 'relentry->schema_sent' flag. However, before
        -:  757:	 * doing that we need to study its impact on the case where we have a mix
        -:  758:	 * of streaming and non-streaming transactions.
        -:  759:	 */
   182265:  760:	if (data->in_streaming)
branch  0 taken 97% (fallthrough)
branch  1 taken 3%
   175941:  761:		schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
call    0 returned 100%
        -:  762:	else
     6324:  763:		schema_sent = relentry->schema_sent;
        -:  764:
        -:  765:	/* Nothing to do if we already sent the schema. */
   182265:  766:	if (schema_sent)
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        -:  767:		return;
        -:  768:
        -:  769:	/*
        -:  770:	 * Send the schema.  If the changes will be published using an ancestor's
        -:  771:	 * schema, not the relation's own, send that ancestor's schema before
        -:  772:	 * sending relation's own (XXX - maybe sending only the former suffices?).
        -:  773:	 */
      341:  774:	if (relentry->publish_as_relid != RelationGetRelid(relation))
branch  0 taken 10% (fallthrough)
branch  1 taken 90%
        -:  775:	{
       35:  776:		Relation	ancestor = RelationIdGetRelation(relentry->publish_as_relid);
call    0 returned 100%
        -:  777:
       35:  778:		send_relation_and_attrs(ancestor, xid, ctx, relentry);
call    0 returned 97%
       34:  779:		RelationClose(ancestor);
call    0 returned 100%
        -:  780:	}
        -:  781:
      340:  782:	send_relation_and_attrs(relation, xid, ctx, relentry);
call    0 returned 100%
        -:  783:
      339:  784:	if (data->in_streaming)
branch  0 taken 21% (fallthrough)
branch  1 taken 79%
       72:  785:		set_schema_sent_in_streamed_txn(relentry, topxid);
call    0 returned 100%
        -:  786:	else
      267:  787:		relentry->schema_sent = true;
        -:  788:}
        -:  789:
        -:  790:/*
        -:  791: * Sends a relation
        -:  792: */
        -:  793:static void
function send_relation_and_attrs called 375 returned 99% blocks executed 100%
      375:  794:send_relation_and_attrs(Relation relation, TransactionId xid,
        -:  795:						LogicalDecodingContext *ctx,
        -:  796:						RelationSyncEntry *relentry)
        -:  797:{
      375:  798:	TupleDesc	desc = RelationGetDescr(relation);
      375:  799:	Bitmapset  *columns = relentry->columns;
      375:  800:	PublishGencolsType include_gencols_type = relentry->include_gencols_type;
        -:  801:	int			i;
        -:  802:
        -:  803:	/*
        -:  804:	 * Write out type info if needed.  We do that only for user-created types.
        -:  805:	 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
        -:  806:	 * objects with hand-assigned OIDs to be "built in", not for instance any
        -:  807:	 * function or type defined in the information_schema. This is important
        -:  808:	 * because only hand-assigned OIDs can be expected to remain stable across
        -:  809:	 * major versions.
        -:  810:	 */
     1160:  811:	for (i = 0; i < desc->natts; i++)
branch  0 taken 68%
branch  1 taken 32% (fallthrough)
        -:  812:	{
        -:  813:		Form_pg_attribute att = TupleDescAttr(desc, i);
        -:  814:
      785:  815:		if (!logicalrep_should_publish_column(att, columns,
call    0 returned 100%
branch  1 taken 9% (fallthrough)
branch  2 taken 91%
        -:  816:											  include_gencols_type))
       71:  817:			continue;
        -:  818:
      714:  819:		if (att->atttypid < FirstGenbkiObjectId)
branch  0 taken 97% (fallthrough)
branch  1 taken 3%
      696:  820:			continue;
        -:  821:
       18:  822:		OutputPluginPrepareWrite(ctx, false);
call    0 returned 100%
       18:  823:		logicalrep_write_typ(ctx->out, xid, att->atttypid);
call    0 returned 100%
       18:  824:		OutputPluginWrite(ctx, false);
call    0 returned 100%
        -:  825:	}
        -:  826:
      375:  827:	OutputPluginPrepareWrite(ctx, false);
call    0 returned 100%
      375:  828:	logicalrep_write_rel(ctx->out, xid, relation, columns,
call    0 returned 100%
        -:  829:						 include_gencols_type);
      375:  830:	OutputPluginWrite(ctx, false);
call    0 returned 99%
      373:  831:}
        -:  832:
        -:  833:/*
        -:  834: * Executor state preparation for evaluation of row filter expressions for the
        -:  835: * specified relation.
        -:  836: */
        -:  837:static EState *
function create_estate_for_relation called 17 returned 100% blocks executed 100%
       17:  838:create_estate_for_relation(Relation rel)
        -:  839:{
        -:  840:	EState	   *estate;
        -:  841:	RangeTblEntry *rte;
       17:  842:	List	   *perminfos = NIL;
        -:  843:
       17:  844:	estate = CreateExecutorState();
call    0 returned 100%
        -:  845:
        -:  846:	rte = makeNode(RangeTblEntry);
       17:  847:	rte->rtekind = RTE_RELATION;
       17:  848:	rte->relid = RelationGetRelid(rel);
       17:  849:	rte->relkind = rel->rd_rel->relkind;
       17:  850:	rte->rellockmode = AccessShareLock;
        -:  851:
       17:  852:	addRTEPermissionInfo(&perminfos, rte);
call    0 returned 100%
        -:  853:
       17:  854:	ExecInitRangeTable(estate, list_make1(rte), perminfos,
call    0 returned 100%
call    1 returned 100%
call    2 returned 100%
        -:  855:					   bms_make_singleton(1));
        -:  856:
       17:  857:	estate->es_output_cid = GetCurrentCommandId(false);
call    0 returned 100%
        -:  858:
       17:  859:	return estate;
        -:  860:}
        -:  861:
        -:  862:/*
        -:  863: * Evaluates row filter.
        -:  864: *
        -:  865: * If the row filter evaluates to NULL, it is taken as false i.e. the change
        -:  866: * isn't replicated.
        -:  867: */
        -:  868:static bool
function pgoutput_row_filter_exec_expr called 38 returned 100% blocks executed 47%
       38:  869:pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
        -:  870:{
        -:  871:	Datum		ret;
        -:  872:	bool		isnull;
        -:  873:
      38*:  874:	Assert(state != NULL);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -:  875:
        -:  876:	ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
        -:  877:
      38*:  878:	elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
call    0 returned 100%
branch  1 taken 0% (fallthrough)
branch  2 taken 100%
branch  3 never executed
branch  4 never executed
branch  5 never executed
branch  6 never executed
branch  7 never executed
branch  8 never executed
call    9 never executed
call   10 never executed
        -:  879:		 isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
        -:  880:		 isnull ? "true" : "false");
        -:  881:
       38:  882:	if (isnull)
branch  0 taken 97% (fallthrough)
branch  1 taken 3%
        -:  883:		return false;
        -:  884:
       37:  885:	return DatumGetBool(ret);
        -:  886:}
        -:  887:
        -:  888:/*
        -:  889: * Make sure the per-entry memory context exists.
        -:  890: */
        -:  891:static void
function pgoutput_ensure_entry_cxt called 321 returned 100% blocks executed 100%
      321:  892:pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
        -:  893:{
        -:  894:	Relation	relation;
        -:  895:
        -:  896:	/* The context may already exist, in which case bail out. */
      321:  897:	if (entry->entry_cxt)
branch  0 taken 95% (fallthrough)
branch  1 taken 5%
        -:  898:		return;
        -:  899:
      304:  900:	relation = RelationIdGetRelation(entry->publish_as_relid);
call    0 returned 100%
        -:  901:
      304:  902:	entry->entry_cxt = AllocSetContextCreate(data->cachectx,
call    0 returned 100%
        -:  903:											 "entry private context",
        -:  904:											 ALLOCSET_SMALL_SIZES);
        -:  905:
      304:  906:	MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
call    0 returned 100%
call    1 returned 100%
        -:  907:									  RelationGetRelationName(relation));
        -:  908:}
        -:  909:
        -:  910:/*
        -:  911: * Initialize the row filter.
        -:  912: */
        -:  913:static void
function pgoutput_row_filter_init called 304 returned 100% blocks executed 100%
      304:  914:pgoutput_row_filter_init(PGOutputData *data, List *publications,
        -:  915:						 RelationSyncEntry *entry)
        -:  916:{
        -:  917:	ListCell   *lc;
      304:  918:	List	   *rfnodes[] = {NIL, NIL, NIL};	/* One per pubaction */
      304:  919:	bool		no_filter[] = {false, false, false};	/* One per pubaction */
        -:  920:	MemoryContext oldctx;
        -:  921:	int			idx;
        -:  922:	bool		has_filter = true;
      304:  923:	Oid			schemaid = get_rel_namespace(entry->publish_as_relid);
call    0 returned 100%
        -:  924:
        -:  925:	/*
        -:  926:	 * Find if there are any row filters for this relation. If there are, then
        -:  927:	 * prepare the necessary ExprState and cache it in entry->exprstate. To
        -:  928:	 * build an expression state, we need to ensure the following:
        -:  929:	 *
        -:  930:	 * All the given publication-table mappings must be checked.
        -:  931:	 *
        -:  932:	 * Multiple publications might have multiple row filters for this
        -:  933:	 * relation. Since row filter usage depends on the DML operation, there
        -:  934:	 * are multiple lists (one for each operation) to which row filters will
        -:  935:	 * be appended.
        -:  936:	 *
        -:  937:	 * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
        -:  938:	 * expression" so it takes precedence.
        -:  939:	 */
      325:  940:	foreach(lc, publications)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 95% (fallthrough)
branch  3 taken 5%
        -:  941:	{
      308:  942:		Publication *pub = lfirst(lc);
        -:  943:		HeapTuple	rftuple = NULL;
        -:  944:		Datum		rfdatum = 0;
      308:  945:		bool		pub_no_filter = true;
        -:  946:
        -:  947:		/*
        -:  948:		 * If the publication is FOR ALL TABLES, or the publication includes a
        -:  949:		 * FOR TABLES IN SCHEMA where the table belongs to the referred
        -:  950:		 * schema, then it is treated the same as if there are no row filters
        -:  951:		 * (even if other publications have a row filter).
        -:  952:		 */
      532:  953:		if (!pub->alltables &&
branch  0 taken 97% (fallthrough)
branch  1 taken 3%
branch  2 taken 73%
branch  3 taken 27%
      224:  954:			!SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
call    0 returned 100%
        -:  955:								   ObjectIdGetDatum(schemaid),
        -:  956:								   ObjectIdGetDatum(pub->oid)))
        -:  957:		{
        -:  958:			/*
        -:  959:			 * Check for the presence of a row filter in this publication.
        -:  960:			 */
      218:  961:			rftuple = SearchSysCache2(PUBLICATIONRELMAP,
call    0 returned 100%
        -:  962:									  ObjectIdGetDatum(entry->publish_as_relid),
        -:  963:									  ObjectIdGetDatum(pub->oid));
        -:  964:
      218:  965:			if (HeapTupleIsValid(rftuple))
branch  0 taken 94% (fallthrough)
branch  1 taken 6%
        -:  966:			{
        -:  967:				/* Null indicates no filter. */
      206:  968:				rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
call    0 returned 100%
        -:  969:										  Anum_pg_publication_rel_prqual,
        -:  970:										  &pub_no_filter);
        -:  971:			}
        -:  972:		}
        -:  973:
      308:  974:		if (pub_no_filter)
branch  0 taken 95% (fallthrough)
branch  1 taken 5%
        -:  975:		{
      294:  976:			if (rftuple)
branch  0 taken 65% (fallthrough)
branch  1 taken 35%
      192:  977:				ReleaseSysCache(rftuple);
call    0 returned 100%
        -:  978:
      294:  979:			no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
      294:  980:			no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
      294:  981:			no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
        -:  982:
        -:  983:			/*
        -:  984:			 * Quick exit if all the DML actions are publicized via this
        -:  985:			 * publication.
        -:  986:			 */
      294:  987:			if (no_filter[PUBACTION_INSERT] &&
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 98% (fallthrough)
branch  3 taken 2%
      287:  988:				no_filter[PUBACTION_UPDATE] &&
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
        -:  989:				no_filter[PUBACTION_DELETE])
        -:  990:			{
        -:  991:				has_filter = false;
      287:  992:				break;
        -:  993:			}
        -:  994:
        -:  995:			/* No additional work for this publication. Next one. */
        7:  996:			continue;
        -:  997:		}
        -:  998:
        -:  999:		/* Form the per pubaction row filter lists. */
       14: 1000:		if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 100% (fallthrough)
branch  3 taken 0%
       14: 1001:			rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
call    0 returned 100%
       14: 1002:												TextDatumGetCString(rfdatum));
call    0 returned 100%
       14: 1003:		if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 100% (fallthrough)
branch  3 taken 0%
       14: 1004:			rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
call    0 returned 100%
       14: 1005:												TextDatumGetCString(rfdatum));
call    0 returned 100%
       14: 1006:		if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 100% (fallthrough)
branch  3 taken 0%
       14: 1007:			rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
call    0 returned 100%
       14: 1008:												TextDatumGetCString(rfdatum));
call    0 returned 100%
        -: 1009:
       14: 1010:		ReleaseSysCache(rftuple);
call    0 returned 100%
        -: 1011:	}							/* loop all subscribed publications */
        -: 1012:
        -: 1013:	/* Clean the row filter */
     1216: 1014:	for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
branch  0 taken 75%
branch  1 taken 25% (fallthrough)
        -: 1015:	{
      912: 1016:		if (no_filter[idx])
branch  0 taken 95% (fallthrough)
branch  1 taken 5%
        -: 1017:		{
      870: 1018:			list_free_deep(rfnodes[idx]);
call    0 returned 100%
      870: 1019:			rfnodes[idx] = NIL;
        -: 1020:		}
        -: 1021:	}
        -: 1022:
      304: 1023:	if (has_filter)
branch  0 taken 6% (fallthrough)
branch  1 taken 94%
        -: 1024:	{
       17: 1025:		Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
call    0 returned 100%
        -: 1026:
       17: 1027:		pgoutput_ensure_entry_cxt(data, entry);
call    0 returned 100%
        -: 1028:
        -: 1029:		/*
        -: 1030:		 * Now all the filters for all pubactions are known. Combine them when
        -: 1031:		 * their pubactions are the same.
        -: 1032:		 */
       17: 1033:		oldctx = MemoryContextSwitchTo(entry->entry_cxt);
call    0 returned 100%
       17: 1034:		entry->estate = create_estate_for_relation(relation);
call    0 returned 100%
       68: 1035:		for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
branch  0 taken 75%
branch  1 taken 25% (fallthrough)
        -: 1036:		{
        -: 1037:			List	   *filters = NIL;
        -: 1038:			Expr	   *rfnode;
        -: 1039:
       51: 1040:			if (rfnodes[idx] == NIL)
branch  0 taken 41% (fallthrough)
branch  1 taken 59%
       21: 1041:				continue;
        -: 1042:
       63: 1043:			foreach(lc, rfnodes[idx])
branch  0 taken 52% (fallthrough)
branch  1 taken 48%
       33: 1044:				filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
call    0 returned 100%
call    1 returned 100%
call    2 returned 100%
        -: 1045:
        -: 1046:			/* combine the row filter and cache the ExprState */
       30: 1047:			rfnode = make_orclause(filters);
call    0 returned 100%
       30: 1048:			entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
call    0 returned 100%
        -: 1049:		}						/* for each pubaction */
        -: 1050:		MemoryContextSwitchTo(oldctx);
        -: 1051:
       17: 1052:		RelationClose(relation);
call    0 returned 100%
        -: 1053:	}
      304: 1054:}
        -: 1055:
        -: 1056:/*
        -: 1057: * If the table contains a generated column, check for any conflicting
        -: 1058: * values of 'publish_generated_columns' parameter in the publications.
        -: 1059: */
        -: 1060:static void
function check_and_init_gencol called 304 returned 100% blocks executed 69%
      304: 1061:check_and_init_gencol(PGOutputData *data, List *publications,
        -: 1062:					  RelationSyncEntry *entry)
        -: 1063:{
      304: 1064:	Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
call    0 returned 100%
      304: 1065:	TupleDesc	desc = RelationGetDescr(relation);
        -: 1066:	bool		gencolpresent = false;
        -: 1067:	bool		first = true;
        -: 1068:
        -: 1069:	/* Check if there is any generated column present. */
      930: 1070:	for (int i = 0; i < desc->natts; i++)
branch  0 taken 68%
branch  1 taken 32% (fallthrough)
        -: 1071:	{
        -: 1072:		Form_pg_attribute att = TupleDescAttr(desc, i);
        -: 1073:
      633: 1074:		if (att->attgenerated)
branch  0 taken 99% (fallthrough)
branch  1 taken 1%
        -: 1075:		{
        -: 1076:			gencolpresent = true;
        -: 1077:			break;
        -: 1078:		}
        -: 1079:	}
        -: 1080:
        -: 1081:	/* There are no generated columns to be published. */
      304: 1082:	if (!gencolpresent)
branch  0 taken 98% (fallthrough)
branch  1 taken 2%
        -: 1083:	{
      297: 1084:		entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
      297: 1085:		return;
        -: 1086:	}
        -: 1087:
        -: 1088:	/*
        -: 1089:	 * There may be a conflicting value for 'publish_generated_columns'
        -: 1090:	 * parameter in the publications.
        -: 1091:	 */
       15: 1092:	foreach_ptr(Publication, pub, publications)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 53% (fallthrough)
branch  3 taken 47%
        -: 1093:	{
        -: 1094:		/*
        -: 1095:		 * The column list takes precedence over the
        -: 1096:		 * 'publish_generated_columns' parameter. Those will be checked later,
        -: 1097:		 * see pgoutput_column_list_init.
        -: 1098:		 */
        8: 1099:		if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
call    0 returned 100%
branch  1 taken 38%
branch  2 taken 62%
        3: 1100:			continue;
        -: 1101:
        5: 1102:		if (first)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
        -: 1103:		{
        5: 1104:			entry->include_gencols_type = pub->pubgencols_type;
        -: 1105:			first = false;
        -: 1106:		}
    #####: 1107:		else if (entry->include_gencols_type != pub->pubgencols_type)
branch  0 never executed
branch  1 never executed
    #####: 1108:			ereport(ERROR,
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
call    5 never executed
call    6 never executed
call    7 never executed
        -: 1109:					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
        -: 1110:					errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
        -: 1111:						   get_namespace_name(RelationGetNamespace(relation)),
        -: 1112:						   RelationGetRelationName(relation)));
        -: 1113:	}
        -: 1114:}
        -: 1115:
        -: 1116:/*
        -: 1117: * Initialize the column list.
        -: 1118: */
        -: 1119:static void
function pgoutput_column_list_init called 304 returned 100% blocks executed 96%
      304: 1120:pgoutput_column_list_init(PGOutputData *data, List *publications,
        -: 1121:						  RelationSyncEntry *entry)
        -: 1122:{
        -: 1123:	ListCell   *lc;
        -: 1124:	bool		first = true;
      304: 1125:	Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
call    0 returned 100%
        -: 1126:	bool		found_pub_collist = false;
        -: 1127:	Bitmapset  *relcols = NULL;
        -: 1128:
      304: 1129:	pgoutput_ensure_entry_cxt(data, entry);
call    0 returned 100%
        -: 1130:
        -: 1131:	/*
        -: 1132:	 * Find if there are any column lists for this relation. If there are,
        -: 1133:	 * build a bitmap using the column lists.
        -: 1134:	 *
        -: 1135:	 * Multiple publications might have multiple column lists for this
        -: 1136:	 * relation.
        -: 1137:	 *
        -: 1138:	 * Note that we don't support the case where the column list is different
        -: 1139:	 * for the same table when combining publications. See comments atop
        -: 1140:	 * fetch_table_list. But one can later change the publication so we still
        -: 1141:	 * need to check all the given publication-table mappings and report an
        -: 1142:	 * error if any publications have a different column list.
        -: 1143:	 */
      618: 1144:	foreach(lc, publications)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 51% (fallthrough)
branch  3 taken 49%
        -: 1145:	{
      315: 1146:		Publication *pub = lfirst(lc);
      315: 1147:		Bitmapset  *cols = NULL;
        -: 1148:
        -: 1149:		/* Retrieve the bitmap of columns for a column list publication. */
      315: 1150:		found_pub_collist |= check_and_fetch_column_list(pub,
call    0 returned 100%
        -: 1151:														 entry->publish_as_relid,
        -: 1152:														 entry->entry_cxt, &cols);
        -: 1153:
        -: 1154:		/*
        -: 1155:		 * For non-column list publications — e.g. TABLE (without a column
        -: 1156:		 * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
        -: 1157:		 * of the table (including generated columns when
        -: 1158:		 * 'publish_generated_columns' parameter is true).
        -: 1159:		 */
      315: 1160:		if (!cols)
branch  0 taken 88%
branch  1 taken 12%
        -: 1161:		{
        -: 1162:			/*
        -: 1163:			 * Cache the table columns for the first publication with no
        -: 1164:			 * specified column list to detect publication with a different
        -: 1165:			 * column list.
        -: 1166:			 */
      276: 1167:			if (!relcols && (list_length(publications) > 1))
branch  0 taken 99% (fallthrough)
branch  1 taken 1%
branch  2 taken 3% (fallthrough)
branch  3 taken 97%
        -: 1168:			{
        9: 1169:				MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
call    0 returned 100%
        -: 1170:
        9: 1171:				relcols = pub_form_cols_map(relation,
call    0 returned 100%
        -: 1172:											entry->include_gencols_type);
        -: 1173:				MemoryContextSwitchTo(oldcxt);
        -: 1174:			}
        -: 1175:
      276: 1176:			cols = relcols;
        -: 1177:		}
        -: 1178:
      315: 1179:		if (first)
branch  0 taken 97% (fallthrough)
branch  1 taken 3%
        -: 1180:		{
      304: 1181:			entry->columns = cols;
        -: 1182:			first = false;
        -: 1183:		}
       11: 1184:		else if (!bms_equal(entry->columns, cols))
call    0 returned 100%
branch  1 taken 9% (fallthrough)
branch  2 taken 91%
       1*: 1185:			ereport(ERROR,
call    0 returned 100%
branch  1 taken 100% (fallthrough)
branch  2 taken 0%
call    3 returned 100%
call    4 returned 100%
call    5 returned 100%
call    6 returned 0%
call    7 never executed
        -: 1186:					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
        -: 1187:					errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
        -: 1188:						   get_namespace_name(RelationGetNamespace(relation)),
        -: 1189:						   RelationGetRelationName(relation)));
        -: 1190:	}							/* loop all subscribed publications */
        -: 1191:
        -: 1192:	/*
        -: 1193:	 * If no column list publications exist, columns to be published will be
        -: 1194:	 * computed later according to the 'publish_generated_columns' parameter.
        -: 1195:	 */
      303: 1196:	if (!found_pub_collist)
branch  0 taken 88% (fallthrough)
branch  1 taken 12%
      267: 1197:		entry->columns = NULL;
        -: 1198:
      303: 1199:	RelationClose(relation);
call    0 returned 100%
      303: 1200:}
        -: 1201:
        -: 1202:/*
        -: 1203: * Initialize the slot for storing new and old tuples, and build the map that
        -: 1204: * will be used to convert the relation's tuples into the ancestor's format.
        -: 1205: */
        -: 1206:static void
function init_tuple_slot called 304 returned 100% blocks executed 100%
      304: 1207:init_tuple_slot(PGOutputData *data, Relation relation,
        -: 1208:				RelationSyncEntry *entry)
        -: 1209:{
        -: 1210:	MemoryContext oldctx;
        -: 1211:	TupleDesc	oldtupdesc;
        -: 1212:	TupleDesc	newtupdesc;
        -: 1213:
      304: 1214:	oldctx = MemoryContextSwitchTo(data->cachectx);
call    0 returned 100%
        -: 1215:
        -: 1216:	/*
        -: 1217:	 * Create tuple table slots. Create a copy of the TupleDesc as it needs to
        -: 1218:	 * live as long as the cache remains.
        -: 1219:	 */
      304: 1220:	oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
call    0 returned 100%
      304: 1221:	newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
call    0 returned 100%
        -: 1222:
      304: 1223:	entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
call    0 returned 100%
      304: 1224:	entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
call    0 returned 100%
branch  1 taken 13% (fallthrough)
branch  2 taken 87%
        -: 1225:
        -: 1226:	MemoryContextSwitchTo(oldctx);
        -: 1227:
        -: 1228:	/*
        -: 1229:	 * Cache the map that will be used to convert the relation's tuples into
        -: 1230:	 * the ancestor's format, if needed.
        -: 1231:	 */
      304: 1232:	if (entry->publish_as_relid != RelationGetRelid(relation))
branch  0 taken 13% (fallthrough)
branch  1 taken 87%
        -: 1233:	{
       40: 1234:		Relation	ancestor = RelationIdGetRelation(entry->publish_as_relid);
call    0 returned 100%
       40: 1235:		TupleDesc	indesc = RelationGetDescr(relation);
       40: 1236:		TupleDesc	outdesc = RelationGetDescr(ancestor);
        -: 1237:
        -: 1238:		/* Map must live as long as the logical decoding context. */
       40: 1239:		oldctx = MemoryContextSwitchTo(data->cachectx);
call    0 returned 100%
        -: 1240:
       40: 1241:		entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
call    0 returned 100%
call    1 returned 100%
        -: 1242:
        -: 1243:		MemoryContextSwitchTo(oldctx);
       40: 1244:		RelationClose(ancestor);
call    0 returned 100%
        -: 1245:	}
      304: 1246:}
        -: 1247:
        -: 1248:/*
        -: 1249: * Change is checked against the row filter if any.
        -: 1250: *
        -: 1251: * Returns true if the change is to be replicated, else false.
        -: 1252: *
        -: 1253: * For inserts, evaluate the row filter for new tuple.
        -: 1254: * For deletes, evaluate the row filter for old tuple.
        -: 1255: * For updates, evaluate the row filter for old and new tuple.
        -: 1256: *
        -: 1257: * For updates, if both evaluations are true, we allow sending the UPDATE and
        -: 1258: * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
        -: 1259: * only one of the tuples matches the row filter expression, we transform
        -: 1260: * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
        -: 1261: * following rules:
        -: 1262: *
        -: 1263: * Case 1: old-row (no match)    new-row (no match)  -> (drop change)
        -: 1264: * Case 2: old-row (no match)    new row (match)     -> INSERT
        -: 1265: * Case 3: old-row (match)       new-row (no match)  -> DELETE
        -: 1266: * Case 4: old-row (match)       new row (match)     -> UPDATE
        -: 1267: *
        -: 1268: * The new action is updated in the action parameter.
        -: 1269: *
        -: 1270: * The new slot could be updated when transforming the UPDATE into INSERT,
        -: 1271: * because the original new tuple might not have column values from the replica
        -: 1272: * identity.
        -: 1273: *
        -: 1274: * Examples:
        -: 1275: * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
        -: 1276: * Since the old tuple satisfies, the initial table synchronization copied this
        -: 1277: * row (or another method was used to guarantee that there is data
        -: 1278: * consistency).  However, after the UPDATE the new tuple doesn't satisfy the
        -: 1279: * row filter, so from a data consistency perspective, that row should be
        -: 1280: * removed on the subscriber. The UPDATE should be transformed into a DELETE
        -: 1281: * statement and be sent to the subscriber. Keeping this row on the subscriber
        -: 1282: * is undesirable because it doesn't reflect what was defined in the row filter
        -: 1283: * expression on the publisher. This row on the subscriber would likely not be
        -: 1284: * modified by replication again. If someone inserted a new row with the same
        -: 1285: * old identifier, replication could stop due to a constraint violation.
        -: 1286: *
        -: 1287: * Let's say the old tuple doesn't match the row filter but the new tuple does.
        -: 1288: * Since the old tuple doesn't satisfy, the initial table synchronization
        -: 1289: * probably didn't copy this row. However, after the UPDATE the new tuple does
        -: 1290: * satisfy the row filter, so from a data consistency perspective, that row
        -: 1291: * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
        -: 1292: * statements have no effect (it matches no row -- see
        -: 1293: * apply_handle_update_internal()). So, the UPDATE should be transformed into a
        -: 1294: * INSERT statement and be sent to the subscriber. However, this might surprise
        -: 1295: * someone who expects the data set to satisfy the row filter expression on the
        -: 1296: * provider.
        -: 1297: */
        -: 1298:static bool
function pgoutput_row_filter called 182261 returned 100% blocks executed 84%
   182261: 1299:pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
        -: 1300:					TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
        -: 1301:					ReorderBufferChangeType *action)
        -: 1302:{
        -: 1303:	TupleDesc	desc;
        -: 1304:	int			i;
        -: 1305:	bool		old_matched,
        -: 1306:				new_matched,
        -: 1307:				result;
        -: 1308:	TupleTableSlot *tmp_new_slot;
   182261: 1309:	TupleTableSlot *new_slot = *new_slot_ptr;
        -: 1310:	ExprContext *ecxt;
        -: 1311:	ExprState  *filter_exprstate;
        -: 1312:
        -: 1313:	/*
        -: 1314:	 * We need this map to avoid relying on ReorderBufferChangeType enums
        -: 1315:	 * having specific values.
        -: 1316:	 */
        -: 1317:	static const int map_changetype_pubaction[] = {
        -: 1318:		[REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
        -: 1319:		[REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
        -: 1320:		[REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
        -: 1321:	};
        -: 1322:
  182261*: 1323:	Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 1324:		   *action == REORDER_BUFFER_CHANGE_UPDATE ||
        -: 1325:		   *action == REORDER_BUFFER_CHANGE_DELETE);
        -: 1326:
  182261*: 1327:	Assert(new_slot || old_slot);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 1328:
        -: 1329:	/* Get the corresponding row filter */
   182261: 1330:	filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
        -: 1331:
        -: 1332:	/* Bail out if there is no row filter */
   182261: 1333:	if (!filter_exprstate)
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        -: 1334:		return true;
        -: 1335:
      34*: 1336:	elog(DEBUG3, "table \"%s.%s\" has row filter",
call    0 returned 100%
branch  1 taken 0% (fallthrough)
branch  2 taken 100%
call    3 never executed
call    4 never executed
call    5 never executed
        -: 1337:		 get_namespace_name(RelationGetNamespace(relation)),
        -: 1338:		 RelationGetRelationName(relation));
        -: 1339:
       34: 1340:	ResetPerTupleExprContext(entry->estate);
branch  0 taken 71% (fallthrough)
branch  1 taken 29%
call    2 returned 100%
        -: 1341:
       34: 1342:	ecxt = GetPerTupleExprContext(entry->estate);
branch  0 taken 29% (fallthrough)
branch  1 taken 71%
call    2 returned 100%
        -: 1343:
        -: 1344:	/*
        -: 1345:	 * For the following occasions where there is only one tuple, we can
        -: 1346:	 * evaluate the row filter for that tuple and return.
        -: 1347:	 *
        -: 1348:	 * For inserts, we only have the new tuple.
        -: 1349:	 *
        -: 1350:	 * For updates, we can have only a new tuple when none of the replica
        -: 1351:	 * identity columns changed and none of those columns have external data
        -: 1352:	 * but we still need to evaluate the row filter for the new tuple as the
        -: 1353:	 * existing values of those columns might not match the filter. Also,
        -: 1354:	 * users can use constant expressions in the row filter, so we anyway need
        -: 1355:	 * to evaluate it for the new tuple.
        -: 1356:	 *
        -: 1357:	 * For deletes, we only have the old tuple.
        -: 1358:	 */
       34: 1359:	if (!new_slot || !old_slot)
branch  0 taken 88% (fallthrough)
branch  1 taken 12%
        -: 1360:	{
       30: 1361:		ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
branch  0 taken 7% (fallthrough)
branch  1 taken 93%
       30: 1362:		result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
call    0 returned 100%
        -: 1363:
       30: 1364:		return result;
        -: 1365:	}
        -: 1366:
        -: 1367:	/*
        -: 1368:	 * Both the old and new tuples must be valid only for updates and need to
        -: 1369:	 * be checked against the row filter.
        -: 1370:	 */
       4*: 1371:	Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 1372:
        4: 1373:	slot_getallattrs(new_slot);
call    0 returned 100%
        4: 1374:	slot_getallattrs(old_slot);
call    0 returned 100%
        -: 1375:
        -: 1376:	tmp_new_slot = NULL;
        4: 1377:	desc = RelationGetDescr(relation);
        -: 1378:
        -: 1379:	/*
        -: 1380:	 * The new tuple might not have all the replica identity columns, in which
        -: 1381:	 * case it needs to be copied over from the old tuple.
        -: 1382:	 */
       12: 1383:	for (i = 0; i < desc->natts; i++)
branch  0 taken 67%
branch  1 taken 33% (fallthrough)
        -: 1384:	{
        -: 1385:		CompactAttribute *att = TupleDescCompactAttr(desc, i);
        -: 1386:
        -: 1387:		/*
        -: 1388:		 * if the column in the new tuple or old tuple is null, nothing to do
        -: 1389:		 */
        8: 1390:		if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
branch  0 taken 88% (fallthrough)
branch  1 taken 12%
branch  2 taken 0% (fallthrough)
branch  3 taken 100%
        1: 1391:			continue;
        -: 1392:
        -: 1393:		/*
        -: 1394:		 * Unchanged toasted replica identity columns are only logged in the
        -: 1395:		 * old tuple. Copy this over to the new tuple. The changed (or WAL
        -: 1396:		 * Logged) toast values are always assembled in memory and set as
        -: 1397:		 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
        -: 1398:		 */
       11: 1399:		if (att->attlen == -1 &&
branch  0 taken 57% (fallthrough)
branch  1 taken 43%
branch  2 taken 25% (fallthrough)
branch  3 taken 75%
        5: 1400:			VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
branch  0 taken 25% (fallthrough)
branch  1 taken 75%
branch  2 taken 100% (fallthrough)
branch  3 taken 0%
        1: 1401:			!VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
        -: 1402:		{
        1: 1403:			if (!tmp_new_slot)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
        -: 1404:			{
        1: 1405:				tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
call    0 returned 100%
        -: 1406:				ExecClearTuple(tmp_new_slot);
        -: 1407:
        1: 1408:				memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
        1: 1409:					   desc->natts * sizeof(Datum));
        1: 1410:				memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
        1: 1411:					   desc->natts * sizeof(bool));
        -: 1412:			}
        -: 1413:
        1: 1414:			tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
        1: 1415:			tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
        -: 1416:		}
        -: 1417:	}
        -: 1418:
        4: 1419:	ecxt->ecxt_scantuple = old_slot;
        4: 1420:	old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
call    0 returned 100%
        -: 1421:
        4: 1422:	if (tmp_new_slot)
branch  0 taken 25% (fallthrough)
branch  1 taken 75%
        -: 1423:	{
        1: 1424:		ExecStoreVirtualTuple(tmp_new_slot);
call    0 returned 100%
        1: 1425:		ecxt->ecxt_scantuple = tmp_new_slot;
        -: 1426:	}
        -: 1427:	else
        3: 1428:		ecxt->ecxt_scantuple = new_slot;
        -: 1429:
        4: 1430:	new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
call    0 returned 100%
        -: 1431:
        -: 1432:	/*
        -: 1433:	 * Case 1: if both tuples don't match the row filter, bailout. Send
        -: 1434:	 * nothing.
        -: 1435:	 */
        4: 1436:	if (!old_matched && !new_matched)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
        -: 1437:		return false;
        -: 1438:
        -: 1439:	/*
        -: 1440:	 * Case 2: if the old tuple doesn't satisfy the row filter but the new
        -: 1441:	 * tuple does, transform the UPDATE into INSERT.
        -: 1442:	 *
        -: 1443:	 * Use the newly transformed tuple that must contain the column values for
        -: 1444:	 * all the replica identity columns. This is required to ensure that the
        -: 1445:	 * while inserting the tuple in the downstream node, we have all the
        -: 1446:	 * required column values.
        -: 1447:	 */
        4: 1448:	if (!old_matched && new_matched)
branch  0 taken 50% (fallthrough)
branch  1 taken 50%
        -: 1449:	{
        2: 1450:		*action = REORDER_BUFFER_CHANGE_INSERT;
        -: 1451:
        2: 1452:		if (tmp_new_slot)
branch  0 taken 50% (fallthrough)
branch  1 taken 50%
        1: 1453:			*new_slot_ptr = tmp_new_slot;
        -: 1454:	}
        -: 1455:
        -: 1456:	/*
        -: 1457:	 * Case 3: if the old tuple satisfies the row filter but the new tuple
        -: 1458:	 * doesn't, transform the UPDATE into DELETE.
        -: 1459:	 *
        -: 1460:	 * This transformation does not require another tuple. The Old tuple will
        -: 1461:	 * be used for DELETE.
        -: 1462:	 */
        2: 1463:	else if (old_matched && !new_matched)
branch  0 taken 50% (fallthrough)
branch  1 taken 50%
        1: 1464:		*action = REORDER_BUFFER_CHANGE_DELETE;
        -: 1465:
        -: 1466:	/*
        -: 1467:	 * Case 4: if both tuples match the row filter, transformation isn't
        -: 1468:	 * required. (*action is default UPDATE).
        -: 1469:	 */
        -: 1470:
        -: 1471:	return true;
        -: 1472:}
        -: 1473:
        -: 1474:/*
        -: 1475: * Sends the decoded DML over wire.
        -: 1476: *
        -: 1477: * This is called both in streaming and non-streaming modes.
        -: 1478: */
        -: 1479:static void
function pgoutput_change called 183433 returned 100% blocks executed 86%
   183433: 1480:pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        -: 1481:				Relation relation, ReorderBufferChange *change)
        -: 1482:{
   183433: 1483:	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
   183433: 1484:	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
        -: 1485:	MemoryContext old;
        -: 1486:	RelationSyncEntry *relentry;
        -: 1487:	TransactionId xid = InvalidTransactionId;
        -: 1488:	Relation	ancestor = NULL;
        -: 1489:	Relation	targetrel = relation;
   183433: 1490:	ReorderBufferChangeType action = change->action;
        -: 1491:	TupleTableSlot *old_slot = NULL;
   183433: 1492:	TupleTableSlot *new_slot = NULL;
        -: 1493:
   183433: 1494:	if (!is_publishable_relation(relation))
call    0 returned 100%
branch  1 taken 100% (fallthrough)
branch  2 taken 0%
     1171: 1495:		return;
        -: 1496:
        -: 1497:	/*
        -: 1498:	 * Remember the xid for the change in streaming mode. We need to send xid
        -: 1499:	 * with each change in the streaming mode so that subscriber can make
        -: 1500:	 * their association and on aborts, it can discard the corresponding
        -: 1501:	 * changes.
        -: 1502:	 */
   183433: 1503:	if (data->in_streaming)
branch  0 taken 96% (fallthrough)
branch  1 taken 4%
   175941: 1504:		xid = change->txn->xid;
        -: 1505:
   183433: 1506:	relentry = get_rel_sync_entry(data, relation);
call    0 returned 100%
        -: 1507:
        -: 1508:	/* First check the table filter */
   183432: 1509:	switch (action)
branch  0 taken 58%
branch  1 taken 19%
branch  2 taken 23%
branch  3 taken 0%
        -: 1510:	{
   106006: 1511:		case REORDER_BUFFER_CHANGE_INSERT:
   106006: 1512:			if (!relentry->pubactions.pubinsert)
branch  0 taken 100%
branch  1 taken 1%
        -: 1513:				return;
        -: 1514:			break;
    34491: 1515:		case REORDER_BUFFER_CHANGE_UPDATE:
    34491: 1516:			if (!relentry->pubactions.pubupdate)
branch  0 taken 100%
branch  1 taken 1%
        -: 1517:				return;
        -: 1518:			break;
    42935: 1519:		case REORDER_BUFFER_CHANGE_DELETE:
    42935: 1520:			if (!relentry->pubactions.pubdelete)
branch  0 taken 98% (fallthrough)
branch  1 taken 2%
        -: 1521:				return;
        -: 1522:
        -: 1523:			/*
        -: 1524:			 * This is only possible if deletes are allowed even when replica
        -: 1525:			 * identity is not defined for a table. Since the DELETE action
        -: 1526:			 * can't be published, we simply return.
        -: 1527:			 */
    41885: 1528:			if (!change->data.tp.oldtuple)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
        -: 1529:			{
    #####: 1530:				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
call    0 never executed
branch  1 never executed
branch  2 never executed
call    3 never executed
call    4 never executed
    #####: 1531:				return;
        -: 1532:			}
        -: 1533:			break;
        -: 1534:		default:
    #####: 1535:			Assert(false);
call    0 never executed
        -: 1536:	}
        -: 1537:
        -: 1538:	/* Avoid leaking memory by using and resetting our own context */
   182261: 1539:	old = MemoryContextSwitchTo(data->context);
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        -: 1540:
        -: 1541:	/* Switch relation if publishing via root. */
   182261: 1542:	if (relentry->publish_as_relid != RelationGetRelid(relation))
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        -: 1543:	{
      73*: 1544:		Assert(relation->rd_rel->relispartition);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
       73: 1545:		ancestor = RelationIdGetRelation(relentry->publish_as_relid);
call    0 returned 100%
        -: 1546:		targetrel = ancestor;
        -: 1547:	}
        -: 1548:
   182261: 1549:	if (change->data.tp.oldtuple)
branch  0 taken 23% (fallthrough)
branch  1 taken 77%
        -: 1550:	{
    42019: 1551:		old_slot = relentry->old_slot;
    42019: 1552:		ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
call    0 returned 100%
        -: 1553:
        -: 1554:		/* Convert tuple if needed. */
    42019: 1555:		if (relentry->attrmap)
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        -: 1556:		{
        5: 1557:			TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
call    0 returned 100%
        -: 1558:													  &TTSOpsVirtual);
        -: 1559:
        5: 1560:			old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
call    0 returned 100%
        -: 1561:		}
        -: 1562:	}
        -: 1563:
   182261: 1564:	if (change->data.tp.newtuple)
branch  0 taken 77% (fallthrough)
branch  1 taken 23%
        -: 1565:	{
   140376: 1566:		new_slot = relentry->new_slot;
   140376: 1567:		ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
call    0 returned 100%
        -: 1568:
        -: 1569:		/* Convert tuple if needed. */
   140376: 1570:		if (relentry->attrmap)
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        -: 1571:		{
       21: 1572:			TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
call    0 returned 100%
        -: 1573:													  &TTSOpsVirtual);
        -: 1574:
       21: 1575:			new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
call    0 returned 100%
        -: 1576:		}
        -: 1577:	}
        -: 1578:
        -: 1579:	/*
        -: 1580:	 * Check row filter.
        -: 1581:	 *
        -: 1582:	 * Updates could be transformed to inserts or deletes based on the results
        -: 1583:	 * of the row filter for old and new tuple.
        -: 1584:	 */
   182261: 1585:	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
call    0 returned 100%
branch  1 taken 1% (fallthrough)
branch  2 taken 100%
       12: 1586:		goto cleanup;
        -: 1587:
        -: 1588:	/*
        -: 1589:	 * Send BEGIN if we haven't yet.
        -: 1590:	 *
        -: 1591:	 * We send the BEGIN message after ensuring that we will actually send the
        -: 1592:	 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
        -: 1593:	 * transactions.
        -: 1594:	 */
   182249: 1595:	if (txndata && !txndata->sent_begin_txn)
branch  0 taken 3% (fallthrough)
branch  1 taken 97%
branch  2 taken 7% (fallthrough)
branch  3 taken 93%
      437: 1596:		pgoutput_send_begin(ctx, txn);
call    0 returned 100%
        -: 1597:
        -: 1598:	/*
        -: 1599:	 * Schema should be sent using the original relation because it also sends
        -: 1600:	 * the ancestor's relation.
        -: 1601:	 */
   182248: 1602:	maybe_send_schema(ctx, change, relation, relentry);
call    0 returned 100%
        -: 1603:
   182246: 1604:	OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
        -: 1605:
        -: 1606:	/* Send the data */
   182246: 1607:	switch (action)
branch  0 taken 58%
branch  1 taken 19%
branch  2 taken 23%
branch  3 taken 0%
        -: 1608:	{
   105916: 1609:		case REORDER_BUFFER_CHANGE_INSERT:
   105916: 1610:			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
   105916: 1611:									data->binary, relentry->columns,
call    0 returned 100%
        -: 1612:									relentry->include_gencols_type);
   105916: 1613:			break;
    34444: 1614:		case REORDER_BUFFER_CHANGE_UPDATE:
    34444: 1615:			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
    34444: 1616:									new_slot, data->binary, relentry->columns,
call    0 returned 100%
        -: 1617:									relentry->include_gencols_type);
    34444: 1618:			break;
    41886: 1619:		case REORDER_BUFFER_CHANGE_DELETE:
    41886: 1620:			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
    41886: 1621:									data->binary, relentry->columns,
call    0 returned 100%
        -: 1622:									relentry->include_gencols_type);
    41886: 1623:			break;
        -: 1624:		default:
    #####: 1625:			Assert(false);
call    0 never executed
        -: 1626:	}
        -: 1627:
   182246: 1628:	OutputPluginWrite(ctx, true);
call    0 returned 100%
        -: 1629:
   182258: 1630:cleanup:
   182258: 1631:	if (RelationIsValid(ancestor))
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        -: 1632:	{
       70: 1633:		RelationClose(ancestor);
call    0 returned 100%
        -: 1634:		ancestor = NULL;
        -: 1635:	}
        -: 1636:
        -: 1637:	/* Drop the new slots that were used to store the converted tuples. */
   182258: 1638:	if (relentry->attrmap)
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        -: 1639:	{
       26: 1640:		if (old_slot)
branch  0 taken 19% (fallthrough)
branch  1 taken 81%
        5: 1641:			ExecDropSingleTupleTableSlot(old_slot);
call    0 returned 100%
        -: 1642:
       26: 1643:		if (new_slot)
branch  0 taken 81% (fallthrough)
branch  1 taken 19%
       21: 1644:			ExecDropSingleTupleTableSlot(new_slot);
call    0 returned 100%
        -: 1645:	}
        -: 1646:
        -: 1647:	MemoryContextSwitchTo(old);
   182258: 1648:	MemoryContextReset(data->context);
call    0 returned 100%
        -: 1649:}
        -: 1650:
        -: 1651:static void
function pgoutput_truncate called 17 returned 100% blocks executed 92%
       17: 1652:pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        -: 1653:				  int nrelations, Relation relations[], ReorderBufferChange *change)
        -: 1654:{
       17: 1655:	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
       17: 1656:	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
        -: 1657:	MemoryContext old;
        -: 1658:	RelationSyncEntry *relentry;
        -: 1659:	int			i;
        -: 1660:	int			nrelids;
        -: 1661:	Oid		   *relids;
        -: 1662:	TransactionId xid = InvalidTransactionId;
        -: 1663:
        -: 1664:	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
       17: 1665:	if (data->in_streaming)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####: 1666:		xid = change->txn->xid;
        -: 1667:
       17: 1668:	old = MemoryContextSwitchTo(data->context);
call    0 returned 100%
        -: 1669:
       17: 1670:	relids = palloc0(nrelations * sizeof(Oid));
call    0 returned 100%
        -: 1671:	nrelids = 0;
        -: 1672:
       53: 1673:	for (i = 0; i < nrelations; i++)
branch  0 taken 68%
branch  1 taken 32% (fallthrough)
        -: 1674:	{
       36: 1675:		Relation	relation = relations[i];
       36: 1676:		Oid			relid = RelationGetRelid(relation);
        -: 1677:
      36*: 1678:		if (!is_publishable_relation(relation))
call    0 returned 100%
branch  1 taken 0% (fallthrough)
branch  2 taken 100%
    #####: 1679:			continue;
        -: 1680:
       36: 1681:		relentry = get_rel_sync_entry(data, relation);
call    0 returned 100%
        -: 1682:
       36: 1683:		if (!relentry->pubactions.pubtruncate)
branch  0 taken 44% (fallthrough)
branch  1 taken 56%
       16: 1684:			continue;
        -: 1685:
        -: 1686:		/*
        -: 1687:		 * Don't send partitions if the publication wants to send only the
        -: 1688:		 * root tables through it.
        -: 1689:		 */
       20: 1690:		if (relation->rd_rel->relispartition &&
branch  0 taken 75% (fallthrough)
branch  1 taken 25%
       15: 1691:			relentry->publish_as_relid != relid)
branch  0 taken 20% (fallthrough)
branch  1 taken 80%
        3: 1692:			continue;
        -: 1693:
       17: 1694:		relids[nrelids++] = relid;
        -: 1695:
        -: 1696:		/* Send BEGIN if we haven't yet */
       17: 1697:		if (txndata && !txndata->sent_begin_txn)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 65% (fallthrough)
branch  3 taken 35%
       11: 1698:			pgoutput_send_begin(ctx, txn);
call    0 returned 100%
        -: 1699:
       17: 1700:		maybe_send_schema(ctx, change, relation, relentry);
call    0 returned 100%
        -: 1701:	}
        -: 1702:
       17: 1703:	if (nrelids > 0)
branch  0 taken 65% (fallthrough)
branch  1 taken 35%
        -: 1704:	{
       11: 1705:		OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
       11: 1706:		logicalrep_write_truncate(ctx->out,
        -: 1707:								  xid,
        -: 1708:								  nrelids,
        -: 1709:								  relids,
       11: 1710:								  change->data.truncate.cascade,
       11: 1711:								  change->data.truncate.restart_seqs);
call    0 returned 100%
       11: 1712:		OutputPluginWrite(ctx, true);
call    0 returned 100%
        -: 1713:	}
        -: 1714:
        -: 1715:	MemoryContextSwitchTo(old);
       17: 1716:	MemoryContextReset(data->context);
call    0 returned 100%
       17: 1717:}
        -: 1718:
        -: 1719:static void
function pgoutput_message called 7 returned 100% blocks executed 91%
        7: 1720:pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        -: 1721:				 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
        -: 1722:				 const char *message)
        -: 1723:{
        7: 1724:	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
        -: 1725:	TransactionId xid = InvalidTransactionId;
        -: 1726:
        7: 1727:	if (!data->messages)
branch  0 taken 71% (fallthrough)
branch  1 taken 29%
        -: 1728:		return;
        -: 1729:
        -: 1730:	/*
        -: 1731:	 * Remember the xid for the message in streaming mode. See
        -: 1732:	 * pgoutput_change.
        -: 1733:	 */
        5: 1734:	if (data->in_streaming)
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
    #####: 1735:		xid = txn->xid;
        -: 1736:
        -: 1737:	/*
        -: 1738:	 * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
        -: 1739:	 */
        5: 1740:	if (transactional)
branch  0 taken 40% (fallthrough)
branch  1 taken 60%
        -: 1741:	{
        2: 1742:		PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
        -: 1743:
        -: 1744:		/* Send BEGIN if we haven't yet */
        2: 1745:		if (txndata && !txndata->sent_begin_txn)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 100% (fallthrough)
branch  3 taken 0%
        2: 1746:			pgoutput_send_begin(ctx, txn);
call    0 returned 100%
        -: 1747:	}
        -: 1748:
        5: 1749:	OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
        5: 1750:	logicalrep_write_message(ctx->out,
call    0 returned 100%
        -: 1751:							 xid,
        -: 1752:							 message_lsn,
        -: 1753:							 transactional,
        -: 1754:							 prefix,
        -: 1755:							 sz,
        -: 1756:							 message);
        5: 1757:	OutputPluginWrite(ctx, true);
call    0 returned 100%
        -: 1758:}
        -: 1759:
        -: 1760:/*
        -: 1761: * Return true if the data is associated with an origin and the user has
        -: 1762: * requested the changes that don't have an origin, false otherwise.
        -: 1763: */
        -: 1764:static bool
function pgoutput_origin_filter called 344827 returned 100% blocks executed 100%
   344827: 1765:pgoutput_origin_filter(LogicalDecodingContext *ctx,
        -: 1766:					   RepOriginId origin_id)
        -: 1767:{
   344827: 1768:	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
        -: 1769:
   344827: 1770:	if (data->publish_no_origin && origin_id != InvalidRepOriginId)
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
branch  2 taken 39% (fallthrough)
branch  3 taken 61%
      164: 1771:		return true;
        -: 1772:
        -: 1773:	return false;
        -: 1774:}
        -: 1775:
        -: 1776:/*
        -: 1777: * Shutdown the output plugin.
        -: 1778: *
        -: 1779: * Note, we don't need to clean the data->context, data->cachectx, and
        -: 1780: * data->pubctx as they are child contexts of the ctx->context so they
        -: 1781: * will be cleaned up by logical decoding machinery.
        -: 1782: */
        -: 1783:static void
function pgoutput_shutdown called 517 returned 100% blocks executed 100%
      517: 1784:pgoutput_shutdown(LogicalDecodingContext *ctx)
        -: 1785:{
      517: 1786:	pgoutput_memory_context_reset(NULL);
call    0 returned 100%
      517: 1787:}
        -: 1788:
        -: 1789:/*
        -: 1790: * Load publications from the list of publication names.
        -: 1791: *
        -: 1792: * Here, we skip the publications that don't exist yet. This will allow us
        -: 1793: * to silently continue the replication in the absence of a missing publication.
        -: 1794: * This is required because we allow the users to create publications after they
        -: 1795: * have specified the required publications at the time of replication start.
        -: 1796: */
        -: 1797:static List *
function LoadPublications called 196 returned 100% blocks executed 100%
      196: 1798:LoadPublications(List *pubnames)
        -: 1799:{
        -: 1800:	List	   *result = NIL;
        -: 1801:	ListCell   *lc;
        -: 1802:
      437: 1803:	foreach(lc, pubnames)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
branch  2 taken 55% (fallthrough)
branch  3 taken 45%
        -: 1804:	{
      241: 1805:		char	   *pubname = (char *) lfirst(lc);
      241: 1806:		Publication *pub = GetPublicationByName(pubname, true);
call    0 returned 100%
        -: 1807:
      241: 1808:		if (pub)
branch  0 taken 99%
branch  1 taken 1%
      239: 1809:			result = lappend(result, pub);
call    0 returned 100%
        -: 1810:		else
        2: 1811:			ereport(WARNING,
call    0 returned 100%
branch  1 taken 100% (fallthrough)
branch  2 taken 0%
call    3 returned 100%
call    4 returned 100%
call    5 returned 100%
call    6 returned 100%
call    7 returned 100%
        -: 1812:					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
        -: 1813:					errmsg("skipped loading publication \"%s\"", pubname),
        -: 1814:					errdetail("The publication does not exist at this point in the WAL."),
        -: 1815:					errhint("Create the publication if it does not exist."));
        -: 1816:	}
        -: 1817:
      196: 1818:	return result;
        -: 1819:}
        -: 1820:
        -: 1821:/*
        -: 1822: * Publication syscache invalidation callback.
        -: 1823: *
        -: 1824: * Called for invalidations on pg_publication.
        -: 1825: */
        -: 1826:static void
function publication_invalidation_cb called 313 returned 100% blocks executed 100%
      313: 1827:publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
        -: 1828:{
      313: 1829:	publications_valid = false;
      313: 1830:}
        -: 1831:
        -: 1832:/*
        -: 1833: * START STREAM callback
        -: 1834: */
        -: 1835:static void
function pgoutput_stream_start called 637 returned 100% blocks executed 89%
      637: 1836:pgoutput_stream_start(struct LogicalDecodingContext *ctx,
        -: 1837:					  ReorderBufferTXN *txn)
        -: 1838:{
      637: 1839:	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
      637: 1840:	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
        -: 1841:
        -: 1842:	/* we can't nest streaming of transactions */
     637*: 1843:	Assert(!data->in_streaming);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 1844:
        -: 1845:	/*
        -: 1846:	 * If we already sent the first stream for this transaction then don't
        -: 1847:	 * send the origin id in the subsequent streams.
        -: 1848:	 */
      637: 1849:	if (rbtxn_is_streamed(txn))
branch  0 taken 90% (fallthrough)
branch  1 taken 10%
        -: 1850:		send_replication_origin = false;
        -: 1851:
      637: 1852:	OutputPluginPrepareWrite(ctx, !send_replication_origin);
call    0 returned 100%
      637: 1853:	logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
call    0 returned 100%
        -: 1854:
      637: 1855:	send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
call    0 returned 100%
        -: 1856:					 send_replication_origin);
        -: 1857:
      637: 1858:	OutputPluginWrite(ctx, true);
call    0 returned 100%
        -: 1859:
        -: 1860:	/* we're streaming a chunk of transaction now */
      637: 1861:	data->in_streaming = true;
      637: 1862:}
        -: 1863:
        -: 1864:/*
        -: 1865: * STOP STREAM callback
        -: 1866: */
        -: 1867:static void
function pgoutput_stream_stop called 637 returned 100% blocks executed 83%
      637: 1868:pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
        -: 1869:					 ReorderBufferTXN *txn)
        -: 1870:{
      637: 1871:	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
        -: 1872:
        -: 1873:	/* we should be streaming a transaction */
     637*: 1874:	Assert(data->in_streaming);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 1875:
      637: 1876:	OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
      637: 1877:	logicalrep_write_stream_stop(ctx->out);
call    0 returned 100%
      637: 1878:	OutputPluginWrite(ctx, true);
call    0 returned 100%
        -: 1879:
        -: 1880:	/* we've stopped streaming a transaction */
      637: 1881:	data->in_streaming = false;
      637: 1882:}
        -: 1883:
        -: 1884:/*
        -: 1885: * Notify downstream to discard the streamed transaction (along with all
        -: 1886: * its subtransactions, if it's a toplevel transaction).
        -: 1887: */
        -: 1888:static void
function pgoutput_stream_abort called 26 returned 100% blocks executed 82%
       26: 1889:pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
        -: 1890:					  ReorderBufferTXN *txn,
        -: 1891:					  XLogRecPtr abort_lsn)
        -: 1892:{
        -: 1893:	ReorderBufferTXN *toptxn;
       26: 1894:	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
       26: 1895:	bool		write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
        -: 1896:
        -: 1897:	/*
        -: 1898:	 * The abort should happen outside streaming block, even for streamed
        -: 1899:	 * transactions. The transaction has to be marked as streamed, though.
        -: 1900:	 */
      26*: 1901:	Assert(!data->in_streaming);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 1902:
        -: 1903:	/* determine the toplevel transaction */
       26: 1904:	toptxn = rbtxn_get_toptxn(txn);
branch  0 taken 88% (fallthrough)
branch  1 taken 12%
        -: 1905:
      26*: 1906:	Assert(rbtxn_is_streamed(toptxn));
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 1907:
       26: 1908:	OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
       26: 1909:	logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
call    0 returned 100%
        -: 1910:								  txn->abort_time, write_abort_info);
        -: 1911:
       26: 1912:	OutputPluginWrite(ctx, true);
call    0 returned 100%
        -: 1913:
       26: 1914:	cleanup_rel_sync_cache(toptxn->xid, false);
call    0 returned 100%
       26: 1915:}
        -: 1916:
        -: 1917:/*
        -: 1918: * Notify downstream to apply the streamed transaction (along with all
        -: 1919: * its subtransactions).
        -: 1920: */
        -: 1921:static void
function pgoutput_stream_commit called 46 returned 100% blocks executed 80%
       46: 1922:pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
        -: 1923:					   ReorderBufferTXN *txn,
        -: 1924:					   XLogRecPtr commit_lsn)
        -: 1925:{
       46: 1926:	PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
        -: 1927:
        -: 1928:	/*
        -: 1929:	 * The commit should happen outside streaming block, even for streamed
        -: 1930:	 * transactions. The transaction has to be marked as streamed, though.
        -: 1931:	 */
      46*: 1932:	Assert(!data->in_streaming);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
      46*: 1933:	Assert(rbtxn_is_streamed(txn));
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 1934:
       46: 1935:	OutputPluginUpdateProgress(ctx, false);
call    0 returned 100%
        -: 1936:
       46: 1937:	OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
       46: 1938:	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
call    0 returned 100%
       46: 1939:	OutputPluginWrite(ctx, true);
call    0 returned 100%
        -: 1940:
       46: 1941:	cleanup_rel_sync_cache(txn->xid, true);
call    0 returned 100%
       46: 1942:}
        -: 1943:
        -: 1944:/*
        -: 1945: * PREPARE callback (for streaming two-phase commit).
        -: 1946: *
        -: 1947: * Notify the downstream to prepare the transaction.
        -: 1948: */
        -: 1949:static void
function pgoutput_stream_prepare_txn called 14 returned 100% blocks executed 86%
       14: 1950:pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
        -: 1951:							ReorderBufferTXN *txn,
        -: 1952:							XLogRecPtr prepare_lsn)
        -: 1953:{
      14*: 1954:	Assert(rbtxn_is_streamed(txn));
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 1955:
       14: 1956:	OutputPluginUpdateProgress(ctx, false);
call    0 returned 100%
       14: 1957:	OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
       14: 1958:	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
call    0 returned 100%
       14: 1959:	OutputPluginWrite(ctx, true);
call    0 returned 100%
       14: 1960:}
        -: 1961:
        -: 1962:/*
        -: 1963: * Initialize the relation schema sync cache for a decoding session.
        -: 1964: *
        -: 1965: * The hash table is destroyed at the end of a decoding session. While
        -: 1966: * relcache invalidations still exist and will still be invoked, they
        -: 1967: * will just see the null hash table global and take no action.
        -: 1968: */
        -: 1969:static void
function init_rel_sync_cache called 400 returned 100% blocks executed 90%
      400: 1970:init_rel_sync_cache(MemoryContext cachectx)
        -: 1971:{
        -: 1972:	HASHCTL		ctl;
        -: 1973:	static bool relation_callbacks_registered = false;
        -: 1974:
        -: 1975:	/* Nothing to do if hash table already exists */
      400: 1976:	if (RelationSyncCache != NULL)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
        2: 1977:		return;
        -: 1978:
        -: 1979:	/* Make a new hash table for the cache */
      400: 1980:	ctl.keysize = sizeof(Oid);
      400: 1981:	ctl.entrysize = sizeof(RelationSyncEntry);
      400: 1982:	ctl.hcxt = cachectx;
        -: 1983:
      400: 1984:	RelationSyncCache = hash_create("logical replication output relation cache",
call    0 returned 100%
        -: 1985:									128, &ctl,
        -: 1986:									HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
        -: 1987:
     400*: 1988:	Assert(RelationSyncCache != NULL);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 1989:
        -: 1990:	/* No more to do if we already registered callbacks */
      400: 1991:	if (relation_callbacks_registered)
branch  0 taken 100% (fallthrough)
branch  1 taken 0%
        -: 1992:		return;
        -: 1993:
        -: 1994:	/* We must update the cache entry for a relation after a relcache flush */
      398: 1995:	CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
call    0 returned 100%
        -: 1996:
        -: 1997:	/*
        -: 1998:	 * Flush all cache entries after a pg_namespace change, in case it was a
        -: 1999:	 * schema rename affecting a relation being replicated.
        -: 2000:	 *
        -: 2001:	 * XXX: It is not a good idea to invalidate all the relation entries in
        -: 2002:	 * RelationSyncCache on schema rename. We can optimize it to invalidate
        -: 2003:	 * only the required relations by either having a specific invalidation
        -: 2004:	 * message containing impacted relations or by having schema information
        -: 2005:	 * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
        -: 2006:	 * passed to the callback.
        -: 2007:	 */
      398: 2008:	CacheRegisterSyscacheCallback(NAMESPACEOID,
call    0 returned 100%
        -: 2009:								  rel_sync_cache_publication_cb,
        -: 2010:								  (Datum) 0);
        -: 2011:
      398: 2012:	relation_callbacks_registered = true;
        -: 2013:}
        -: 2014:
        -: 2015:/*
        -: 2016: * We expect relatively small number of streamed transactions.
        -: 2017: */
        -: 2018:static bool
function get_schema_sent_in_streamed_txn called 175941 returned 100% blocks executed 100%
   175941: 2019:get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
        -: 2020:{
   175941: 2021:	return list_member_xid(entry->streamed_txns, xid);
call    0 returned 100%
        -: 2022:}
        -: 2023:
        -: 2024:/*
        -: 2025: * Add the xid in the rel sync entry for which we have already sent the schema
        -: 2026: * of the relation.
        -: 2027: */
        -: 2028:static void
function set_schema_sent_in_streamed_txn called 72 returned 100% blocks executed 100%
       72: 2029:set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
        -: 2030:{
        -: 2031:	MemoryContext oldctx;
        -: 2032:
       72: 2033:	oldctx = MemoryContextSwitchTo(CacheMemoryContext);
call    0 returned 100%
        -: 2034:
       72: 2035:	entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
call    0 returned 100%
        -: 2036:
        -: 2037:	MemoryContextSwitchTo(oldctx);
       72: 2038:}
        -: 2039:
        -: 2040:/*
        -: 2041: * Find or create entry in the relation schema cache.
        -: 2042: *
        -: 2043: * This looks up publications that the given relation is directly or
        -: 2044: * indirectly part of (the latter if it's really the relation's ancestor that
        -: 2045: * is part of a publication) and fills up the found entry with the information
        -: 2046: * about which operations to publish and whether to use an ancestor's schema
        -: 2047: * when publishing.
        -: 2048: */
        -: 2049:static RelationSyncEntry *
function get_rel_sync_entry called 183469 returned 100% blocks executed 94%
   183469: 2050:get_rel_sync_entry(PGOutputData *data, Relation relation)
        -: 2051:{
        -: 2052:	RelationSyncEntry *entry;
        -: 2053:	bool		found;
        -: 2054:	MemoryContext oldctx;
   183469: 2055:	Oid			relid = RelationGetRelid(relation);
        -: 2056:
  183469*: 2057:	Assert(RelationSyncCache != NULL);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 2058:
        -: 2059:	/* Find cached relation info, creating if not found */
   183469: 2060:	entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
call    0 returned 100%
        -: 2061:											  &relid,
        -: 2062:											  HASH_ENTER, &found);
  183469*: 2063:	Assert(entry != NULL);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 2064:
        -: 2065:	/* initialize entry, if it's new */
   183469: 2066:	if (!found)
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        -: 2067:	{
      294: 2068:		entry->replicate_valid = false;
      294: 2069:		entry->schema_sent = false;
      294: 2070:		entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
      294: 2071:		entry->streamed_txns = NIL;
      294: 2072:		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
      294: 2073:			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
      294: 2074:		entry->new_slot = NULL;
      294: 2075:		entry->old_slot = NULL;
      294: 2076:		memset(entry->exprstate, 0, sizeof(entry->exprstate));
      294: 2077:		entry->entry_cxt = NULL;
      294: 2078:		entry->publish_as_relid = InvalidOid;
      294: 2079:		entry->columns = NULL;
      294: 2080:		entry->attrmap = NULL;
        -: 2081:	}
        -: 2082:
        -: 2083:	/* Validate the entry */
   183469: 2084:	if (!entry->replicate_valid)
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        -: 2085:	{
      376: 2086:		Oid			schemaId = get_rel_namespace(relid);
call    0 returned 100%
      376: 2087:		List	   *pubids = GetRelationPublications(relid);
call    0 returned 100%
        -: 2088:
        -: 2089:		/*
        -: 2090:		 * We don't acquire a lock on the namespace system table as we build
        -: 2091:		 * the cache entry using a historic snapshot and all the later changes
        -: 2092:		 * are absorbed while decoding WAL.
        -: 2093:		 */
      376: 2094:		List	   *schemaPubids = GetSchemaPublications(schemaId);
call    0 returned 100%
        -: 2095:		ListCell   *lc;
      376: 2096:		Oid			publish_as_relid = relid;
        -: 2097:		int			publish_ancestor_level = 0;
      376: 2098:		bool		am_partition = get_rel_relispartition(relid);
call    0 returned 100%
      376: 2099:		char		relkind = get_rel_relkind(relid);
call    0 returned 100%
        -: 2100:		List	   *rel_publications = NIL;
        -: 2101:
        -: 2102:		/* Reload publications if needed before use. */
      376: 2103:		if (!publications_valid)
branch  0 taken 52% (fallthrough)
branch  1 taken 48%
        -: 2104:		{
      196: 2105:			MemoryContextReset(data->pubctx);
call    0 returned 100%
        -: 2106:
      196: 2107:			oldctx = MemoryContextSwitchTo(data->pubctx);
call    0 returned 100%
      196: 2108:			data->publications = LoadPublications(data->publication_names);
call    0 returned 100%
        -: 2109:			MemoryContextSwitchTo(oldctx);
      196: 2110:			publications_valid = true;
        -: 2111:		}
        -: 2112:
        -: 2113:		/*
        -: 2114:		 * Reset schema_sent status as the relation definition may have
        -: 2115:		 * changed.  Also reset pubactions to empty in case rel was dropped
        -: 2116:		 * from a publication.  Also free any objects that depended on the
        -: 2117:		 * earlier definition.
        -: 2118:		 */
      376: 2119:		entry->schema_sent = false;
      376: 2120:		entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
      376: 2121:		list_free(entry->streamed_txns);
call    0 returned 100%
      376: 2122:		entry->streamed_txns = NIL;
      376: 2123:		bms_free(entry->columns);
call    0 returned 100%
      376: 2124:		entry->columns = NULL;
      376: 2125:		entry->pubactions.pubinsert = false;
      376: 2126:		entry->pubactions.pubupdate = false;
      376: 2127:		entry->pubactions.pubdelete = false;
      376: 2128:		entry->pubactions.pubtruncate = false;
        -: 2129:
        -: 2130:		/*
        -: 2131:		 * Tuple slots cleanups. (Will be rebuilt later if needed).
        -: 2132:		 */
      376: 2133:		if (entry->old_slot)
branch  0 taken 16% (fallthrough)
branch  1 taken 84%
        -: 2134:		{
       60: 2135:			TupleDesc	desc = entry->old_slot->tts_tupleDescriptor;
        -: 2136:
      60*: 2137:			Assert(desc->tdrefcount == -1);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 2138:
       60: 2139:			ExecDropSingleTupleTableSlot(entry->old_slot);
call    0 returned 100%
        -: 2140:
        -: 2141:			/*
        -: 2142:			 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
        -: 2143:			 * do it now to avoid any leaks.
        -: 2144:			 */
       60: 2145:			FreeTupleDesc(desc);
call    0 returned 100%
        -: 2146:		}
      376: 2147:		if (entry->new_slot)
branch  0 taken 16% (fallthrough)
branch  1 taken 84%
        -: 2148:		{
       60: 2149:			TupleDesc	desc = entry->new_slot->tts_tupleDescriptor;
        -: 2150:
      60*: 2151:			Assert(desc->tdrefcount == -1);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 2152:
       60: 2153:			ExecDropSingleTupleTableSlot(entry->new_slot);
call    0 returned 100%
        -: 2154:
        -: 2155:			/*
        -: 2156:			 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
        -: 2157:			 * do it now to avoid any leaks.
        -: 2158:			 */
       60: 2159:			FreeTupleDesc(desc);
call    0 returned 100%
        -: 2160:		}
        -: 2161:
      376: 2162:		entry->old_slot = NULL;
      376: 2163:		entry->new_slot = NULL;
        -: 2164:
      376: 2165:		if (entry->attrmap)
branch  0 taken 1% (fallthrough)
branch  1 taken 99%
        3: 2166:			free_attrmap(entry->attrmap);
call    0 returned 100%
      376: 2167:		entry->attrmap = NULL;
        -: 2168:
        -: 2169:		/*
        -: 2170:		 * Row filter cache cleanups.
        -: 2171:		 */
      376: 2172:		if (entry->entry_cxt)
branch  0 taken 16% (fallthrough)
branch  1 taken 84%
       60: 2173:			MemoryContextDelete(entry->entry_cxt);
call    0 returned 100%
        -: 2174:
      376: 2175:		entry->entry_cxt = NULL;
      376: 2176:		entry->estate = NULL;
      376: 2177:		memset(entry->exprstate, 0, sizeof(entry->exprstate));
        -: 2178:
        -: 2179:		/*
        -: 2180:		 * Build publication cache. We can't use one provided by relcache as
        -: 2181:		 * relcache considers all publications that the given relation is in,
        -: 2182:		 * but here we only need to consider ones that the subscriber
        -: 2183:		 * requested.
        -: 2184:		 */
      917: 2185:		foreach(lc, data->publications)
branch  0 taken 100% (fallthrough)
branch  1 taken 1%
branch  2 taken 59% (fallthrough)
branch  3 taken 41%
        -: 2186:		{
      541: 2187:			Publication *pub = lfirst(lc);
        -: 2188:			bool		publish = false;
        -: 2189:
        -: 2190:			/*
        -: 2191:			 * Under what relid should we publish changes in this publication?
        -: 2192:			 * We'll use the top-most relid across all publications. Also
        -: 2193:			 * track the ancestor level for this publication.
        -: 2194:			 */
      541: 2195:			Oid			pub_relid = relid;
        -: 2196:			int			ancestor_level = 0;
        -: 2197:
        -: 2198:			/*
        -: 2199:			 * If this is a FOR ALL TABLES publication, pick the partition
        -: 2200:			 * root and set the ancestor level accordingly.
        -: 2201:			 */
      541: 2202:			if (pub->alltables)
branch  0 taken 16%
branch  1 taken 84%
        -: 2203:			{
        -: 2204:				publish = true;
       86: 2205:				if (pub->pubviaroot && am_partition)
branch  0 taken 19% (fallthrough)
branch  1 taken 81%
branch  2 taken 100% (fallthrough)
branch  3 taken 0%
        -: 2206:				{
       16: 2207:					List	   *ancestors = get_partition_ancestors(relid);
call    0 returned 100%
        -: 2208:
       16: 2209:					pub_relid = llast_oid(ancestors);
call    0 returned 100%
branch  1 taken 100% (fallthrough)
branch  2 taken 0%
        -: 2210:					ancestor_level = list_length(ancestors);
        -: 2211:				}
        -: 2212:			}
        -: 2213:
      541: 2214:			if (!publish)
branch  0 taken 84% (fallthrough)
branch  1 taken 16%
        -: 2215:			{
        -: 2216:				bool		ancestor_published = false;
        -: 2217:
        -: 2218:				/*
        -: 2219:				 * For a partition, check if any of the ancestors are
        -: 2220:				 * published.  If so, note down the topmost ancestor that is
        -: 2221:				 * published via this publication, which will be used as the
        -: 2222:				 * relation via which to publish the partition's changes.
        -: 2223:				 */
      455: 2224:				if (am_partition)
branch  0 taken 27% (fallthrough)
branch  1 taken 73%
        -: 2225:				{
        -: 2226:					Oid			ancestor;
        -: 2227:					int			level;
      121: 2228:					List	   *ancestors = get_partition_ancestors(relid);
call    0 returned 100%
        -: 2229:
      121: 2230:					ancestor = GetTopMostAncestorInPublication(pub->oid,
call    0 returned 100%
        -: 2231:															   ancestors,
        -: 2232:															   &level);
        -: 2233:
      121: 2234:					if (ancestor != InvalidOid)
branch  0 taken 40% (fallthrough)
branch  1 taken 60%
        -: 2235:					{
        -: 2236:						ancestor_published = true;
       48: 2237:						if (pub->pubviaroot)
branch  0 taken 52% (fallthrough)
branch  1 taken 48%
        -: 2238:						{
        -: 2239:							pub_relid = ancestor;
       25: 2240:							ancestor_level = level;
        -: 2241:						}
        -: 2242:					}
        -: 2243:				}
        -: 2244:
      709: 2245:				if (list_member_oid(pubids, pub->oid) ||
call    0 returned 100%
branch  1 taken 56% (fallthrough)
branch  2 taken 44%
branch  3 taken 98% (fallthrough)
branch  4 taken 2%
      502: 2246:					list_member_oid(schemaPubids, pub->oid) ||
call    0 returned 100%
branch  1 taken 11% (fallthrough)
branch  2 taken 89%
        -: 2247:					ancestor_published)
        -: 2248:					publish = true;
        -: 2249:			}
        -: 2250:
        -: 2251:			/*
        -: 2252:			 * If the relation is to be published, determine actions to
        -: 2253:			 * publish, and list of columns, if appropriate.
        -: 2254:			 *
        -: 2255:			 * Don't publish changes for partitioned tables, because
        -: 2256:			 * publishing those of its partitions suffices, unless partition
        -: 2257:			 * changes won't be published due to pubviaroot being set.
        -: 2258:			 */
      541: 2259:			if (publish &&
branch  0 taken 59% (fallthrough)
branch  1 taken 41%
branch  2 taken 1% (fallthrough)
branch  3 taken 99%
        4: 2260:				(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
branch  0 taken 25% (fallthrough)
branch  1 taken 75%
        -: 2261:			{
      318: 2262:				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
      318: 2263:				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
      318: 2264:				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
      318: 2265:				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
        -: 2266:
        -: 2267:				/*
        -: 2268:				 * We want to publish the changes as the top-most ancestor
        -: 2269:				 * across all publications. So we need to check if the already
        -: 2270:				 * calculated level is higher than the new one. If yes, we can
        -: 2271:				 * ignore the new value (as it's a child). Otherwise the new
        -: 2272:				 * value is an ancestor, so we keep it.
        -: 2273:				 */
      318: 2274:				if (publish_ancestor_level > ancestor_level)
branch  0 taken 1% (fallthrough)
branch  1 taken 100%
        1: 2275:					continue;
        -: 2276:
        -: 2277:				/*
        -: 2278:				 * If we found an ancestor higher up in the tree, discard the
        -: 2279:				 * list of publications through which we replicate it, and use
        -: 2280:				 * the new ancestor.
        -: 2281:				 */
      317: 2282:				if (publish_ancestor_level < ancestor_level)
branch  0 taken 87% (fallthrough)
branch  1 taken 13%
        -: 2283:				{
        -: 2284:					publish_as_relid = pub_relid;
        -: 2285:					publish_ancestor_level = ancestor_level;
        -: 2286:
        -: 2287:					/* reset the publication list for this relation */
        -: 2288:					rel_publications = NIL;
        -: 2289:				}
        -: 2290:				else
        -: 2291:				{
        -: 2292:					/* Same ancestor level, has to be the same OID. */
     276*: 2293:					Assert(publish_as_relid == pub_relid);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 2294:				}
        -: 2295:
        -: 2296:				/* Track publications for this ancestor. */
      317: 2297:				rel_publications = lappend(rel_publications, pub);
call    0 returned 100%
        -: 2298:			}
        -: 2299:		}
        -: 2300:
      376: 2301:		entry->publish_as_relid = publish_as_relid;
        -: 2302:
        -: 2303:		/*
        -: 2304:		 * Initialize the tuple slot, map, and row filter. These are only used
        -: 2305:		 * when publishing inserts, updates, or deletes.
        -: 2306:		 */
      376: 2307:		if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
branch  0 taken 81% (fallthrough)
branch  1 taken 19%
        -: 2308:			entry->pubactions.pubdelete)
        -: 2309:		{
        -: 2310:			/* Initialize the tuple slot and map */
      304: 2311:			init_tuple_slot(data, relation, entry);
call    0 returned 100%
        -: 2312:
        -: 2313:			/* Initialize the row filter */
      304: 2314:			pgoutput_row_filter_init(data, rel_publications, entry);
call    0 returned 100%
        -: 2315:
        -: 2316:			/* Check whether to publish generated columns. */
      304: 2317:			check_and_init_gencol(data, rel_publications, entry);
call    0 returned 100%
        -: 2318:
        -: 2319:			/* Initialize the column list */
      304: 2320:			pgoutput_column_list_init(data, rel_publications, entry);
call    0 returned 100%
        -: 2321:		}
        -: 2322:
      375: 2323:		list_free(pubids);
call    0 returned 100%
      375: 2324:		list_free(schemaPubids);
call    0 returned 100%
      375: 2325:		list_free(rel_publications);
call    0 returned 100%
        -: 2326:
      375: 2327:		entry->replicate_valid = true;
        -: 2328:	}
        -: 2329:
   183468: 2330:	return entry;
        -: 2331:}
        -: 2332:
        -: 2333:/*
        -: 2334: * Cleanup list of streamed transactions and update the schema_sent flag.
        -: 2335: *
        -: 2336: * When a streamed transaction commits or aborts, we need to remove the
        -: 2337: * toplevel XID from the schema cache. If the transaction aborted, the
        -: 2338: * subscriber will simply throw away the schema records we streamed, so
        -: 2339: * we don't need to do anything else.
        -: 2340: *
        -: 2341: * If the transaction is committed, the subscriber will update the relation
        -: 2342: * cache - so tweak the schema_sent flag accordingly.
        -: 2343: */
        -: 2344:static void
function cleanup_rel_sync_cache called 72 returned 100% blocks executed 94%
       72: 2345:cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
        -: 2346:{
        -: 2347:	HASH_SEQ_STATUS hash_seq;
        -: 2348:	RelationSyncEntry *entry;
        -: 2349:
      72*: 2350:	Assert(RelationSyncCache != NULL);
branch  0 taken 0% (fallthrough)
branch  1 taken 100%
call    2 never executed
        -: 2351:
       72: 2352:	hash_seq_init(&hash_seq, RelationSyncCache);
call    0 returned 100%
      147: 2353:	while ((entry = hash_seq_search(&hash_seq)) != NULL)
call    0 returned 100%
branch  1 taken 51%
branch  2 taken 49% (fallthrough)
        -: 2354:	{
        -: 2355:		/*
        -: 2356:		 * We can set the schema_sent flag for an entry that has committed xid
        -: 2357:		 * in the list as that ensures that the subscriber would have the
        -: 2358:		 * corresponding schema and we don't need to send it unless there is
        -: 2359:		 * any invalidation for that relation.
        -: 2360:		 */
       95: 2361:		foreach_xid(streamed_txn, entry->streamed_txns)
branch  0 taken 78% (fallthrough)
branch  1 taken 22%
branch  2 taken 100% (fallthrough)
branch  3 taken 0%
        -: 2362:		{
       74: 2363:			if (xid == streamed_txn)
branch  0 taken 73%
branch  1 taken 27%
        -: 2364:			{
       54: 2365:				if (is_commit)
branch  0 taken 80% (fallthrough)
branch  1 taken 20%
       43: 2366:					entry->schema_sent = true;
        -: 2367:
       54: 2368:				entry->streamed_txns =
       54: 2369:					foreach_delete_current(entry->streamed_txns, streamed_txn);
call    0 returned 100%
       54: 2370:				break;
        -: 2371:			}
        -: 2372:		}
        -: 2373:	}
       72: 2374:}
        -: 2375:
        -: 2376:/*
        -: 2377: * Relcache invalidation callback
        -: 2378: */
        -: 2379:static void
function rel_sync_cache_relation_cb called 3707 returned 100% blocks executed 100%
     3707: 2380:rel_sync_cache_relation_cb(Datum arg, Oid relid)
        -: 2381:{
        -: 2382:	RelationSyncEntry *entry;
        -: 2383:
        -: 2384:	/*
        -: 2385:	 * We can get here if the plugin was used in SQL interface as the
        -: 2386:	 * RelationSyncCache is destroyed when the decoding finishes, but there is
        -: 2387:	 * no way to unregister the relcache invalidation callback.
        -: 2388:	 */
     3707: 2389:	if (RelationSyncCache == NULL)
branch  0 taken 99% (fallthrough)
branch  1 taken 1%
        -: 2390:		return;
        -: 2391:
        -: 2392:	/*
        -: 2393:	 * Nobody keeps pointers to entries in this hash table around outside
        -: 2394:	 * logical decoding callback calls - but invalidation events can come in
        -: 2395:	 * *during* a callback if we do any syscache access in the callback.
        -: 2396:	 * Because of that we must mark the cache entry as invalid but not damage
        -: 2397:	 * any of its substructure here.  The next get_rel_sync_entry() call will
        -: 2398:	 * rebuild it all.
        -: 2399:	 */
     3681: 2400:	if (OidIsValid(relid))
branch  0 taken 98% (fallthrough)
branch  1 taken 2%
        -: 2401:	{
        -: 2402:		/*
        -: 2403:		 * Getting invalidations for relations that aren't in the table is
        -: 2404:		 * entirely normal.  So we don't care if it's found or not.
        -: 2405:		 */
     3619: 2406:		entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
call    0 returned 100%
        -: 2407:												  HASH_FIND, NULL);
     3619: 2408:		if (entry != NULL)
branch  0 taken 19% (fallthrough)
branch  1 taken 81%
      671: 2409:			entry->replicate_valid = false;
        -: 2410:	}
        -: 2411:	else
        -: 2412:	{
        -: 2413:		/* Whole cache must be flushed. */
        -: 2414:		HASH_SEQ_STATUS status;
        -: 2415:
       62: 2416:		hash_seq_init(&status, RelationSyncCache);
call    0 returned 100%
      132: 2417:		while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
call    0 returned 100%
branch  1 taken 53%
branch  2 taken 47% (fallthrough)
        -: 2418:		{
       70: 2419:			entry->replicate_valid = false;
        -: 2420:		}
        -: 2421:	}
        -: 2422:}
        -: 2423:
        -: 2424:/*
        -: 2425: * Publication relation/schema map syscache invalidation callback
        -: 2426: *
        -: 2427: * Called for invalidations on pg_namespace.
        -: 2428: */
        -: 2429:static void
function rel_sync_cache_publication_cb called 32 returned 100% blocks executed 100%
       32: 2430:rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
        -: 2431:{
        -: 2432:	HASH_SEQ_STATUS status;
        -: 2433:	RelationSyncEntry *entry;
        -: 2434:
        -: 2435:	/*
        -: 2436:	 * We can get here if the plugin was used in SQL interface as the
        -: 2437:	 * RelationSyncCache is destroyed when the decoding finishes, but there is
        -: 2438:	 * no way to unregister the invalidation callbacks.
        -: 2439:	 */
       32: 2440:	if (RelationSyncCache == NULL)
branch  0 taken 31% (fallthrough)
branch  1 taken 69%
       10: 2441:		return;
        -: 2442:
        -: 2443:	/*
        -: 2444:	 * We have no easy way to identify which cache entries this invalidation
        -: 2445:	 * event might have affected, so just mark them all invalid.
        -: 2446:	 */
       22: 2447:	hash_seq_init(&status, RelationSyncCache);
call    0 returned 100%
       41: 2448:	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
call    0 returned 100%
branch  1 taken 46%
branch  2 taken 54% (fallthrough)
        -: 2449:	{
       19: 2450:		entry->replicate_valid = false;
        -: 2451:	}
        -: 2452:}
        -: 2453:
        -: 2454:/* Send Replication origin */
        -: 2455:static void
function send_repl_origin called 1108 returned 100% blocks executed 100%
     1108: 2456:send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
        -: 2457:				 XLogRecPtr origin_lsn, bool send_origin)
        -: 2458:{
     1108: 2459:	if (send_origin)
branch  0 taken 1% (fallthrough)
branch  1 taken 99%
        -: 2460:	{
        -: 2461:		char	   *origin;
        -: 2462:
        -: 2463:		/*----------
        -: 2464:		 * XXX: which behaviour do we want here?
        -: 2465:		 *
        -: 2466:		 * Alternatives:
        -: 2467:		 *  - don't send origin message if origin name not found
        -: 2468:		 *    (that's what we do now)
        -: 2469:		 *  - throw error - that will break replication, not good
        -: 2470:		 *  - send some special "unknown" origin
        -: 2471:		 *----------
        -: 2472:		 */
        8: 2473:		if (replorigin_by_oid(origin_id, true, &origin))
call    0 returned 100%
branch  1 taken 100% (fallthrough)
branch  2 taken 0%
        -: 2474:		{
        -: 2475:			/* Message boundary */
        8: 2476:			OutputPluginWrite(ctx, false);
call    0 returned 100%
        8: 2477:			OutputPluginPrepareWrite(ctx, true);
call    0 returned 100%
        -: 2478:
        8: 2479:			logicalrep_write_origin(ctx->out, origin, origin_lsn);
call    0 returned 100%
        -: 2480:		}
        -: 2481:	}
     1108: 2482:}