责任链模式实践

1、前言

       当今软件开发中,设计模式是一种重要的方法论,它们帮助开发人员解决了各种常见的问题,并提高了代码的可维护性和可扩展性。本文将重点介绍责任链模式(Chain of Responsibility Pattern),并使用Java语言来解释它的概念、用途以及如何实现。

2、责任链模式简介

       责任链模式是一种行为型设计模式,它允许你将请求沿着处理链传递,直到有一个对象处理它为止。这种模式非常适合处理请求的分发和处理,特别是当您不知道哪个对象能够处理请求时。

       在责任链模式中,通常有多个处理对象(处理器),每个处理对象都有一个处理请求的方法。当一个请求进入责任链时,它从链的起始点开始传递,每个处理对象检查是否能够处理该请求。如果可以处理,则该请求被处理,否则它会被传递给链中的下一个处理对象,直到找到一个合适的处理器。

3、责任链模式的优点

责任链模式有以下几个优点:

  1. 解耦性: 责任链模式将请求发送者和接收者解耦,发送者无需知道请求最终由哪个接收者处理。
  2. 可扩展性: 可以轻松地添加新的处理对象或修改处理顺序,而不会影响其他部分的代码。
  3. 灵活性: 可以动态配置责任链,根据需要添加、删除或修改处理对象。
  4. 单一职责原则: 每个处理对象只需关心自己是否能够处理请求,符合单一职责原则。

4、责任链模式的实现

让我们使用Java语言来实现一个简单的责任链模式示例。假设我们有一个报销审批系统,根据报销金额不同,有三个处理对象:经理、财务部门和CEO。让我们一步一步实现这个示例。

首先,我们创建一个请求类表示报销请求:

public class ReimbursementRequest {
    private double amount;

    public ReimbursementRequest(double amount) {
        this.amount = amount;
    }

    public double getAmount() {
        return amount;
    }
}

接下来,我们创建一个处理接口,表示处理请求的方法:

public interface ReimbursementHandler {
    void handleRequest(ReimbursementRequest request);
    void setNextHandler(ReimbursementHandler nextHandler);
}

然后,我们创建三个具体的处理对象:经理、财务部门和CEO,它们实现了ReimbursementHandler接口:

public class Manager implements ReimbursementHandler {
    private ReimbursementHandler nextHandler;

    @Override
    public void handleRequest(ReimbursementRequest request) {
        if (request.getAmount() <= 1000) {
            System.out.println("经理审批通过,报销金额:" + request.getAmount());
        } else {
            if (nextHandler != null) {
                nextHandler.handleRequest(request);
            } else {
                System.out.println("无法处理该报销申请");
            }
        }
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        this.nextHandler = nextHandler;
    }
}

public class FinanceDepartment implements ReimbursementHandler {
    private ReimbursementHandler nextHandler;

    @Override
    public void handleRequest(ReimbursementRequest request) {
        if (request.getAmount() <= 5000) {
            System.out.println("财务部门审批通过,报销金额:" + request.getAmount());
        } else {
            if (nextHandler != null) {
                nextHandler.handleRequest(request);
            } else {
                System.out.println("无法处理该报销申请");
            }
        }
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        this.nextHandler = nextHandler;
    }
}

public class CEO implements ReimbursementHandler {
    @Override
    public void handleRequest(ReimbursementRequest request) {
        System.out.println("CEO审批通过,报销金额:" + request.getAmount());
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        // CEO是责任链中的最后一个处理对象,不需要设置下一个处理对象
    }
}

最后,我们可以创建一个责任链并测试它:

public class Main {
    public static void main(String[] args) {
        ReimbursementHandler manager = new Manager();
        ReimbursementHandler finance = new FinanceDepartment();
        ReimbursementHandler ceo = new CEO();

        // 构建责任链
        manager.setNextHandler(finance);
        finance.setNextHandler(ceo);

        // 创建报销请求
        ReimbursementRequest request1 = new ReimbursementRequest(800);
        ReimbursementRequest request2 = new ReimbursementRequest(3500);
        ReimbursementRequest request3 = new ReimbursementRequest(10000);

        // 发送请求
        manager.handleRequest(request1);
        manager.handleRequest(request2);
        manager.handleRequest(request3);
    }
}

运行以上代码,你会看到不同金额的报销请求会被不同的处理对象处理。

5、责任链模式在检查系统中的运用

检查系统具有以下检查-申诉-整改-奖惩链路,在该链路中我们可以使用责任链模式,使代码变得更容易维护和扩展。

首先我们创建一个包含各个链路中需要用到的请求类对象。

