责任链模式实践

1、前言

       当今软件开发中,设计模式是一种重要的方法论,它们帮助开发人员解决了各种常见的问题,并提高了代码的可维护性和可扩展性。本文将重点介绍责任链模式(Chain of Responsibility Pattern),并使用Java语言来解释它的概念、用途以及如何实现。

2、责任链模式简介

       责任链模式是一种行为型设计模式,它允许你将请求沿着处理链传递,直到有一个对象处理它为止。这种模式非常适合处理请求的分发和处理,特别是当您不知道哪个对象能够处理请求时。

       在责任链模式中,通常有多个处理对象(处理器),每个处理对象都有一个处理请求的方法。当一个请求进入责任链时,它从链的起始点开始传递,每个处理对象检查是否能够处理该请求。如果可以处理,则该请求被处理,否则它会被传递给链中的下一个处理对象,直到找到一个合适的处理器。

3、责任链模式的优点

责任链模式有以下几个优点:

  1. 解耦性: 责任链模式将请求发送者和接收者解耦,发送者无需知道请求最终由哪个接收者处理。
  2. 可扩展性: 可以轻松地添加新的处理对象或修改处理顺序,而不会影响其他部分的代码。
  3. 灵活性: 可以动态配置责任链,根据需要添加、删除或修改处理对象。
  4. 单一职责原则: 每个处理对象只需关心自己是否能够处理请求,符合单一职责原则。

4、责任链模式的实现

让我们使用Java语言来实现一个简单的责任链模式示例。假设我们有一个报销审批系统,根据报销金额不同,有三个处理对象:经理、财务部门和CEO。让我们一步一步实现这个示例。

首先,我们创建一个请求类表示报销请求:

public class ReimbursementRequest {
    private double amount;

    public ReimbursementRequest(double amount) {
        this.amount = amount;
    }

    public double getAmount() {
        return amount;
    }
}

接下来,我们创建一个处理接口,表示处理请求的方法:

public interface ReimbursementHandler {
    void handleRequest(ReimbursementRequest request);
    void setNextHandler(ReimbursementHandler nextHandler);
}

然后,我们创建三个具体的处理对象:经理、财务部门和CEO,它们实现了ReimbursementHandler接口:

public class Manager implements ReimbursementHandler {
    private ReimbursementHandler nextHandler;

    @Override
    public void handleRequest(ReimbursementRequest request) {
        if (request.getAmount() <= 1000) {
            System.out.println("经理审批通过,报销金额:" + request.getAmount());
        } else {
            if (nextHandler != null) {
                nextHandler.handleRequest(request);
            } else {
                System.out.println("无法处理该报销申请");
            }
        }
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        this.nextHandler = nextHandler;
    }
}

public class FinanceDepartment implements ReimbursementHandler {
    private ReimbursementHandler nextHandler;

    @Override
    public void handleRequest(ReimbursementRequest request) {
        if (request.getAmount() <= 5000) {
            System.out.println("财务部门审批通过,报销金额:" + request.getAmount());
        } else {
            if (nextHandler != null) {
                nextHandler.handleRequest(request);
            } else {
                System.out.println("无法处理该报销申请");
            }
        }
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        this.nextHandler = nextHandler;
    }
}

public class CEO implements ReimbursementHandler {
    @Override
    public void handleRequest(ReimbursementRequest request) {
        System.out.println("CEO审批通过,报销金额:" + request.getAmount());
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        // CEO是责任链中的最后一个处理对象,不需要设置下一个处理对象
    }
}

最后,我们可以创建一个责任链并测试它:

public class Main {
    public static void main(String[] args) {
        ReimbursementHandler manager = new Manager();
        ReimbursementHandler finance = new FinanceDepartment();
        ReimbursementHandler ceo = new CEO();

        // 构建责任链
        manager.setNextHandler(finance);
        finance.setNextHandler(ceo);

        // 创建报销请求
        ReimbursementRequest request1 = new ReimbursementRequest(800);
        ReimbursementRequest request2 = new ReimbursementRequest(3500);
        ReimbursementRequest request3 = new ReimbursementRequest(10000);

        // 发送请求
        manager.handleRequest(request1);
        manager.handleRequest(request2);
        manager.handleRequest(request3);
    }
}

运行以上代码,你会看到不同金额的报销请求会被不同的处理对象处理。

5、责任链模式在检查系统中的运用

检查系统具有以下检查-申诉-整改-奖惩链路,在该链路中我们可以使用责任链模式,使代码变得更容易维护和扩展。

首先我们创建一个包含各个链路中需要用到的请求类对象。

/**
 * @author: guojiangtao
 * @description:
 * @param: 
 **/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CheckRewardPunishmentRequest implements Serializable {

    private static final long serialVersionUID = 8676996408255323152L;

    /**
     * 检查结果ID
     **/
    private Long checkListResultId;

    /**
     * 检查基本事实
     **/
    CheckBaseBO checkBaseBO;
    /**
     * 检查整改事实
     **/
    CheckRectifyBO checkRectifyBO;
    /**
     * 检查申诉事实
     **/
    CheckAppealBO checkAppealBO;

}

我们创建一个处理接口,表示处理请求的方法。

/**
 * @author: guojiangtao
 * @description: 处理请求
 * @param:
 **/
public interface CheckRewardPunishmentPhaseActionInterface {

    CheckRewardPunishmentPhaseResult proceed(PhaseChain chain);

    /**
     * @author: guojiangtao
     * @description: 获取request以及转发请求
     * @param: 
     **/
    interface PhaseChain {
        // 获取当前Request
        CheckRewardPunishmentRequest request();
        // 转发Request
        CheckRewardPunishmentPhaseResult dispatch(CheckRewardPunishmentRequest request);
    }

}

创建一个客户端类来维护处理的实现类。

/**
 * @author guojiangtao
 */
@Data
public class PhaseChainClient {

    public static final List<CheckRewardPunishmentPhaseActionInterface> phaseActionInterfaceList = Lists.newArrayList();
    private static void addPhaseActionInterface(CheckRewardPunishmentPhaseActionInterface phaseActionInterface) {
        phaseActionInterfaceList.add(phaseActionInterface);
    }

    public static PhaseChainClient getChainClient(CheckRewardPunishmentPhaseActionInterface ... interfaces) {

        List<CheckRewardPunishmentPhaseActionInterface> interfaceList = Lists.newArrayList(interfaces);

        if (CollectionUtil.isNullOrEmpty(interfaceList)) {
            throw new BizException(BizErrorCodeEnum.ILLEGAL_OPERATION, "getChainClient exception");
        }

        interfaceList.forEach(PhaseChainClient::addPhaseActionInterface);

        return new PhaseChainClient();
    }

    public CheckRewardPunishmentPhaseResult execute (CheckRewardPunishmentRequest request) {

        RealPhaseChain realChain = new RealPhaseChain(request, phaseActionInterfaceList, 0);

        return realChain.dispatch(request);
    }

}

检查提交实现类。

/**
 * @author: guojiangtao
 * @description: 检查提交
 * @param:
 **/
@Service
@Slf4j
public class CheckSubmit implements CheckRewardPunishmentPhaseActionInterface {

    @Autowired
    private CheckAppeal checkAppeal;
    @Autowired
    private CheckRpProducer checkRpProducer;

