逐行解读Hikari连接池源码

网友投稿 1784 2023-06-08

逐行解读Hikari连接池源码

逐行解读Hikari连接池源码

作者 | 薛师兄

写在前面

几年前,我最开始接触的数据库连接池是 C3P0,后来是阿里的 Druid,但随着 Springboot 2.0 选择 HikariCP 作为默认数据库连接池这一事件之后,HikariCP 作为一个后起之秀出现在大众的视野中,以其速度快,性能高等特点受到越来越多人青睐。

在实际开发工作中,数据库一直是引发报警的重灾区,而与数据库打交道的就是 Hikari 连接池,看懂 Hikari 报警日志并定位异常原因,是实际工作中必不可少的技能!

本文以 Hikari 2.7.9 版本源码进行分析,带大家理解 Hikari 原理,学会处理线上问题!

1、概念释义

在学习一项技术之前,需要先在宏观的层面去看到它的位置,比如我们今天学习的 HikariCP,它在什么位置?

以 Spring Boot 项目为例,我们有 Service 业务层,编写业务代码,而与数据库打交道的是 ORM 框架(例如 MyBatis),ORM 框架的下一层是 Hikari 连接池,Hikari 连接池的下一层是 MySQL 驱动,MySQL 驱动的下一层是 MySQL 服务器。理解了这个宏观层次,我们再去学习 Hikari 就不会学的那么稀里糊涂了。

其次,我们需要明白数据库连接池是干什么的?

简单来说,数据库连接池负责分配、管理和释放数据库的连接。有了数据库连接池就可以复用数据库连接,可以避免连接频繁建立、关闭的开销,提升系统的性能。它可以帮助我们释放过期的数据库连接,避免因为使用过期的数据库连接而引起的异常。

至于 Hikari,它是一个“零开销”生产就绪的 JDBC 连接池。库非常轻,大约 130 Kb。

2、配置使用

我们先来看一个线上 Hikari 连接池配置需要哪些参数。

@Bean("dataSource")public DataSource dataSource() { HikariConfig cfg = new HikariConfig(); // 从池中借出的连接是否默认自动提交事务,默认开启 cfg.setAutoCommit(false); // 从池中获取连接时的等待时间 cfg.setConnectionTimeout(); // MYSQL连接相关 cfg.setJdbcUrl(); cfg.setDriverClassName(); cfg.setUsername(); cfg.setPassword(); // 连接池的最大容量 cfg.setMaximumPoolSize(); // 连接池的最小容量,官网不建议设置,保持与 MaximumPoolSize 一致,从而获得最高性能和对峰值需求的响应 // cfg.setMinimumIdle(); // 连接池的名称,用于日志监控,多数据源要区分 cfg.setPoolName(); // 池中连接的最长存活时间,要比数据库的 wait_timeout 时间要小不少 cfg.setMaxLifetime(); // 连接在池中闲置的最长时间,仅在 minimumIdle 小于 maximumPoolSize 时生效(本配置不生效) cfg.setIdleTimeout(); // 连接泄露检测,默认 0 不开启 // cfg.setLeakDetectionThreshold(); // 测试链接是否有效的超时时间,默认 5 秒 // cfg.setValidationTimeout(); // MYSQL驱动环境变量 // 字符编解码 cfg.addDataSourceProperty("characterEncoding", ); cfg.addDataSourceProperty("useUnicode", ); // 较新版本的 MySQL 支持服务器端准备好的语句 cfg.addDataSourceProperty("useServerPrepStmts", ); // 缓存SQL开关 cfg.addDataSourceProperty("cachePrepStmts", ); // 缓存SQL数量 cfg.addDataSourceProperty("prepStmtCacheSize", ); // 缓存SQL长度,默认256 // prepStmtCacheSqlLimit return new HikariDataSource(cfg);}

3、源码分析

1)分析入口

万事开头难,下载 Hikari 源码到本地后该从哪开始去看呢?不妨从下面两个入口去分析。