/**
 * @author: guojiangtao
 * @description:
 * @param: 
 **/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CheckRewardPunishmentRequest implements Serializable {

    private static final long serialVersionUID = 8676996408255323152L;

    /**
     * 检查结果ID
     **/
    private Long checkListResultId;

    /**
     * 检查基本事实
     **/
    CheckBaseBO checkBaseBO;
    /**
     * 检查整改事实
     **/
    CheckRectifyBO checkRectifyBO;
    /**
     * 检查申诉事实
     **/
    CheckAppealBO checkAppealBO;

}

我们创建一个处理接口,表示处理请求的方法。

/**
 * @author: guojiangtao
 * @description: 处理请求
 * @param:
 **/
public interface CheckRewardPunishmentPhaseActionInterface {

    CheckRewardPunishmentPhaseResult proceed(PhaseChain chain);

    /**
     * @author: guojiangtao
     * @description: 获取request以及转发请求
     * @param: 
     **/
    interface PhaseChain {
        // 获取当前Request
        CheckRewardPunishmentRequest request();
        // 转发Request
        CheckRewardPunishmentPhaseResult dispatch(CheckRewardPunishmentRequest request);
    }

}

创建一个客户端类来维护处理的实现类。

/**
 * @author guojiangtao
 */
@Data
public class PhaseChainClient {

    public static final List<CheckRewardPunishmentPhaseActionInterface> phaseActionInterfaceList = Lists.newArrayList();
    private static void addPhaseActionInterface(CheckRewardPunishmentPhaseActionInterface phaseActionInterface) {
        phaseActionInterfaceList.add(phaseActionInterface);
    }

    public static PhaseChainClient getChainClient(CheckRewardPunishmentPhaseActionInterface ... interfaces) {

        List<CheckRewardPunishmentPhaseActionInterface> interfaceList = Lists.newArrayList(interfaces);

        if (CollectionUtil.isNullOrEmpty(interfaceList)) {
            throw new BizException(BizErrorCodeEnum.ILLEGAL_OPERATION, "getChainClient exception");
        }

        interfaceList.forEach(PhaseChainClient::addPhaseActionInterface);

        return new PhaseChainClient();
    }

    public CheckRewardPunishmentPhaseResult execute (CheckRewardPunishmentRequest request) {

        RealPhaseChain realChain = new RealPhaseChain(request, phaseActionInterfaceList, 0);

        return realChain.dispatch(request);
    }

}

检查提交实现类。

/**
 * @author: guojiangtao
 * @description: 检查提交
 * @param:
 **/
@Service
@Slf4j
public class CheckSubmit implements CheckRewardPunishmentPhaseActionInterface {

    @Autowired
    private CheckAppeal checkAppeal;
    @Autowired
    private CheckRpProducer checkRpProducer;