    @Override
    public CheckRewardPunishmentPhaseResult proceed(PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        // 参数校验
        if (null == request) {
            log.warn("CheckSubmit proceed with null request");
            return null;
        }

        CheckBaseBO checkBaseBO = request.getCheckBaseBO();

        if (null == checkBaseBO) {
            log.warn("CheckSubmit proceed with null checkBaseBO");
            return null;
        }

        // 检查单配置
        if (AppealTypeEnum.OPEN_APPEAL.getCode().equals(checkBaseBO.getAppealType())) {
            // 构建检查申诉事实实例
            CheckAppealBO appealBO = new CheckAppealBO(checkBaseBO.getCheckItemId(), checkBaseBO.getCheckItemResultId(), 1);
            request.setCheckAppealBO(appealBO);
            return PhaseChainClient.getChainClient(checkAppeal).execute(request);
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

}

检查申诉实现类:

/**
 * @author: guojiangtao
 * @description: 检查申诉
 * @param:
 **/
@Service
@Slf4j
public class CheckAppeal implements CheckRewardPunishmentPhaseActionInterface {

    @Autowired
    private CheckRpProducer checkRpProducer;
    @Autowired
    private CheckResultRectifyListRepository checkResultRectifyListRepository;
    @Autowired
    private CheckRectifyTemplateItemRelationRepository rectifyTemplateItemRelationRepository;
    @Autowired
    private CheckRectify checkRectify;

    @Override
    public CheckRewardPunishmentPhaseResult proceed (PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        // 参数校验
        if (null == request) {
            return new CheckRewardPunishmentPhaseResult(Boolean.FALSE, "");
        }

        CheckBaseBO checkBaseBO = request.getCheckBaseBO();
        CheckAppealBO appealBO = request.getCheckAppealBO();

        if (null == checkBaseBO || null == appealBO) {
            return new CheckRewardPunishmentPhaseResult(Boolean.FALSE, "");
        }

        // 根据检查项ID查询 检查项与整改任务模板 1:1
        CheckRectifyTemplateItemRelationDAO templateItemRelationDAO = rectifyTemplateItemRelationRepository.queryByItemId(checkBaseBO.getCheckItemId());

        if (null != templateItemRelationDAO) {
            // 整改单模板ID
            Long rectifyTemplateId = templateItemRelationDAO.getRectifyTemplateId();
            // 整改单
            CheckResultRectifyListDAO rectifyListDAO = checkResultRectifyListRepository.getByCheckResultIdAndTemplateId(checkBaseBO.getCheckListResultId(), rectifyTemplateId);
            if (null != rectifyListDAO && Objects.equals(RectifyResultListStatusEnum.FINISH.getCode(), rectifyListDAO.getStatus())) {
                // 构建检查整改事实实例
                CheckRectifyBO rectifyBO = new CheckRectifyBO(checkBaseBO.getCheckItemId(), rectifyListDAO.getStatus());
                request.setCheckRectifyBO(rectifyBO);
                return PhaseChainClient.getChainClient(checkRectify).execute(request);
            }
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

}

检查整改实现类:

@Service
@Slf4j
public class CheckRectify implements CheckRewardPunishmentPhaseActionInterface {
    @Autowired
    private CheckRpProducer checkRpProducer;

    @Override
    public CheckRewardPunishmentPhaseResult proceed(PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        if (null == request) {
            return null;
        }

        CheckRectifyBO rectifyBO = request.getCheckRectifyBO();

        // 无整改事实 不处理
        if (null == rectifyBO) {
            return null;
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

6、责任链模式的适用场景

责任链模式在以下情况下特别有用:

  1. 当您希望将请求的发送者和接收者解耦时,可以使用责任链模式。发送者无需知道最终哪个对象会处理请求。
  2. 当您有多个对象可以处理请求,但您不确定哪个对象应该处理时,可以使用责任链模式。这种情况下,您可以将请求传递给责任链,直到有一个对象能够处理它。
  3. 当您希望动态配置处理对象的顺序或添加新的处理对象时,可以使用责任链模式。这使得系统更加灵活和可扩展。
  4. 在需要应用单一职责原则的情况下,责任链模式可以帮助将处理逻辑分散到不同的对象中,每个对象只负责一部分功能。

7、总结

责任链模式是一种有用的设计模式,可以帮助您构建灵活的、可扩展的系统。它通过将请求传递给一系列处理对象来解耦请求发送者和接收者,使得代码更容易维护和扩展。通过示例代码和解释,我们希望您现在更好地理解了责任链模式及其在Java中的实现方式。希望这篇分享文档有助于您在实际项目中应用责任链模式以解决类似的问题。

MySQL 连接为什么挂死了?

本次分享的是一次关于 MySQL 高可用问题的定位过程,其中曲折颇多但问题本身却比较有些代表性,遂将其记录以供参考。

一、背景

近期由测试反馈的问题有点多,其中关于系统可靠性测试提出的问题令人感到头疼,一来这类问题有时候属于“偶发”现象,难以在环境上快速复现;二来则是可靠性问题的定位链条有时候变得很长,极端情况下可能要从 A 服务追踪到 Z 服务,或者是从应用代码追溯到硬件层面。

本次分享的是一次关于 MySQL 高可用问题的定位过程,其中曲折颇多但问题本身却比较有些代表性,遂将其记录以供参考。

架构

首先,本系统以 MySQL 作为主要的数据存储部件。整一个是典型的微服务架构(SpringBoot + SpringCloud),持久层则采用了如下几个组件:

  • mybatis,实现 SQL <-> Method 的映射
  • hikaricp,实现数据库连接池
  • mariadb-java-client,实现 JDBC 驱动

在 MySQL 服务端部分,后端采用了双主架构,前端以 keepalived 结合浮动IP(VIP)做一层高可用。如下:

说明

  • MySQL 部署两台实例,设定为互为主备的关系。
  • 为每台 MySQL 实例部署一个 keepalived 进程,由 keepalived 提供 VIP 高可用的故障切换。

实际上,keepalived 和 MySQL 都实现了容器化,而 VIP 端口则映射到 VM 上的 nodePort 服务端口上。

  • 业务服务一律使用 VIP 进行数据库访问。

Keepalived 是基于 VRRP 协议实现了路由层转换的,在同一时刻,VIP 只会指向其中的一个虚拟机(master)。当主节点发生故障时,其他的 keepalived 会检测到问题并重新选举出新的 master,此后 VIP 将切换到另一个可用的 MySQL 实例节点上。这样一来,MySQL 数据库就拥有了基础的高可用能力。

另外一点,Keepalived 还会对 MySQL 实例进行定时的健康检查,一旦发现 MySQL 实例不可用会将自身进程杀死,进而再触发 VIP 的切换动作。

问题现象

本次的测试用例也是基于虚拟机故障的场景来设计的:

持续以较小的压力向业务服务发起访问,随后将其中一台 MySQL 的容器实例(master)重启。
按照原有的评估,业务可能会产生很小的抖动,但其中断时间应该保持在秒级。

然而经过多次的测试后发现,在重启 MySQL 主节点容器之后,有一定的概率会出现业务却再也无法访问的情况!

二、分析过程

在发生问题之后,开发同学的第一反应是 MySQL 的高可用机制出了问题。由于此前曾经出现过由于 keepalived 配置不当导致 VIP 未能及时切换的问题,因此对其已经有所戒备。

先是经过一通的排查,然后并没有找到 keepalived 任何配置上的毛病。

然后在没有办法的情况下,重新测试了几次,问题又复现了。

紧接着,我们提出了几个疑点:

1.Keepalived 会根据 MySQL 实例的可达性进行判断,会不会是健康检查出了问题?

但在本次测试场景中,MySQL 容器销毁会导致 keepalived 的端口探测产生失败,这同样会导致 keepalived 失效。如果 keepalived 也发生了中止,那么 VIP 应该能自动发生抢占。而通过对比两台虚拟机节点的信息后,发现 VIP 的确发生了切换。

2. 业务进程所在的容器是否发生了网络不可达的问题?

尝试进入容器,对当前发生切换后的浮动IP、端口执行 telnet 测试,发现仍然能访问成功。

连接池

在排查前面两个疑点之后,我们只能将目光转向了业务服务的DB客户端上。

从日志上看,在产生故障的时刻,业务侧的确出现了一些异常,如下:

这里提示的是业务操作获取连接超时了(超过了30秒)。那么,会不会是连接数不够用呢?

Unable to acquire JDBC Connection [n/a] java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms. at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:669) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.9.jar!/:?] ...

业务接入采用的是 hikariCP 连接池,这也是市面上流行度很高的一款组件了。

我们随即检查了当前的连接池配置,如下:

//最小空闲连接数 
spring.datasource.hikari.minimum-idle=10 
//连接池最大大小 
spring.datasource.hikari.maximum-pool-size=50 
//连接最大空闲时长 
spring.datasource.hikari.idle-timeout=60000 
//连接生命时长 
spring.datasource.hikari.max-lifetime=1800000 
//获取连接的超时时长 
spring.datasource.hikari.connection-timeout=30000

其中 注意到 hikari 连接池配置了 minimum-idle = 10,也就是说,就算在没有任何业务的情况下,连接池应该保证有 10 个连接。更何况当前的业务访问量极低,不应该存在连接数不够使用的情况。

除此之外,另外一种可能性则可能是出现了“僵尸连接”,也就是说在重启的过程中,连接池一直没有释放这些不可用的连接,最终造成没有可用连接的结果。

开发同学对”僵尸链接”的说法深信不疑,倾向性的认为这很可能是来自于 HikariCP 组件的某个 BUG…

于是开始走读 HikariCP 的源码,发现应用层向连接池请求连接的一处代码如下:

public class HikariPool{

   //获取连接对象入口
   public Connection getConnection(final long hardTimeout) throws SQLException
   {
      suspendResumeLock.acquire();
      final long startTime = currentTime();

      try {
         //使用预设的30s 超时时间
         long timeout = hardTimeout;
         do {
            //进入循环,在指定时间内获取可用连接
            //从 connectionBag 中获取连接
            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {
               break; // We timed out... break and throw exception
            }

            final long now = currentTime();
            //连接对象被标记清除或不满足存活条件时,关闭该连接
            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            }
            //成功获得连接对象
            else {
               metricsTracker.recordBorrowStats(poolEntry, startTime);
               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);

         //超时了,抛出异常
         metricsTracker.recordBorrowTimeoutStats(startTime);
         throw createTimeoutException(startTime);
      }
      catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
      }
      finally {
         suspendResumeLock.release();
      }
   }
}

getConnection() 方法展示了获取连接的整个流程,其中 connectionBag 是用于存放连接对象的容器对象。如果从 connectionBag 获得的连接不再满足存活条件,那么会将其手动关闭,代码如下:

void closeConnection(final PoolEntry poolEntry, final String closureReason)
   {
      //移除连接对象
      if (connectionBag.remove(poolEntry)) {
         final Connection connection = poolEntry.close();
         //异步关闭连接
         closeConnectionExecutor.execute(() -> {
            quietlyCloseConnection(connection, closureReason);
            //由于可用连接变少,将触发填充连接池的任务
            if (poolState == POOL_NORMAL) {
               fillPool();
            }
         });
      }
   }

注意到,只有当连接满足下面条件中的其中一个时,会被执行 close。

  • isMarkedEvicted() 的返回结果是 true,即标记为清除

如果连接存活时间超出最大生存时间(maxLifeTime),或者距离上一次使用超过了idleTimeout,会被定时任务标记为清除状态,清除状态的连接在获取的时候才真正 close。

  • 500ms 内没有被使用,且连接已经不再存活,即 isConnectionAlive() 返回 false

由于我们把 idleTimeout 和 maxLifeTime 都设置得非常大,因此需重点检查 isConnectionAlive 方法中的判断,如下:

public class PoolBase{

   //判断连接是否存活
   boolean isConnectionAlive(final Connection connection)
   {
      try {
         try {
            //设置 JDBC 连接的执行超时
            setNetworkTimeout(connection, validationTimeout);

            final int validationSeconds = (int) Math.max(1000L, validationTimeout) / 1000;

            //如果没有设置 TestQuery,使用 JDBC4 的校验接口
            if (isUseJdbc4Validation) {
               return connection.isValid(validationSeconds);
            }

            //使用 TestQuery(如 select 1)语句对连接进行探测
            try (Statement statement = connection.createStatement()) {
               if (isNetworkTimeoutSupported != TRUE) {
                  setQueryTimeout(statement, validationSeconds);
               }

               statement.execute(config.getConnectionTestQuery());
            }
         }
         finally {
            setNetworkTimeout(connection, networkTimeout);

            if (isIsolateInternalQueries && !isAutoCommit) {
               connection.rollback();
            }
         }

         return true;
      }
      catch (Exception e) {
         //发生异常时,将失败信息记录到上下文
         lastConnectionFailure.set(e);
         logger.warn("{} - Failed to validate connection {} ({}). Possibly consider using a shorter maxLifetime value.",
                     poolName, connection, e.getMessage());
         return false;
      }
   }

}

我们看到,在PoolBase.isConnectionAlive 方法中对连接执行了一系列的探测,如果发生异常还会将异常信息记录到当前的线程上下文中。随后,在 HikariPool 抛出异常时会将最后一次检测失败的异常也一同收集,如下:

private SQLException createTimeoutException(long startTime)
{
   logPoolState("Timeout failure ");
   metricsTracker.recordConnectionTimeout();

   String sqlState = null;
   //获取最后一次连接失败的异常
   final Throwable originalException = getLastConnectionFailure();
   if (originalException instanceof SQLException) {
      sqlState = ((SQLException) originalException).getSQLState();
   }
   //抛出异常
   final SQLException connectionException = new SQLTransientConnectionException(poolName + " - Connection is not available, request timed out after " + elapsedMillis(startTime) + "ms.", sqlState, originalException);
   if (originalException instanceof SQLException) {
      connectionException.setNextException((SQLException) originalException);
   }

   return connectionException;
}

这里的异常消息和我们在业务服务中看到的异常日志基本上是吻合的,即除了超时产生的 “Connection is not available, request timed out after xxxms” 消息之外,日志中还伴随输出了校验失败的信息:

Caused by: java.sql.SQLException: Connection.setNetworkTimeout cannot be called on a closed connection at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.getSqlException(ExceptionMapper.java:211) ~[mariadb-java-client-2.2.6.jar!/:?] at org.mariadb.jdbc.MariaDbConnection.setNetworkTimeout(MariaDbConnection.java:1632) ~[mariadb-java-client-2.2.6.jar!/:?] at com.zaxxer.hikari.pool.PoolBase.setNetworkTimeout(PoolBase.java:541) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.PoolBase.isConnectionAlive(PoolBase.java:162) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:172) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.9.jar!/:?]

到这里,我们已经将应用获得连接的代码大致梳理了一遍,整个过程如下图所示:

从执行逻辑上看,连接池的处理并没有问题,相反其在许多细节上都考虑到位了。在对非存活连接执行 close 时,同样调用了 removeFromBag 动作将其从连接池中移除,因此也不应该存在僵尸连接对象的问题。

那么,我们之前的推测应该就是错误的!

陷入焦灼

在代码分析之余,开发同学也注意到当前使用的 hikariCP 版本为 3.4.5,而环境上出问题的业务服务却是 2.7.9 版本,这仿佛预示着什么… 让我们再次假设 hikariCP 2.7.9 版本存在某种未知的 BUG,导致了问题的产生。

为了进一步分析连接池对于服务端故障的行为处理,我们尝试在本地机器上进行模拟,这一次使用了 hikariCP 2.7.9 版本进行测试,并同时将 hikariCP 的日志级别设置为 DEBUG。

模拟场景中,会由 由本地应用程序连接本机的 MySQL 数据库进行操作,步骤如下:

1. 初始化数据源,此时连接池 min-idle 设置为 10;
2. 每隔50ms 执行一次SQL操作,查询当前的元数据表;
3. 将 MySQL 服务停止一段时间,观察业务表现;
4. 将 MySQL 服务重新启动,观察业务表现。

最终产生的日志如下:

//初始化过程,建立10个连接 
DEBUG -HikariPool.logPoolState - Pool stats (total=1, active=1, idle=0, waiting=0) DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@71ab7c09 DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@7f6c9c4c DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@7b531779 ... DEBUG -HikariPool.logPoolState- After adding stats (total=10, active=1, idle=9, waiting=0) 
//执行业务操作,成功 execute statement: true test time -------1 execute statement: true test time -------2 ... 
//停止MySQL ... 
//检测到无效连接 WARN -PoolBase.isConnectionAlive - Failed to validate connection MariaDbConnection@9225652 ((conn=38652) Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value. WARN -PoolBase.isConnectionAlive - Failed to validate connection MariaDbConnection@71ab7c09 ((conn=38653) Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value. 
//释放连接 DEBUG -PoolBase.quietlyCloseConnection(PoolBase.java:134) - Closing connection MariaDbConnection@9225652: (connection is dead) DEBUG -PoolBase.quietlyCloseConnection(PoolBase.java:134) - Closing connection MariaDbConnection@71ab7c09: (connection is dead) 
//尝试创建连接失败 DEBUG -HikariPool.createPoolEntry - Cannot acquire connection from data source java.sql.SQLNonTransientConnectionException: Could not connect to address=(host=localhost)(port=3306)(type=master) : Socket fail to connect to host:localhost, port:3306. Connection refused: connect Caused by: java.sql.SQLNonTransientConnectionException: Socket fail to connect to host:localhost, port:3306. Connection refused: connect at internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:73) ~[mariadb-java-client-2.6.0.jar:?] ... 
//持续失败.. 直到MySQL重启 //重启后,自动创建连接成功 DEBUG -HikariPool$PoolEntryCreator.call -Added connection MariaDbConnection@42c5503e DEBUG -HikariPool$PoolEntryCreator.call -Added connection MariaDbConnection@695a7435 
//连接池状态,重新建立10个连接 DEBUG -HikariPool.logPoolState(HikariPool.java:421) -After adding stats (total=10, active=1, idle=9, waiting=0) 
//执行业务操作,成功(已经自愈) execute statement: true

从日志上看,hikariCP 还是能成功检测到坏死的连接并将其踢出连接池,一旦 MySQL 重新启动,业务操作又能自动恢复成功了。根据这个结果,基于 hikariCP 版本问题的设想也再次落空,研发同学再次陷入焦灼。

拨开云雾见光明

多方面求证无果之后,我们最终尝试在业务服务所在的容器内进行抓包,看是否能发现一些蛛丝马迹。

进入故障容器,执行 tcpdump -i eth0 tcp port 30052 进行抓包,然后对业务接口发起访问。

此时令人诡异的事情发生了,没有任何网络包产生!而业务日志在 30s 之后也出现了获取连接失败的异常。

我们通过 netstat 命令检查网络连接,发现只有一个 ESTABLISHED 状态的 TCP 连接。

也就是说,当前业务实例和 MySQL 服务端是存在一个建好的连接的,但为什么业务还是报出可用连接呢?

推测可能原因有二:

  • 该连接被某个业务(如定时器)一直占用。
  • 该连接实际上还没有办法使用,可能处于某种僵死的状态。

对于原因一,很快就可以被推翻,一来当前服务并没有什么定时器任务,二来就算该连接被占用,按照连接池的原理,只要没有达到上限,新的业务请求应该会促使连接池进行新连接的建立,那么无论是从 netstat 命令检查还是 tcpdump 的结果来看,不应该一直是只有一个连接的状况。

那么,情况二的可能性就很大了。带着这个思路,继续分析 Java 进程的线程栈。

执行 kill -3 pid 将线程栈输出后分析,果不其然,在当前 thread stack 中发现了如下的条目:

"HikariPool-1 connection adder" #121 daemon prio=5 os_prio=0 tid=0x00007f1300021800 nid=0xad runnable [0x00007f12d82e5000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.FilterInputStream.read(FilterInputStream.java:133) at org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream.fillBuffer(ReadAheadBufferedStream.java:129) at org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream.read(ReadAheadBufferedStream.java:102) - locked <0x00000000d7f5b480> (a org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream) at org.mariadb.jdbc.internal.io.input.StandardPacketInputStream.getPacketArray(StandardPacketInputStream.java:241) at org.mariadb.jdbc.internal.io.input.StandardPacketInputStream.getPacket(StandardPacketInputStream.java:212) at org.mariadb.jdbc.internal.com.read.ReadInitialHandShakePacket.<init>(ReadInitialHandShakePacket.java:90) at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.createConnection(AbstractConnectProtocol.java:480) at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1236) at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:610) at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:142) at org.mariadb.jdbc.Driver.connect(Driver.java:86) at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:138) at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:358) at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206) at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:477)

这里显示 HikariPool-1 connection adder 这个线程一直处于 socketRead 的可执行状态。从命名上看该线程应该是 HikariCP 连接池用于建立连接的任务线程,socket 读操作则来自于 MariaDbConnection.newConnection() 这个方法,即 mariadb-java-client 驱动层建立 MySQL 连接的一个操作,其中 ReadInitialHandShakePacket 初始化则属于 MySQL 建链协议中的一个环节。

简而言之,上面的线程刚好处于建链的一个过程态,关于 mariadb 驱动和 MySQL 建链的过程大致如下:

MySQL 建链首先是建立 TCP 连接(三次握手),客户端会读取 MySQL 协议的一个初始化握手消息包,内部包含 MySQL 版本号,鉴权算法等等信息,之后再进入身份鉴权的环节。

这里的问题就在于 ReadInitialHandShakePacket 初始化(读取握手消息包)一直处于 socket read 的一个状态。

如果此时 MySQL 远端主机故障了,那么该操作就会一直卡住。而此时的连接虽然已经建立(处于 ESTABLISHED 状态),但却一直没能完成协议握手和后面的身份鉴权流程,即该连接只能算一个半成品(无法进入 hikariCP 连接池的列表中)。从故障服务的 DEBUG 日志也可以看到,连接池持续是没有可用连接的,如下:DEBUG HikariPool.logPoolState –> Before cleanup stats (total=0, active=0, idle=0, waiting=3)

另一个需要解释的问题则是,这样一个 socket read 操作的阻塞是否就造成了整个连接池的阻塞呢?

经过代码走读,我们再次梳理了 hikariCP 建立连接的一个流程,其中涉及到几个模块:

  • HikariPool,连接池实例,由该对象连接的获取、释放以及连接的维护。
  • ConnectionBag,连接对象容器,存放当前的连接对象列表,用于提供可用连接。
  • AddConnectionExecutor,添加连接的执行器,命名如 “HikariPool-1 connection adder”,是一个单线程的线程池。
  • PoolEntryCreator,添加连接的任务,实现创建连接的具体逻辑。
  • HouseKeeper,内部定时器,用于实现连接的超时淘汰、连接池的补充等工作。

HouseKeeper 在连接池初始化后的 100ms 触发执行,其调用 fillPool() 方法完成连接池的填充,例如 min-idle 是10,那么初始化就会创建10个连接。ConnectionBag 维护了当前连接对象的列表,该模块还维护了请求连接者(waiters)的一个计数器,用于评估当前连接数的需求。

其中,borrow 方法的逻辑如下:

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
   {
      // 尝试从 thread-local 中获取
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
         ...
      }

      // 计算当前等待请求的任务
      final int waiting = waiters.incrementAndGet();
      try {
         for (T bagEntry : sharedList) {
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               //如果获得了可用连接,会触发填充任务
               if (waiting > 1) {
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }

         //没有可用连接,先触发填充任务
         listener.addBagItem(waiting);

         //在指定时间内等待可用连接进入
         timeout = timeUnit.toNanos(timeout);
         do {
            final long start = currentTime();
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= elapsedNanos(start);
         } while (timeout > 10_000);

         return null;
      }
      finally {
         waiters.decrementAndGet();
      }
   }

注意到,无论是有没有可用连接,该方法都会触发一个 listener.addBagItem() 方法,HikariPool 对该接口的实现如下:

public void addBagItem(final int waiting)
   {
      final boolean shouldAdd = waiting - addConnectionQueueReadOnlyView.size() >= 0; // Yes, >= is intentional.
      if (shouldAdd) {
         //调用 AddConnectionExecutor 提交创建连接的任务
         addConnectionExecutor.submit(poolEntryCreator);
      }
      else {
         logger.debug("{} - Add connection elided, waiting {}, queue {}", poolName, waiting, addConnectionQueueReadOnlyView.size());
      }
   }
PoolEntryCreator 则实现了创建连接的具体逻辑,如下:
public class PoolEntryCreator{
     @Override
      public Boolean call()
      {
         long sleepBackoff = 250L;
         //判断是否需要建立连接
         while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {
            //创建 MySQL 连接
            final PoolEntry poolEntry = createPoolEntry();
 
            if (poolEntry != null) {
               //建立连接成功,直接返回。
               connectionBag.add(poolEntry);
               logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
               if (loggingPrefix != null) {
                  logPoolState(loggingPrefix);
               }
               return Boolean.TRUE;
            }
            ...
         }

         // Pool is suspended or shutdown or at max size
         return Boolean.FALSE;
      }
}

由此可见,AddConnectionExecutor 采用了单线程的设计,当产生新连接需求时,会异步触发 PoolEntryCreator 任务进行补充。其中 PoolEntryCreator. createPoolEntry() 会完成 MySQL 驱动连接建立的所有事情,而我们的情况则恰恰是 MySQL 建链过程产生了永久性阻塞。因此无论后面怎么获取连接,新来的建链任务都会一直排队等待,这便导致了业务上一直没有连接可用。

下面这个图说明了 hikariCP 的建链过程:

好了,让我们在回顾一下前面关于可靠性测试的场景:

首先,MySQL 主实例发生故障,而紧接着 hikariCP 则检测到了坏的连接(connection is dead)并将其释放,在释放关闭连接的同时又发现连接数需要补充,进而立即触发了新的建链请求。
而问题就刚好出在这一次建链请求上,TCP 握手的部分是成功了(客户端和 MySQL VM 上 nodePort 完成连接),但在接下来由于当前的 MySQL 容器已经停止(此时 VIP 也切换到了另一台 MySQL 实例上),因此客户端再也无法获得原 MySQL 实例的握手包响应(该握手属于MySQL应用层的协议),此时便陷入了长时间的阻塞式 socketRead 操作。而建链请求任务恰恰好采用了单线程运作,进一步则导致了所有业务的阻塞。

三、解决方案

在了解了事情的来龙去脉之后,我们主要考虑从两方面进行优化:

  • 优化一,增加 HirakiPool 中 AddConnectionExecutor 线程的数量,这样即使第一个线程出现挂死,还有其他的线程能参与建链任务的分配。
  • 优化二,出问题的 socketRead 是一种同步阻塞式的调用,可通过 SO_TIMEOUT 来避免长时间挂死。

对于优化点一,我们一致认为用处并不大,如果连接出现了挂死那么相当于线程资源已经泄露,对服务后续的稳定运行十分不利,而且 hikariCP 在这里也已经将其写死了。因此关键的方案还是避免阻塞式的调用。

查阅了 mariadb-java-client 官方文档后,发现可以在 JDBC URL 中指定网络IO 的超时参数,如下:

具体参考:https://mariadb.com/kb/en/about-mariadb-connector-j/

如描述所说的,socketTimeout 可以设置 socket 的 SO_TIMEOUT 属性,从而达到控制超时时间的目的。默认是 0,即不超时。

我们在 MySQL JDBC URL 中加入了相关的参数,如下:

spring.datasource.url=jdbc:mysql://10.0.71.13:33052/appdb?socketTimeout=60000&connectTimeout=30000&serverTimezone=UTC

此后对 MySQL 可靠性场景进行多次验证,发现连接挂死的现象已经不再出现,此时问题得到解决。

四、小结

本次分享了一次关于 MySQL 连接挂死问题排查的心路历程,由于环境搭建的工作量巨大,而且该问题复现存在偶然性,整个分析过程还是有些坎坷的(其中也踩了坑)。的确,我们很容易被一些表面的现象所迷惑,而觉得问题很难解决时,更容易带着偏向性思维去处理问题。例如本例中曾一致认为连接池出现了问题,但实际上却是由于 MySQL JDBC 驱动(mariadb driver)的一个不严谨的配置所导致。

从原则上讲,应该避免一切可能导致资源挂死的行为。如果我们能在前期对代码及相关配置做好充分的排查工作,相信 996 就会离我们越来越远。

数据库和缓存双写一致性问题

缓存由于其高并发和高性能的特性,已经在项目中被广泛使用。在读取缓存方面,没啥疑问,都是按照下图的流程来进行业务操作。

但是在更新缓存方面,对于更新完数据库,是更新缓存呢,还是删除缓存。又或者是先删除缓存,再更新数据库,其实大家存在很大的争议。

先做一个说明,从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案。这种方案下,我们可以对存入缓存的数据设置过期时间,所有的写操作以数据库为准,对缓存操作只是尽最大努力即可。也就是说如果数据库写成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存。因此,接下来讨论的思路不依赖于给缓存设置过期时间这个方案。

在这里,我们讨论三种更新策略:

  1. 先更新数据库,再更新缓存
  2. 先删除缓存,再更新数据库
  3. 先更新数据库,再删除缓存

应该没人问我,为什么没有先更新缓存,再更新数据库这种策略。

1、先更新数据库,再更新缓存

这套方案,大家是普遍反对的。为什么呢?有如下两点原因。

  • 原因一(线程安全角度)

同时有请求A和请求B进行更新操作,那么会出现

(1)线程A更新了数据库
(2)线程B更新了数据库
(3)线程B更新了缓存
(4)线程A更新了缓存

这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。

  • 原因二(业务场景角度)

有如下两点:

(1)如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到,缓存就被频繁的更新,浪费性能。
(2)如果你写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存。那么,每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。

接下来讨论的就是争议最大的,先删缓存,再更新数据库。还是先更新数据库,再删缓存的问题。

2、先删缓存,再更新数据库

该方案会导致不一致的原因是。同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:

(1)请求A进行写操作,删除缓存
(2)请求B查询发现缓存不存在
(3)请求B去数据库查询得到旧值
(4)请求B将旧值写入缓存
(5)请求A将新值写入数据库

上述情况就会导致不一致的情形出现。而且,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。

那么,如何解决呢?采用延时双删策略

伪代码如下

public void write(String key,Object data){ 
    redis.delKey(key); 
    db.updateData(data); 
    Thread.sleep(1000); 
    redis.delKey(key); 
}

转化为中文描述就是

(1)先淘汰缓存
(2)再写数据库(这两步和原来一样)
(3)休眠1秒,再次淘汰缓存

这么做,可以将1秒内所造成的缓存脏数据,再次删除。那么,这个1秒怎么确定的,具体该休眠多久呢?

针对上面的情形,读者应该自行评估自己的项目的读数据业务逻辑的耗时。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上,加几百ms即可。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。

如果你用了mysql的读写分离架构怎么办?

ok,在这种情况下,造成数据不一致的原因如下,还是两个请求,一个请求A进行更新操作,另一个请求B进行查询操作。

(1)请求A进行写操作,删除缓存
(2)请求A将数据写入数据库了,
(3)请求B查询缓存发现,缓存没有值
(4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值
(5)请求B将旧值写入缓存
(6)数据库完成主从同步,从库变为新值

上述情形,就是数据不一致的原因。还是使用双删延时策略。只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms。

采用这种同步淘汰策略,吞吐量降低怎么办?

那就将第二次删除作为异步的。自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后了,再返回。这么做,加大吞吐量。

第二次删除,如果删除失败怎么办?

这是个非常好的问题,因为第二次删除失败,就会出现如下情形。还是有两个请求,一个请求A进行更新操作,另一个请求B进行查询操作,为了方便,假设是单库:

(1)请求A进行写操作,删除缓存
(2)请求B查询发现缓存不存在
(3)请求B去数据库查询得到旧值
(4)请求B将旧值写入缓存
(5)请求A将新值写入数据库
(6)请求A试图去删除请求B写入对缓存值,结果失败了。

ok,这也就是说。如果第二次删除缓存失败,会再次出现缓存和数据库不一致的问题。

如何解决呢?

具体解决方案,且看博主对第(3)种更新策略的解析。

3、先更新数据库,再删缓存

首先,先说一下。老外提出了一个缓存更新套路,名为《Cache-Aside pattern》。其中就指出

  • 失效:应用程序先从cache取数据,没有得到,则从数据库中取数据,成功后,放到缓存中。
  • 命中:应用程序从cache中取数据,取到后返回。
  • 更新:先把数据存到数据库中,成功后,再让缓存失效。

另外,知名社交网站facebook也在论文《Scaling Memcache at Facebook》中提出,他们用的也是先更新数据库,再删缓存的策略。

这种情况不存在并发问题么?

不是的。假设这会有两个请求,一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生

(1)缓存刚好失效
(2)请求A查询数据库,得一个旧值
(3)请求B将新值写入数据库
(4)请求B删除缓存
(5)请求A将查到的旧值写入缓存

ok,如果发生上述情况,确实是会发生脏数据。

然而,发生这种情况的概率又有多少呢?

发生上述情况有一个先天性条件,就是步骤(3)的写数据库操作比步骤(2)的读数据库操作耗时更短,才有可能使得步骤(4)先于步骤(5)。可是,大家想想,数据库的读操作的速度远快于写操作的(不然做读写分离干嘛,做读写分离的意义就是因为读操作比较快,耗资源少),因此步骤(3)耗时比步骤(2)更短,这一情形很难出现。

假设,有人非要抬杠,有强迫症,一定要解决怎么办?

如何解决上述并发问题?

首先,给缓存设有效时间是一种方案。其次,采用策略(2)里给出的异步延时删除策略,保证读请求完成以后,再进行删除操作。

还有其他造成不一致的原因么?

有的,这也是缓存更新策略(2)和缓存更新策略(3)都存在的一个问题,如果删缓存失败了怎么办,那不是会有不一致的情况出现么。比如一个写数据请求,然后写入数据库了,删缓存失败了,这会就出现不一致的情况了。这也是缓存更新策略(2)里留下的最后一个疑问。

如何解决?提供一个保障的重试机制即可,这里给出两套方案。

方案一,如下图所示:

​(1)更新数据库数据;
(2)缓存因为种种问题删除失败
(3)将需要删除的key发送至消息队列
(4)自己消费消息,获得需要删除的key
(5)继续重试删除操作,直到成功

然而,该方案有一个缺点,对业务线代码造成大量的侵入。于是有了方案二,在方案二中,启动一个订阅程序去订阅数据库的binlog,获得需要操作的数据。在应用程序中,另起一段程序,获得这个订阅程序传来的信息,进行删除缓存操作。

方案二,流程如下图所示:

​(1)更新数据库数据
(2)数据库会将操作信息写入binlog日志当中
(3)订阅程序提取出所需要的数据以及key
(4)另起一段非业务代码,获得该信息
(5)尝试删除缓存操作,发现删除失败
(6)将这些信息发送至消息队列
(7)重新从消息队列中获得该数据,重试操作。

备注说明:上述的订阅binlog程序在mysql中有现成的中间件叫canal,可以完成订阅binlog日志的功能。至于oracle中,博主目前不知道有没有现成中间件可以使用。另外,重试机制,博主是采用的是消息队列的方式。如果对一致性要求不是很高,直接在程序中另起一个线程,每隔一段时间去重试即可,这些大家可以灵活自由发挥,只是提供一个思路。

总结
对目前互联网中已有的一致性方案,进行了一个总结。对于先删缓存,再更新数据库的更新策略,还有方案提出维护一个内存队列的方式,觉得实现异常复杂,没有必要,因此没有必要在文中给出。最后,希望大家有所收获。

ElasticSearch高并发场景写入优化

ElasticSearch高并发场景写入优化

ElasticSearch号称分布式全文搜索引擎,但使用不当依然会很慢,特别是在高并发写入时,会存在写入超时的问题。

在公司内部,基本所有的日志都会放入到ElasticSearch,比如接口访问时间日志、动态/计划数据审核日志、动态/计划抓取报文日志、动态报文更新日志等,每天的写入数据量巨大,最初我们基于ElasticSearch封装了一层,使用内部RestHighLevelClient获取数据,部分代码如下:

@Slf4j
public class ElasticsearchService {

    private RestHighLevelClient client;

    public ElasticsearchService(RestHighLevelClient client) {
        if (client == null)
            throw new IllegalArgumentException("client");
        this.client = client;
    }

    /**
     * 根据id获取文档
     **/
    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;
        try {
            GetRequest getRequest = new GetRequest(index, type, id);
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

            return getResponse;

        } catch (Exception e) {
            log.error("获取文档失败", e);
        }
        return null;
    }

    /**
     * 插入文档, id可以为空,id如果为空,就自动生成id
     **/
    public IndexResponse insertDocument(String index, String type, String id, String json) {
        if (StringUtils.isAnyBlank(index, type, json))
            return null;

        try {
            IndexRequest indexRequest = generateInsertRequest(index, type, id, json);

            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            return indexResponse;
        } catch (Exception e) {
            log.error("创建文档失败", e);
        }
        return null;
    }
}

然而在放到线上生成环境下,经常会出现写入失败数据丢失的情况,发现是数据量写入太大,调用接口超时,直接返回错误了。RestHighLevelClient实质就是通过httpclient请求接口发送数据,是基于http协议。

查看Elasticsearch文档,发现Elasticsearch同样支持使用Transport传输,其内部使用netty长连接通信,并且可以设置处理连接的数量。

新建一个TransportClientFactory用来创建TransportClient,这里连接需要使用密钥,具体填写自己Elasticsearch的账号密码。

public class TransportClientFactory {
    // 默认处理核心数,cpu核心数
    private static int DEFAULT_PROCESSORS = 8;
    // 配置核心数
    private static int processors = 8;

    public static TransportClient create(String addresses, int port, String keyStore, String keyPassword, String trustStore, String trustPassword) throws Exception {

        if (StringUtils.isBlank(addresses)) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (StringUtils.isAnyBlank(keyStore, keyPassword, trustStore, trustPassword)) {
            throw new IllegalArgumentException("缺少证书或密码");
        }

        String[] addrs = addresses.split(",");
        if (addrs.length == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        List<String> validAddrs = new ArrayList<>();
        for (String addr : addrs) {
            if (StringUtils.isNotBlank(addr)) {
                validAddrs.add(addr.trim());
            }
        }

        if (validAddrs.size() == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (processors < DEFAULT_PROCESSORS) {
            processors = DEFAULT_PROCESSORS;
        }
        if (processors > 32) {
            processors = 32;
        }
        int threadPoolCore = processors * 2;
        int threadPoolMax = processors * 2;
        Settings settings = Settings
                .builder()
                .put("path.home", ".")
                // .put("cluster.name", clusterName)
                .put("client.transport.ignore_cluster_name", true).put("searchguard.ssl.transport.enabled", true)
                .put("searchguard.ssl.transport.enforce_hostname_verification", false).put("searchguard.ssl.transport.keystore_filepath", keyStore)
                .put("searchguard.ssl.transport.keystore_password", keyPassword).put("searchguard.ssl.transport.truststore_filepath", trustStore)
                .put("searchguard.ssl.transport.truststore_password", trustPassword).put("processors", processors).put("thread_pool.flush.core", threadPoolCore)
                .put("thread_pool.flush.max", threadPoolMax).build();

        TransportClient client = new PreBuiltTransportClient(settings, SearchGuardPlugin.class);

        for (String validAddr : validAddrs) {
            client.addTransportAddress(new TransportAddress(InetAddress.getByName(validAddr), port));
        }

        return client;
    }

    public static int getProcessors() {
        return processors;
    }

    public static void setProcessors(int processors) {
        TransportClientFactory.processors = processors;
    }

}

修改ElasticsearchService

@Slf4j
public class ElasticsearchService {

    /**
     * 默认10秒超时
     **/
    private long bulkTimeoutSecond = 10;

    private TransportClient client;

    public ElasticsearchService(TransportClient client) {
        this.client = client;
    }

    public ElasticsearchService(TransportClient client, int asyncTimeoutSecond) {
        this.client = client;
        this.bulkTimeoutSecond = asyncTimeoutSecond;
    }

    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;

        return this.client.prepareGet(index, type, id).get();
    }

    public IndexResponse createDocument(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        IndexRequestBuilder builder = createDocumentRequestBuilder(index, type, id, msg);
        if (builder == null)
            return null;

        return builder.get();
    }

    private IndexRequestBuilder createDocumentRequestBuilder(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        if (StringUtils.isBlank(id)) {
            return this.client.prepareIndex(index, type).setSource(msg, XContentType.JSON);
        } else {
            return this.client.prepareIndex(index, type, id).setSource(msg, XContentType.JSON);
        }
    }

}

另外为了提升写入效率,可以批量异步一次性写入,使用bulk方法,如下:

  /**
     * 批量操作,listener可空,支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    public void bulkDocumentOperationAsync(List<DocWriteRequest<?>> requests, ActionListener<BulkResponse> listener) {
        if (requests == null || requests.size() == 0)
            return;

        BulkRequest request = prepareBulkRequests(requests);
        if (request == null)
            return;

        this.client.bulk(request, listener);
    }

    /**
     * 支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    private BulkRequest prepareBulkRequests(List<DocWriteRequest<?>> requests) {
        BulkRequestBuilder builder = bulkRequestBuilder();

        for (DocWriteRequest<?> request : requests) {
            if (request instanceof IndexRequest) {
                builder.add((IndexRequest) request);
            } else if (request instanceof UpdateRequest) {
                builder.add((UpdateRequest) request);
            } else if (request instanceof DeleteRequest) {
                builder.add((DeleteRequest) request);
            }
        }

        if (builder.numberOfActions() == 0) {
            return null;
        }
        return builder.request();
    }

在dao层,我们可以一次性写入多条数据

    /**
     * 异步写入动态日志
     * 
     * @param flightRecords
     */
    public void insertFlightRecodeListAsync(List<FlightRecord> flightRecords) {
        try {
            if (CollectionUtils.isEmpty(flightRecords)) {
                return;
            }
            ElasticsearchService elasticsearchService = elasticsearchProvider.getElasticsearchService();
            List<DocWriteRequest<?>> requests = new ArrayList(flightRecords.size());
            for (FlightRecord flightRecord : flightRecords) {
                // index以航班日期为准
                String index = elasticsearchProvider.FLIGHT_RECORD_INDEX_PREFIX + flightRecord.getLocalDate().replace("-", ".");
                requests.add(elasticsearchService.createDocumentRequest(index, elasticsearchProvider.FLIGHT_RECORD_TYPE, null, JSON.toJSONString(flightRecord)));

            }
            if (CollectionUtils.isEmpty(requests)) {
                return;
            }
            elasticsearchService.bulkDocumentOperationAsync(requests, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkItemResponses) {
                }

                @Override
                public void onFailure(Exception e) {
                    log.error("flightRecord写入ES失败", e);
                }
            });
        } catch (Exception e) {
            log.error("insertFlightRecodeListAsync Exception", e);
        }
    }

其它写入性能优化:

