pgsql_fdw.patch
text/plain
Filename: pgsql_fdw.patch
Type: text/plain
Part: 2
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: unified
| File | + | − |
|---|---|---|
| contrib/Makefile | 0 | 0 |
| contrib/pgsql_fdw/connection.c | 0 | 0 |
| contrib/pgsql_fdw/connection.h | 0 | 0 |
| contrib/pgsql_fdw/deparse.c | 0 | 0 |
| contrib/pgsql_fdw/expected/pgsql_fdw.out | 0 | 0 |
| contrib/pgsql_fdw/.gitignore | 0 | 0 |
| contrib/pgsql_fdw/Makefile | 0 | 0 |
| contrib/pgsql_fdw/option.c | 0 | 0 |
| contrib/pgsql_fdw/pgsql_fdw--1.0.sql | 0 | 0 |
| contrib/pgsql_fdw/pgsql_fdw.c | 0 | 0 |
| contrib/pgsql_fdw/pgsql_fdw.control | 0 | 0 |
| contrib/pgsql_fdw/pgsql_fdw.h | 0 | 0 |
| contrib/pgsql_fdw/sql/pgsql_fdw.sql | 0 | 0 |
| contrib/README | 0 | 0 |
| doc/src/sgml/contrib.sgml | 0 | 0 |
| doc/src/sgml/filelist.sgml | 0 | 0 |
| doc/src/sgml/pgsql-fdw.sgml | 0 | 0 |
| src/backend/utils/adt/ruleutils.c | 0 | 0 |
| src/include/utils/builtins.h | 0 | 0 |
diff --git a/contrib/Makefile b/contrib/Makefile
index 0c238aa..e09e61e 100644
*** a/contrib/Makefile
--- b/contrib/Makefile
*************** SUBDIRS = \
*** 41,46 ****
--- 41,47 ----
pgbench \
pgcrypto \
pgrowlocks \
+ pgsql_fdw \
pgstattuple \
seg \
spi \
diff --git a/contrib/README b/contrib/README
index a1d42a1..d3fa211 100644
*** a/contrib/README
--- b/contrib/README
*************** pgrowlocks -
*** 158,163 ****
--- 158,167 ----
A function to return row locking information
by Tatsuo Ishii <ishii@sraoss.co.jp>
+ pgsql_fdw -
+ Foreign-data wrapper for external PostgreSQL servers.
+ by Shigeru Hanada <shigeru.hanada@gmail.com>
+
pgstattuple -
Functions to return statistics about "dead" tuples and free
space within a table
diff --git a/contrib/pgsql_fdw/.gitignore b/contrib/pgsql_fdw/.gitignore
index ...0854728 .
*** a/contrib/pgsql_fdw/.gitignore
--- b/contrib/pgsql_fdw/.gitignore
***************
*** 0 ****
--- 1,4 ----
+ # Generated subdirectories
+ /results/
+ *.o
+ *.so
diff --git a/contrib/pgsql_fdw/Makefile b/contrib/pgsql_fdw/Makefile
index ...6943409 .
*** a/contrib/pgsql_fdw/Makefile
--- b/contrib/pgsql_fdw/Makefile
***************
*** 0 ****
--- 1,22 ----
+ # pgsql_fdw/Makefile
+
+ MODULE_big = pgsql_fdw
+ OBJS = pgsql_fdw.o option.o deparse.o connection.o
+ PG_CPPFLAGS = -I$(libpq_srcdir)
+ SHLIB_LINK = $(libpq)
+
+ EXTENSION = pgsql_fdw
+ DATA = pgsql_fdw--1.0.sql
+
+ REGRESS = pgsql_fdw
+
+ ifdef USE_PGXS
+ PG_CONFIG = pg_config
+ PGXS := $(shell $(PG_CONFIG) --pgxs)
+ include $(PGXS)
+ else
+ subdir = contrib/pgsql_fdw
+ top_builddir = ../..
+ include $(top_builddir)/src/Makefile.global
+ include $(top_srcdir)/contrib/contrib-global.mk
+ endif
diff --git a/contrib/pgsql_fdw/connection.c b/contrib/pgsql_fdw/connection.c
index ...e2d8d2a .
*** a/contrib/pgsql_fdw/connection.c
--- b/contrib/pgsql_fdw/connection.c
***************
*** 0 ****
--- 1,504 ----
+ /*-------------------------------------------------------------------------
+ *
+ * connection.c
+ * Connection management for pgsql_fdw
+ *
+ * Portions Copyright (c) 2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * pgsql_fdw/connection.c
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+
+ #include "catalog/pg_type.h"
+ #include "foreign/foreign.h"
+ #include "funcapi.h"
+ #include "libpq-fe.h"
+ #include "mb/pg_wchar.h"
+ #include "miscadmin.h"
+ #include "utils/array.h"
+ #include "utils/builtins.h"
+ #include "utils/hsearch.h"
+ #include "utils/memutils.h"
+ #include "utils/resowner.h"
+ #include "utils/tuplestore.h"
+
+ #include "pgsql_fdw.h"
+ #include "connection.h"
+
+ /* ============================================================================
+ * Connection management functions
+ * ==========================================================================*/
+
+ /*
+ * Connection cache entry managed with hash table.
+ */
+ typedef struct ConnCacheEntry
+ {
+ /* hash key must be first */
+ Oid serverid; /* oid of foreign server */
+ Oid userid; /* oid of local user */
+
+ int refs; /* reference counter */
+ PGconn *conn; /* foreign server connection */
+ } ConnCacheEntry;
+
+ /*
+ * Hash table which is used to cache connection to PostgreSQL servers, will be
+ * initialized before first attempt to connect PostgreSQL server by the backend.
+ */
+ static HTAB *FSConnectionHash;
+
+ /* ----------------------------------------------------------------------------
+ * prototype of private functions
+ * --------------------------------------------------------------------------*/
+ static void
+ cleanup_connection(ResourceReleasePhase phase,
+ bool isCommit,
+ bool isTopLevel,
+ void *arg);
+ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+ /*
+ * Get a PGconn which can be used to execute foreign query on the remote
+ * PostgreSQL server with the user's authorization. If this was the first
+ * request for the server, new connection is established.
+ */
+ PGconn *
+ GetConnection(ForeignServer *server, UserMapping *user)
+ {
+ bool found;
+ ConnCacheEntry *entry;
+ ConnCacheEntry key;
+ PGconn *conn = NULL;
+
+ /* initialize connection cache if it isn't */
+ if (FSConnectionHash == NULL)
+ {
+ HASHCTL ctl;
+
+ /* hash key is a pair of oids: serverid and userid */
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(Oid) + sizeof(Oid);
+ ctl.entrysize = sizeof(ConnCacheEntry);
+ ctl.hash = tag_hash;
+ ctl.match = memcmp;
+ ctl.keycopy = memcpy;
+ /* allocate FSConnectionHash in the cache context */
+ ctl.hcxt = CacheMemoryContext;
+ FSConnectionHash = hash_create("Foreign Connections", 32,
+ &ctl,
+ HASH_ELEM | HASH_CONTEXT |
+ HASH_FUNCTION | HASH_COMPARE |
+ HASH_KEYCOPY);
+ }
+
+ /* Create key value for the entry. */
+ MemSet(&key, 0, sizeof(key));
+ key.serverid = server->serverid;
+ key.userid = GetOuterUserId();
+
+ /* Is there any cached and valid connection with such key? */
+ entry = hash_search(FSConnectionHash, &key, HASH_ENTER, &found);
+ if (found)
+ {
+ if (entry->conn != NULL)
+ {
+ entry->refs++;
+ elog(DEBUG1,
+ "reuse connection %u/%u (%d)",
+ entry->serverid,
+ entry->userid,
+ entry->refs);
+ return entry->conn;
+ }
+
+ /*
+ * Connection cache entry was found but connection in it is invalid.
+ * We reuse entry to store newly established connection later.
+ */
+ }
+ else
+ {
+ /*
+ * Use ResourceOwner to clean the connection up on error including
+ * user interrupt.
+ */
+ elog(DEBUG1,
+ "create entry for %u/%u (%d)",
+ entry->serverid,
+ entry->userid,
+ entry->refs);
+ entry->refs = 0;
+ entry->conn = NULL;
+ RegisterResourceReleaseCallback(cleanup_connection, entry);
+ }
+
+ /*
+ * Here we have to establish new connection.
+ * Use PG_TRY block to ensure closing connection on error.
+ */
+ PG_TRY();
+ {
+ /* Connect to the foreign PostgreSQL server */
+ conn = connect_pg_server(server, user);
+
+ /*
+ * Initialize the cache entry to keep new connection.
+ * Note: key items of entry has been initialized in
+ * hash_search(HASH_ENTER).
+ */
+ entry->refs = 1;
+ entry->conn = conn;
+ elog(DEBUG1,
+ "connected to %u/%u (%d)",
+ entry->serverid,
+ entry->userid,
+ entry->refs);
+ }
+ PG_CATCH();
+ {
+ PQfinish(conn);
+ entry->refs = 0;
+ entry->conn = NULL;
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return conn;
+ }
+
+ /*
+ * For non-superusers, insist that the connstr specify a password. This
+ * prevents a password from being picked up from .pgpass, a service file,
+ * the environment, etc. We don't want the postgres user's passwords
+ * to be accessible to non-superusers.
+ */
+ static void
+ check_conn_params(const char **keywords, const char **values)
+ {
+ int i;
+
+ /* no check required if superuser */
+ if (superuser())
+ return;
+
+ /* ok if params contain a non-empty password */
+ for (i = 0; keywords[i] != NULL; i++)
+ {
+ if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
+ return;
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superusers must provide a password in the connection string.")));
+ }
+
+ static PGconn *
+ connect_pg_server(ForeignServer *server, UserMapping *user)
+ {
+ const char *conname = server->servername;
+ PGconn *conn;
+ PGresult *res;
+ const char **all_keywords;
+ const char **all_values;
+ const char **keywords;
+ const char **values;
+ int n;
+ int i, j;
+
+ /*
+ * Construct connection params from generic options of ForeignServer and
+ * UserMapping. Those two object hold only libpq options.
+ * Extra 3 items are for:
+ * *) fallback_application_name
+ * *) client_encoding
+ * *) NULL termination (end marker)
+ *
+ * Note: We don't omit any parameters even target database might be older
+ * than local, because unexpected parameters are just ignored.
+ */
+ n = list_length(server->options) + list_length(user->options) + 3;
+ all_keywords = (const char **) palloc(sizeof(char *) * n);
+ all_values = (const char **) palloc(sizeof(char *) * n);
+ keywords = (const char **) palloc(sizeof(char *) * n);
+ values = (const char **) palloc(sizeof(char *) * n);
+ n = 0;
+ n += ExtractConnectionOptions(server->options,
+ all_keywords + n, all_values + n);
+ n += ExtractConnectionOptions(user->options,
+ all_keywords + n, all_values + n);
+ all_keywords[n] = all_values[n] = NULL;
+
+ for (i = 0, j = 0; all_keywords[i]; i++)
+ {
+ keywords[j] = all_keywords[i];
+ values[j] = all_values[i];
+ j++;
+ }
+
+ /* Use "pgsql_fdw" as fallback_application_name. */
+ keywords[j] = "fallback_application_name";
+ values[j++] = "pgsql_fdw";
+
+ /* Set client_encoding so that libpq can convert encoding properly. */
+ keywords[j] = "client_encoding";
+ values[j++] = GetDatabaseEncodingName();
+
+ keywords[j] = values[j] = NULL;
+ pfree(all_keywords);
+ pfree(all_values);
+
+ /* verify connection parameters and do connect */
+ check_conn_params(keywords, values);
+ conn = PQconnectdbParams(keywords, values, 0);
+ if (!conn || PQstatus(conn) != CONNECTION_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+ errmsg("could not connect to server \"%s\"", conname),
+ errdetail("%s", PQerrorMessage(conn))));
+ pfree(keywords);
+ pfree(values);
+
+ /*
+ * Check that non-superuser has used password to establish connection.
+ * This check logic is based on dblink_security_check() in contrib/dblink.
+ *
+ * XXX Should we check this even if we don't provide unsafe version like
+ * dblink_connect_u()?
+ */
+ if (!superuser() && !PQconnectionUsedPassword(conn))
+ {
+ PQfinish(conn);
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superuser cannot connect if the server does not request a password."),
+ errhint("Target server's authentication method must be changed.")));
+ }
+
+ /*
+ * Start transaction to use cursor to retrieve data separately.
+ */
+ res = PQexec(conn, "BEGIN");
+ if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
+ elog(ERROR, "could not start transaction");
+
+ return conn;
+ }
+
+ /*
+ * Mark the connection as "unused", and close it if the caller was the last
+ * user of the connection.
+ */
+ void
+ ReleaseConnection(PGconn *conn)
+ {
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+
+ if (conn == NULL)
+ return;
+
+ /*
+ * We need to scan sequentially since we use the address to find appropriate
+ * PGconn from the hash table.
+ */
+ hash_seq_init(&scan, FSConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ if (entry->conn == conn)
+ break;
+ }
+ if (entry != NULL)
+ hash_seq_term(&scan);
+
+ /*
+ * If the released connection was an orphan, just close it.
+ */
+ if (entry == NULL)
+ {
+ PQfinish(conn);
+ return;
+ }
+
+ /*
+ * If the caller was the last referrer, unregister it from cache.
+ * TODO: Note that sharing connections requires a mechanism to detect
+ * change of FDW object to invalidate lasting connections.
+ */
+ entry->refs--;
+ elog(DEBUG1,
+ "connection %u/%u released (%d)",
+ entry->serverid,
+ entry->userid,
+ entry->refs);
+ }
+
+ /*
+ * Clean the connection up via ResourceOwner.
+ */
+ static void
+ cleanup_connection(ResourceReleasePhase phase,
+ bool isCommit,
+ bool isTopLevel,
+ void *arg)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) arg;
+
+ /* If the transaction was committed, don't close connections. */
+ if (isCommit)
+ return;
+
+ /*
+ * We clean the connection up on post-lock because foreign connections are
+ * backend-internal resource.
+ */
+ if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
+ return;
+
+ /*
+ * We ignore cleanup for ResourceOwners other than transaction. At this
+ * point, such a ResourceOwner is only Portal.
+ */
+ if (CurrentResourceOwner != CurTransactionResourceOwner)
+ return;
+
+ /*
+ * We don't care whether we are in TopTransaction or Subtransaction.
+ * Anyway, we close the connection and reset the reference counter.
+ */
+ if (entry->conn != NULL)
+ {
+ elog(DEBUG1,
+ "closing connection %u/%u",
+ entry->serverid,
+ entry->userid);
+ PQfinish(entry->conn);
+ entry->refs = 0;
+ entry->conn = NULL;
+ }
+ else
+ elog(DEBUG1,
+ "connection %u/%u already closed",
+ entry->serverid,
+ entry->userid);
+ }
+
+ /*
+ * Get list of connections currently active.
+ */
+ Datum pgsql_fdw_get_connections(PG_FUNCTION_ARGS);
+ PG_FUNCTION_INFO_V1(pgsql_fdw_get_connections);
+ Datum
+ pgsql_fdw_get_connections(PG_FUNCTION_ARGS)
+ {
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+ MemoryContext oldcontext = CurrentMemoryContext;
+ Tuplestorestate *tuplestore;
+ TupleDesc tupdesc;
+
+ /* We return list of connection with storing them in a Tuplestore. */
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = NULL;
+ rsinfo->setDesc = NULL;
+
+ /* Create tuplestore and copy of TupleDesc in per-query context. */
+ MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+
+ tupdesc = CreateTemplateTupleDesc(2, false);
+ TupleDescInitEntry(tupdesc, 1, "srvid", OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, 2, "usesysid", OIDOID, -1, 0);
+ rsinfo->setDesc = tupdesc;
+
+ tuplestore = tuplestore_begin_heap(false, false, work_mem);
+ rsinfo->setResult = tuplestore;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * We need to scan sequentially since we use the address to find
+ * appropriate PGconn from the hash table.
+ */
+ if (FSConnectionHash != NULL)
+ {
+ hash_seq_init(&scan, FSConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ Datum values[2];
+ bool nulls[2];
+ HeapTuple tuple;
+
+ elog(DEBUG1, "found: %u/%u", entry->serverid, entry->userid);
+
+ /* Ignore inactive connections */
+ if (PQstatus(entry->conn) != CONNECTION_OK)
+ continue;
+
+ /*
+ * Ignore other users' connections if current user isn't a
+ * superuser.
+ */
+ if (!superuser() && entry->userid != GetUserId())
+ continue;
+
+ values[0] = ObjectIdGetDatum(entry->serverid);
+ values[1] = ObjectIdGetDatum(entry->userid);
+ nulls[0] = false;
+ nulls[1] = false;
+
+ tuple = heap_formtuple(tupdesc, values, nulls);
+ tuplestore_puttuple(tuplestore, tuple);
+ }
+ }
+ tuplestore_donestoring(tuplestore);
+
+ PG_RETURN_VOID();
+ }
+
+ /*
+ * Discard persistent connection designated by given connection name.
+ */
+ Datum pgsql_fdw_disconnect(PG_FUNCTION_ARGS);
+ PG_FUNCTION_INFO_V1(pgsql_fdw_disconnect);
+ Datum
+ pgsql_fdw_disconnect(PG_FUNCTION_ARGS)
+ {
+ Oid serverid = PG_GETARG_OID(0);
+ Oid userid = PG_GETARG_OID(1);
+ ConnCacheEntry key;
+ ConnCacheEntry *entry = NULL;
+ bool found;
+
+ /* Non-superuser can't discard other users' connection. */
+ if (!superuser() && userid != GetOuterUserId())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("only superuser can discard other user's connection")));
+
+ /*
+ * If no connection has been established, or no such connections, just
+ * return "NG" to indicate nothing has done.
+ */
+ if (FSConnectionHash == NULL)
+ PG_RETURN_TEXT_P(cstring_to_text("NG"));
+
+ key.serverid = serverid;
+ key.userid = userid;
+ entry = hash_search(FSConnectionHash, &key, HASH_FIND, &found);
+ if (!found)
+ PG_RETURN_TEXT_P(cstring_to_text("NG"));
+
+ /* Discard cached connection, and clear reference counter. */
+ PQfinish(entry->conn);
+ entry->refs = 0;
+ entry->conn = NULL;
+ elog(DEBUG1, "closed connection %u/%u", serverid, userid);
+
+ PG_RETURN_TEXT_P(cstring_to_text("OK"));
+ }
diff --git a/contrib/pgsql_fdw/connection.h b/contrib/pgsql_fdw/connection.h
index ...80dd3e0 .
*** a/contrib/pgsql_fdw/connection.h
--- b/contrib/pgsql_fdw/connection.h
***************
*** 0 ****
--- 1,25 ----
+ /*-------------------------------------------------------------------------
+ *
+ * connection.h
+ * Connection management for pgsql_fdw
+ *
+ * Portions Copyright (c) 2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * pgsql_fdw/connection.h
+ *
+ *-------------------------------------------------------------------------
+ */
+ #ifndef CONNECTION_H
+ #define CONNECTION_H
+
+ #include "foreign/foreign.h"
+ #include "libpq-fe.h"
+
+ /*
+ * Connection management
+ */
+ PGconn *GetConnection(ForeignServer *server, UserMapping *user);
+ void ReleaseConnection(PGconn *conn);
+
+ #endif /* CONNECTION_H */
diff --git a/contrib/pgsql_fdw/deparse.c b/contrib/pgsql_fdw/deparse.c
index ...4dab232 .
*** a/contrib/pgsql_fdw/deparse.c
--- b/contrib/pgsql_fdw/deparse.c
***************
*** 0 ****
--- 1,352 ----
+ /*-------------------------------------------------------------------------
+ *
+ * deparse.c
+ * query deparser for PostgreSQL
+ *
+ * Copyright (c) 2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * pgsql_fdw/deparse.c
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+
+ #include "access/transam.h"
+ #include "foreign/foreign.h"
+ #include "lib/stringinfo.h"
+ #include "nodes/nodeFuncs.h"
+ #include "nodes/nodes.h"
+ #include "nodes/makefuncs.h"
+ #include "optimizer/clauses.h"
+ #include "optimizer/var.h"
+ #include "parser/parsetree.h"
+ #include "utils/builtins.h"
+ #include "utils/lsyscache.h"
+
+ #include "pgsql_fdw.h"
+
+ /*
+ * Context for walk-through the expression tree.
+ */
+ typedef struct foreign_executable_cxt
+ {
+ PlannerInfo *root;
+ RelOptInfo *foreignrel;
+ } foreign_executable_cxt;
+
+ static bool is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr);
+ static bool foreign_expr_walker(Node *node, foreign_executable_cxt *context);
+ static bool is_proc_remotely_executable(Oid procid);
+
+ /*
+ * Deparse query representation into SQL statement which suits for remote
+ * PostgreSQL server. Also some of quals in WHERE clause will be pushed down
+ * If they are safe to be evaluated on the remote side.
+ */
+ char *
+ deparseSql(Oid relid, PlannerInfo *root, RelOptInfo *baserel)
+ {
+ StringInfoData foreign_relname;
+ StringInfoData sql; /* builder for SQL statement */
+ bool first;
+ AttrNumber attr;
+ List *attr_used = NIL; /* List of AttNumber used in the query */
+ const char *nspname = NULL; /* plain namespace name */
+ const char *relname = NULL; /* plain relation name */
+ const char *q_nspname; /* quoted namespace name */
+ const char *q_relname; /* quoted relation name */
+ List *foreign_expr = NIL; /* list of Expr* evaluated on remote */
+ int i;
+ List *rtable = NIL;
+ List *context = NIL;
+
+ initStringInfo(&sql);
+ initStringInfo(&foreign_relname);
+
+ /*
+ * First of all, determine which qual can be pushed down.
+ *
+ * The expressions which satisfy is_foreign_expr() are deparsed into WHERE
+ * clause of result SQL string, and they could be removed from PlanState
+ * to avoid duplicate evaluation at ExecScan().
+ *
+ * We never change the quals in the Plan node, because this execution might
+ * be for a PREPAREd statement, thus the quals in the Plan node might be
+ * reused to construct another PlanState for subsequent EXECUTE statement.
+ *
+ * We do this before deparsing SELECT clause because attributes which are
+ * not used in neither reltargetlist nor baserel->baserestrictinfo, quals
+ * evaluated on local, can be replaced with literal "NULL" in the SELECT
+ * clause to reduce overhead of tuple handling tuple and data transfer.
+ */
+ if (baserel->baserestrictinfo != NIL)
+ {
+ ListCell *lc;
+ List *local_qual = NIL;
+
+ foreach (lc, baserel->baserestrictinfo)
+ {
+ RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
+
+ /* Determine whether the qual can be pushed down or not. */
+ if (is_foreign_expr(root, baserel, ri->clause))
+ foreign_expr = lappend(foreign_expr, ri->clause);
+ else
+ {
+ List *attrs;
+
+ /*
+ * We need to know which attributes are used in qual evaluated
+ * on the local server, because they should be listed in the
+ * SELECT clause of remote query. We can ignore attributes
+ * which are referenced only in ORDER BY/GROUP BY clause because
+ * such attributes has already been kept in reltargetlist.
+ */
+ attrs = pull_var_clause((Node *) ri->clause,
+ PVC_RECURSE_AGGREGATES,
+ PVC_RECURSE_PLACEHOLDERS);
+ attr_used = list_union(attr_used, attrs);
+
+ /*
+ * Save the RestrictInfo to replace baserel->baserestrictinfo
+ * afterward.
+ */
+ local_qual = lappend(local_qual, ri);
+ }
+ }
+
+ /*
+ * Remove quals which are going to evaluated on the foreign server to
+ * avoid overhead of duplicated evaluation.
+ */
+ baserel->baserestrictinfo = local_qual;
+ }
+
+ /*
+ * Determine foreign relation's qualified name. This is necessary for
+ * FROM clause and SELECT clause.
+ */
+ nspname = GetFdwOptionValue(relid, InvalidAttrNumber, "nspname");
+ if (nspname == NULL)
+ nspname = get_namespace_name(get_rel_namespace(relid));
+ q_nspname = quote_identifier(nspname);
+
+ relname = GetFdwOptionValue(relid, InvalidAttrNumber, "relname");
+ if (relname == NULL)
+ relname = get_rel_name(relid);
+ q_relname = quote_identifier(relname);
+
+ appendStringInfo(&foreign_relname, "%s.%s", q_nspname, q_relname);
+
+ /*
+ * Create context for the deparse.
+ * We need multi-relation context without PlanState, and alias of each
+ * RangeTblEntry should be modified to be valid on remote side.
+ *
+ * We skip first element of simple_rel_array.
+ */
+ for (i = 1; i < root->simple_rel_array_size; i++)
+ {
+ RangeTblEntry *rte = copyObject(root->simple_rte_array[i]);
+ rtable = lappend(rtable, rte);
+ }
+ context = deparse_context_for_rtelist(rtable);
+
+ /*
+ * deparse SELECT clause
+ *
+ * List attributes which are in either target list or local restriction.
+ * Unused attributes are replace with literal "NULL" for optimization.
+ */
+ appendStringInfo(&sql, "SELECT ");
+ attr_used = list_union(attr_used, baserel->reltargetlist);
+ first = true;
+ for (attr = 1; attr <= baserel->max_attr; attr++)
+ {
+ RangeTblEntry *rte = root->simple_rte_array[baserel->relid];
+ Var *var = NULL;
+ ListCell *lc;
+
+ /* Ignore droppped attributes. */
+ if (get_rte_attribute_is_dropped(rte, attr))
+ continue;
+
+ if (!first)
+ appendStringInfo(&sql, ", ");
+ first = false;
+
+ /*
+ * We use linear search here, but it wouldn't be problem since
+ * attr_used seems to not become so large.
+ */
+ foreach (lc, attr_used)
+ {
+ var = lfirst(lc);
+ if (var->varattno == attr)
+ break;
+ var = NULL;
+ }
+ if (var != NULL)
+ appendStringInfo(&sql, "%s",
+ deparse_expression((Node *) var, context, false, false));
+ else
+ appendStringInfo(&sql, "NULL");
+ }
+ appendStringInfoChar(&sql, ' ');
+
+ /*
+ * deparse FROM clause
+ */
+ appendStringInfo(&sql, "FROM %s", foreign_relname.data);
+
+ /*
+ * deparse WHERE clause
+ */
+ if (foreign_expr != NIL)
+ {
+ Node *node;
+
+ node = (Node *) make_ands_explicit(foreign_expr);
+ appendStringInfo(&sql, " WHERE %s ",
+ deparse_expression(node, context, false, false));
+ list_free(foreign_expr);
+ foreign_expr = NIL;
+ }
+
+ return sql.data;
+ }
+
+ /*
+ * Returns true if expr is safe to be evaluated on the foreign server.
+ */
+ static bool
+ is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
+ {
+ foreign_executable_cxt context;
+ context.root = root;
+ context.foreignrel = baserel;
+
+ /*
+ * An expression which includes any mutable function can't be pushed down
+ * because it's result is not stable. For example, pushing now() down to
+ * remote side would cause confusion from the clock offset.
+ * If we have routine mapping infrastructure in future release, we will be
+ * able to choose function to be pushed down in finer granularity.
+ */
+ if (contain_mutable_functions((Node *) expr))
+ return false;
+
+ /*
+ * Check that the expression consists of nodes which are known as safe to
+ * be pushed down.
+ */
+ if (foreign_expr_walker((Node *) expr, &context))
+ return false;
+
+ return true;
+ }
+
+ /*
+ * Return true if node includes any node which is not known as safe to be
+ * pushed down.
+ */
+ static bool
+ foreign_expr_walker(Node *node, foreign_executable_cxt *context)
+ {
+ if (node == NULL)
+ return false;
+
+ switch (nodeTag(node))
+ {
+ case T_Const:
+ case T_ArrayExpr:
+ case T_BoolExpr:
+ case T_NullTest:
+ case T_DistinctExpr:
+ case T_ScalarArrayOpExpr:
+ /*
+ * These type of nodes are known as safe to be pushed down.
+ * Of course the subtre of the node, if any, should be checked
+ * continuously at the tail of this function.
+ */
+ break;
+ case T_Param:
+ /*
+ * Only external parameters can be pushed down.:
+ */
+ {
+ if (((Param *) node)->paramkind != PARAM_EXTERN)
+ return true;
+ }
+ break;
+ case T_OpExpr:
+ /*
+ * Operators which use non-immutable function can't be pushed down.
+ */
+ {
+ OpExpr *oe = (OpExpr *) node;
+
+ if (!is_proc_remotely_executable(oe->opfuncid))
+ return true;
+
+ /* operands are checked later */
+ }
+ break;
+ case T_FuncExpr:
+ /*
+ * Non-immutable functions can't be pushed down.
+ */
+ {
+ FuncExpr *fe = (FuncExpr *) node;
+
+ if (!is_proc_remotely_executable(fe->funcid))
+ return true;
+
+ /* operands are checked later */
+ }
+ break;
+ case T_Var:
+ /*
+ * Var can be pushed down if it is in the foreign table.
+ * XXX Var of other relation can be here?
+ */
+ {
+ Var *var = (Var *) node;
+ foreign_executable_cxt *f_context;
+
+ f_context = (foreign_executable_cxt *) context;
+ if (var->varno != f_context->foreignrel->relid ||
+ var->varlevelsup != 0)
+ return true;
+ }
+ break;
+ default:
+ {
+ ereport(DEBUG3,
+ (errmsg("expression is too complex"),
+ errdetail("%s", nodeToString(node))));
+ return true;
+ }
+ break;
+ }
+
+ return expression_tree_walker(node, foreign_expr_walker, context);
+ }
+
+ /*
+ * Return true if func is known as safe to be pushed down if all of the
+ * arguments are also known as safe.
+ */
+ static bool
+ is_proc_remotely_executable(Oid procid)
+ {
+ /*
+ * User-defined procedures can't be pushed down.
+ */
+ if (procid >= FirstNormalObjectId)
+ return false;
+
+ return true;
+ }
+
diff --git a/contrib/pgsql_fdw/expected/pgsql_fdw.out b/contrib/pgsql_fdw/expected/pgsql_fdw.out
index ...af72154 .
*** a/contrib/pgsql_fdw/expected/pgsql_fdw.out
--- b/contrib/pgsql_fdw/expected/pgsql_fdw.out
***************
*** 0 ****
--- 1,406 ----
+ -- ===================================================================
+ -- create FDW objects
+ -- ===================================================================
+ CREATE EXTENSION pgsql_fdw;
+ CREATE SERVER loopback1 FOREIGN DATA WRAPPER pgsql_fdw;
+ CREATE SERVER loopback2 FOREIGN DATA WRAPPER pgsql_fdw
+ OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER loopback1
+ OPTIONS (user 'value', password 'value');
+ CREATE USER MAPPING FOR public SERVER loopback2;
+ CREATE FOREIGN TABLE ft1 (
+ c1 int NOT NULL,
+ c2 int NOT NULL,
+ c3 text,
+ c4 timestamptz,
+ c5 timestamp
+ ) SERVER loopback2;
+ CREATE FOREIGN TABLE ft2 (
+ c1 int NOT NULL,
+ c2 int NOT NULL,
+ c3 text,
+ c4 timestamptz,
+ c5 timestamp
+ ) SERVER loopback2;
+ -- ===================================================================
+ -- create objects used through FDW
+ -- ===================================================================
+ CREATE SCHEMA "S 1";
+ CREATE TABLE "S 1"."T 1" (
+ c1 int NOT NULL,
+ c2 int NOT NULL,
+ c3 text,
+ c4 timestamptz,
+ c5 timestamp,
+ CONSTRAINT t1_pkey PRIMARY KEY (c1)
+ );
+ NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "t1_pkey" for table "T 1"
+ CREATE TABLE "S 1"."T 2" (
+ c1 int NOT NULL,
+ c2 text,
+ CONSTRAINT t2_pkey PRIMARY KEY (c1)
+ );
+ NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "t2_pkey" for table "T 2"
+ BEGIN;
+ TRUNCATE "S 1"."T 1";
+ INSERT INTO "S 1"."T 1"
+ SELECT id,
+ id % 10,
+ to_char(id, 'FM00000'),
+ '1970-01-01'::timestamptz + ((id % 100) || ' days')::interval,
+ '1970-01-01'::timestamp + ((id % 100) || ' days')::interval
+ FROM generate_series(1, 1000) id;
+ TRUNCATE "S 1"."T 2";
+ INSERT INTO "S 1"."T 2"
+ SELECT id,
+ 'AAA' || to_char(id, 'FM000')
+ FROM generate_series(1, 100) id;
+ COMMIT;
+ -- ===================================================================
+ -- tests for pgsql_fdw_validator
+ -- ===================================================================
+ ALTER FOREIGN DATA WRAPPER pgsql_fdw OPTIONS (host 'value'); -- ERRROR
+ ERROR: invalid option "host"
+ HINT: Valid options in this context are:
+ -- requiressl, krbsrvname and gsslib are omitted because they depend on
+ -- configure option
+ ALTER SERVER loopback1 OPTIONS (
+ --authtype 'value',
+ service 'value',
+ connect_timeout 'value',
+ dbname 'value',
+ host 'value',
+ hostaddr 'value',
+ port 'value',
+ --client_encoding 'value',
+ --tty 'value',
+ options 'value',
+ application_name 'value',
+ --fallback_application_name 'value',
+ keepalives 'value',
+ keepalives_idle 'value',
+ keepalives_interval 'value',
+ -- requiressl 'value',
+ sslmode 'value',
+ sslcert 'value',
+ sslkey 'value',
+ sslrootcert 'value',
+ sslcrl 'value'
+ --requirepeer 'value',
+ -- krbsrvname 'value',
+ -- gsslib 'value',
+ --replication 'value'
+ );
+ ALTER SERVER loopback1 OPTIONS (user 'value'); -- ERROR
+ ERROR: invalid option "user"
+ HINT: Valid options in this context are: service, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, sslmode, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, min_cursor_rows, fetch_count
+ ALTER SERVER loopback2 OPTIONS (ADD min_cursor_rows '0');
+ ALTER SERVER loopback2 OPTIONS (ADD fetch_count '2');
+ ALTER USER MAPPING FOR public SERVER loopback1
+ OPTIONS (DROP user, DROP password);
+ ALTER USER MAPPING FOR public SERVER loopback1
+ OPTIONS (host 'value'); -- ERROR
+ ERROR: invalid option "host"
+ HINT: Valid options in this context are: user, password
+ ALTER FOREIGN TABLE ft1 OPTIONS (nspname 'S 1', relname 'T 1');
+ ALTER FOREIGN TABLE ft2 OPTIONS (nspname 'S 1', relname 'T 1', min_cursor_rows '10', fetch_count '100');
+ ALTER FOREIGN TABLE ft1 OPTIONS (invalid 'value'); -- ERROR
+ ERROR: invalid option "invalid"
+ HINT: Valid options in this context are: nspname, relname, min_cursor_rows, fetch_count
+ ALTER FOREIGN TABLE ft1 OPTIONS (min_cursor_rows 'a'); -- ERROR
+ ERROR: invalid value for min_cursor_rows: "a"
+ ALTER FOREIGN TABLE ft1 OPTIONS (min_cursor_rows '-1'); -- ERROR
+ ERROR: invalid value for min_cursor_rows: "-1"
+ ALTER FOREIGN TABLE ft1 OPTIONS (fetch_count 'a'); -- ERROR
+ ERROR: invalid value for fetch_count: "a"
+ ALTER FOREIGN TABLE ft1 OPTIONS (fetch_count '0'); -- ERROR
+ ERROR: invalid value for fetch_count: "0"
+ ALTER FOREIGN TABLE ft1 OPTIONS (fetch_count '-1'); -- ERROR
+ ERROR: invalid value for fetch_count: "-1"
+ \dew+
+ List of foreign-data wrappers
+ Name | Owner | Handler | Validator | Access privileges | FDW Options | Description
+ -----------+----------+-------------------+---------------------+-------------------+-------------+-------------
+ pgsql_fdw | postgres | pgsql_fdw_handler | pgsql_fdw_validator | | |
+ (1 row)
+
+ \des+
+ List of foreign servers
+ Name | Owner | Foreign-data wrapper | Access privileges | Type | Version | FDW Options | Description
+ -----------+----------+----------------------+-------------------+------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------
+ loopback1 | postgres | pgsql_fdw | | | | (service 'value', connect_timeout 'value', dbname 'value', host 'value', hostaddr 'value', port 'value', options 'value', application_name 'value', keepalives 'value', keepalives_idle 'value', keepalives_interval 'value', sslmode 'value', sslcert 'value', sslkey 'value', sslrootcert 'value', sslcrl 'value') |
+ loopback2 | postgres | pgsql_fdw | | | | (dbname 'contrib_regression', min_cursor_rows '0', fetch_count '2') |
+ (2 rows)
+
+ \deu+
+ List of user mappings
+ Server | User name | FDW Options
+ -----------+-----------+-------------
+ loopback1 | public |
+ loopback2 | public |
+ (2 rows)
+
+ \det+
+ List of foreign tables
+ Schema | Table | Server | FDW Options | Description
+ --------+-------+-----------+-------------------------------------------------------------------------+-------------
+ public | ft1 | loopback2 | (nspname 'S 1', relname 'T 1') |
+ public | ft2 | loopback2 | (nspname 'S 1', relname 'T 1', min_cursor_rows '10', fetch_count '100') |
+ (2 rows)
+
+ -- ===================================================================
+ -- simple queries
+ -- ===================================================================
+ -- single table, with/without alias
+ EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1 ORDER BY c3, c1 OFFSET 100 LIMIT 10;
+ QUERY PLAN
+ ----------------------------------------------------------------------
+ Limit
+ Output: c1, c2, c3, c4, c5
+ -> Sort
+ Output: c1, c2, c3, c4, c5
+ Sort Key: ft1.c3, ft1.c1
+ -> Foreign Scan on public.ft1
+ Output: c1, c2, c3, c4, c5
+ Remote SQL: SELECT c1, c2, c3, c4, c5 FROM "S 1"."T 1"
+ (8 rows)
+
+ SELECT * FROM ft1 ORDER BY c3, c1 OFFSET 100 LIMIT 10;
+ c1 | c2 | c3 | c4 | c5
+ -----+----+-------+------------------------------+--------------------------
+ 101 | 1 | 00101 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970
+ 102 | 2 | 00102 | Sat Jan 03 00:00:00 1970 PST | Sat Jan 03 00:00:00 1970
+ 103 | 3 | 00103 | Sun Jan 04 00:00:00 1970 PST | Sun Jan 04 00:00:00 1970
+ 104 | 4 | 00104 | Mon Jan 05 00:00:00 1970 PST | Mon Jan 05 00:00:00 1970
+ 105 | 5 | 00105 | Tue Jan 06 00:00:00 1970 PST | Tue Jan 06 00:00:00 1970
+ 106 | 6 | 00106 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970
+ 107 | 7 | 00107 | Thu Jan 08 00:00:00 1970 PST | Thu Jan 08 00:00:00 1970
+ 108 | 8 | 00108 | Fri Jan 09 00:00:00 1970 PST | Fri Jan 09 00:00:00 1970
+ 109 | 9 | 00109 | Sat Jan 10 00:00:00 1970 PST | Sat Jan 10 00:00:00 1970
+ 110 | 0 | 00110 | Sun Jan 11 00:00:00 1970 PST | Sun Jan 11 00:00:00 1970
+ (10 rows)
+
+ EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1 t1 ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10;
+ QUERY PLAN
+ ----------------------------------------------------------------------
+ Limit
+ Output: c1, c2, c3, c4, c5
+ -> Sort
+ Output: c1, c2, c3, c4, c5
+ Sort Key: t1.c3, t1.c1
+ -> Foreign Scan on public.ft1 t1
+ Output: c1, c2, c3, c4, c5
+ Remote SQL: SELECT c1, c2, c3, c4, c5 FROM "S 1"."T 1"
+ (8 rows)
+
+ SELECT * FROM ft1 t1 ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10;
+ c1 | c2 | c3 | c4 | c5
+ -----+----+-------+------------------------------+--------------------------
+ 101 | 1 | 00101 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970
+ 102 | 2 | 00102 | Sat Jan 03 00:00:00 1970 PST | Sat Jan 03 00:00:00 1970
+ 103 | 3 | 00103 | Sun Jan 04 00:00:00 1970 PST | Sun Jan 04 00:00:00 1970
+ 104 | 4 | 00104 | Mon Jan 05 00:00:00 1970 PST | Mon Jan 05 00:00:00 1970
+ 105 | 5 | 00105 | Tue Jan 06 00:00:00 1970 PST | Tue Jan 06 00:00:00 1970
+ 106 | 6 | 00106 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970
+ 107 | 7 | 00107 | Thu Jan 08 00:00:00 1970 PST | Thu Jan 08 00:00:00 1970
+ 108 | 8 | 00108 | Fri Jan 09 00:00:00 1970 PST | Fri Jan 09 00:00:00 1970
+ 109 | 9 | 00109 | Sat Jan 10 00:00:00 1970 PST | Sat Jan 10 00:00:00 1970
+ 110 | 0 | 00110 | Sun Jan 11 00:00:00 1970 PST | Sun Jan 11 00:00:00 1970
+ (10 rows)
+
+ -- with WHERE clause
+ SELECT * FROM ft1 t1 WHERE t1.c1 = 101;
+ c1 | c2 | c3 | c4 | c5
+ -----+----+-------+------------------------------+--------------------------
+ 101 | 1 | 00101 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970
+ (1 row)
+
+ -- aggregate
+ SELECT COUNT(*) FROM ft1 t1;
+ count
+ -------
+ 1000
+ (1 row)
+
+ -- join two tables
+ SELECT t1.c1 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10;
+ c1
+ -----
+ 101
+ 102
+ 103
+ 104
+ 105
+ 106
+ 107
+ 108
+ 109
+ 110
+ (10 rows)
+
+ -- subquery
+ SELECT * FROM ft1 t1 WHERE t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 <= 10) ORDER BY c1;
+ c1 | c2 | c3 | c4 | c5
+ ----+----+-------+------------------------------+--------------------------
+ 1 | 1 | 00001 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970
+ 2 | 2 | 00002 | Sat Jan 03 00:00:00 1970 PST | Sat Jan 03 00:00:00 1970
+ 3 | 3 | 00003 | Sun Jan 04 00:00:00 1970 PST | Sun Jan 04 00:00:00 1970
+ 4 | 4 | 00004 | Mon Jan 05 00:00:00 1970 PST | Mon Jan 05 00:00:00 1970
+ 5 | 5 | 00005 | Tue Jan 06 00:00:00 1970 PST | Tue Jan 06 00:00:00 1970
+ 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970
+ 7 | 7 | 00007 | Thu Jan 08 00:00:00 1970 PST | Thu Jan 08 00:00:00 1970
+ 8 | 8 | 00008 | Fri Jan 09 00:00:00 1970 PST | Fri Jan 09 00:00:00 1970
+ 9 | 9 | 00009 | Sat Jan 10 00:00:00 1970 PST | Sat Jan 10 00:00:00 1970
+ 10 | 0 | 00010 | Sun Jan 11 00:00:00 1970 PST | Sun Jan 11 00:00:00 1970
+ (10 rows)
+
+ -- subquery+MAX
+ SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
+ c1 | c2 | c3 | c4 | c5
+ ------+----+-------+------------------------------+--------------------------
+ 1000 | 0 | 01000 | Thu Jan 01 00:00:00 1970 PST | Thu Jan 01 00:00:00 1970
+ (1 row)
+
+ -- used in CTE
+ WITH t1 AS (SELECT * FROM ft1 WHERE c1 <= 10) SELECT t2.c1, t2.c2, t2.c3, t2.c4 FROM t1, ft2 t2 WHERE t1.c1 = t2.c1 ORDER BY t1.c1;
+ c1 | c2 | c3 | c4
+ ----+----+-------+------------------------------
+ 1 | 1 | 00001 | Fri Jan 02 00:00:00 1970 PST
+ 2 | 2 | 00002 | Sat Jan 03 00:00:00 1970 PST
+ 3 | 3 | 00003 | Sun Jan 04 00:00:00 1970 PST
+ 4 | 4 | 00004 | Mon Jan 05 00:00:00 1970 PST
+ 5 | 5 | 00005 | Tue Jan 06 00:00:00 1970 PST
+ 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST
+ 7 | 7 | 00007 | Thu Jan 08 00:00:00 1970 PST
+ 8 | 8 | 00008 | Fri Jan 09 00:00:00 1970 PST
+ 9 | 9 | 00009 | Sat Jan 10 00:00:00 1970 PST
+ 10 | 0 | 00010 | Sun Jan 11 00:00:00 1970 PST
+ (10 rows)
+
+ -- fixed values
+ SELECT 'fixed', NULL FROM ft1 t1 WHERE c1 = 1;
+ ?column? | ?column?
+ ----------+----------
+ fixed |
+ (1 row)
+
+ -- ===================================================================
+ -- parameterized queries
+ -- ===================================================================
+ -- simple join
+ PREPARE st1(int, int) AS SELECT t1.c3, t2.c3 FROM ft1 t1, ft2 t2 WHERE t1.c1 = $1 AND t2.c1 = $2;
+ EXPLAIN (COSTS false) EXECUTE st1(1, 2);
+ QUERY PLAN
+ ----------------------------------------------------------------------------------------
+ Nested Loop
+ -> Foreign Scan on ft1 t1
+ Remote SQL: SELECT NULL, NULL, c3, NULL, NULL FROM "S 1"."T 1" WHERE (c1 = 1)
+ -> Foreign Scan on ft2 t2
+ Remote SQL: SELECT NULL, NULL, c3, NULL, NULL FROM "S 1"."T 1" WHERE (c1 = 2)
+ (5 rows)
+
+ EXECUTE st1(1, 1);
+ c3 | c3
+ -------+-------
+ 00001 | 00001
+ (1 row)
+
+ EXECUTE st1(101, 101);
+ c3 | c3
+ -------+-------
+ 00101 | 00101
+ (1 row)
+
+ -- subquery using stable function (can't be pushed down)
+ PREPARE st2(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 < $2 AND t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 > $1 AND EXTRACT(dow FROM c4) = 6) ORDER BY c1;
+ EXPLAIN (COSTS false) EXECUTE st2(10, 20);
+ QUERY PLAN
+ ------------------------------------------------------------------------------------------------------------------------------------------------
+ Sort
+ Sort Key: t1.c1
+ -> Nested Loop
+ Join Filter: (t1.c3 = t2.c3)
+ -> HashAggregate
+ -> Foreign Scan on ft2 t2
+ Filter: (date_part('dow'::text, c4) = 6::double precision)
+ Remote SQL: DECLARE pgsql_fdw_cursor_6 SCROLL CURSOR FOR SELECT NULL, NULL, c3, c4, NULL FROM "S 1"."T 1" WHERE (c1 > 10)
+ -> Foreign Scan on ft1 t1
+ Remote SQL: SELECT c1, c2, c3, c4, c5 FROM "S 1"."T 1" WHERE (c1 < 20)
+ (10 rows)
+
+ EXECUTE st2(10, 20);
+ c1 | c2 | c3 | c4 | c5
+ ----+----+-------+------------------------------+--------------------------
+ 16 | 6 | 00016 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970
+ (1 row)
+
+ EXECUTE st1(101, 101);
+ c3 | c3
+ -------+-------
+ 00101 | 00101
+ (1 row)
+
+ -- subquery using immutable function (can be pushed down)
+ PREPARE st3(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 < $2 AND t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 > $1 AND EXTRACT(dow FROM c5) = 6) ORDER BY c1;
+ EXPLAIN (COSTS false) EXECUTE st3(10, 20);
+ QUERY PLAN
+ --------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Sort
+ Sort Key: t1.c1
+ -> Hash Join
+ Hash Cond: (t1.c3 = t2.c3)
+ -> Foreign Scan on ft1 t1
+ Remote SQL: SELECT c1, c2, c3, c4, c5 FROM "S 1"."T 1" WHERE (c1 < 20)
+ -> Hash
+ -> HashAggregate
+ -> Foreign Scan on ft2 t2
+ Remote SQL: SELECT NULL, NULL, c3, NULL, NULL FROM "S 1"."T 1" WHERE ((c1 > 10) AND (date_part('dow'::text, c5) = 6::double precision))
+ (10 rows)
+
+ EXECUTE st3(10, 20);
+ c1 | c2 | c3 | c4 | c5
+ ----+----+-------+------------------------------+--------------------------
+ 16 | 6 | 00016 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970
+ (1 row)
+
+ EXECUTE st3(20, 30);
+ c1 | c2 | c3 | c4 | c5
+ ----+----+-------+------------------------------+--------------------------
+ 23 | 3 | 00023 | Sat Jan 24 00:00:00 1970 PST | Sat Jan 24 00:00:00 1970
+ (1 row)
+
+ -- cleanup
+ DEALLOCATE st1;
+ DEALLOCATE st2;
+ DEALLOCATE st3;
+ -- ===================================================================
+ -- connection management
+ -- ===================================================================
+ SELECT srvname, usename FROM pgsql_fdw_connections;
+ srvname | usename
+ -----------+----------
+ loopback2 | postgres
+ (1 row)
+
+ SELECT pgsql_fdw_disconnect(srvid, usesysid) FROM pgsql_fdw_get_connections();
+ pgsql_fdw_disconnect
+ ----------------------
+ OK
+ (1 row)
+
+ SELECT srvname, usename FROM pgsql_fdw_connections;
+ srvname | usename
+ ---------+---------
+ (0 rows)
+
+ -- ===================================================================
+ -- cleanup
+ -- ===================================================================
+ DROP EXTENSION pgsql_fdw CASCADE;
+ NOTICE: drop cascades to 6 other objects
+ DETAIL: drop cascades to server loopback1
+ drop cascades to user mapping for public
+ drop cascades to server loopback2
+ drop cascades to user mapping for public
+ drop cascades to foreign table ft1
+ drop cascades to foreign table ft2
diff --git a/contrib/pgsql_fdw/option.c b/contrib/pgsql_fdw/option.c
index ...610daad .
*** a/contrib/pgsql_fdw/option.c
--- b/contrib/pgsql_fdw/option.c
***************
*** 0 ****
--- 1,262 ----
+ /*-------------------------------------------------------------------------
+ *
+ * option.c
+ * FDW option handling
+ *
+ * Copyright (c) 2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * pgsql_fdw/option.c
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+
+ #include "access/reloptions.h"
+ #include "catalog/pg_foreign_data_wrapper.h"
+ #include "catalog/pg_foreign_server.h"
+ #include "catalog/pg_foreign_table.h"
+ #include "catalog/pg_user_mapping.h"
+ #include "fmgr.h"
+ #include "foreign/foreign.h"
+ #include "lib/stringinfo.h"
+ #include "miscadmin.h"
+
+ #include "pgsql_fdw.h"
+
+ /*
+ * SQL functions
+ */
+ extern Datum pgsql_fdw_validator(PG_FUNCTION_ARGS);
+ PG_FUNCTION_INFO_V1(pgsql_fdw_validator);
+
+ /*
+ * Describes the valid options for objects that use this wrapper.
+ */
+ typedef struct PgsqlFdwOption
+ {
+ const char *optname;
+ Oid optcontext; /* Oid of catalog in which options may appear */
+ bool is_libpq_opt; /* true if it's used in libpq */
+ } PgsqlFdwOption;
+
+ /*
+ * Valid options for pgsql_fdw.
+ */
+ static PgsqlFdwOption valid_options[] = {
+
+ /*
+ * Options for libpq connection.
+ * Note: This list should be updated along with PQconninfoOptions in
+ * interfaces/libpq/fe-connect.c, so the order is kept as is.
+ *
+ * Some useless libpq connection options are not accepted by pgsql_fdw:
+ * authtype: no longer used
+ * client_encoding: set to local database encoding automatically
+ * tty: no longer used
+ * fallback_application_name: fixed to "pgsql_fdw"
+ * replication: pgsql_fdw never be replication client
+ */
+ {"service", ForeignServerRelationId, true},
+ {"user", UserMappingRelationId, true},
+ {"password", UserMappingRelationId, true},
+ {"connect_timeout", ForeignServerRelationId, true},
+ {"dbname", ForeignServerRelationId, true},
+ {"host", ForeignServerRelationId, true},
+ {"hostaddr", ForeignServerRelationId, true},
+ {"port", ForeignServerRelationId, true},
+ {"options", ForeignServerRelationId, true},
+ {"application_name", ForeignServerRelationId, true},
+ {"keepalives", ForeignServerRelationId, true},
+ {"keepalives_idle", ForeignServerRelationId, true},
+ {"keepalives_interval", ForeignServerRelationId, true},
+ {"keepalives_count", ForeignServerRelationId, true},
+ #ifdef USE_SSL
+ {"requiressl", ForeignServerRelationId, true},
+ #endif
+ {"sslmode", ForeignServerRelationId, true},
+ {"sslcert", ForeignServerRelationId, true},
+ {"sslkey", ForeignServerRelationId, true},
+ {"sslrootcert", ForeignServerRelationId, true},
+ {"sslcrl", ForeignServerRelationId, true},
+ {"requirepeer", ForeignServerRelationId, true},
+ #if defined(KRB5) || defined(ENABLE_GSS) || defined(ENABLE_SSPI)
+ {"krbsrvname", ForeignServerRelationId, true},
+ #endif
+ #if defined(ENABLE_GSS) && defined(ENABLE_SSPI)
+ {"gsslib", ForeignServerRelationId, true},
+ #endif
+
+ /*
+ * Options for translation of object names.
+ * Note: Per-column options are not supported in 9.1, so we can't translate
+ * column name.
+ */
+ {"nspname", ForeignTableRelationId, false},
+ {"relname", ForeignTableRelationId, false},
+
+ /*
+ * Options for cursor behavior.
+ * These options can be overridden by smaller objects.
+ */
+ {"min_cursor_rows", ForeignTableRelationId, false},
+ {"min_cursor_rows", ForeignServerRelationId, false},
+ {"fetch_count", ForeignTableRelationId, false},
+ {"fetch_count", ForeignServerRelationId, false},
+
+ /* Sentinel */
+ {NULL, InvalidOid, false}
+ };
+
+ /*
+ * Helper functions
+ */
+ static bool is_valid_option(const char *optname, Oid context);
+
+ /*
+ * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
+ * USER MAPPING or FOREIGN TABLE that uses pgsql_fdw.
+ *
+ * Raise an ERROR if the option or its value is considered invalid.
+ */
+ Datum
+ pgsql_fdw_validator(PG_FUNCTION_ARGS)
+ {
+ List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
+ Oid catalog = PG_GETARG_OID(1);
+ ListCell *cell;
+
+ /*
+ * Check that only options supported by pgsql_fdw, and allowed for the
+ * current object type, are given.
+ */
+ foreach(cell, options_list)
+ {
+ DefElem *def = (DefElem *) lfirst(cell);
+
+ if (!is_valid_option(def->defname, catalog))
+ {
+ PgsqlFdwOption *opt;
+ StringInfoData buf;
+
+ /*
+ * Unknown option specified, complain about it. Provide a hint
+ * with list of valid options for the object.
+ */
+ initStringInfo(&buf);
+ for (opt = valid_options; opt->optname; opt++)
+ {
+ if (catalog == opt->optcontext)
+ appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
+ opt->optname);
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
+ errmsg("invalid option \"%s\"", def->defname),
+ errhint("Valid options in this context are: %s",
+ buf.data)));
+ }
+
+ /* min_cursor_rows be zero or positive digit number. */
+ if (strcmp(def->defname, "min_cursor_rows") == 0)
+ {
+ long value;
+ char *p = NULL;
+
+ value = strtol(strVal(def->arg), &p, 10);
+ if (*p != '\0' || value < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_INVALID_ATTRIBUTE_VALUE),
+ errmsg("invalid value for %s: \"%s\"",
+ def->defname, strVal(def->arg))));
+ }
+
+ /* fetch_count be positive digit number. */
+ if (strcmp(def->defname, "fetch_count") == 0)
+ {
+ long value;
+ char *p = NULL;
+
+ value = strtol(strVal(def->arg), &p, 10);
+ if (*p != '\0' || value < 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_INVALID_ATTRIBUTE_VALUE),
+ errmsg("invalid value for %s: \"%s\"",
+ def->defname, strVal(def->arg))));
+ }
+ }
+
+ /*
+ * We don't care option-specific limitation here; they will be validated at
+ * the execution time.
+ */
+
+ PG_RETURN_VOID();
+ }
+
+ /*
+ * Check if the provided option is one of the valid options.
+ * context is the Oid of the catalog holding the object the option is for.
+ */
+ static bool
+ is_valid_option(const char *optname, Oid context)
+ {
+ PgsqlFdwOption *opt;
+
+ /*
+ * If the option was other than libpq option, look up option table and
+ * determine valid context.
+ */
+ for (opt = valid_options; opt->optname; opt++)
+ {
+ if (context == opt->optcontext && strcmp(opt->optname, optname) == 0)
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * Check if the provided option is one of the valid options.
+ * context is the Oid of the catalog holding the object the option is for.
+ */
+ static bool
+ is_libpq_option(const char *optname)
+ {
+ PgsqlFdwOption *opt;
+
+ /*
+ * If the option was other than libpq option, look up option table and
+ * determine valid context.
+ */
+ for (opt = valid_options; opt->optname; opt++)
+ {
+ if (strcmp(opt->optname, optname) == 0 && opt->is_libpq_opt)
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * Generate key-value arrays which includes only libpq options from the list
+ * which contains any kind of options.
+ */
+ int
+ ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
+ {
+ ListCell *lc;
+ int i;
+
+ i = 0;
+ foreach(lc, defelems)
+ {
+ DefElem *d = (DefElem *) lfirst(lc);
+ if (is_libpq_option(d->defname))
+ {
+ keywords[i] = d->defname;
+ values[i] = strVal(d->arg);
+ i++;
+ }
+ }
+ return i;
+ }
diff --git a/contrib/pgsql_fdw/pgsql_fdw--1.0.sql b/contrib/pgsql_fdw/pgsql_fdw--1.0.sql
index ...87dfa29 .
*** a/contrib/pgsql_fdw/pgsql_fdw--1.0.sql
--- b/contrib/pgsql_fdw/pgsql_fdw--1.0.sql
***************
*** 0 ****
--- 1,36 ----
+ /* contrib/pgsql_fdw/pgsql_fdw--1.0.sql */
+
+ CREATE FUNCTION pgsql_fdw_handler()
+ RETURNS fdw_handler
+ AS 'MODULE_PATHNAME'
+ LANGUAGE C STRICT;
+
+ CREATE FUNCTION pgsql_fdw_validator(text[], oid)
+ RETURNS void
+ AS 'MODULE_PATHNAME'
+ LANGUAGE C STRICT;
+
+ CREATE FOREIGN DATA WRAPPER pgsql_fdw
+ HANDLER pgsql_fdw_handler
+ VALIDATOR pgsql_fdw_validator;
+
+ /* connection management functions and view */
+ CREATE FUNCTION pgsql_fdw_get_connections(out srvid oid, out usesysid oid)
+ RETURNS SETOF record
+ AS 'MODULE_PATHNAME'
+ LANGUAGE C STRICT;
+
+ CREATE FUNCTION pgsql_fdw_disconnect(oid, oid)
+ RETURNS text
+ AS 'MODULE_PATHNAME'
+ LANGUAGE C STRICT;
+
+ CREATE VIEW pgsql_fdw_connections AS
+ SELECT c.srvid srvid,
+ s.srvname srvname,
+ c.usesysid usesysid,
+ pg_get_userbyid(c.usesysid) usename
+ FROM pgsql_fdw_get_connections() c
+ JOIN pg_catalog.pg_foreign_server s ON (s.oid = c.srvid);
+ GRANT SELECT ON pgsql_fdw_connections TO public;
+
diff --git a/contrib/pgsql_fdw/pgsql_fdw.c b/contrib/pgsql_fdw/pgsql_fdw.c
index ...614916c .
*** a/contrib/pgsql_fdw/pgsql_fdw.c
--- b/contrib/pgsql_fdw/pgsql_fdw.c
***************
*** 0 ****
--- 1,882 ----
+ /*-------------------------------------------------------------------------
+ *
+ * pgsql_fdw.c
+ * foreign-data wrapper for remote PostgreSQL servers.
+ *
+ * Copyright (c) 2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * pgsql_fdw/pgsql_fdw.c
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+ #include "fmgr.h"
+
+ #include "catalog/pg_foreign_server.h"
+ #include "catalog/pg_foreign_table.h"
+ #include "commands/explain.h"
+ #include "foreign/fdwapi.h"
+ #include "funcapi.h"
+ #include "miscadmin.h"
+ #include "optimizer/cost.h"
+ #include "utils/lsyscache.h"
+ #include "utils/memutils.h"
+ #include "utils/rel.h"
+
+ #include "pgsql_fdw.h"
+ #include "connection.h"
+
+ PG_MODULE_MAGIC;
+
+ /*
+ * Default fetch count for cursor mode. This can be override by fetch_count
+ * FDW option.
+ */
+ #define DEFAULT_FETCH_COUNT 1000
+
+ /*
+ * Default minimum estimated row count for cursor mode. This can be override
+ * by min_cursor_rows FDW option.
+ * */
+ #define DEFAULT_MIN_CURSOR_ROWS 1000
+
+ /*
+ * Cost to establish a connection.
+ * XXX: should be configurable per server?
+ */
+ #define CONNECTION_COSTS 100.0
+
+ /*
+ * Cost to transfer 1 byte from remote server.
+ * XXX: should be configurable per server?
+ */
+ #define TRANSFER_COSTS_PER_BYTE 0.001
+
+ /*
+ * Cursors which are used together in a local query require different name, so
+ * we use simple incremental name for that purpose. We don't care wrap around
+ * of cursor_id because it's hard to imagine that 2^32 cursors are used in a
+ * query.
+ */
+ #define CURSOR_NAME_FORMAT "pgsql_fdw_cursor_%u"
+ static uint32 cursor_id = 0;
+
+ /*
+ * Index of FDW-private items stored in FdwPlan.
+ */
+ enum FdwPrivateIndex {
+ FdwPrivateSelectSql,
+
+ /* Items for cursor mode */
+ FdwPrivateDeclareSql,
+ FdwPrivateFetchSql,
+ FdwPrivateResetSql,
+ FdwPrivateCloseSql,
+
+ /* # of elements stored in the list fdw_private */
+ FdwPrivateNum,
+ };
+
+ /*
+ * This macro can be used in the executor to determine whether the scan is
+ * using CURSOR or not.
+ */
+ #define USE_CURSOR(fdwplan) \
+ (list_length((fdwplan)->fdw_private) > FdwPrivateDeclareSql)
+
+ /*
+ * Describes an execution state of a foreign scan against a foreign table
+ * using pgsql_fdw.
+ */
+ typedef struct PgsqlFdwExecutionState
+ {
+ FdwPlan *fdwplan; /* FDW-specific planning information */
+ PGconn *conn; /* connection for the scan */
+
+ Oid *param_types; /* type array of external parameter */
+ const char **param_values; /* value array of external parameter */
+
+ int attnum; /* # of non-dropped attribute */
+ char **col_values; /* column value buffer */
+ AttInMetadata *attinmeta; /* attribute metadata */
+
+ Tuplestorestate *tuples; /* result of the scan, partial in cursor mode */
+ bool cursor_opened; /* true if cursor is opened */
+ } PgsqlFdwExecutionState;
+
+ /*
+ * SQL functions
+ */
+ extern Datum pgsql_fdw_handler(PG_FUNCTION_ARGS);
+ PG_FUNCTION_INFO_V1(pgsql_fdw_handler);
+
+ /*
+ * FDW callback routines
+ */
+ static FdwPlan *pgsqlPlanForeignScan(Oid foreigntableid,
+ PlannerInfo *root,
+ RelOptInfo *baserel);
+ static void pgsqlExplainForeignScan(ForeignScanState *node, ExplainState *es);
+ static void pgsqlBeginForeignScan(ForeignScanState *node, int eflags);
+ static TupleTableSlot *pgsqlIterateForeignScan(ForeignScanState *node);
+ static void pgsqlReScanForeignScan(ForeignScanState *node);
+ static void pgsqlEndForeignScan(ForeignScanState *node);
+
+ /*
+ * Helper functions
+ */
+ static void estimate_costs(PlannerInfo *root,
+ RelOptInfo *baserel,
+ const char *sql,
+ Oid serverid,
+ Cost *startup_cost,
+ Cost *total_cost);
+ static void execute_query(ForeignScanState *node);
+ static PGresult *fetch_result(ForeignScanState *node);
+ static void store_result(ForeignScanState *node, PGresult *res);
+
+ /*
+ * Foreign-data wrapper handler function: return a struct with pointers
+ * to my callback routines.
+ */
+ Datum
+ pgsql_fdw_handler(PG_FUNCTION_ARGS)
+ {
+ FdwRoutine *fdwroutine = makeNode(FdwRoutine);
+
+ fdwroutine->PlanForeignScan = pgsqlPlanForeignScan;
+ fdwroutine->ExplainForeignScan = pgsqlExplainForeignScan;
+ fdwroutine->BeginForeignScan = pgsqlBeginForeignScan;
+ fdwroutine->IterateForeignScan = pgsqlIterateForeignScan;
+ fdwroutine->ReScanForeignScan = pgsqlReScanForeignScan;
+ fdwroutine->EndForeignScan = pgsqlEndForeignScan;
+
+ PG_RETURN_POINTER(fdwroutine);
+ }
+
+ /*
+ * pgsqlPlanForeignScan
+ * Create a FdwPlan for a scan on the foreign table
+ */
+ static FdwPlan *
+ pgsqlPlanForeignScan(Oid foreigntableid,
+ PlannerInfo *root,
+ RelOptInfo *baserel)
+ {
+ char *sql;
+ FdwPlan *fdwplan;
+ List *fdw_private = NIL;
+ const char *min_cursor_rows_str;
+ int min_cursor_rows = DEFAULT_MIN_CURSOR_ROWS;
+ ForeignTable *table;
+ ForeignServer *server;
+
+ /* Construct FdwPlan with cost estimates */
+ fdwplan = makeNode(FdwPlan);
+ sql = deparseSql(foreigntableid, root, baserel);
+ table = GetForeignTable(foreigntableid);
+ server = GetForeignServer(table->serverid);
+ estimate_costs(root, baserel, sql, server->serverid,
+ &fdwplan->startup_cost, &fdwplan->total_cost);
+
+ /*
+ * Store plain SELECT statement in private area of FdwPlan. This will be
+ * used for executing remote query and explaining scan.
+ */
+ fdw_private = list_make1(makeString(sql));
+
+ /*
+ * We use SQL-level cursor when the result seems to be large, to limit
+ * memory usage for the result set. Exceptionally we use simple SELECT if
+ * min_cursor_rows is set to 0.
+ */
+ min_cursor_rows_str = GetFdwOptionValue(foreigntableid,
+ InvalidAttrNumber,
+ "min_cursor_rows");
+ if (min_cursor_rows_str != NULL)
+ min_cursor_rows = strtol(min_cursor_rows_str, NULL, 10);
+
+ if (min_cursor_rows != 0 && baserel->rows >= min_cursor_rows)
+ {
+ char name[128]; /* must be larger than format + 10 */
+ StringInfoData cursor;
+ const char *fetch_count_str;
+ int fetch_count = DEFAULT_FETCH_COUNT;
+
+ /* Use specified fetch_count instead of default value, if any. */
+ fetch_count_str = GetFdwOptionValue(foreigntableid,
+ InvalidAttrNumber,
+ "fetch_count");
+ if (fetch_count_str != NULL)
+ fetch_count = strtol(fetch_count_str, NULL, 10);
+ elog(DEBUG1,
+ "relid=%u fetch_count=%d",
+ foreigntableid,
+ fetch_count);
+
+ /* We store some more information in FdwPlan to pass them beyond the
+ * boundary between planner and executor. Finally FdwPlan using cursor
+ * would hold items below:
+ *
+ * 1) plain SELECT statement (already added above)
+ * 2) SQL statement used to declare cursor
+ * 3) SQL statement used to fetch rows from cursor
+ * 4) SQL statement used to reset cursor
+ * 5) SQL statement used to close cursor
+ *
+ * These items are indexed with the enum FdwPrivateIndex, so an item
+ * can be accessed directly via list_nth(). For example of FETCH
+ * statement:
+ * list_nth(fdw_private, FdwPrivateFetchSql)
+ */
+
+ /* Construct cursor name from sequential value */
+ sprintf(name, CURSOR_NAME_FORMAT, cursor_id++);
+
+ /* Construct statement to declare cursor */
+ initStringInfo(&cursor);
+ appendStringInfo(&cursor, "DECLARE %s SCROLL CURSOR FOR %s", name, sql);
+ fdw_private = lappend(fdw_private, makeString(cursor.data));
+
+ /* Construct statement to fetch rows from cursor */
+ initStringInfo(&cursor);
+ appendStringInfo(&cursor, "FETCH %d FROM %s", fetch_count, name);
+ fdw_private = lappend(fdw_private, makeString(cursor.data));
+
+ /* Construct statement to reset cursor */
+ initStringInfo(&cursor);
+ appendStringInfo(&cursor, "MOVE ABSOLUTE 0 FROM %s", name);
+ fdw_private = lappend(fdw_private, makeString(cursor.data));
+
+ /* Construct statement to close cursor */
+ initStringInfo(&cursor);
+ appendStringInfo(&cursor, "CLOSE %s", name);
+ fdw_private = lappend(fdw_private, makeString(cursor.data));
+ }
+
+ /* Store FDW private information into FdwPlan */
+ fdwplan->fdw_private = fdw_private;
+
+ return fdwplan;
+ }
+
+ /*
+ * pgsqlExplainForeignScan
+ * Produce extra output for EXPLAIN
+ */
+ static void
+ pgsqlExplainForeignScan(ForeignScanState *node, ExplainState *es)
+ {
+ FdwPlan *fdwplan;
+ char *sql;
+
+ fdwplan = ((ForeignScan *) node->ss.ps.plan)->fdwplan;
+ if (USE_CURSOR(fdwplan))
+ sql = strVal(list_nth(fdwplan->fdw_private, FdwPrivateDeclareSql));
+ else
+ sql = strVal(list_nth(fdwplan->fdw_private, FdwPrivateSelectSql));
+ ExplainPropertyText("Remote SQL", sql, es);
+ }
+
+ /*
+ * pgsqlBeginForeignScan
+ * Initiate access to a foreign PostgreSQL table.
+ */
+ static void
+ pgsqlBeginForeignScan(ForeignScanState *node, int eflags)
+ {
+ PgsqlFdwExecutionState *festate;
+ PGconn *conn;
+ Oid relid;
+ ForeignTable *table;
+ ForeignServer *server;
+ UserMapping *user;
+ TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+
+ /*
+ * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
+ */
+ if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+ return;
+
+ /*
+ * Save state in node->fdw_state.
+ */
+ festate = (PgsqlFdwExecutionState *) palloc(sizeof(PgsqlFdwExecutionState));
+ festate->fdwplan = ((ForeignScan *) node->ss.ps.plan)->fdwplan;
+
+ /*
+ * Get connection to the foreign server. Connection manager would
+ * establish new connection if necessary.
+ */
+ relid = RelationGetRelid(node->ss.ss_currentRelation);
+ table = GetForeignTable(relid);
+ server = GetForeignServer(table->serverid);
+ user = GetUserMapping(GetOuterUserId(), server->serverid);
+ conn = GetConnection(server, user);
+ festate->conn = conn;
+
+ /* Result will be filled in first Iterate call. */
+ festate->tuples = NULL;
+ festate->cursor_opened = false;
+
+ /* Allocate buffers for column values. */
+ {
+ TupleDesc tupdesc = slot->tts_tupleDescriptor;
+ festate->col_values = palloc(sizeof(char *) * tupdesc->natts);
+ festate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ }
+
+ /* Allocate buffers for query parameters. */
+ {
+ ParamListInfo params = node->ss.ps.state->es_param_list_info;
+ int numParams = params ? params->numParams : 0;
+
+ if (numParams > 0)
+ {
+ festate->param_types = palloc0(sizeof(Oid) * numParams);
+ festate->param_values = palloc0(sizeof(char *) * numParams);
+ }
+ else
+ {
+ festate->param_types = NULL;
+ festate->param_values = NULL;
+ }
+ }
+
+
+ /* Store FDW-specific state into ForeignScanState */
+ node->fdw_state = (void *) festate;
+
+ return;
+ }
+
+ /*
+ * pgsqlIterateForeignScan
+ * Retrieve next row from the result set, or clear tuple slot to indicate
+ * EOF.
+ *
+ * Note that using per-query context when retrieving tuples from
+ * tuplestore to ensure that returned tuples can survive until next
+ * iteration because the tuple is released implicitly via ExecClearTuple.
+ * Retrieving a tuple from tuplestore in CurrentMemoryContext (it's a
+ * per-tuple context), ExecClearTuple will free dangling pointer.
+ */
+ static TupleTableSlot *
+ pgsqlIterateForeignScan(ForeignScanState *node)
+ {
+ PgsqlFdwExecutionState *festate;
+ TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ PGresult *res;
+ MemoryContext oldcontext = CurrentMemoryContext;
+
+ festate = (PgsqlFdwExecutionState *) node->fdw_state;
+
+
+ /*
+ * If this is the first call after Begin, we need to execute remote query.
+ * If the query needs cursor, we declare a cursor at first call and fetch
+ * from it in later calls.
+ */
+ if (festate->tuples == NULL && !festate->cursor_opened)
+ execute_query(node);
+
+ /*
+ * If enough tuples are left in tuplestore, just return next tuple from it.
+ */
+ MemoryContextSwitchTo(node->ss.ps.state->es_query_cxt);
+ if (tuplestore_gettupleslot(festate->tuples, true, false, slot))
+ {
+ MemoryContextSwitchTo(oldcontext);
+ return slot;
+ }
+ MemoryContextSwitchTo(oldcontext);
+
+ /* If the scan doesn't use cursor, the scan has been done. */
+ if (!USE_CURSOR(festate->fdwplan))
+ {
+ ExecClearTuple(slot);
+ return slot;
+ }
+
+ /*
+ * Here we need to clear partial result and fetch next bunch of tuples from
+ * from the cursor for the scan. If the fetch returns no tuple, the scan
+ * has reached the end.
+ */
+ res = fetch_result(node);
+ PG_TRY();
+ {
+ store_result(node, res);
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ /*
+ * If we got more tuples from the server cursor, return next tuple from
+ * tuplestore.
+ */
+ MemoryContextSwitchTo(node->ss.ps.state->es_query_cxt);
+ if (tuplestore_gettupleslot(festate->tuples, true, false, slot))
+ {
+ MemoryContextSwitchTo(oldcontext);
+ return slot;
+ }
+ MemoryContextSwitchTo(oldcontext);
+
+ /* We don't have any result even in remote server cursor. */
+ ExecClearTuple(slot);
+ return slot;
+ }
+
+ /*
+ * pgsqlReScanForeignScan
+ * - Restart this scan by resetting fetch location.
+ */
+ static void
+ pgsqlReScanForeignScan(ForeignScanState *node)
+ {
+ List *fdw_private;
+ char *sql;
+ PGconn *conn;
+ PGresult *res;
+ PgsqlFdwExecutionState *festate;
+
+ festate = (PgsqlFdwExecutionState *) node->fdw_state;
+
+ /* Discard fetch results if any. */
+ if (festate->tuples != NULL)
+ {
+ tuplestore_end(festate->tuples);
+ festate->tuples = NULL;
+ }
+
+ /* Reset cursor */
+ if (USE_CURSOR(festate->fdwplan))
+ {
+ fdw_private = festate->fdwplan->fdw_private;
+ conn = festate->conn;
+ sql = strVal(list_nth(fdw_private, FdwPrivateResetSql));
+ res = PQexec(conn, sql);
+ PG_TRY();
+ {
+ if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ ereport(ERROR,
+ (errmsg("could not rewind cursor"),
+ errdetail("%s", PQerrorMessage(conn)),
+ errhint("%s", sql)));
+ }
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+ }
+
+ /*
+ * pgsqlEndForeignScan
+ * Finish scanning foreign table and dispose objects used for this scan
+ */
+ static void
+ pgsqlEndForeignScan(ForeignScanState *node)
+ {
+ List *fdw_private;
+ char *sql;
+ PGconn *conn;
+ PGresult *res;
+ PgsqlFdwExecutionState *festate;
+
+ festate = (PgsqlFdwExecutionState *) node->fdw_state;
+
+ /* if festate is NULL, we are in EXPLAIN; nothing to do */
+ if (festate == NULL)
+ return;
+
+ /* Discard fetch results */
+ if (festate->tuples != NULL)
+ {
+ tuplestore_end(festate->tuples);
+ festate->tuples = NULL;
+ }
+
+ /* Close cursor */
+ if (USE_CURSOR(festate->fdwplan) && festate->cursor_opened)
+ {
+ fdw_private = festate->fdwplan->fdw_private;
+ conn = festate->conn;
+ sql = strVal(list_nth(fdw_private, FdwPrivateCloseSql));
+ res = PQexec(conn, sql);
+ PG_TRY();
+ {
+ if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ ereport(ERROR,
+ (errmsg("could not close cursor"),
+ errdetail("%s", PQerrorMessage(conn)),
+ errhint("%s", sql)));
+ }
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+
+ ReleaseConnection(festate->conn);
+ }
+
+ /*
+ * Estimate costs of scanning a foreign table.
+ */
+ static void
+ estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
+ const char *sql, Oid serverid,
+ Cost *startup_cost, Cost *total_cost)
+ {
+ ForeignServer *server;
+ UserMapping *user;
+ PGconn *conn = NULL;
+ PGresult *res = NULL;
+ StringInfoData buf;
+ char *plan;
+ char *p;
+ char *endp;
+
+ /*
+ * Get connection to the foreign server. Connection manager would
+ * establish new connection if necessary.
+ */
+ server = GetForeignServer(serverid);
+ user = GetUserMapping(GetOuterUserId(), server->serverid);
+ conn = GetConnection(server, user);
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "EXPLAIN (FORMAT YAML) %s", sql);
+
+ /* remove WHERE clause if the query uses parameter */
+ p = strchr(buf.data, '$');
+ if (p != NULL)
+ {
+ p = strstr(buf.data, "WHERE");
+ if (p != NULL)
+ *p = '\0';
+ }
+
+ PG_TRY();
+ {
+ res = PQexec(conn, buf.data);
+ if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ char *msg;
+
+ msg = pstrdup(PQerrorMessage(conn));
+ PQclear(res);
+ res = NULL;
+ ereport(ERROR,
+ (errmsg("could not execute EXPLAIN for cost estimation"),
+ errdetail("%s", msg),
+ errhint("%s", sql)));
+ }
+ plan = pstrdup(PQgetvalue(res, 0, 0));
+ PQclear(res);
+ res = NULL;
+ ReleaseConnection(conn);
+ }
+ PG_CATCH();
+ {
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ /* extract startup cost from remote plan */
+ p = strstr(plan, "Startup Cost: ");
+ if (p != NULL)
+ {
+ p += strlen("Startup Cost: ");
+ *startup_cost = strtod(p, &endp);
+ if (*endp != '\n' && *endp != '\0')
+ elog(ERROR, "invalid plan format for startup cost%c", *endp);
+ }
+
+ /* extract total cost from remote plan */
+ p = strstr(plan, "Total Cost: ");
+ if (p != NULL)
+ {
+ p += strlen("Total Cost: ");
+ *total_cost = strtod(p, &endp);
+ if (*endp != '\n' && *endp != '\0')
+ elog(ERROR, "invalid plan format for total cost");
+ }
+
+ /* extract # of rows from remote plan */
+ p = strstr(plan, "Plan Rows: ");
+ if (p != NULL)
+ {
+ p += strlen("Plan Rows: ");
+ baserel->rows = strtod(p, &endp);
+ if (*endp != '\n' && *endp != '\0')
+ elog(ERROR, "invalid plan format for plan rows");
+ }
+
+ /* extract average width from remote plan */
+ p = strstr(plan, "Plan Width: ");
+ if (p != NULL)
+ {
+ p += strlen("Plan Width: ");
+ baserel->width = strtol(p, &endp, 10);
+ if (*endp != '\n' && *endp != '\0')
+ elog(ERROR, "invalid plan format for plan width");
+ }
+
+ /* TODO Selectivity of quals pushed down should be considered. */
+
+ /* add cost to establish connection. */
+ *startup_cost += CONNECTION_COSTS;
+ *total_cost += CONNECTION_COSTS;
+
+ /* add cost to transfer result. */
+ *total_cost += TRANSFER_COSTS_PER_BYTE * baserel->width * baserel->tuples;
+ *total_cost += cpu_tuple_cost * baserel->tuples;
+ }
+
+ /*
+ * Execute remote query with current parameters.
+ */
+ static void
+ execute_query(ForeignScanState *node)
+ {
+ FdwPlan *fdwplan;
+ PgsqlFdwExecutionState *festate;
+ ParamListInfo params = node->ss.ps.state->es_param_list_info;
+ int numParams = params ? params->numParams : 0;
+ Oid *types = NULL;
+ const char **values = NULL;
+ char *sql;
+ int required_result;
+ PGconn *conn;
+ PGresult *res;
+
+ festate = (PgsqlFdwExecutionState *) node->fdw_state;
+ types = festate->param_types;
+ values = festate->param_values;
+
+ /*
+ * Construct parameter array in text format. We don't release memory for
+ * the arrays explicitly, because the memory usage would not be very large,
+ * and anyway they will be released in context cleanup.
+ */
+ if (numParams > 0)
+ {
+ int i;
+
+ for (i = 0; i < numParams; i++)
+ {
+ types[i] = params->params[i].ptype;
+ if (params->params[i].isnull)
+ values[i] = NULL;
+ else
+ {
+ Oid out_func_oid;
+ bool isvarlena;
+ FmgrInfo func;
+
+ getTypeOutputInfo(types[i], &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &func);
+ values[i] = OutputFunctionCall(&func, params->params[i].value);
+ }
+ }
+ }
+
+ /*
+ * Execute remote query with parameters.
+ */
+ conn = festate->conn;
+ fdwplan = ((ForeignScan *) node->ss.ps.plan)->fdwplan;
+ if (USE_CURSOR(fdwplan))
+ {
+ sql = strVal(list_nth(fdwplan->fdw_private, FdwPrivateDeclareSql));
+ required_result = PGRES_COMMAND_OK;
+ }
+ else
+ {
+ sql = strVal(list_nth(fdwplan->fdw_private, FdwPrivateSelectSql));
+ required_result = PGRES_TUPLES_OK;
+ }
+ res = PQexecParams(conn, sql, numParams, types, values, NULL, NULL, 0);
+ PG_TRY();
+ {
+ /*
+ * If the query has failed, reporting details is enough here.
+ * Connection(s) which are used by this query (at least used by
+ * pgsql_fdw) will be cleaned up by the foreign connection manager.
+ */
+ if (res == NULL || PQresultStatus(res) != required_result)
+ {
+ ereport(ERROR,
+ (errmsg("could not execute foreign query"),
+ errdetail("%s", PQerrorMessage(conn)),
+ errhint("%s", sql)));
+ }
+
+ /* Fetch first bunch of result if we using cursor. */
+ if (USE_CURSOR(fdwplan))
+ {
+ /* Mark that this scan has opened a cursor. */
+ festate->cursor_opened = true;
+
+ /* Discard result of CURSOR statement and fetch first bunch. */
+ PQclear(res);
+ res = fetch_result(node);
+ }
+
+ /*
+ * Store the result of the query into tuplestore.
+ * We must release PGresult here to avoid memory leak.
+ */
+ store_result(node, res);
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+
+ /*
+ * Fetch next partial result from remote server.
+ */
+ static PGresult *
+ fetch_result(ForeignScanState *node)
+ {
+ PgsqlFdwExecutionState *festate;
+ List *fdw_private;
+ char *sql;
+ PGconn *conn;
+ PGresult *res;
+
+ festate = (PgsqlFdwExecutionState *) node->fdw_state;
+
+ /* retrieve information for fetching result. */
+ fdw_private = festate->fdwplan->fdw_private;
+ sql = strVal(list_nth(fdw_private, FdwPrivateFetchSql));
+ conn = festate->conn;
+ res = PQexec(conn, sql);
+ if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ ereport(ERROR,
+ (errmsg("could not fetch rows from foreign server"),
+ errdetail("%s", PQerrorMessage(conn)),
+ errhint("%s", sql)));
+ }
+
+ return res;
+ }
+
+ /*
+ * Create tuples from PGresult and store them into tuplestore.
+ */
+ static void
+ store_result(ForeignScanState *node, PGresult *res)
+ {
+ int rows;
+ int row;
+ int i;
+ int nfields;
+ int attnum; /* number of non-dropped columns */
+ Form_pg_attribute *attrs;
+ TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ TupleDesc tupdesc = slot->tts_tupleDescriptor;
+ PgsqlFdwExecutionState *festate;
+
+ festate = (PgsqlFdwExecutionState *) node->fdw_state;
+ rows = PQntuples(res);
+ nfields = PQnfields(res);
+ attrs = tupdesc->attrs;
+
+ /* First, ensure that the tuplestore is empty. */
+ if (festate->tuples == NULL)
+ {
+ MemoryContext oldcontext = CurrentMemoryContext;
+
+ /*
+ * Create tuplestore to store result of the query in per-query context.
+ * Note that we use this memory context to avoid memory leak in error
+ * cases.
+ */
+ MemoryContextSwitchTo(MessageContext);
+ festate->tuples = tuplestore_begin_heap(false, false, work_mem);
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ /* We already have tuplestore, just need to clear contents of it. */
+ tuplestore_clear(festate->tuples);
+ }
+
+
+ /* count non-dropped columns */
+ for (attnum = 0, i = 0; i < tupdesc->natts; i++)
+ if (!attrs[i]->attisdropped)
+ attnum++;
+
+ /* check result and tuple descriptor have the same number of columns */
+ if (attnum > 0 && attnum != nfields)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype"),
+ errdetail("expected %d, actual %d", attnum, nfields)));
+
+ /* put a tuples into the slot */
+ for (row = 0; row < rows; row++)
+ {
+ int j;
+ HeapTuple tuple;
+
+ for (i = 0, j = 0; i < tupdesc->natts; i++)
+ {
+ /* skip dropped columns. */
+ if (attrs[i]->attisdropped)
+ {
+ festate->col_values[i] = NULL;
+ continue;
+ }
+
+ if (PQgetisnull(res, row, j))
+ festate->col_values[i] = NULL;
+ else
+ festate->col_values[i] = PQgetvalue(res, row, j);
+ j++;
+ }
+
+ /*
+ * Build the tuple and put it into the slot.
+ * We don't have to free the tuple explicitly because it's been
+ * allocated in the per-tuple context.
+ */
+ tuple = BuildTupleFromCStrings(festate->attinmeta, festate->col_values);
+ tuplestore_puttuple(festate->tuples, tuple);
+ }
+
+ tuplestore_donestoring(festate->tuples);
+ }
diff --git a/contrib/pgsql_fdw/pgsql_fdw.control b/contrib/pgsql_fdw/pgsql_fdw.control
index ...0a9c8f4 .
*** a/contrib/pgsql_fdw/pgsql_fdw.control
--- b/contrib/pgsql_fdw/pgsql_fdw.control
***************
*** 0 ****
--- 1,5 ----
+ # pgsql_fdw extension
+ comment = 'foreign-data wrapper for remote PostgreSQL servers'
+ default_version = '1.0'
+ module_pathname = '$libdir/pgsql_fdw'
+ relocatable = true
diff --git a/contrib/pgsql_fdw/pgsql_fdw.h b/contrib/pgsql_fdw/pgsql_fdw.h
index ...d5e5d4b .
*** a/contrib/pgsql_fdw/pgsql_fdw.h
--- b/contrib/pgsql_fdw/pgsql_fdw.h
***************
*** 0 ****
--- 1,28 ----
+ /*-------------------------------------------------------------------------
+ *
+ * pgsql_fdw.h
+ * foreign-data wrapper for remote PostgreSQL servers.
+ *
+ * Copyright (c) 2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * pgsql_fdw/pgsql_fdw.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+ #ifndef PGSQL_FDW_H
+ #define PGSQL_FDW_H
+
+ #include "postgres.h"
+ #include "nodes/relation.h"
+
+ /* in option.c */
+ int ExtractConnectionOptions(List *defelems,
+ const char **keywords,
+ const char **values);
+
+ /* in deparse.c */
+ char *deparseSql(Oid relid, PlannerInfo *root, RelOptInfo *baserel);
+
+ #endif /* PGSQL_FDW_H */
diff --git a/contrib/pgsql_fdw/sql/pgsql_fdw.sql b/contrib/pgsql_fdw/sql/pgsql_fdw.sql
index ...b6b99ab .
*** a/contrib/pgsql_fdw/sql/pgsql_fdw.sql
--- b/contrib/pgsql_fdw/sql/pgsql_fdw.sql
***************
*** 0 ****
--- 1,175 ----
+ -- ===================================================================
+ -- create FDW objects
+ -- ===================================================================
+
+ CREATE EXTENSION pgsql_fdw;
+
+ CREATE SERVER loopback1 FOREIGN DATA WRAPPER pgsql_fdw;
+ CREATE SERVER loopback2 FOREIGN DATA WRAPPER pgsql_fdw
+ OPTIONS (dbname 'contrib_regression');
+
+ CREATE USER MAPPING FOR public SERVER loopback1
+ OPTIONS (user 'value', password 'value');
+ CREATE USER MAPPING FOR public SERVER loopback2;
+
+ CREATE FOREIGN TABLE ft1 (
+ c1 int NOT NULL,
+ c2 int NOT NULL,
+ c3 text,
+ c4 timestamptz,
+ c5 timestamp
+ ) SERVER loopback2;
+
+ CREATE FOREIGN TABLE ft2 (
+ c1 int NOT NULL,
+ c2 int NOT NULL,
+ c3 text,
+ c4 timestamptz,
+ c5 timestamp
+ ) SERVER loopback2;
+
+ -- ===================================================================
+ -- create objects used through FDW
+ -- ===================================================================
+ CREATE SCHEMA "S 1";
+ CREATE TABLE "S 1"."T 1" (
+ c1 int NOT NULL,
+ c2 int NOT NULL,
+ c3 text,
+ c4 timestamptz,
+ c5 timestamp,
+ CONSTRAINT t1_pkey PRIMARY KEY (c1)
+ );
+ CREATE TABLE "S 1"."T 2" (
+ c1 int NOT NULL,
+ c2 text,
+ CONSTRAINT t2_pkey PRIMARY KEY (c1)
+ );
+
+ BEGIN;
+ TRUNCATE "S 1"."T 1";
+ INSERT INTO "S 1"."T 1"
+ SELECT id,
+ id % 10,
+ to_char(id, 'FM00000'),
+ '1970-01-01'::timestamptz + ((id % 100) || ' days')::interval,
+ '1970-01-01'::timestamp + ((id % 100) || ' days')::interval
+ FROM generate_series(1, 1000) id;
+ TRUNCATE "S 1"."T 2";
+ INSERT INTO "S 1"."T 2"
+ SELECT id,
+ 'AAA' || to_char(id, 'FM000')
+ FROM generate_series(1, 100) id;
+ COMMIT;
+
+ -- ===================================================================
+ -- tests for pgsql_fdw_validator
+ -- ===================================================================
+ ALTER FOREIGN DATA WRAPPER pgsql_fdw OPTIONS (host 'value'); -- ERRROR
+ -- requiressl, krbsrvname and gsslib are omitted because they depend on
+ -- configure option
+ ALTER SERVER loopback1 OPTIONS (
+ --authtype 'value',
+ service 'value',
+ connect_timeout 'value',
+ dbname 'value',
+ host 'value',
+ hostaddr 'value',
+ port 'value',
+ --client_encoding 'value',
+ --tty 'value',
+ options 'value',
+ application_name 'value',
+ --fallback_application_name 'value',
+ keepalives 'value',
+ keepalives_idle 'value',
+ keepalives_interval 'value',
+ -- requiressl 'value',
+ sslmode 'value',
+ sslcert 'value',
+ sslkey 'value',
+ sslrootcert 'value',
+ sslcrl 'value'
+ --requirepeer 'value',
+ -- krbsrvname 'value',
+ -- gsslib 'value',
+ --replication 'value'
+ );
+ ALTER SERVER loopback1 OPTIONS (user 'value'); -- ERROR
+ ALTER SERVER loopback2 OPTIONS (ADD min_cursor_rows '0');
+ ALTER SERVER loopback2 OPTIONS (ADD fetch_count '2');
+ ALTER USER MAPPING FOR public SERVER loopback1
+ OPTIONS (DROP user, DROP password);
+ ALTER USER MAPPING FOR public SERVER loopback1
+ OPTIONS (host 'value'); -- ERROR
+ ALTER FOREIGN TABLE ft1 OPTIONS (nspname 'S 1', relname 'T 1');
+ ALTER FOREIGN TABLE ft2 OPTIONS (nspname 'S 1', relname 'T 1', min_cursor_rows '10', fetch_count '100');
+ ALTER FOREIGN TABLE ft1 OPTIONS (invalid 'value'); -- ERROR
+ ALTER FOREIGN TABLE ft1 OPTIONS (min_cursor_rows 'a'); -- ERROR
+ ALTER FOREIGN TABLE ft1 OPTIONS (min_cursor_rows '-1'); -- ERROR
+ ALTER FOREIGN TABLE ft1 OPTIONS (fetch_count 'a'); -- ERROR
+ ALTER FOREIGN TABLE ft1 OPTIONS (fetch_count '0'); -- ERROR
+ ALTER FOREIGN TABLE ft1 OPTIONS (fetch_count '-1'); -- ERROR
+ \dew+
+ \des+
+ \deu+
+ \det+
+
+ -- ===================================================================
+ -- simple queries
+ -- ===================================================================
+ -- single table, with/without alias
+ EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1 ORDER BY c3, c1 OFFSET 100 LIMIT 10;
+ SELECT * FROM ft1 ORDER BY c3, c1 OFFSET 100 LIMIT 10;
+ EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1 t1 ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10;
+ SELECT * FROM ft1 t1 ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10;
+ -- with WHERE clause
+ SELECT * FROM ft1 t1 WHERE t1.c1 = 101;
+ -- aggregate
+ SELECT COUNT(*) FROM ft1 t1;
+ -- join two tables
+ SELECT t1.c1 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10;
+ -- subquery
+ SELECT * FROM ft1 t1 WHERE t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 <= 10) ORDER BY c1;
+ -- subquery+MAX
+ SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
+ -- used in CTE
+ WITH t1 AS (SELECT * FROM ft1 WHERE c1 <= 10) SELECT t2.c1, t2.c2, t2.c3, t2.c4 FROM t1, ft2 t2 WHERE t1.c1 = t2.c1 ORDER BY t1.c1;
+ -- fixed values
+ SELECT 'fixed', NULL FROM ft1 t1 WHERE c1 = 1;
+
+ -- ===================================================================
+ -- parameterized queries
+ -- ===================================================================
+ -- simple join
+ PREPARE st1(int, int) AS SELECT t1.c3, t2.c3 FROM ft1 t1, ft2 t2 WHERE t1.c1 = $1 AND t2.c1 = $2;
+ EXPLAIN (COSTS false) EXECUTE st1(1, 2);
+ EXECUTE st1(1, 1);
+ EXECUTE st1(101, 101);
+ -- subquery using stable function (can't be pushed down)
+ PREPARE st2(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 < $2 AND t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 > $1 AND EXTRACT(dow FROM c4) = 6) ORDER BY c1;
+ EXPLAIN (COSTS false) EXECUTE st2(10, 20);
+ EXECUTE st2(10, 20);
+ EXECUTE st1(101, 101);
+ -- subquery using immutable function (can be pushed down)
+ PREPARE st3(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 < $2 AND t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 > $1 AND EXTRACT(dow FROM c5) = 6) ORDER BY c1;
+ EXPLAIN (COSTS false) EXECUTE st3(10, 20);
+ EXECUTE st3(10, 20);
+ EXECUTE st3(20, 30);
+ -- cleanup
+ DEALLOCATE st1;
+ DEALLOCATE st2;
+ DEALLOCATE st3;
+
+ -- ===================================================================
+ -- connection management
+ -- ===================================================================
+ SELECT srvname, usename FROM pgsql_fdw_connections;
+ SELECT pgsql_fdw_disconnect(srvid, usesysid) FROM pgsql_fdw_get_connections();
+ SELECT srvname, usename FROM pgsql_fdw_connections;
+
+ -- ===================================================================
+ -- cleanup
+ -- ===================================================================
+ DROP EXTENSION pgsql_fdw CASCADE;
+
diff --git a/doc/src/sgml/contrib.sgml b/doc/src/sgml/contrib.sgml
index adf09ca..65c7e81 100644
*** a/doc/src/sgml/contrib.sgml
--- b/doc/src/sgml/contrib.sgml
*************** CREATE EXTENSION <replaceable>module_nam
*** 117,122 ****
--- 117,123 ----
&pgcrypto;
&pgfreespacemap;
&pgrowlocks;
+ &pgsql-fdw;
&pgstandby;
&pgstatstatements;
&pgstattuple;
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index ed39e0b..d72590f 100644
*** a/doc/src/sgml/filelist.sgml
--- b/doc/src/sgml/filelist.sgml
***************
*** 123,128 ****
--- 123,129 ----
<!ENTITY pgcrypto SYSTEM "pgcrypto.sgml">
<!ENTITY pgfreespacemap SYSTEM "pgfreespacemap.sgml">
<!ENTITY pgrowlocks SYSTEM "pgrowlocks.sgml">
+ <!ENTITY pgsql-fdw SYSTEM "pgsql-fdw.sgml">
<!ENTITY pgstandby SYSTEM "pgstandby.sgml">
<!ENTITY pgstatstatements SYSTEM "pgstatstatements.sgml">
<!ENTITY pgstattuple SYSTEM "pgstattuple.sgml">
diff --git a/doc/src/sgml/pgsql-fdw.sgml b/doc/src/sgml/pgsql-fdw.sgml
index ...6b7cb94 .
*** a/doc/src/sgml/pgsql-fdw.sgml
--- b/doc/src/sgml/pgsql-fdw.sgml
***************
*** 0 ****
--- 1,273 ----
+ <!-- doc/src/sgml/pgsql-fdw.sgml -->
+
+ <sect1 id="pgsql-fdw" xreflabel="pgsql_fdw">
+ <title>pgsql_fdw</title>
+
+ <indexterm zone="pgsql-fdw">
+ <primary>pgsql_fdw</primary>
+ </indexterm>
+
+ <para>
+ The <filename>pgsql_fdw</filename> module provides a foreign-data wrapper for
+ external <productname>PostgreSQL</productname> servers.
+ With this module, users can access data stored in external
+ <productname>PostgreSQL</productname> via plain SQL statements.
+ </para>
+
+ <para>
+ The <application>pgsql_fdw</application> can be installed on only
+ <productname>PostgreSQL</productname> 9.1 or later, but it can also access
+ data stored in <productname>PostgreSQL</productname> 9.0.
+ <productname>PostgreSQL</productname>8.4 or older are not available as remote
+ server because <application>pgsql_fdw</application> uses YAML format of
+ <command>EXPLAIN</command> output.
+ </para>
+
+ <para>
+ Note that default wrapper <literal>pgsql_fdw</literal> is created
+ automatically during <command>CREATE EXTENSION</command> command for
+ <application>pgsql_fdw</application>.
+ </para>
+
+ <sect2>
+ <title>FDW Options of pgsql_fdw</title>
+
+ <sect3>
+ <title>Connection Options</title>
+ <para>
+ A foreign server and user mapping created using this wrapper can have
+ <application>libpq</> connection options, expect below:
+
+ <itemizedlist>
+ <listitem><para>authtype</para></listitem>
+ <listitem><para>client_encoding</para></listitem>
+ <listitem><para>tty</para></listitem>
+ <listitem><para>fallback_application_name</para></listitem>
+ <listitem><para>replication</para></listitem>
+ </itemizedlist>
+
+ For details of <application>libpq</> connection options, see
+ <xref linkend="libpq-connect">.
+ </para>
+
+ <para>
+ <literal>user</literal> and <literal>password</literal> can be
+ specified on user mappings, and others can be specified on foreign servers.
+ </para>
+ </sect3>
+
+ <sect3>
+ <title>Object Name Options</title>
+ <para>
+ Foreign tables which were created using this wrapper, or its columns can
+ have object name options. These options can be used to specify the names
+ used in SQL statement sent to remote <productname>PostgreSQL</productname>
+ server. These options are useful when a remote object has different name
+ from corresponding local one.
+ </para>
+
+ <variablelist>
+
+ <varlistentry>
+ <term><literal>nspname</literal></term>
+ <listitem>
+ <para>
+ This option, which can be specified on a foreign table, is used as a
+ namespace (schema) reference in the SQL statement. If this options is
+ omitted, <literal>pg_class.nspname</literal> of the foreign table is
+ used.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>relname</literal></term>
+ <listitem>
+ <para>
+ This option, which can be specified on a foreign table, is used as a
+ relation (table) reference in the SQL statement. If this options is
+ omitted, <literal>pg_class.relname</literal> of the foreign table is
+ used.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>colname</literal></term>
+ <listitem>
+ <para>
+ This option, which can be specified on a column of a foreign table, is
+ used as a column (attribute) reference in the SQL statement. If this
+ options is omitted, <literal>pg_attribute.attname</literal> of the column
+ of the foreign table is used.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+
+ </sect3>
+
+ <sect3>
+ <title>Cursor Options</title>
+ <para>
+ The <application>pgsql_fdw</application> switches the way used to retrieve
+ result of remote query according to estimated number of result rows.
+ If the estimation was less than a threshold, the result rows are retrieved
+ at once with simple <command>SELECT</command> statement.
+ In contrast, if the estimation is larger than the threshold, the result
+ rows are fetched separately with a cursor defined by
+ <command>DECLARE</command> statement.
+ </para>
+
+ <para>
+ Users can control this behavior with setting cursor options for a foreign
+ table.
+ </para>
+
+ <variablelist>
+
+ <varlistentry>
+ <term><literal>min_cursor_rows</literal></term>
+ <listitem>
+ <para>
+ This option specifies the minimum estimated number of result rows to use
+ cursor for fetching result. Setting this to 1 means that every query
+ uses cursor, and setting to 0 means that every query uses simple
+ <command>SELECT</command>. This option can be set on a foreign table,
+ and accepts only positive integer value.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>fetch_count</literal></term>
+ <listitem>
+ <para>
+ This option specifies the number of rows fetched at once in cursor mode.
+ This option accepts only integer value larger than zero.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+
+ </sect3>
+
+ </sect2>
+
+ <sect2>
+ <title>Connection Management</title>
+
+ <para>
+ The <application>pgsql_fdw</application> establishes a connection to a
+ foreign server in the beginning of the first query which uses a foreign
+ table associated to the foreign server, and reuses the connection following
+ queries and even in following foreign scans in same query.
+
+ You can see the list of active connections via
+ <structname>pgsql_fdw_connections</structname> view. It shows pair of oid
+ and name of server and local role for each active connections established by
+ <application>pgsql_fdw</application>. For security reason, only superuser
+ can see other role's connections.
+ </para>
+
+ <para>
+ Established connections are kept alive until local role changes or the
+ current transaction aborts or user requests so.
+ </para>
+
+ <para>
+ If role has been changed, active connections established as old local role
+ is kept alive but never be reused until locla role has restored to original
+ role. This kind of situation happens with <command>SET ROLE</command> and
+ <command>SET SESSION AUTHORIZATION</command>.
+ </para>
+
+ <para>
+ If current transaction aborts by error or user request, all active
+ connections are disconnected automatically. This behavior avoids possible
+ connection leaks on error.
+ </para>
+
+ <para>
+ You can discard persistent connection at arbitrary timing with
+ <function>pgsql_fdw_disconnect()</function>. It takes server oid and
+ user oid as arguments. This function can handle only connections
+ established in current session; connections established by other backends
+ are not reachable.
+ </para>
+
+ <para>
+ You can discard all active and visible connections in current session with
+ using <structname>pgsql_fdw_connections</structname> and
+ <function>pgsql_fdw_disconnect()</function> together:
+ <synopsis>
+ postgres=# SELECT pgsql_fdw_disconnect(srvid, usesysid) FROM pgsql_fdw_connections;
+ pgsql_fdw_disconnect
+ ----------------------
+ OK
+ OK
+ (2 rows)
+ </synopsis>
+ </para>
+ </sect2>
+
+ <sect2>
+ <title>Transaction Management</title>
+ <para>
+ The <application>pgsql_fdw</application> executes <command>BEGIN</command>
+ command when a new connection has established. This means that all remote
+ queries for a foreign server are executed in a transaction. Since the
+ default transaction isolation level is <literal>READ COMMITTED</literal>,
+ multiple foreign scans in a local query might produce inconsistent results.
+
+ To avoid this inconsistency, you can use <literal>SERIALIZABLE</literal>
+ level for remote transaction with setting
+ <literal>default_transaction_isolation</literal> for the user used for
+ <application>pgsql_fdw</application> connection on remote side.
+ </para>
+ </sect2>
+
+ <sect2>
+ <title>Estimation of Costs and Rows</title>
+ <para>
+ The <application>pgsql_fdw</application> estimates the costs of a foreign
+ scan by adding some basic costs: connection costs, remote query costs and
+ data transfer costs.
+ To get remote query costs <application>pgsql_fdw</application> executes
+ <command>EXPLAIN</command> command on remote server for each foreign scan.
+ </para>
+
+ <para>
+ On the other hand, estimated rows which was returned by
+ <command>EXPLAIN</command> is used for local estimation as-is.
+ </para>
+ </sect2>
+
+ <sect2>
+ <title>EXPLAIN Output</title>
+ <para>
+ For a foreign table using <literal>pgsql_fdw</>, <command>EXPLAIN</> shows
+ a remote SQL statement which is sent to remote
+ <productname>PostgreSQL</productname> server for a ForeignScan plan node.
+ For example:
+ </para>
+ <synopsis>
+ postgres=# EXPLAIN SELECT aid FROM pgbench_accounts WHERE abalance < 0;
+ QUERY PLAN
+ -----------------------------------------------------------------------------------------------
+ Foreign Scan on pgbench_accounts (cost=100.00..8769.00 rows=1 width=4)
+ Remote SQL: SELECT aid, NULL, NULL, NULL FROM public.pgbench_accounts WHERE (abalance < 0)
+ (2 rows)
+ </synopsis>
+ </sect2>
+
+ <sect2>
+ <title>Author</title>
+ <para>
+ Shigeru Hanada <email>shigeru.hanada@gmail.com</email>
+ </para>
+ </sect2>
+
+ </sect1>
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 75923a6..6fda053 100644
*** a/src/backend/utils/adt/ruleutils.c
--- b/src/backend/utils/adt/ruleutils.c
*************** deparse_context_for(const char *aliasnam
*** 2161,2166 ****
--- 2161,2189 ----
return list_make1(dpns);
}
+ /* ----------
+ * deparse_context_for_rtelist - Build deparse context for a single relation
+ *
+ * Given the list of RangeTableEnetry, build deparsing context for an
+ * expression referencing those relations. This is sufficient for uses of
+ * deparse_expression before plan has been created.
+ * ----------
+ */
+ List *
+ deparse_context_for_rtelist(List *rtable)
+ {
+ deparse_namespace *dpns;
+
+ dpns = (deparse_namespace *) palloc0(sizeof(deparse_namespace));
+
+ /* Build a minimal RTE for the rel */
+ dpns->rtable = rtable;
+ dpns->ctes = NIL;
+
+ /* Return a one-deep namespace stack */
+ return list_make1(dpns);
+ }
+
/*
* deparse_context_for_planstate - Build deparse context for a plan
*
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 8a1c82e..a56366f 100644
*** a/src/include/utils/builtins.h
--- b/src/include/utils/builtins.h
*************** extern char *deparse_expression(Node *ex
*** 626,631 ****
--- 626,632 ----
extern List *deparse_context_for(const char *aliasname, Oid relid);
extern List *deparse_context_for_planstate(Node *planstate, List *ancestors,
List *rtable);
+ extern List *deparse_context_for_rtelist(List *rtable);
extern const char *quote_identifier(const char *ident);
extern char *quote_qualified_identifier(const char *qualifier,
const char *ident);