    @Override
    public CheckRewardPunishmentPhaseResult proceed(PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        // 参数校验
        if (null == request) {
            log.warn("CheckSubmit proceed with null request");
            return null;
        }

        CheckBaseBO checkBaseBO = request.getCheckBaseBO();

        if (null == checkBaseBO) {
            log.warn("CheckSubmit proceed with null checkBaseBO");
            return null;
        }

        // 检查单配置
        if (AppealTypeEnum.OPEN_APPEAL.getCode().equals(checkBaseBO.getAppealType())) {
            // 构建检查申诉事实实例
            CheckAppealBO appealBO = new CheckAppealBO(checkBaseBO.getCheckItemId(), checkBaseBO.getCheckItemResultId(), 1);
            request.setCheckAppealBO(appealBO);
            return PhaseChainClient.getChainClient(checkAppeal).execute(request);
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

}

检查申诉实现类:

/**
 * @author: guojiangtao
 * @description: 检查申诉
 * @param:
 **/
@Service
@Slf4j
public class CheckAppeal implements CheckRewardPunishmentPhaseActionInterface {

    @Autowired
    private CheckRpProducer checkRpProducer;
    @Autowired
    private CheckResultRectifyListRepository checkResultRectifyListRepository;
    @Autowired
    private CheckRectifyTemplateItemRelationRepository rectifyTemplateItemRelationRepository;
    @Autowired
    private CheckRectify checkRectify;

    @Override
    public CheckRewardPunishmentPhaseResult proceed (PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        // 参数校验
        if (null == request) {
            return new CheckRewardPunishmentPhaseResult(Boolean.FALSE, "");
        }

        CheckBaseBO checkBaseBO = request.getCheckBaseBO();
        CheckAppealBO appealBO = request.getCheckAppealBO();

        if (null == checkBaseBO || null == appealBO) {
            return new CheckRewardPunishmentPhaseResult(Boolean.FALSE, "");
        }

        // 根据检查项ID查询 检查项与整改任务模板 1:1
        CheckRectifyTemplateItemRelationDAO templateItemRelationDAO = rectifyTemplateItemRelationRepository.queryByItemId(checkBaseBO.getCheckItemId());

        if (null != templateItemRelationDAO) {
            // 整改单模板ID
            Long rectifyTemplateId = templateItemRelationDAO.getRectifyTemplateId();
            // 整改单
            CheckResultRectifyListDAO rectifyListDAO = checkResultRectifyListRepository.getByCheckResultIdAndTemplateId(checkBaseBO.getCheckListResultId(), rectifyTemplateId);
            if (null != rectifyListDAO && Objects.equals(RectifyResultListStatusEnum.FINISH.getCode(), rectifyListDAO.getStatus())) {
                // 构建检查整改事实实例
                CheckRectifyBO rectifyBO = new CheckRectifyBO(checkBaseBO.getCheckItemId(), rectifyListDAO.getStatus());
                request.setCheckRectifyBO(rectifyBO);
                return PhaseChainClient.getChainClient(checkRectify).execute(request);
            }
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

}

检查整改实现类:

@Service
@Slf4j
public class CheckRectify implements CheckRewardPunishmentPhaseActionInterface {
    @Autowired
    private CheckRpProducer checkRpProducer;

    @Override
    public CheckRewardPunishmentPhaseResult proceed(PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        if (null == request) {
            return null;
        }

        CheckRectifyBO rectifyBO = request.getCheckRectifyBO();

        // 无整改事实 不处理
        if (null == rectifyBO) {
            return null;
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

6、责任链模式的适用场景

责任链模式在以下情况下特别有用:

  1. 当您希望将请求的发送者和接收者解耦时,可以使用责任链模式。发送者无需知道最终哪个对象会处理请求。
  2. 当您有多个对象可以处理请求,但您不确定哪个对象应该处理时,可以使用责任链模式。这种情况下,您可以将请求传递给责任链,直到有一个对象能够处理它。
  3. 当您希望动态配置处理对象的顺序或添加新的处理对象时,可以使用责任链模式。这使得系统更加灵活和可扩展。
  4. 在需要应用单一职责原则的情况下,责任链模式可以帮助将处理逻辑分散到不同的对象中,每个对象只负责一部分功能。

7、总结

责任链模式是一种有用的设计模式,可以帮助您构建灵活的、可扩展的系统。它通过将请求传递给一系列处理对象来解耦请求发送者和接收者,使得代码更容易维护和扩展。通过示例代码和解释,我们希望您现在更好地理解了责任链模式及其在Java中的实现方式。希望这篇分享文档有助于您在实际项目中应用责任链模式以解决类似的问题。

MySQL 连接为什么挂死了?

本次分享的是一次关于 MySQL 高可用问题的定位过程,其中曲折颇多但问题本身却比较有些代表性,遂将其记录以供参考。

一、背景

近期由测试反馈的问题有点多,其中关于系统可靠性测试提出的问题令人感到头疼,一来这类问题有时候属于“偶发”现象,难以在环境上快速复现;二来则是可靠性问题的定位链条有时候变得很长,极端情况下可能要从 A 服务追踪到 Z 服务,或者是从应用代码追溯到硬件层面。

本次分享的是一次关于 MySQL 高可用问题的定位过程,其中曲折颇多但问题本身却比较有些代表性,遂将其记录以供参考。

架构

首先,本系统以 MySQL 作为主要的数据存储部件。整一个是典型的微服务架构(SpringBoot + SpringCloud),持久层则采用了如下几个组件:

  • mybatis,实现 SQL <-> Method 的映射
  • hikaricp,实现数据库连接池
  • mariadb-java-client,实现 JDBC 驱动

在 MySQL 服务端部分,后端采用了双主架构,前端以 keepalived 结合浮动IP(VIP)做一层高可用。如下:

说明

  • MySQL 部署两台实例,设定为互为主备的关系。
  • 为每台 MySQL 实例部署一个 keepalived 进程,由 keepalived 提供 VIP 高可用的故障切换。

实际上,keepalived 和 MySQL 都实现了容器化,而 VIP 端口则映射到 VM 上的 nodePort 服务端口上。

  • 业务服务一律使用 VIP 进行数据库访问。

Keepalived 是基于 VRRP 协议实现了路由层转换的,在同一时刻,VIP 只会指向其中的一个虚拟机(master)。当主节点发生故障时,其他的 keepalived 会检测到问题并重新选举出新的 master,此后 VIP 将切换到另一个可用的 MySQL 实例节点上。这样一来,MySQL 数据库就拥有了基础的高可用能力。

另外一点,Keepalived 还会对 MySQL 实例进行定时的健康检查,一旦发现 MySQL 实例不可用会将自身进程杀死,进而再触发 VIP 的切换动作。

问题现象

本次的测试用例也是基于虚拟机故障的场景来设计的:

持续以较小的压力向业务服务发起访问,随后将其中一台 MySQL 的容器实例(master)重启。
按照原有的评估,业务可能会产生很小的抖动,但其中断时间应该保持在秒级。

然而经过多次的测试后发现,在重启 MySQL 主节点容器之后,有一定的概率会出现业务却再也无法访问的情况!

二、分析过程

在发生问题之后,开发同学的第一反应是 MySQL 的高可用机制出了问题。由于此前曾经出现过由于 keepalived 配置不当导致 VIP 未能及时切换的问题,因此对其已经有所戒备。

先是经过一通的排查,然后并没有找到 keepalived 任何配置上的毛病。

然后在没有办法的情况下,重新测试了几次,问题又复现了。

紧接着,我们提出了几个疑点:

1.Keepalived 会根据 MySQL 实例的可达性进行判断,会不会是健康检查出了问题?

但在本次测试场景中,MySQL 容器销毁会导致 keepalived 的端口探测产生失败,这同样会导致 keepalived 失效。如果 keepalived 也发生了中止,那么 VIP 应该能自动发生抢占。而通过对比两台虚拟机节点的信息后,发现 VIP 的确发生了切换。

2. 业务进程所在的容器是否发生了网络不可达的问题?

尝试进入容器,对当前发生切换后的浮动IP、端口执行 telnet 测试,发现仍然能访问成功。

连接池

在排查前面两个疑点之后,我们只能将目光转向了业务服务的DB客户端上。

从日志上看,在产生故障的时刻,业务侧的确出现了一些异常,如下:

这里提示的是业务操作获取连接超时了(超过了30秒)。那么,会不会是连接数不够用呢?

Unable to acquire JDBC Connection [n/a] java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms. at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:669) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.9.jar!/:?] ...

业务接入采用的是 hikariCP 连接池,这也是市面上流行度很高的一款组件了。

我们随即检查了当前的连接池配置,如下:

//最小空闲连接数 
spring.datasource.hikari.minimum-idle=10 
//连接池最大大小 
spring.datasource.hikari.maximum-pool-size=50 
//连接最大空闲时长 
spring.datasource.hikari.idle-timeout=60000 
//连接生命时长 
spring.datasource.hikari.max-lifetime=1800000 
//获取连接的超时时长 
spring.datasource.hikari.connection-timeout=30000

其中 注意到 hikari 连接池配置了 minimum-idle = 10,也就是说,就算在没有任何业务的情况下,连接池应该保证有 10 个连接。更何况当前的业务访问量极低,不应该存在连接数不够使用的情况。

除此之外,另外一种可能性则可能是出现了“僵尸连接”,也就是说在重启的过程中,连接池一直没有释放这些不可用的连接,最终造成没有可用连接的结果。

开发同学对”僵尸链接”的说法深信不疑,倾向性的认为这很可能是来自于 HikariCP 组件的某个 BUG…

于是开始走读 HikariCP 的源码,发现应用层向连接池请求连接的一处代码如下:

public class HikariPool{

   //获取连接对象入口
   public Connection getConnection(final long hardTimeout) throws SQLException
   {
      suspendResumeLock.acquire();
      final long startTime = currentTime();

      try {
         //使用预设的30s 超时时间
         long timeout = hardTimeout;
         do {
            //进入循环,在指定时间内获取可用连接
            //从 connectionBag 中获取连接
            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {
               break; // We timed out... break and throw exception
            }

            final long now = currentTime();
            //连接对象被标记清除或不满足存活条件时,关闭该连接
            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            }
            //成功获得连接对象
            else {
               metricsTracker.recordBorrowStats(poolEntry, startTime);
               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);

         //超时了,抛出异常
         metricsTracker.recordBorrowTimeoutStats(startTime);
         throw createTimeoutException(startTime);
      }
      catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
      }
      finally {
         suspendResumeLock.release();
      }
   }
}

getConnection() 方法展示了获取连接的整个流程,其中 connectionBag 是用于存放连接对象的容器对象。如果从 connectionBag 获得的连接不再满足存活条件,那么会将其手动关闭,代码如下:

void closeConnection(final PoolEntry poolEntry, final String closureReason)
   {
      //移除连接对象
      if (connectionBag.remove(poolEntry)) {
         final Connection connection = poolEntry.close();
         //异步关闭连接
         closeConnectionExecutor.execute(() -> {
            quietlyCloseConnection(connection, closureReason);
            //由于可用连接变少,将触发填充连接池的任务
            if (poolState == POOL_NORMAL) {
               fillPool();
            }
         });
      }
   }

注意到,只有当连接满足下面条件中的其中一个时,会被执行 close。

  • isMarkedEvicted() 的返回结果是 true,即标记为清除

如果连接存活时间超出最大生存时间(maxLifeTime),或者距离上一次使用超过了idleTimeout,会被定时任务标记为清除状态,清除状态的连接在获取的时候才真正 close。

  • 500ms 内没有被使用,且连接已经不再存活,即 isConnectionAlive() 返回 false

由于我们把 idleTimeout 和 maxLifeTime 都设置得非常大,因此需重点检查 isConnectionAlive 方法中的判断,如下:

public class PoolBase{

   //判断连接是否存活
   boolean isConnectionAlive(final Connection connection)
   {
      try {
         try {
            //设置 JDBC 连接的执行超时
            setNetworkTimeout(connection, validationTimeout);

            final int validationSeconds = (int) Math.max(1000L, validationTimeout) / 1000;

            //如果没有设置 TestQuery,使用 JDBC4 的校验接口
            if (isUseJdbc4Validation) {
               return connection.isValid(validationSeconds);
            }

            //使用 TestQuery(如 select 1)语句对连接进行探测
            try (Statement statement = connection.createStatement()) {
               if (isNetworkTimeoutSupported != TRUE) {
                  setQueryTimeout(statement, validationSeconds);
               }

               statement.execute(config.getConnectionTestQuery());
            }
         }
         finally {
            setNetworkTimeout(connection, networkTimeout);

            if (isIsolateInternalQueries && !isAutoCommit) {
               connection.rollback();
            }
         }

         return true;
      }
      catch (Exception e) {
         //发生异常时,将失败信息记录到上下文
         lastConnectionFailure.set(e);
         logger.warn("{} - Failed to validate connection {} ({}). Possibly consider using a shorter maxLifetime value.",
                     poolName, connection, e.getMessage());
         return false;
      }
   }

}

我们看到,在PoolBase.isConnectionAlive 方法中对连接执行了一系列的探测,如果发生异常还会将异常信息记录到当前的线程上下文中。随后,在 HikariPool 抛出异常时会将最后一次检测失败的异常也一同收集,如下:

private SQLException createTimeoutException(long startTime)
{
   logPoolState("Timeout failure ");
   metricsTracker.recordConnectionTimeout();

   String sqlState = null;
   //获取最后一次连接失败的异常
   final Throwable originalException = getLastConnectionFailure();
   if (originalException instanceof SQLException) {
      sqlState = ((SQLException) originalException).getSQLState();
   }
   //抛出异常
   final SQLException connectionException = new SQLTransientConnectionException(poolName + " - Connection is not available, request timed out after " + elapsedMillis(startTime) + "ms.", sqlState, originalException);
   if (originalException instanceof SQLException) {
      connectionException.setNextException((SQLException) originalException);
   }

   return connectionException;
}

这里的异常消息和我们在业务服务中看到的异常日志基本上是吻合的,即除了超时产生的 “Connection is not available, request timed out after xxxms” 消息之外,日志中还伴随输出了校验失败的信息:

Caused by: java.sql.SQLException: Connection.setNetworkTimeout cannot be called on a closed connection at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.getSqlException(ExceptionMapper.java:211) ~[mariadb-java-client-2.2.6.jar!/:?] at org.mariadb.jdbc.MariaDbConnection.setNetworkTimeout(MariaDbConnection.java:1632) ~[mariadb-java-client-2.2.6.jar!/:?] at com.zaxxer.hikari.pool.PoolBase.setNetworkTimeout(PoolBase.java:541) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.PoolBase.isConnectionAlive(PoolBase.java:162) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:172) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.9.jar!/:?]

到这里,我们已经将应用获得连接的代码大致梳理了一遍,整个过程如下图所示:

从执行逻辑上看,连接池的处理并没有问题,相反其在许多细节上都考虑到位了。在对非存活连接执行 close 时,同样调用了 removeFromBag 动作将其从连接池中移除,因此也不应该存在僵尸连接对象的问题。

那么,我们之前的推测应该就是错误的!

陷入焦灼

在代码分析之余,开发同学也注意到当前使用的 hikariCP 版本为 3.4.5,而环境上出问题的业务服务却是 2.7.9 版本,这仿佛预示着什么… 让我们再次假设 hikariCP 2.7.9 版本存在某种未知的 BUG,导致了问题的产生。

为了进一步分析连接池对于服务端故障的行为处理,我们尝试在本地机器上进行模拟,这一次使用了 hikariCP 2.7.9 版本进行测试,并同时将 hikariCP 的日志级别设置为 DEBUG。

模拟场景中,会由 由本地应用程序连接本机的 MySQL 数据库进行操作,步骤如下:

1. 初始化数据源,此时连接池 min-idle 设置为 10;
2. 每隔50ms 执行一次SQL操作,查询当前的元数据表;
3. 将 MySQL 服务停止一段时间,观察业务表现;
4. 将 MySQL 服务重新启动,观察业务表现。

最终产生的日志如下:

//初始化过程,建立10个连接 
DEBUG -HikariPool.logPoolState - Pool stats (total=1, active=1, idle=0, waiting=0) DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@71ab7c09 DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@7f6c9c4c DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@7b531779 ... DEBUG -HikariPool.logPoolState- After adding stats (total=10, active=1, idle=9, waiting=0) 
//执行业务操作,成功 execute statement: true test time -------1 execute statement: true test time -------2 ... 
//停止MySQL ... 
//检测到无效连接 WARN -PoolBase.isConnectionAlive - Failed to validate connection MariaDbConnection@9225652 ((conn=38652) Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value. WARN -PoolBase.isConnectionAlive - Failed to validate connection MariaDbConnection@71ab7c09 ((conn=38653) Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value. 
//释放连接 DEBUG -PoolBase.quietlyCloseConnection(PoolBase.java:134) - Closing connection MariaDbConnection@9225652: (connection is dead) DEBUG -PoolBase.quietlyCloseConnection(PoolBase.java:134) - Closing connection MariaDbConnection@71ab7c09: (connection is dead) 
//尝试创建连接失败 DEBUG -HikariPool.createPoolEntry - Cannot acquire connection from data source java.sql.SQLNonTransientConnectionException: Could not connect to address=(host=localhost)(port=3306)(type=master) : Socket fail to connect to host:localhost, port:3306. Connection refused: connect Caused by: java.sql.SQLNonTransientConnectionException: Socket fail to connect to host:localhost, port:3306. Connection refused: connect at internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:73) ~[mariadb-java-client-2.6.0.jar:?] ... 
//持续失败.. 直到MySQL重启 //重启后,自动创建连接成功 DEBUG -HikariPool$PoolEntryCreator.call -Added connection MariaDbConnection@42c5503e DEBUG -HikariPool$PoolEntryCreator.call -Added connection MariaDbConnection@695a7435 
//连接池状态,重新建立10个连接 DEBUG -HikariPool.logPoolState(HikariPool.java:421) -After adding stats (total=10, active=1, idle=9, waiting=0) 
//执行业务操作,成功(已经自愈) execute statement: true

从日志上看,hikariCP 还是能成功检测到坏死的连接并将其踢出连接池,一旦 MySQL 重新启动,业务操作又能自动恢复成功了。根据这个结果,基于 hikariCP 版本问题的设想也再次落空,研发同学再次陷入焦灼。

拨开云雾见光明

多方面求证无果之后,我们最终尝试在业务服务所在的容器内进行抓包,看是否能发现一些蛛丝马迹。

进入故障容器,执行 tcpdump -i eth0 tcp port 30052 进行抓包,然后对业务接口发起访问。

此时令人诡异的事情发生了,没有任何网络包产生!而业务日志在 30s 之后也出现了获取连接失败的异常。

我们通过 netstat 命令检查网络连接,发现只有一个 ESTABLISHED 状态的 TCP 连接。

也就是说,当前业务实例和 MySQL 服务端是存在一个建好的连接的,但为什么业务还是报出可用连接呢?

推测可能原因有二:

  • 该连接被某个业务(如定时器)一直占用。
  • 该连接实际上还没有办法使用,可能处于某种僵死的状态。

对于原因一,很快就可以被推翻,一来当前服务并没有什么定时器任务,二来就算该连接被占用,按照连接池的原理,只要没有达到上限,新的业务请求应该会促使连接池进行新连接的建立,那么无论是从 netstat 命令检查还是 tcpdump 的结果来看,不应该一直是只有一个连接的状况。

那么,情况二的可能性就很大了。带着这个思路,继续分析 Java 进程的线程栈。

执行 kill -3 pid 将线程栈输出后分析,果不其然,在当前 thread stack 中发现了如下的条目:

"HikariPool-1 connection adder" #121 daemon prio=5 os_prio=0 tid=0x00007f1300021800 nid=0xad runnable [0x00007f12d82e5000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.FilterInputStream.read(FilterInputStream.java:133) at org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream.fillBuffer(ReadAheadBufferedStream.java:129) at org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream.read(ReadAheadBufferedStream.java:102) - locked <0x00000000d7f5b480> (a org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream) at org.mariadb.jdbc.internal.io.input.StandardPacketInputStream.getPacketArray(StandardPacketInputStream.java:241) at org.mariadb.jdbc.internal.io.input.StandardPacketInputStream.getPacket(StandardPacketInputStream.java:212) at org.mariadb.jdbc.internal.com.read.ReadInitialHandShakePacket.<init>(ReadInitialHandShakePacket.java:90) at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.createConnection(AbstractConnectProtocol.java:480) at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1236) at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:610) at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:142) at org.mariadb.jdbc.Driver.connect(Driver.java:86) at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:138) at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:358) at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206) at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:477)

这里显示 HikariPool-1 connection adder 这个线程一直处于 socketRead 的可执行状态。从命名上看该线程应该是 HikariCP 连接池用于建立连接的任务线程,socket 读操作则来自于 MariaDbConnection.newConnection() 这个方法,即 mariadb-java-client 驱动层建立 MySQL 连接的一个操作,其中 ReadInitialHandShakePacket 初始化则属于 MySQL 建链协议中的一个环节。

简而言之,上面的线程刚好处于建链的一个过程态,关于 mariadb 驱动和 MySQL 建链的过程大致如下:

MySQL 建链首先是建立 TCP 连接(三次握手),客户端会读取 MySQL 协议的一个初始化握手消息包,内部包含 MySQL 版本号,鉴权算法等等信息,之后再进入身份鉴权的环节。

这里的问题就在于 ReadInitialHandShakePacket 初始化(读取握手消息包)一直处于 socket read 的一个状态。

如果此时 MySQL 远端主机故障了,那么该操作就会一直卡住。而此时的连接虽然已经建立(处于 ESTABLISHED 状态),但却一直没能完成协议握手和后面的身份鉴权流程,即该连接只能算一个半成品(无法进入 hikariCP 连接池的列表中)。从故障服务的 DEBUG 日志也可以看到,连接池持续是没有可用连接的,如下:DEBUG HikariPool.logPoolState –> Before cleanup stats (total=0, active=0, idle=0, waiting=3)

另一个需要解释的问题则是,这样一个 socket read 操作的阻塞是否就造成了整个连接池的阻塞呢?

经过代码走读,我们再次梳理了 hikariCP 建立连接的一个流程,其中涉及到几个模块:

  • HikariPool,连接池实例,由该对象连接的获取、释放以及连接的维护。
  • ConnectionBag,连接对象容器,存放当前的连接对象列表,用于提供可用连接。
  • AddConnectionExecutor,添加连接的执行器,命名如 “HikariPool-1 connection adder”,是一个单线程的线程池。
  • PoolEntryCreator,添加连接的任务,实现创建连接的具体逻辑。
  • HouseKeeper,内部定时器,用于实现连接的超时淘汰、连接池的补充等工作。

HouseKeeper 在连接池初始化后的 100ms 触发执行,其调用 fillPool() 方法完成连接池的填充,例如 min-idle 是10,那么初始化就会创建10个连接。ConnectionBag 维护了当前连接对象的列表,该模块还维护了请求连接者(waiters)的一个计数器,用于评估当前连接数的需求。

其中,borrow 方法的逻辑如下:

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
   {
      // 尝试从 thread-local 中获取
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
         ...
      }

      // 计算当前等待请求的任务
      final int waiting = waiters.incrementAndGet();
      try {
         for (T bagEntry : sharedList) {
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               //如果获得了可用连接,会触发填充任务
               if (waiting > 1) {
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }

         //没有可用连接,先触发填充任务
         listener.addBagItem(waiting);

         //在指定时间内等待可用连接进入
         timeout = timeUnit.toNanos(timeout);
         do {
            final long start = currentTime();
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= elapsedNanos(start);
         } while (timeout > 10_000);

         return null;
      }
      finally {
         waiters.decrementAndGet();
      }
   }

注意到,无论是有没有可用连接,该方法都会触发一个 listener.addBagItem() 方法,HikariPool 对该接口的实现如下:

public void addBagItem(final int waiting)
   {
      final boolean shouldAdd = waiting - addConnectionQueueReadOnlyView.size() >= 0; // Yes, >= is intentional.
      if (shouldAdd) {
         //调用 AddConnectionExecutor 提交创建连接的任务
         addConnectionExecutor.submit(poolEntryCreator);
      }
      else {
         logger.debug("{} - Add connection elided, waiting {}, queue {}", poolName, waiting, addConnectionQueueReadOnlyView.size());
      }
   }
PoolEntryCreator 则实现了创建连接的具体逻辑,如下:
public class PoolEntryCreator{
     @Override
      public Boolean call()
      {
         long sleepBackoff = 250L;
         //判断是否需要建立连接
         while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {
            //创建 MySQL 连接
            final PoolEntry poolEntry = createPoolEntry();
 
            if (poolEntry != null) {
               //建立连接成功,直接返回。
               connectionBag.add(poolEntry);
               logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
               if (loggingPrefix != null) {
                  logPoolState(loggingPrefix);
               }
               return Boolean.TRUE;
            }
            ...
         }

         // Pool is suspended or shutdown or at max size
         return Boolean.FALSE;
      }
}

由此可见,AddConnectionExecutor 采用了单线程的设计,当产生新连接需求时,会异步触发 PoolEntryCreator 任务进行补充。其中 PoolEntryCreator. createPoolEntry() 会完成 MySQL 驱动连接建立的所有事情,而我们的情况则恰恰是 MySQL 建链过程产生了永久性阻塞。因此无论后面怎么获取连接,新来的建链任务都会一直排队等待,这便导致了业务上一直没有连接可用。

下面这个图说明了 hikariCP 的建链过程:

好了,让我们在回顾一下前面关于可靠性测试的场景:

首先,MySQL 主实例发生故障,而紧接着 hikariCP 则检测到了坏的连接(connection is dead)并将其释放,在释放关闭连接的同时又发现连接数需要补充,进而立即触发了新的建链请求。
而问题就刚好出在这一次建链请求上,TCP 握手的部分是成功了(客户端和 MySQL VM 上 nodePort 完成连接),但在接下来由于当前的 MySQL 容器已经停止(此时 VIP 也切换到了另一台 MySQL 实例上),因此客户端再也无法获得原 MySQL 实例的握手包响应(该握手属于MySQL应用层的协议),此时便陷入了长时间的阻塞式 socketRead 操作。而建链请求任务恰恰好采用了单线程运作,进一步则导致了所有业务的阻塞。

三、解决方案

在了解了事情的来龙去脉之后,我们主要考虑从两方面进行优化:

  • 优化一,增加 HirakiPool 中 AddConnectionExecutor 线程的数量,这样即使第一个线程出现挂死,还有其他的线程能参与建链任务的分配。
  • 优化二,出问题的 socketRead 是一种同步阻塞式的调用,可通过 SO_TIMEOUT 来避免长时间挂死。

对于优化点一,我们一致认为用处并不大,如果连接出现了挂死那么相当于线程资源已经泄露,对服务后续的稳定运行十分不利,而且 hikariCP 在这里也已经将其写死了。因此关键的方案还是避免阻塞式的调用。

查阅了 mariadb-java-client 官方文档后,发现可以在 JDBC URL 中指定网络IO 的超时参数,如下:

具体参考:https://mariadb.com/kb/en/about-mariadb-connector-j/

如描述所说的,socketTimeout 可以设置 socket 的 SO_TIMEOUT 属性,从而达到控制超时时间的目的。默认是 0,即不超时。

我们在 MySQL JDBC URL 中加入了相关的参数,如下:

spring.datasource.url=jdbc:mysql://10.0.71.13:33052/appdb?socketTimeout=60000&connectTimeout=30000&serverTimezone=UTC

此后对 MySQL 可靠性场景进行多次验证,发现连接挂死的现象已经不再出现,此时问题得到解决。

四、小结

本次分享了一次关于 MySQL 连接挂死问题排查的心路历程,由于环境搭建的工作量巨大,而且该问题复现存在偶然性,整个分析过程还是有些坎坷的(其中也踩了坑)。的确,我们很容易被一些表面的现象所迷惑,而觉得问题很难解决时,更容易带着偏向性思维去处理问题。例如本例中曾一致认为连接池出现了问题,但实际上却是由于 MySQL JDBC 驱动(mariadb driver)的一个不严谨的配置所导致。

从原则上讲,应该避免一切可能导致资源挂死的行为。如果我们能在前期对代码及相关配置做好充分的排查工作,相信 996 就会离我们越来越远。