  1. 去掉不必要的字段分词和索引,ElasticSearch会默认对所有字段进行分词,一般查询我们都是根据航班号、航班日期、起飞机场、到达机场查询;所以没必要的字段,我们可以使用keyword类型,不进行分词。
    PUT my_index
    {
      "mappings": {
        "my_type": {
          "properties": {
            "tail_no": { 
              "type": "keyword",
              "index": false
            }
          }
        }
      }
    }
  2. 对于一些普通的日志,比如动态/计划抓取网页日志,数据丢失也无所谓,可以禁用掉refresh和replia,即index.refreshinterval设置为-1,将index.numberof_replicas设置为0即可。
    PUT my_index
    {
      "settings": {
        "index": {
            "refresh_interval" : "-1",
            "number_of_replicas" : 0
        }
      }
    }
  3. 使用ElasticSearch自增ID,如果我们要手动给ElasticSearch document设置一个id,那么ElasticSearch需要每次都去确认一下那个id是否存在,这个过程是比较耗费时间的。如果我们使用自动生成的id,那么ElasticSearch就可以跳过这个步骤,写入性能会更好。
  4. 使用多线程写入ElasticSearch,单线程发送bulk请求是无法最大化ElasticSearch集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。为了更好的利用集群的资源,这样多线程并发写入,可以减少每次底层磁盘fsync的次数和开销。一样,可以对单个ElasticSearch节点的单个shard做压测,比如说,先是2个线程,然后是4个线程,然后是8个线程,16个,每次线程数量倍增。一旦发现es返回了TOOMANYREQUESTS的错误,JavaClient也就是EsRejectedExecutionException,此时那么就说明ElasticSearch是说已经到了一个并发写入的最大瓶颈了,此时我们就知道最多只能支撑这么高的并发写入了。
  5. 适当增加index buffer的大小,如果我们要进行非常重的高并发写入操作,那么最好将index buffer调大一些,indices.memory.indexbuffersize,这个可以调节大一些,设置的这个index buffer大小,是所有的shard公用的,但是如果除以shard数量以后,算出来平均每个shard可以使用的内存大小,一般建议,但是对于每个shard来说,最多给512mb,因为再大性能就没什么提升了。ElasticSearch会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个buffer。默认这个参数的值是10%,也就是jvm heap的10%,如果我们给jvm heap分配10gb内存,那么这个index buffer就有1gb,对于两个shard共享来说,是足够的了。

将项目发布到Maven仓库

我的开源包eweb已经发布到maven中心库了,使用maven配置就可以引入:

