citus 节点间的网络需求:
postgres=# show log_connections ; log_connections -----------------on (1 row) postgres=# show log_disconnections ; log_disconnections --------------------on (1 row)例子,
以下两条SQL均为即时短连接模式(Custom Scan (Citus Task-Tracker) Custom Scan (Citus Real-Time))。
postgres=# set citus.task_executor_type =task; ERROR: invalid value for parameter "citus.task_executor_type": "task" HINT: Availablevalues: real-time, task-tracker. postgres=# set citus.task_executor_type =task-tracker; SET postgres=# explain select count(*) from pgbench_accounts ; QUERY PLAN --------------------------------------------------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0width=0) -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) TaskCount: 128 Tasks Shown: One of 128 -> Task Node: host= dbname=postgres -> Aggregate (cost=231.85..231.86 rows=1 width=8) -> SeqScan on pgbench_accounts_106812 pgbench_accounts (cost=0.00..212.48 rows=7748 width=0) (8 rows) postgres=# set citus.task_executor_type =real-time; postgres=# explain select count(*) from pgbench_accounts ; QUERY PLAN --------------------------------------------------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) Task Count: 128 Tasks Shown: One of 128-> Task Node: host= port=1921 dbname=postgres -> Aggregate (cost=231.85..231.86 rows=1 width=8) -> Seq Scan onpgbench_accounts_106812 pgbench_accounts (cost=0.00..212.48 rows=7748 width=0) (8 rows)2、跑OLTP查询时(通常并发很高,前端有连接池(保持会话)),为会话级保持连接模式(Custom Scan (Citus Router))。
postgres=# explain select * from pgbench_accounts where aid=5;QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task Node: host= port=1921dbname=postgres ->Index Scan using pgbench_accounts_pkey_106836 on pgbench_accounts_106836 pgbench_accounts (cost=0.28..2.50 rows=1 width=97) Index Cond: (aid = 5) (7 rows)看以上两种场景,CITUS应该说设计得已经很不错了。既能满足TP也能满足AP。
newdb=> \c postgres postgres You are now connected to database"postgres" as user "postgres". postgres=# select * from pgbench_accounts where aid=1; aid | bid | abalance | filler -----+-----+----------+-------------------------------------------------------------------------------------- 1 | 1 | 7214 | (1 row) Time: 11.264 ms -- 包括新建连接的开销 postgres=# select * from pgbench_accounts where aid=1; aid | bid | abalance | filler -----+-----+----------+-------------------------------------------------------------------------------------- 1 | 1 | 7214 | (1row)Time: 0.905 ms -- 已建立连接在worker节点上,部署pgbouncer,所有与worker节点建立的连接都通过pgbouncer连接池,以此来保持住连接,降低worker节点频繁新建连接的开销。
yum install -y pgbouncer配置
vi /etc/pgbouncer/pgb.ini [databases] newdb = host=/tmp dbname=newdb port=1921 user=digoal pool_size=128 reserve_pool=10 [pgbouncer] logfile = /var/log/pgbouncer/pgbouncer.log pidfile = /var/run/pgbouncer/pgbouncer.pid listen_addr = listen_port = 8001 auth_type = any auth_file = /etc/pgbouncer/userlist.txt pool_mode = session server_reset_query = DISCARD ALL max_client_conn = 5000 default_pool_size = 128 ; 最大不要超过4倍CPU数启动
pgbouncer-d -u pgbouncer /etc/pgbouncer/pgb.ini在一个citus集群中,可以同时存在直连worker或者通过pgbouncer连接worker。
2、所有节点(包括cn, worker)新建数据库,插件
su - postgres psql -c "create roledigoal login;" psql -c "create database newdb;" psql -c "grant all on database newdb todigoal;" psql -U postgres newdb -c "create extension citus;"cn节点将worker添加到集群配置,使用pgbouncer的连接端口
su - postgres -c "psql -U postgres newdb -c \"SELECT * from master_add_node(xxx.xxx.xxx.224, 8001);\"" su - postgres -c "psql -U postgres newdb -c \"SELECT * from master_add_node(xxx.xxx.xxx.230, 8001);\"" su - postgres -c "psql -U postgres newdb -c \"SELECT * from master_add_node(xxx.xxx.xxx.231, 8001);\"" su - postgres -c "psql -U postgres newdb -c \"SELECT * frommaster_add_node(xxx.xxx.xxx.225, 8001);\"" su - postgres -c "psql -U postgres newdb -c \"SELECT * frommaster_add_node(xxx.xxx.xxx.227, 8001);\"" su - postgres -c "psql -U postgres newdb -c \"SELECT * from master_add_node(xxx.xxx.xxx.232, 8001);\"" su - postgres -c "psql -U postgres newdb -c \"SELECT * from master_add_node(xxx.xxx.xxx.226, 8001);\"" su - postgres -c "psql -U postgres newdb -c \"SELECT * from master_add_node(xxx.xxx.xxx.229, 8001);\""MX配置,同样,将worker节点添加到元数据同步中。
psql newdb postgresselect * from master_add_node(xxx.xxx.xxx.224,8001); select * from master_add_node(xxx.xxx.xxx.230,8001);开启同步到元数据。
select start_metadata_sync_to_node(xxx.xxx.xxx.224,8001); select start_metadata_sync_to_node(xxx.xxx.xxx.230,8001);测试1、tpc-b 长连接测试
pgbench -i -s -U digoal newdbpsql -U digoal newdbselect create_distributed_table(pgbench_accounts,aid); select create_distributed_table(pgbench_branches,bid); select create_distributed_table(pgbench_tellers,tid); select create_distributed_table(pgbench_history,aid);pgbench -M prepared -v -r -P 1 -c 64 -j 64 -T 120 -U digoal newdb -S性能与不使用pgbouncer差不多,因为使用了长连接测试简单SQL(本身citus就使用了会话级连接保持,没有短连接问题)。
newdb=> \q postgres@digoal-citus-gpdb-test001-> psql newdb digoal psql (10.5) Type "help" forhelp. \timing newdb=> select * from pgbench_accounts where aid=5; aid | bid | abalance | filler -----+-----+----------+-------------------------------------------------------------------------------------- 5 | 1 | 0 | (1 row) Time: 6.016 ms -- 不包括新建连接(已使用pgbouncer建立),但是也多了几毫秒 -- 但是相比未使用pgbouncer,已经降低了5毫秒左右的延迟。 newdb=> select * from pgbench_accounts where aid=5; aid | bid | abalance | filler -----+-----+----------+-------------------------------------------------------------------------------------- 5 | 1 | 0 | (1 row) Time: 0.989 ms多出的几毫秒,我们社区的小伙伴邓彪、王健给出了原因如下,很多地方需要检查安装的citus版本与citus.control控制文件的版本是否兼容(比如加载分布式TABLE的RELCACHE时,第一次访问就是这个问题),不兼容报错:
/* * CheckAvailableVersion compares CITUS_EXTENSIONVERSION and the currently * available version from the citus.control file. If they are not compatible, * this function logs an error with the specified elevel and returns false, * otherwise it returns true. */ bool CheckAvailableVersion(int elevel) { char *availableVersion = NULL; if(!EnableVersionChecks) {return true; } availableVersion = AvailableExtensionVersion(); if(!MajorVersionsCompatible(availableVersion, CITUS_EXTENSIONVERSION)) { ereport(elevel, (errmsg("loaded Citus library version differs from latest " "available extension version"), errdetail("Loaded library requires %s, but the latest control " "file specifies %s.", CITUS_MAJORVERSION, availableVersion), errhint("Restart the database to load the latest Citus " "library."))); return false; } return true; } /* * AvailableExtensionVersion returns the Citus version from citus.control file. It also * saves the result, thus consecutive calls to CitusExtensionAvailableVersion will * not read the citus.control file again. */ static char * AvailableExtensionVersion(void) { ReturnSetInfo *extensionsResultSet =NULL; TupleTableSlot *tupleTableSlot = NULL; FunctionCallInfoData *fcinfo =NULL; FmgrInfo *flinfo = NULL; int argumentCount = 0; EState *estate = NULL; boolhasTuple =false; bool goForward = true; bool doCopy = false; char*availableExtensionVersion; InitializeCaches(); estate = CreateExecutorState(); extensionsResultSet = makeNode(ReturnSetInfo); extensionsResultSet->econtext = GetPerTupleExprContext(estate); extensionsResultSet->allowedModes = SFRM_Materialize; fcinfo = palloc0(sizeof(FunctionCallInfoData)); flinfo = palloc0(sizeof(FmgrInfo)); fmgr_info(F_PG_AVAILABLE_EXTENSIONS, flinfo); InitFunctionCallInfoData(*fcinfo, flinfo, argumentCount, InvalidOid,NULL, (Node *) extensionsResultSet);/* pg_available_extensions returns result set containing all available extensions */(*pg_available_extensions)(fcinfo); tupleTableSlot = MakeSingleTupleTableSlot(extensionsResultSet->setDesc); hasTuple = tuplestore_gettupleslot(extensionsResultSet->setResult, goForward, doCopy, tupleTableSlot);while (hasTuple) { Datum extensionNameDatum = 0; char *extensionName = NULL; bool isNull = false; extensionNameDatum = slot_getattr(tupleTableSlot, 1, &isNull); extensionName = NameStr(*DatumGetName(extensionNameDatum));if (strcmp(extensionName,"citus") == 0) { MemoryContext oldMemoryContext = NULL; Datum availableVersion = slot_getattr(tupleTableSlot,2, &isNull); /* we will cache the result of citus version to prevent catalog access */oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext); availableExtensionVersion = text_to_cstring(DatumGetTextPP(availableVersion)); MemoryContextSwitchTo(oldMemoryContext); ExecClearTuple(tupleTableSlot); ExecDropSingleTupleTableSlot(tupleTableSlot);returnavailableExtensionVersion; } ExecClearTuple(tupleTableSlot); hasTuple = tuplestore_gettupleslot(extensionsResultSet->setResult, goForward, doCopy, tupleTableSlot); } ExecDropSingleTupleTableSlot(tupleTableSlot); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("citus extension is not found")));return NULL; }1、对于业务层短连接会有比较好的效果。可以降低至少5毫秒左右的延迟。