// 1、初始化入口new HikariDataSource(cfg)// 2、获取连接public interface DataSource extends CommonDataSource, Wrapper { Connection getConnection() throws SQLException;}

2)初始化分析

初始化分析主要有两部分工作,一是校验配置并且会矫正不符合规范的配置;二是实例化 Hikari 连接池。

矫正配置

校验配置会直接抛异常,大部分坑来源于矫正配置这一步,这会使你的配置不生效。

private void validateNumerics() { // maxLifetime 链接最大存活时间最低30秒,小于30秒不生效 if (maxLifetime != 0 && maxLifetime < SECONDS.toMillis(30)) { LOGGER.warn("{} - maxLifetime is less than 30000ms, setting to default {}ms.", poolName, MAX_LIFETIME); maxLifetime = MAX_LIFETIME; } // idleTimeout 空闲超时不能大于或者接近 maxLifetime,否则设置 0,禁用空闲线程回收 if (idleTimeout + SECONDS.toMillis(1) > maxLifetime && maxLifetime > 0) { LOGGER.warn("{} - idleTimeout is close to or more than maxLifetime, disabling it.", poolName); idleTimeout = 0; } // idleTimeout 空闲超时不能低于默认值 10 秒 if (idleTimeout != 0 && idleTimeout < SECONDS.toMillis(10)) { LOGGER.warn("{} - idleTimeout is less than 10000ms, setting to default {}ms.", poolName, IDLE_TIMEOUT); idleTimeout = IDLE_TIMEOUT; } // 连接泄露检测的时间,默认 0 不开启,不能低于 2 秒,不能比 maxLifetime 大,否则不开启 if (leakDetectionThreshold > 0 && !unitTest) { if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) { LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName); leakDetectionThreshold = 0; } } // 从连接池获取连接时最大等待时间,默认值 30 秒, 低于 250 毫秒不生效 if (connectionTimeout < 250) { LOGGER.warn("{} - connectionTimeout is less than 250ms, setting to {}ms.", poolName, CONNECTION_TIMEOUT); connectionTimeout = CONNECTION_TIMEOUT; } // 检测连接是否有效的超时时间,默认 5 秒,低于 250 毫秒不生效 if (validationTimeout < 250) { LOGGER.warn("{} - validationTimeout is less than 250ms, setting to {}ms.", poolName, VALIDATION_TIMEOUT); validationTimeout = VALIDATION_TIMEOUT; } // 连接池中连接的最大数量,minIdle 大于 0 与其保持一致,否则默认 10 if (maxPoolSize < 1) { maxPoolSize = (minIdle <= 0) ? DEFAULT_POOL_SIZE : minIdle; } // 维持的最小连接数量,不配置默认等于 maxPoolSize if (minIdle < 0 || minIdle > maxPoolSize) { minIdle = maxPoolSize; } }

创建连接池

通过分析连接池实例化过程,可以看到 Hikari 的作者是多么喜欢用异步操作了,包括空闲线程处理、添加连接、关闭连接、连接泄露检测等。

这一步会创建 1 个 LinkedBlockQueue 阻塞队列,需要明确的是,这个队列并不是实际连接池的队列, 只是用来放置添加连接的请求。

public HikariPool(final HikariConfig config) { super(config); // 创建 ConcurrentBag 管理连接池,有连接池的四个重要操作:borrow获取连接,requite归还连接,add添加连接,remove移除连接。 this.connectionBag = new ConcurrentBag<>(this); // getConnection 获取连接时的并发控制,默认关闭 this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK; // 空闲线程池 处理定时任务 this.houseKeepingExecutorService = initializeHouseKeepingExecutorService(); // 快速预检查 创建1个链接 checkFailFast(); // Metrics 监控收集相关 if (config.getMetricsTrackerFactory() != null) { setMetricsTrackerFactory(config.getMetricsTrackerFactory()); } else { setMetricRegistry(config.getMetricRegistry()); } // 健康检查注册相关,默认 无 setHealthCheckRegistry(config.getHealthCheckRegistry()); // 处理JMX监控相关 registerMBeans(this); ThreadFactory threadFactory = config.getThreadFactory(); // 创建 maxPoolSize 大小的 LinkedBlockQueue 阻塞队列,用来构造 addConnectionExecutor LinkedBlockingQueue addConnectionQueue = new LinkedBlockingQueue<>(config.getMaximumPoolSize()); // 镜像只读队列 this.addConnectionQueue = unmodifiableCollection(addConnectionQueue); // 创建 添加连接的 线程池,实际线程数只有1,拒绝策略是丢弃不处理 this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy()); // 创建 关闭连接的 线程池,实际线程数只有1,拒绝策略是调用线程同步执行 this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); // 创建 检测连接泄露 的工厂,使用的时候只需要传1个连接对象 this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService); // 延时100ms后,开启任务,每30s执行空闲线程处理 this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS); }

3)获取连接分析

Hikari 的连接获取分为两步,一是调用 connectionBag.borrow() 方法从池中获取连接,这里等待超时时间是 connectionTimeout;二是获取连接后,会主动检测连接是否可用,如果不可用会关闭连接,连接可用的话会绑定一个定时任务用于连接泄露的检测。

很多时候,会在异常日志中看到 Connection is not available 错误日志后携带的 request timed out 耗时远超 connectionTimeout,仔细分析源码这也是合理的。

HikariDataSource

4)空闲连接回收