    <dependency>
        <groupId>com.github.chuanzh</groupId>
        <artifactId>eweb</artifactId>
        <version>1.0.1</version>
    </dependency>

下面讲讲如何发布到中心库,查了网上的步骤,都说的比较复制,但其实也很简单,这里分下面几个步骤:

一,Sonatype账号注册&创建一个Issue

二,使用GPG生成秘钥

三,修改项目的maven配置文件

四,上传构件到OSS&发布

看懂了之后其实很好理解,其实就类似于申请接入腾讯,微博的APP应用,

第一步,你得成为一个开发者吧,也就是要注册一个开发者账号,接下来就是要创建一个应用,等待腾讯、微博服务商审核

第二步,审核通过后,腾讯、微博会给你一对秘钥,但maven是要自己生成的,也就是用GPG

第三步,在你的项目中配置密钥和其他配置。

第四部,就是上传你的项目到腾讯、微博的服务平台,然后再等待审核,审核通过,那你就可以在他们的平台使用APP了

下面就从这4个方面说明下。

一,Sonatype账号注册&创建一个Issue

注册地址: Sign up for JIRA

记住你的用户名和密码,之后要用到

创建一个Issue:https://issues.sonatype.org/secure/CreateIssue.jspa?issuetype=21&pid=10134

v2-404b13199782405587859b6e73c02854_b

填写项目的基本信息:简单描述,详细描述,项目地址

重点说明下:Group Id为你项目的groupid,一般为公司或个人域名,如果你没有,就不要随便写了(他们在审核时会要求你提供一些证明的),写github的项目地址就可以了

好了,现在你就可以静静的等待maven审核了,一般为1~2天

二,使用GPG生成秘钥

首先需要下载GPG软件,Mac下载地址:GPG Suite,Windows用户下载地址:Secure email and file encryption with GnuPG for Windows,我的是Mac,所以具体说说Mac下的生成方式,Window方式请参考这篇博客

Mac下生成很简单,如下几个步骤:

1,新建一个密钥

v2-4fe9a772f939f0a8ef4fd2bfd103f97d_b

2,点击右键,将公钥发送至公钥服务器

v2-30789dafea5f96661c8e22b9c58ac165_b

到此密钥就生成了,记住你的口令,后面要用到

三,修改项目的maven配置文件

1,修改maven的setting配置

<settings>

