In this post, I will describe how to set up and scale an elastic PostgreSQL cluster with parallel query execution using freely available open-source components from the PostgreSQL ecosystem, including a custom patch to postgres_fdw that is needed to achieve desirable performance.
Scaling out analytics workloads elastically
There are several challenges within analytics workloads (OLAP) that can be solved using an elastically scaling cluster:
- Large and ever growing datasets that are several terabytes in size
- Many users querying the data concurrently
- Sudden surges of query or data-ingestion activity
With an ever growing dataset and an increasing number of users, you eventually run out of system resources, no matter how beefy your PostgreSQL server is.
Parallel queries (introduced in PostgreSQL 9.6) can shorten query times by using more resources, but they become problematic when concurrent users or applications compete for resources. Deploying multiple servers’ worth of resources to a single query leaves the next query starved and sluggish. With varying demands, query times become unpredictable, and users waste time and lose patience.
An elastic cluster keeps your system responsive and your users happy. If you are in a pay-per-use environment, elasticity typically lowers usage and scaling costs and produces less waste because you only pay for what you need and use.
To build a scalable PostgreSQL cluster, there are various options in the PostgreSQL ecosystem, the best of which depends on your use-case:
- In a shared-nothing architecture, every cluster node only contains a part of the dataset. You can distribute queries that modify data as well as read-only queries across the cluster.
- Replication-based scale-out is when every cluster node contains the whole dataset. A master node receives all queries that modify data and replicates that data to all remaining cluster nodes, which are called read replicas. Read-only queries can be distributed across all cluster nodes.
- Hybrid approaches, where data is partially replicated, are not covered in this blog post.
Next, we will show you how to build a scalable, shared-nothing elastic PostgreSQL cluster using only open-source software:
- The haproxy load balancer.
- PostgreSQL 11 (or higher).
- The postgres_fdw foreign data wrapper (FDW) with a patch from Swarm64 that makes the FDW parallel-safe. See parallel-postgres-fdw-patch on GitHub.
- Table partitioning, and partition-wise aggregations and joins.
Introducing the high-level architecture
The cluster consists of data nodes and coordinator nodes. Table data are horizontally partitioned into shards that are stored on the data nodes. Each coordinator node receives queries, plans them, executes them by offloading some parts to the data nodes, and itself executes the rest of each query.
The data nodes do not return all of the data. Rather, they apply filter conditions of the WHERE clause(s) and only return data relevant to each query. Sometimes complex operations such as joins and aggregates are offloaded to the data nodes. For more information, see “Pushing down queries” below.
Because all of the data is stored only on the data nodes, the coordinators are stateless. This statelessness allows you to achieve elastic PostgreSQL scalability by dynamically switching on or off coordinator nodes based on demand.
Sharding the data
The postgres_fdw (Foreign Data Wrapper, or FDW) allows you to use a table that is stored on a remote PostgreSQL server as if it were stored locally on the corresponding PostgreSQL server. Therefore, by creating a partitioned table where each partition is a foreign table backed by a real table stored on a remote PostgreSQL server, you can shard a single table across multiple servers. PostgreSQL supports different forms of partitioning. For more information, see PostgreSQL’s Overview of table partitions.
The code snippet that follows uses hash partitioning, which routes rows to their target partitions. The target partitions are selected by a hash value that is derived from the partition-key columns. Note that the backing tables on the data nodes were already created, and the fetch_size specifies the number of rows to be transferred from the data node to the coordinator in one batch. A fetch size of approximately 100,000 rows performs well.
CREATE EXTENSION postgres_fdw; CREATE SERVER data_node_0 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '10.10.0.1', port '5432', dbname 'tpc_h', use_remote_estimate 'TRUE', fetch_size '100000'); … CREATE SERVER data_node_3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '10.10.0.2', port '5432', dbname 'tpc_h', use_remote_estimate 'TRUE', fetch_size '100000'); CREATE TABLE lineitem ( l_orderkey BIGINT NOT NULL, l_partkey INT NOT NULL, … ) PARTITION BY HASH (l_orderkey); CREATE FOREIGN TABLE lineitem_shard_0 PARTITION OF lineitem FOR VALUES WITH (MODULUS 4, REMAINDER 0) SERVER data_node_0 OPTIONS(table_name 'lineitem'); … CREATE FOREIGN TABLE lineitem_shard_3 PARTITION OF lineitem FOR VALUES WITH (MODULUS 4, REMAINDER 3) SERVER data_node_3 OPTIONS(table_name 'lineitem');
To be able to query any of the tables on the coordinator node, you need a user mapping for each foreign server / data node. The following code snippet creates a user mapping:
CREATE USER MAPPING FOR postgres SERVER data_node_0 OPTIONS (user 'postgres'); ... CREATE USER MAPPING FOR postgres SERVER data_node_3 OPTIONS (user 'postgres');
For a complete example showing how to set up a cluster on a single machine, see Script to set up a local FDW-based cluster.
Enabling parallelism on the coordinator
PostgreSQL 9.6 introduced the ability to leverage multiple CPU cores to execute a query. Parallelism is coordinated by Gather and GatherMerge nodes that fork parallel workers and retrieve all of their results. The entire subplan of a Gather or GatherMerge node is executed by each parallel worker. When you execute a plan node, you get different degrees of parallelism:
- Parallel-unsafe: the plan node cannot be inside a parallel sub-plan.
- Parallel-safe: the plan node can be inside a parallel sub-plan, but must be executed completely by each parallel worker.
- Parallel-aware: the plan node can be inside a parallel sub-plan and is executed jointly by multiple parallel workers. Each of the workers only returns a subset of the data.
Unfortunately, a Foreign Scan on a foreign postgres_fdw table falls into the first category. As a result, any query that accesses data stored on a data node can only be executed on a single CPU core. For example, you can see this in the execution plan of TPC-H query 12:
EXPLAIN (COSTS OFF) SELECT l_shipmode, SUM(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END) AS high_line_count, SUM(CASE WHEN o_orderpriority <> '1-URGENT' AND o_orderpriority <> '2-HIGH' THEN 1 ELSE 0 END) AS low_line_count FROM orders, lineitem WHERE o_orderkey = l_orderkey AND l_shipmode IN ('TRUCK', 'AIR') AND l_commitdate < l_receiptdate AND l_shipdate < l_commitdate AND l_receiptdate >= DATE '1996-01-01' AND l_receiptdate < DATE '1996-01-01' + interval '1' year GROUP BY l_shipmode ORDER BY l_shipmode;
PostgreSQL can execute the join between orders and lineitem in parallel if you do not use foreign partitions. If you do use foreign partitions, the entire execution plan is sequential because it does not contain a Gather or GatherMerge node. A Gather or GatherMerge node is required in order to send work to all data nodes simultaneously, instead of waiting for the first data node to complete before querying the second data node:
Finalize GroupAggregate Group Key: lineitem_shard_0.l_shipmode -> Merge Append Sort Key: lineitem_shard_0.l_shipmode -> Partial GroupAggregate Group Key: lineitem_shard_0.l_shipmode -> Foreign Scan Relations: (public.orders_shard_0 orders) INNER JOIN (public.lineitem_shard_0 lineitem) -> Partial GroupAggregate Group Key: lineitem_shard_1.l_shipmode -> Foreign Scan Relations: (public.orders_shard_1 orders) INNER JOIN (public.lineitem_shard_1 lineitem) -> Partial GroupAggregate Group Key: lineitem_shard_2.l_shipmode -> Foreign Scan Relations: (public.orders_shard_2 orders) INNER JOIN (public.lineitem_shard_2 lineitem) -> Partial GroupAggregate Group Key: lineitem_shard_3.l_shipmode -> Foreign Scan Relations: (public.orders_shard_3 orders) INNER JOIN (public.lineitem_shard_3 lineitem)
If the query plan is sequential, the cluster setup is of no use because neither the data nodes, nor the coordinator node can process the query in parallel, which limits performance and scalability.
Enabling parallelism in postgres_fdw
To parallelize the execution of the query plan, you can either make postgres_fdw a) parallel-safe or b) parallel-aware. To make it parallel-safe, implement the IsForeignParallelScanSafe() FDW API function so that it returns true. This tells PostgreSQL that the ForeignScan node can be part of a parallel sub-plan. However, it still cannot be jointly executed by multiple workers, and is therefore limited by the number of partitions used.
Although option b) is more performant, it’s much more complex to implement because parallel workers need to stay in sync as they only return partial results. Splitting work dynamically across workers requires extra logic inside postgres_fdw.
We opted to provide a patch for option a), which improves the situation in the cluster because now PostgreSQL can make use of the Parallel Append plan node to merge the results of multiple partitions in parallel.
For the source code of the patched postgres_fdw, see parallel-postgres-fdw-patch on GitHub.
With this patch, this is the execution plan of TPC-H query 12:
Finalize GroupAggregate Group Key: lineitem_shard_0.l_shipmode -> Gather Merge Workers Planned: 2 -> Partial GroupAggregate Group Key: lineitem_shard_0.l_shipmode -> Sort Sort Key: lineitem_shard_0.l_shipmode -> Parallel Append -> Foreign Scan Relations: (public.orders_shard_0 orders) INNER JOIN (public.lineitem_shard_0 lineitem) -> Foreign Scan Relations: (public.orders_shard_3 orders) INNER JOIN (public.lineitem_shard_3 lineitem) -> Foreign Scan Relations: (public.orders_shard_1 orders) INNER JOIN (public.lineitem_shard_1 lineitem) -> Foreign Scan Relations: (public.orders_shard_2 orders) INNER JOIN (public.lineitem_shard_2 lineitem)
In our test cluster, the runtime of TPC-H query 12 improved drastically; it previously took several hours, but now completes within minutes. The expected speedup depends on how much of your query plan executes in parallel. See Amdahl’s law.
Enabling parallelism on the data nodes
The postgres_fdw uses the libpq library with a cursor to transfer rows from the data nodes to the coordinator nodes. The cursor prevents the data nodes from parallelizing the query. For more information, see When Can Parallel Query Be Used? To avoid this, split each table into more partitions than there are data nodes. In an example of eight data nodes, you can partition each table into 64 partitions and thereby make every data node process eight tables in parallel. Enabling parallelism in postgres_fdw makes this possible.
Making the cluster elastic
To make the cluster elastic we need a way to add and remove coordinator nodes. We use the haproxy load balancer for that. haproxy does not support adding and removing servers dynamically. To work around this, define a list of servers in disabled state in the haproxy.cfg file. The IP addresses can be placeholders if they are still unknown. The following line of the configuration file provisions 100 servers (coordinator1 to coordinator100) in disabled state:
server-template coordinator 1-100 10.10.0.1:5432 check disabled
Before adding a coordinator node, set its IP address to the correct server:
set server cluster/coordinator1 addr 10.10.1.100 port 5432
To enable and disable a coordinator node, use the following runtime API commands:
set server cluster/coordinator1 state enable set server cluster/coordinator1 state maint
Be careful when shutting down disabled coordinator nodes. It’s possible that there are still active connections on disabled servers. To ensure that these connections are not terminated when shutting down PostgreSQL, use PostgreSQL’s smart shutdown mode, which waits until all active connections complete before shutting down:
pg_ctl stop -m smart
Loading the cluster at speed
Typically, large amounts of data are batch loaded into the database via the COPY FROM command. COPY FROM is faster than loading individual rows via separate INSERT commands because it avoids parsing and planning the command multiple times.
Unfortunately, this is not the case for postgres_fdw foreign tables. Every row coming in through a COPY FROM command is turned into a separate INSERT command and sent to the remote server where it needs to be parsed, planned, and executed. Hence, to load the cluster at speed, data is best directly ingested on the data nodes via the COPY FROM command. In this case, manually distribute the data to conform to the same constraints used on the coordinator, such as partitioning form and uniqueness. Otherwise the query results can be wrong.
Choosing cost estimates
The postgres_fdw extension uses either local or remote cost estimates to evaluate the cost of offloading a part of a query to a foreign server. Remote cost estimates can be enabled by specifying the use_remote_estimate ‘TRUE’ option when creating the foreign server. Because indexes of sharded tables cannot exist on the coordinator, remote cost estimates are typically more accurate than local cost estimates. However for small queries, continually asking data nodes for cost estimates increases query runtime considerably. To avoid this, disable use_remote_estimate. For more information, see Cost Estimation Options.
Pushing down queries
By default postgres_fdw sends every table row from the data nodes to the coordinator, which then does all the work it would usually do if the table were stored directly on the coordinator. In OLAP, where large tables are often processed, moving all this data through the network is inefficient. Hence over the years, postgres_fdw developers built support for pushing down parts of the query execution to the foreign server. PostgreSQL 11 supports pushing down filters, joins, and aggregations. Joins can only be pushed down if both partitions keys align and are both partitions are stored on the same foreign server.
For more information about what gets pushed down to the data nodes, use EXPLAIN VERBOSE, which displays a Remote SQL entry below every Foreign Scan plan node. In PostgreSQL 11, partition-wise aggregates and partition-wise joins are disabled by default. To enable them, run the following commands:
SET enable_partitionwise_join = ON; SET enable_partitionwise_aggregate = ON;
Improving the cluster going forward
While setting up the cluster internally, we encountered restrictions related to parallelization, query-execution performance, and transaction support. Addressing the following points would improve FDW-based clusters:
- Make postgres_fdw parallel-aware. That way, unlike with the parallel-safe postgres_fdw, the degree of parallelism to process a query on the coordinator node does not depend on the number of partitions used.
- Enable queries that are executed by postgres_fdw on the coordinator node to run in parallel on the data node. This is currently prevented by the cursor used by postgres_fdw, which is not supported for parallel execution.
- Improve performance of the COPY FROM operation on sharded tables inpostgres_fdw.
- Support push-down of aggregates without GROUP BY, for example SELECT COUNT(*) FROM lineitem. Currently only queries that perform partition-wise aggregations can be pushed down.
- Support push-down of semi and anti joins as well as joins with lateral references.
- Fully support transactions in FDW-based clusters.
I’d like to thank my colleagues Sebastian Dressler, Luc Vlaming, Luis Carril, and Ursula Kallio for their help with this post.
And I hope to hear from any of you who have feedback on PostgreSQL scalability, the patch, or enhancement requests—reach us via Github issues.