Hikari 在初始化连接池的时候,就已经开启了一条异步定时任务。该任务每 30 秒执行一次空闲连接回收,代码如下:

/** * The house keeping task to retire and maintain minimum idle connections. * 用于补充和移除最小空闲连接的管理任务。 */ private final class HouseKeeper implements Runnable { private volatile long previous = plusMillis(currentTime(), -HOUSEKEEPING_PERIOD_MS); @Override public void run() { try { // refresh timeouts in case they changed via MBean connectionTimeout = config.getConnectionTimeout(); validationTimeout = config.getValidationTimeout(); leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold()); final long idleTimeout = config.getIdleTimeout(); final long now = currentTime(); // Detect retrograde time, allowing +128ms as per NTP spec. // 为了防止时钟回拨,给了128ms的gap,正常情况下,ntp的校准回拨不会超过128ms // now = plusMillis(previous, HOUSEKEEPING_PERIOD_MS) + 100ms if (plusMillis(now, 128) < plusMillis(previous, HOUSEKEEPING_PERIOD_MS)) { LOGGER.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.", poolName, elapsedDisplayString(previous, now)); previous = now; softEvictConnections(); return; } else if (now > plusMillis(previous, (3 * HOUSEKEEPING_PERIOD_MS) / 2)) { // No point evicting for forward clock motion, this merely accelerates connection retirement anyway LOGGER.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now)); } previous = now; String afterPrefix = "Pool "; // 回收符合条件的空闲连接:如果最小连接数等于最大连接数,就不会回收 if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) { logPoolState("Before cleanup "); afterPrefix = "After cleanup "; final List notInUse = connectionBag.values(STATE_NOT_IN_USE); int toRemove = notInUse.size() - config.getMinimumIdle(); for (PoolEntry entry : notInUse) { // 有空闲连接 且 空闲时间达标 且 CAS更改状态成功 if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) { // 关闭连接 closeConnection(entry, "(connection has passed idleTimeout)"); toRemove--; } } } logPoolState(afterPrefix); // 补充链接 fillPool(); // Try to maintain minimum connections } catch (Exception e) { LOGGER.error("Unexpected exception in housekeeping task", e); } } }

5)存活时间处理

Hikari 在创建一个连接实例的时候,就已经为其绑定了一个定时任务用于关闭连接。

private PoolEntry createPoolEntry() { try { final PoolEntry poolEntry = newPoolEntry(); final long maxLifetime = config.getMaxLifetime(); if (maxLifetime > 0) { // variance up to 2.5% of the maxlifetime // 减去一部分随机数,避免大范围连接断开 final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0; final long lifetime = maxLifetime - variance; // 此处 maxLifetime 不能超过数据库最大允许连接时间 poolEntry.setFutureEol(houseKeepingExecutorService.schedule( () -> { if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) { addBagItem(connectionBag.getWaitingThreadCount()); } }, lifetime, MILLISECONDS)); } return poolEntry; } catch (Exception e) { if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently LOGGER.debug("{} - Cannot acquire connection from data source", poolName, (e instanceof ConnectionSetupException ? e.getCause() : e)); } return null; } }

关闭连接的过程是先将连接实例标记为废弃,这样哪怕因为连接正在使用导致关闭失败,也可以在下次获取连接时再对其进行关闭。

private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner) { // 先标记为废弃、哪怕下面关闭失败,getConnection 时也会移除 poolEntry.markEvicted(); // 使用中的连接不会关闭 if (owner || connectionBag.reserve(poolEntry)) { closeConnection(poolEntry, reason); return true; } return false; }

6)连接泄露处理

Hikari 在处理连接泄露时使用到了工厂模式,只需要将连接实例 PoolEntry 传入工厂,即可提交连接泄露检测的延时任务。而所谓的链接泄露检测只是打印 1 次 WARN 日志。

class ProxyLeakTaskFactory{ private ScheduledExecutorService executorService; private long leakDetectionThreshold; ProxyLeakTaskFactory(final long leakDetectionThreshold, final ScheduledExecutorService executorService) { this.executorService = executorService; this.leakDetectionThreshold = leakDetectionThreshold; } // 1、传入连接对象 ProxyLeakTask schedule(final PoolEntry poolEntry) { // 连接泄露检测时间等于 0 不生效 return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry); } void updateLeakDetectionThreshold(final long leakDetectionThreshold) { this.leakDetectionThreshold = leakDetectionThreshold; } // 2、提交延时任务 private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) { ProxyLeakTask task = new ProxyLeakTask(poolEntry); task.schedule(executorService, leakDetectionThreshold); return task; }}ProxyLeakTaskclass ProxyLeakTask implements Runnable{ private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class); static final ProxyLeakTask NO_LEAK; private ScheduledFuture scheduledFuture; private String connectionName; private Exception exception; private String threadName; private boolean isLeaked; static { NO_LEAK = new ProxyLeakTask() { @Override void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {} @Override public void run() {} @Override public void cancel() {} }; } ProxyLeakTask(final PoolEntry poolEntry) { this.exception = new Exception("Apparent connection leak detected"); this.threadName = Thread.currentThread().getName(); this.connectionName = poolEntry.connection.toString(); } private ProxyLeakTask() { } void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) { scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS); } /** {@inheritDoc} */ @Override public void run() { isLeaked = true; final StackTraceElement[] stackTrace = exception.getStackTrace(); final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5]; System.arraycopy(stackTrace, 5, trace, 0, trace.length); // 打印 1 次连接泄露的 WARN 日志 exception.setStackTrace(trace); LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception); } void cancel() { scheduledFuture.cancel(false); if (isLeaked) { LOGGER.info("Previously reported leaked connection {} on thread {} was returned to the pool (unleaked)", connectionName, threadName); } }}

7)连接池类分析

ConcurrentBag 才是真正的连接池,也是 Hikari “零开销”的奥秘所在。

简而言之,Hikari 通过 CopyOnWriteArrayList + State(状态) + CAS 来避免了上锁。

CopyOnWriteArrayList 存放真正的连接对象,每个连接对象都有四种状态:

STATE_NOT_IN_USE:空闲STATE_IN_USE:活跃STATE_REMOVED:移除STATE_RESERVED:不可用

4、报警实战

1)实战一

报警日志

先来看一个真实的线上报警:

思路分析

No operations allowed after connection closed 表示访问了已经被 MySQL 关闭的连接。

request timed out after 6791ms 包含等待连接超时 connectionTimeout(配置 2 秒) 和测试连接可用 validationTimeout(默认 5 秒) 两个时间。

boolean isConnectionAlive(final Connection connection) { try { try { setNetworkTimeout(connection, validationTimeout); // validationTimeout 默认 5 秒,最低 1 秒 final int validationSeconds = (int) Math.max(1000L, validationTimeout) / 1000; // 测试链接是否有效 if (isUseJdbc4Validation) { return connection.isValid(validationSeconds); } try (Statement statement = connection.createStatement()) { if (isNetworkTimeoutSupported != TRUE) { setQueryTimeout(statement, validationSeconds); } statement.execute(config.getConnectionTestQuery()); } } finally { setNetworkTimeout(connection, networkTimeout); if (isIsolateInternalQueries && !isAutoCommit) { connection.rollback(); } } return true; } catch (Exception e) { lastConnectionFailure.set(e); // 此处打印 WARN 日志,可以通过 console.log 查看是否存在 获取到已被关闭连接 的情况 LOGGER.warn("{} - Failed to validate connection {} ({})", poolName, connection, e.getMessage()); return false; } }

查看 console.log,存在大量获取到已关闭连接的情况:

所以推断报警原因是因为获取到已经被数据库关闭的连接。

解决方法

DBA 反馈数据库的 wait_timeout 是 600 秒,线上配置的 maxLifeTime 是 900 秒,配置有误,更改为 450 秒。

上线后验证 console.log 不再持续打印 Failed to validate connection 日志,并且没有 No operations allowed after connection closed 报警日志。

2)实战二

报警日志

优化上线后,观察到又发生了几十条报警,并且只集中在 1 台机器:

思路分析

报警日志中没有 No operations allowed after connection closed,且耗时为 connectionTimeout,推测是没有获取到连接,原因可能有:

1、机器异常

机器负载过大有可能引起 IO 夯。

2、连接池被打满

比如存在慢SQL,或者流量太大支撑不住等,连接数实在不够用。

Hikari 提供 HikariPoolMXBean 接口获取连接池监控信息。

3、连接泄露

开启连接泄露参数后,可在日志中查看。

解决方法

1、机器异常

迁移机器,观察后续情况。

2、连接池被打满

增加 Hikari 连接池监控日志,观察连接池使用情况,进一步再做判断。

比如可以通过一个定时任务,每秒打印连接池相关状态:

@Slf4j@Componentpublic class HikariPoolTask { @Resource private Map dataSourceMap; /** * 延时1秒,每隔1秒 */ @Scheduled(initialDelay = 1000, fixedDelay = 1000) public void run() { if (CollUtil.isNotEmpty(dataSourceMap)) { for (HikariDataSource dataSource : dataSourceMap.values()) { // 连接池名称 String poolName = dataSource.getPoolName(); HikariPoolMXBean hikariPoolMXBean = dataSource.getHikariPoolMXBean(); // 活跃连接数量 int activeConnections = hikariPoolMXBean.getActiveConnections(); // 空闲连接数量 int idleConnections = hikariPoolMXBean.getIdleConnections(); // 全部连接数量 int totalConnections = hikariPoolMXBean.getTotalConnections(); // 等待连接数量 int threadsAwaitingConnection = hikariPoolMXBean.getThreadsAwaitingConnection(); log.info("{} - activeConnections={}, idleConnections={}, totalConnections={}, threadsAwaitingConnection={}", poolName, activeConnections, idleConnections, totalConnections, threadsAwaitingConnection); } } }}

3、连接泄露

增加连接泄露检测参数,比如可以设置 10 秒:

leakDetectionThreshold=10000

作者介绍

薛师兄,在某头部互联网公司担任高级研发工程师,热衷于Java技术栈,对底层原理有独特的追求。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:网易游戏 Flink SQL 平台化实践
下一篇:企业在什么情况下有引入分布式数据库的必要性?
相关文章