    ...

    <servers>
        <server>
            <id>oss</id>
            <username>用户名</username>
            <password>密码</password>
        </server>
    </servers>

    ...

</settings>

用户名和密码就是你注册Sonatype的账号

2,配置项目pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.github.chuanzh</groupId>
  <artifactId>eweb</artifactId>
  <version>1.0.1</version>
  <packaging>jar</packaging>

  <name>eweb</name>
  <description>轻量web开发框架</description>
  <url>https://github.com/chuanzh/eweb</url>

  <licenses>
    <license>
      <name>The Apache Software License, Version 2.0</name>
      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
    </license>
  </licenses>

  <developers>
    <developer>
      <name>chuan.zhang</name>
      <email>zhangchuan0305@gmail.com</email>
    </developer>
  </developers>

  <scm>
    <connection>scm:git@github.com:chuanzh/eweb.git</connection>
    <developerConnection>scm:git@github.com:chuanzh/eweb.git</developerConnection>
    <url>git@github.com:chuanzh/eweb.git</url>
  </scm>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>utf-8</project.reporting.outputEncoding>
  </properties>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5.1</version>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

  <profiles>
    <profile>
      <id>release</id>
      <distributionManagement>
        <snapshotRepository>
          <id>oss</id>
          <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
        </snapshotRepository>
        <repository>
          <id>oss</id>
          <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
        </repository>
      </distributionManagement>
      <build>
        <plugins>
          <!-- Source -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-source-plugin</artifactId>
            <version>3.0.1</version>
            <executions>
              <execution>
                <phase>package</phase>
                <goals>
                  <goal>jar-no-fork</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
          <!-- Javadoc -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-javadoc-plugin</artifactId>
            <version>2.10.4</version>
            <executions>
              <execution>
                <phase>package</phase>
                <goals>
                  <goal>jar</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
          <!-- Gpg Signature -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-gpg-plugin</artifactId>
            <version>1.6</version>
            <executions>
              <execution>
                <id>sign-artifacts</id>
                <phase>verify</phase>
                <goals>
                  <goal>sign</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </profile>
  </profiles>

</project>

pom.xml必须配置的有:name,description,licenses,developers,scm。

snapshotRepository和repository在Issure初审通过后会给你,另外id要和setting.xml中保持一致

6EC7FDB0-E87F-4A66-ABCD-D7DC14EC17D1

四,上传构件到OSS&发布

1,使用下面命令,会提示要求你输入GPG的密钥,就是申请时候填写的,此过程比较慢,需要耐心等待

mvn clean deploy -P release

2,在OSS中发布构件

使用sonatype账号登陆https://oss.sonatype.org ,通过模糊查找,选择Staging Repositories,如下图,该构件的状态此时应该为open,勾选它,点击close按钮,sonatype会先做检验,通过后,就可以点击Release了,如果审核不通过,尝试删掉后重新上传

49767CB8-C939-456A-B314-1DA45F606BFB

A521FDDB-E66E-4116-B604-DF098982EA20

3,通知sonatype已经发布构建

也就是在Issure中回复,再次等待,也是需要审核1~2天,若审核通过,会回复你如下的信息

C86DFA4C-BC11-4A82-8CEB-FD3F3D9BF75C

等待10分钟,你就可以在maven库中搜索到你的jar包了

3CB6AF70-C0F5-4F63-8F3C-14D22E8C3CA8

 

若下次修改了本地的代码,只需要重新执行第四步的命令即可