Sentinel流量控制介绍

1. 背景

  • 比如双11,秒杀等,系统访问量远远超出系统所能处理的并发数,需对系统进行保护。
  • 在微服务架构中,服务拆分粒度较细,会出现请求链路较长的情况,如果链路中某个服务因网络延迟或者请求超时等原因不可用,会导致当前请求阻塞,可能出现请求堆积从而导致雪崩效应。
  • 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、
  • 系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性

2. 常见的限流场景

●  在Nginx 层添加限流模块限制平均访问速度
●  通过设置数据库连接池,线程池的大小来限制总的并发数
●  通过guava 提供的Ratelimiter 限制接口的访问速度
●  TCP通信协议中的限流整形
 

3. 限流算法

3.1、 计数器算法

         计数器算法是一种比较简单的限流实现算法,在指定的周期内累加访问次数,当访问次数达到设定的阈值时,触发限流策略,当进入下一个时间周期时进行访问数的清零。

优点:实现简单
缺点:临界问题:如图所示,当在8-10秒和10-12秒内分别并发500,虽然没有超过阈值,但如果算8-12秒,则并发数高达1000,已经超过了原先定义的10秒内不超过500的并发量
 

3.2、 滑动窗口算法

为了解决计数器算法带来的临界问题,所以引入了滑动窗口算法。
简单来说,滑动窗口算法原理是在固定窗口汇总分割出多个小时间窗口,分别在每个小时间窗口中记录访问次数,然后根据时间将窗口往前滑动并删除过期的小时间窗口,最终只需要统计滑动窗口时间范围内的所有小时窗口的总的计数即可。
如图所示,最终只需要统计每个小时间窗口不超过阈值/n && 在滑动窗口范围内的所有的小时间窗口总的计数不超过阈值即可

优点:实现相对简单,且没有计数器算法的临界问题
缺点:无法应对短时间高并发(突刺现象)
 

3.3、 令牌桶限流算法

令牌桶是网络流量整形和速率限制中最常使用的一种算法,对于每一个请求,都需要从令牌桶中获得一个令牌,如果没有获得令牌,则需要触发限流策略。

基本过程:

1)每进来一个请求,都在桶里获取一个令牌
2)如果有令牌,则拿着令牌通过
3)如果没有令牌,则不允许请求通过

几种情况:
请求速度 > 令牌生成速度:当令牌被取空后会被限流
请求速度 = 令牌生成速度:流量处于平稳状态
请求速度 < 令牌生成速度:说明此时并发数不高,请求能被正常处理
优点:可以像漏桶那样匀速,也可以像计数器那样处理突发流量
 

3.4、  漏桶限流算法

漏桶限流算法的主要作用是控制数据注入网络的速度,平滑网络上的突发流量
漏桶算法的原理:在漏桶内部同样维护一个容器,这个容器会以恒定的速度出水,不管上面的水流速度多快,漏桶水滴的流出速度始终保持不变,实际上消息中间件就使用了漏桶限流的思想。
 

漏桶算法中,有如下几种可能的情况:
●  请求速度大于漏桶流出速度,也就是请求数超出当前服务所能处理的极限,将会触发限流策略
●  请求速度小于或者等于漏桶流出的速度,也就是服务处理能力整合满足客户端的请求量,将正常执行。
不足:无法应对突发的并发流量,因为流出速率一直都是恒定的
优点:平滑系统流量
 

Sentinel介绍

       Sentinel是阿里开源的项目,是面向分布式架构的轻量级流量控制组件,主要以流量为切入点,从限流,流量整形,服务降级,系统负载保护等多个维度来帮助我们保障微服务的稳定性
Sentinel 分2 部分
核心库:不依赖任何框架/库,能够运行于所有的java 环境,对Dubbo ,Spring Cloud 等框架有较好的支持。
控制台:基于Spring boot 开发,打包后可以直接运行,不需要额外的tomcat 等应用部署。
 

5、Sentinel 流量控制

5.1、整体步骤

  • 定义资源
  • 定义限流规则
  • 检验规则是否生效
 

5.2、资源定义方式

方式1:抛出异常的方式定义资源
SphU 包含了 try-catch 风格的 API。用这种方式,当资源发生了限流之后会抛出 BlockException。这个时候可以捕捉异常,进行限流之后的逻辑处理。示例代码如下:
// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
Entry entry = null;
try {
    // 资源名可使用任意有业务语义的字符串
    entry = SphU.entry("自定义资源名");
    // 被保护的业务逻辑
    // do something...
} catch (BlockException e1) {
    // 资源访问阻止,被限流或被降级
    // 进行相应的处理操作
} finally {
    if (entry != null) {
        entry.exit();
    }
}

注意: 
1、SphU.entry(xxx) 需要与 entry.exit() 方法成对出现,匹配调用,否则会导致调用链记录异常,抛出 ErrorEntryFreeException 异常。
2、务必保证finally会被执行

方式2:注解方式定义资源
Sentinel 支持通过 @SentinelResource 注解定义资源并配置 blockHandler 和 fallback 函数来进行限流之后的处理。示例:

@SentinelResource(blockHandler = "blockHandlerForGetUser")
public User getUserById(String id) {
        throw new RuntimeException("getUserById command failed");
}
// blockHandler 函数,原方法调用被限流/降级/系统保护的时候调用
public User blockHandlerForGetUser(String id, BlockException ex) {
        return new User("admin");
}

注意
1、blockHandler 函数会在原方法被限流/降级/系统保护的时候调用,而 fallback 函数会针对所有类型的异常。请注意 blockHandler 和 fallback 函数的形式要求
2、需注意:blockHandler 所配置的值会在触发限流之后调用,这个方法定义必须和原始方法的返回值,参数保持一致,而且需要增加BlockException 参数。

5.3、限流具体实现

5.3.1. 引入 Sentinel 依赖

添加依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.4</version>
</dependency>

5.3.2. 定义资源

资源 是 Sentinel 中的核心概念之一。最常用的资源是我们代码中的 Java 方法,也可以更灵活的定义你的资源,例如,把需要控制流量的代码用 Sentinel API SphU.entry(“HelloWorld”) 和 entry.exit() 包围起来即可。

    在下面的例子中,我们将 System.out.println(“hello world”); 作为资源(被保护的逻辑),用 API 包装起来。参考代码如下:

public static void main(String[] args) {
    // 配置规则.
    initFlowRules();
    while (true) {
        try (Entry entry = SphU.entry("HelloWorld")) {
        // 被保护的逻辑
            System.out.println("hello world");
        } catch (BlockException ex) {
            // 处理被流控的逻辑
            System.out.println("blocked!");
        }
    }
}

完成以上两步后,代码端的改造就完成了。也可以通过我们提供的 注解,来定义我们的资源,类似于下面的代码:

@SentinelResource("HelloWorld")
public void helloWorld() {
    // 资源中的逻辑
    System.out.println("hello world");
}

这样,helloWorld() 方法就成了我们的一个资源。

5.3.3. 定义规则

通过流控规则来指定允许该资源通过的请求次数,例如下面的代码定义了资源 HelloWorld 每秒最多只能通过 20 个请求。

private static void initFlowRules(){
    List<FlowRule> rules = new ArrayList<>();
    FlowRule rule = new FlowRule();
    rule.setResource("HelloWorld");
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule.setCount(20);
    rules.add(rule);
    FlowRuleManager.loadRules(rules);
}

完成上面 3 步,Sentinel 就能够正常工作了。

5.3.4. 检查效果

Demo 运行之后,我们可以在日志 ~/logs/csp/${appName}-metrics.log.xxx 里看到下面的输出:

其中 p 代表通过的请求, block 代表被阻止的请求, s 代表成功执行完成的请求个数, e 代表用户自定义的异常, rt 代表平均响应时长。

可以看到,这个程序每秒稳定输出 “hello world” 20 次,和规则中预先设定的阈值是一样的。

6、Sentinel 熔断

6.1、背景

1、现代微服务架构都是分布式的,由非常多的服务组成,除了流量控制以外,对调用链路中不稳定的资源进行熔断降级也是保障高可用的重要措施之一。
2、分布式系统中,不同服务之间相互调用,组成复杂的调用链路,复杂链路上的某一环不稳定,可能会层层级联导致整体的雪崩 。

6.2、熔断策略

Sentinel 提供以下几种熔断策略:

  • 慢调用比例 (SLOW_REQUEST_RATIO):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。
  • 异常比例 (ERROR_RATIO):当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是 [0.0, 1.0],代表 0% – 100%。
  • 异常数 (ERROR_COUNT):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。

注意
 1、@SentinelResource 注解会自动统计业务异常,无需手动调用。
2、异常降级仅针对业务异常,对 Sentinel 限流降级本身的异常(BlockException)不生效。为了统计异常比例或异常数,需要通过 Tracer.trace(ex) 记录业务异常。示例:

Entry entry = null;
try {
    entry = SphU.entry(resource);

} catch (Throwable t) {
    if (!BlockException.isBlockException(t)) {
        Tracer.trace(t);
    }
} finally {
    if (entry != null) {
        entry.exit();
    }
}

熔断降级规则

熔断降级规则(DegradeRule)包含下面几个重要的属性:

Field说明默认值
resource资源名,即规则的作用对象
grade熔断策略,支持慢调用比例/异常比例/异常数策略慢调用比例
count慢调用比例模式下为慢调用临界 RT(超出该值计为慢调用);异常比例/异常数模式下为对应的阈值
timeWindow熔断时长,单位为 s
minRequestAmount熔断触发的最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入)5
statIntervalMs统计时长(单位为 ms),如 60*1000 代表分钟级(1.8.0 引入)1000 ms
slowRatioThreshold慢调用比例阈值,仅慢调用比例模式有效(1.8.0 引入)

责任链模式实践

1、前言

       当今软件开发中,设计模式是一种重要的方法论,它们帮助开发人员解决了各种常见的问题,并提高了代码的可维护性和可扩展性。本文将重点介绍责任链模式(Chain of Responsibility Pattern),并使用Java语言来解释它的概念、用途以及如何实现。

2、责任链模式简介

       责任链模式是一种行为型设计模式,它允许你将请求沿着处理链传递,直到有一个对象处理它为止。这种模式非常适合处理请求的分发和处理,特别是当您不知道哪个对象能够处理请求时。

       在责任链模式中,通常有多个处理对象(处理器),每个处理对象都有一个处理请求的方法。当一个请求进入责任链时,它从链的起始点开始传递,每个处理对象检查是否能够处理该请求。如果可以处理,则该请求被处理,否则它会被传递给链中的下一个处理对象,直到找到一个合适的处理器。

3、责任链模式的优点

责任链模式有以下几个优点:

  1. 解耦性: 责任链模式将请求发送者和接收者解耦,发送者无需知道请求最终由哪个接收者处理。
  2. 可扩展性: 可以轻松地添加新的处理对象或修改处理顺序,而不会影响其他部分的代码。
  3. 灵活性: 可以动态配置责任链,根据需要添加、删除或修改处理对象。
  4. 单一职责原则: 每个处理对象只需关心自己是否能够处理请求,符合单一职责原则。

4、责任链模式的实现

让我们使用Java语言来实现一个简单的责任链模式示例。假设我们有一个报销审批系统,根据报销金额不同,有三个处理对象:经理、财务部门和CEO。让我们一步一步实现这个示例。

首先,我们创建一个请求类表示报销请求:

public class ReimbursementRequest {
    private double amount;

    public ReimbursementRequest(double amount) {
        this.amount = amount;
    }

    public double getAmount() {
        return amount;
    }
}

接下来,我们创建一个处理接口,表示处理请求的方法:

public interface ReimbursementHandler {
    void handleRequest(ReimbursementRequest request);
    void setNextHandler(ReimbursementHandler nextHandler);
}

然后,我们创建三个具体的处理对象:经理、财务部门和CEO,它们实现了ReimbursementHandler接口:

public class Manager implements ReimbursementHandler {
    private ReimbursementHandler nextHandler;

    @Override
    public void handleRequest(ReimbursementRequest request) {
        if (request.getAmount() <= 1000) {
            System.out.println("经理审批通过,报销金额:" + request.getAmount());
        } else {
            if (nextHandler != null) {
                nextHandler.handleRequest(request);
            } else {
                System.out.println("无法处理该报销申请");
            }
        }
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        this.nextHandler = nextHandler;
    }
}

public class FinanceDepartment implements ReimbursementHandler {
    private ReimbursementHandler nextHandler;

    @Override
    public void handleRequest(ReimbursementRequest request) {
        if (request.getAmount() <= 5000) {
            System.out.println("财务部门审批通过,报销金额:" + request.getAmount());
        } else {
            if (nextHandler != null) {
                nextHandler.handleRequest(request);
            } else {
                System.out.println("无法处理该报销申请");
            }
        }
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        this.nextHandler = nextHandler;
    }
}

public class CEO implements ReimbursementHandler {
    @Override
    public void handleRequest(ReimbursementRequest request) {
        System.out.println("CEO审批通过,报销金额:" + request.getAmount());
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        // CEO是责任链中的最后一个处理对象,不需要设置下一个处理对象
    }
}

最后,我们可以创建一个责任链并测试它:

public class Main {
    public static void main(String[] args) {
        ReimbursementHandler manager = new Manager();
        ReimbursementHandler finance = new FinanceDepartment();
        ReimbursementHandler ceo = new CEO();

        // 构建责任链
        manager.setNextHandler(finance);
        finance.setNextHandler(ceo);

        // 创建报销请求
        ReimbursementRequest request1 = new ReimbursementRequest(800);
        ReimbursementRequest request2 = new ReimbursementRequest(3500);
        ReimbursementRequest request3 = new ReimbursementRequest(10000);

        // 发送请求
        manager.handleRequest(request1);
        manager.handleRequest(request2);
        manager.handleRequest(request3);
    }
}

运行以上代码,你会看到不同金额的报销请求会被不同的处理对象处理。

5、责任链模式在检查系统中的运用

检查系统具有以下检查-申诉-整改-奖惩链路,在该链路中我们可以使用责任链模式,使代码变得更容易维护和扩展。

首先我们创建一个包含各个链路中需要用到的请求类对象。

/**
 * @author: guojiangtao
 * @description:
 * @param: 
 **/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CheckRewardPunishmentRequest implements Serializable {

    private static final long serialVersionUID = 8676996408255323152L;

    /**
     * 检查结果ID
     **/
    private Long checkListResultId;

    /**
     * 检查基本事实
     **/
    CheckBaseBO checkBaseBO;
    /**
     * 检查整改事实
     **/
    CheckRectifyBO checkRectifyBO;
    /**
     * 检查申诉事实
     **/
    CheckAppealBO checkAppealBO;

}

我们创建一个处理接口,表示处理请求的方法。

/**
 * @author: guojiangtao
 * @description: 处理请求
 * @param:
 **/
public interface CheckRewardPunishmentPhaseActionInterface {

    CheckRewardPunishmentPhaseResult proceed(PhaseChain chain);

    /**
     * @author: guojiangtao
     * @description: 获取request以及转发请求
     * @param: 
     **/
    interface PhaseChain {
        // 获取当前Request
        CheckRewardPunishmentRequest request();
        // 转发Request
        CheckRewardPunishmentPhaseResult dispatch(CheckRewardPunishmentRequest request);
    }

}

创建一个客户端类来维护处理的实现类。

/**
 * @author guojiangtao
 */
@Data
public class PhaseChainClient {

    public static final List<CheckRewardPunishmentPhaseActionInterface> phaseActionInterfaceList = Lists.newArrayList();
    private static void addPhaseActionInterface(CheckRewardPunishmentPhaseActionInterface phaseActionInterface) {
        phaseActionInterfaceList.add(phaseActionInterface);
    }

    public static PhaseChainClient getChainClient(CheckRewardPunishmentPhaseActionInterface ... interfaces) {

        List<CheckRewardPunishmentPhaseActionInterface> interfaceList = Lists.newArrayList(interfaces);

        if (CollectionUtil.isNullOrEmpty(interfaceList)) {
            throw new BizException(BizErrorCodeEnum.ILLEGAL_OPERATION, "getChainClient exception");
        }

        interfaceList.forEach(PhaseChainClient::addPhaseActionInterface);

        return new PhaseChainClient();
    }

    public CheckRewardPunishmentPhaseResult execute (CheckRewardPunishmentRequest request) {

        RealPhaseChain realChain = new RealPhaseChain(request, phaseActionInterfaceList, 0);

        return realChain.dispatch(request);
    }

}

检查提交实现类。

/**
 * @author: guojiangtao
 * @description: 检查提交
 * @param:
 **/
@Service
@Slf4j
public class CheckSubmit implements CheckRewardPunishmentPhaseActionInterface {

    @Autowired
    private CheckAppeal checkAppeal;
    @Autowired
    private CheckRpProducer checkRpProducer;

    @Override
    public CheckRewardPunishmentPhaseResult proceed(PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        // 参数校验
        if (null == request) {
            log.warn("CheckSubmit proceed with null request");
            return null;
        }

        CheckBaseBO checkBaseBO = request.getCheckBaseBO();

        if (null == checkBaseBO) {
            log.warn("CheckSubmit proceed with null checkBaseBO");
            return null;
        }

        // 检查单配置
        if (AppealTypeEnum.OPEN_APPEAL.getCode().equals(checkBaseBO.getAppealType())) {
            // 构建检查申诉事实实例
            CheckAppealBO appealBO = new CheckAppealBO(checkBaseBO.getCheckItemId(), checkBaseBO.getCheckItemResultId(), 1);
            request.setCheckAppealBO(appealBO);
            return PhaseChainClient.getChainClient(checkAppeal).execute(request);
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

}

检查申诉实现类:

/**
 * @author: guojiangtao
 * @description: 检查申诉
 * @param:
 **/
@Service
@Slf4j
public class CheckAppeal implements CheckRewardPunishmentPhaseActionInterface {

    @Autowired
    private CheckRpProducer checkRpProducer;
    @Autowired
    private CheckResultRectifyListRepository checkResultRectifyListRepository;
    @Autowired
    private CheckRectifyTemplateItemRelationRepository rectifyTemplateItemRelationRepository;
    @Autowired
    private CheckRectify checkRectify;

    @Override
    public CheckRewardPunishmentPhaseResult proceed (PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        // 参数校验
        if (null == request) {
            return new CheckRewardPunishmentPhaseResult(Boolean.FALSE, "");
        }

        CheckBaseBO checkBaseBO = request.getCheckBaseBO();
        CheckAppealBO appealBO = request.getCheckAppealBO();

        if (null == checkBaseBO || null == appealBO) {
            return new CheckRewardPunishmentPhaseResult(Boolean.FALSE, "");
        }

        // 根据检查项ID查询 检查项与整改任务模板 1:1
        CheckRectifyTemplateItemRelationDAO templateItemRelationDAO = rectifyTemplateItemRelationRepository.queryByItemId(checkBaseBO.getCheckItemId());

        if (null != templateItemRelationDAO) {
            // 整改单模板ID
            Long rectifyTemplateId = templateItemRelationDAO.getRectifyTemplateId();
            // 整改单
            CheckResultRectifyListDAO rectifyListDAO = checkResultRectifyListRepository.getByCheckResultIdAndTemplateId(checkBaseBO.getCheckListResultId(), rectifyTemplateId);
            if (null != rectifyListDAO && Objects.equals(RectifyResultListStatusEnum.FINISH.getCode(), rectifyListDAO.getStatus())) {
                // 构建检查整改事实实例
                CheckRectifyBO rectifyBO = new CheckRectifyBO(checkBaseBO.getCheckItemId(), rectifyListDAO.getStatus());
                request.setCheckRectifyBO(rectifyBO);
                return PhaseChainClient.getChainClient(checkRectify).execute(request);
            }
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

}

检查整改实现类:

@Service
@Slf4j
public class CheckRectify implements CheckRewardPunishmentPhaseActionInterface {
    @Autowired
    private CheckRpProducer checkRpProducer;

    @Override
    public CheckRewardPunishmentPhaseResult proceed(PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        if (null == request) {
            return null;
        }

        CheckRectifyBO rectifyBO = request.getCheckRectifyBO();

        // 无整改事实 不处理
        if (null == rectifyBO) {
            return null;
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

6、责任链模式的适用场景

责任链模式在以下情况下特别有用:

  1. 当您希望将请求的发送者和接收者解耦时,可以使用责任链模式。发送者无需知道最终哪个对象会处理请求。
  2. 当您有多个对象可以处理请求,但您不确定哪个对象应该处理时,可以使用责任链模式。这种情况下,您可以将请求传递给责任链,直到有一个对象能够处理它。
  3. 当您希望动态配置处理对象的顺序或添加新的处理对象时,可以使用责任链模式。这使得系统更加灵活和可扩展。
  4. 在需要应用单一职责原则的情况下,责任链模式可以帮助将处理逻辑分散到不同的对象中,每个对象只负责一部分功能。

7、总结

责任链模式是一种有用的设计模式,可以帮助您构建灵活的、可扩展的系统。它通过将请求传递给一系列处理对象来解耦请求发送者和接收者,使得代码更容易维护和扩展。通过示例代码和解释,我们希望您现在更好地理解了责任链模式及其在Java中的实现方式。希望这篇分享文档有助于您在实际项目中应用责任链模式以解决类似的问题。

MySQL 连接为什么挂死了?

本次分享的是一次关于 MySQL 高可用问题的定位过程,其中曲折颇多但问题本身却比较有些代表性,遂将其记录以供参考。

一、背景

近期由测试反馈的问题有点多,其中关于系统可靠性测试提出的问题令人感到头疼,一来这类问题有时候属于“偶发”现象,难以在环境上快速复现;二来则是可靠性问题的定位链条有时候变得很长,极端情况下可能要从 A 服务追踪到 Z 服务,或者是从应用代码追溯到硬件层面。

本次分享的是一次关于 MySQL 高可用问题的定位过程,其中曲折颇多但问题本身却比较有些代表性,遂将其记录以供参考。

架构

首先,本系统以 MySQL 作为主要的数据存储部件。整一个是典型的微服务架构(SpringBoot + SpringCloud),持久层则采用了如下几个组件:

  • mybatis,实现 SQL <-> Method 的映射
  • hikaricp,实现数据库连接池
  • mariadb-java-client,实现 JDBC 驱动

在 MySQL 服务端部分,后端采用了双主架构,前端以 keepalived 结合浮动IP(VIP)做一层高可用。如下:

说明

  • MySQL 部署两台实例,设定为互为主备的关系。
  • 为每台 MySQL 实例部署一个 keepalived 进程,由 keepalived 提供 VIP 高可用的故障切换。

实际上,keepalived 和 MySQL 都实现了容器化,而 VIP 端口则映射到 VM 上的 nodePort 服务端口上。

  • 业务服务一律使用 VIP 进行数据库访问。

Keepalived 是基于 VRRP 协议实现了路由层转换的,在同一时刻,VIP 只会指向其中的一个虚拟机(master)。当主节点发生故障时,其他的 keepalived 会检测到问题并重新选举出新的 master,此后 VIP 将切换到另一个可用的 MySQL 实例节点上。这样一来,MySQL 数据库就拥有了基础的高可用能力。

另外一点,Keepalived 还会对 MySQL 实例进行定时的健康检查,一旦发现 MySQL 实例不可用会将自身进程杀死,进而再触发 VIP 的切换动作。

问题现象

本次的测试用例也是基于虚拟机故障的场景来设计的:

持续以较小的压力向业务服务发起访问,随后将其中一台 MySQL 的容器实例(master)重启。
按照原有的评估,业务可能会产生很小的抖动,但其中断时间应该保持在秒级。

然而经过多次的测试后发现,在重启 MySQL 主节点容器之后,有一定的概率会出现业务却再也无法访问的情况!

二、分析过程

在发生问题之后,开发同学的第一反应是 MySQL 的高可用机制出了问题。由于此前曾经出现过由于 keepalived 配置不当导致 VIP 未能及时切换的问题,因此对其已经有所戒备。

先是经过一通的排查,然后并没有找到 keepalived 任何配置上的毛病。

然后在没有办法的情况下,重新测试了几次,问题又复现了。

紧接着,我们提出了几个疑点:

1.Keepalived 会根据 MySQL 实例的可达性进行判断,会不会是健康检查出了问题?

但在本次测试场景中,MySQL 容器销毁会导致 keepalived 的端口探测产生失败,这同样会导致 keepalived 失效。如果 keepalived 也发生了中止,那么 VIP 应该能自动发生抢占。而通过对比两台虚拟机节点的信息后,发现 VIP 的确发生了切换。

2. 业务进程所在的容器是否发生了网络不可达的问题?

尝试进入容器,对当前发生切换后的浮动IP、端口执行 telnet 测试,发现仍然能访问成功。

连接池

在排查前面两个疑点之后,我们只能将目光转向了业务服务的DB客户端上。

从日志上看,在产生故障的时刻,业务侧的确出现了一些异常,如下:

这里提示的是业务操作获取连接超时了(超过了30秒)。那么,会不会是连接数不够用呢?

Unable to acquire JDBC Connection [n/a] java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms. at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:669) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.9.jar!/:?] ...

业务接入采用的是 hikariCP 连接池,这也是市面上流行度很高的一款组件了。

我们随即检查了当前的连接池配置,如下:

//最小空闲连接数 
spring.datasource.hikari.minimum-idle=10 
//连接池最大大小 
spring.datasource.hikari.maximum-pool-size=50 
//连接最大空闲时长 
spring.datasource.hikari.idle-timeout=60000 
//连接生命时长 
spring.datasource.hikari.max-lifetime=1800000 
//获取连接的超时时长 
spring.datasource.hikari.connection-timeout=30000

其中 注意到 hikari 连接池配置了 minimum-idle = 10,也就是说,就算在没有任何业务的情况下,连接池应该保证有 10 个连接。更何况当前的业务访问量极低,不应该存在连接数不够使用的情况。

除此之外,另外一种可能性则可能是出现了“僵尸连接”,也就是说在重启的过程中,连接池一直没有释放这些不可用的连接,最终造成没有可用连接的结果。

开发同学对”僵尸链接”的说法深信不疑,倾向性的认为这很可能是来自于 HikariCP 组件的某个 BUG…

于是开始走读 HikariCP 的源码,发现应用层向连接池请求连接的一处代码如下:

public class HikariPool{

   //获取连接对象入口
   public Connection getConnection(final long hardTimeout) throws SQLException
   {
      suspendResumeLock.acquire();
      final long startTime = currentTime();

      try {
         //使用预设的30s 超时时间
         long timeout = hardTimeout;
         do {
            //进入循环,在指定时间内获取可用连接
            //从 connectionBag 中获取连接
            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {
               break; // We timed out... break and throw exception
            }

            final long now = currentTime();
            //连接对象被标记清除或不满足存活条件时,关闭该连接
            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            }
            //成功获得连接对象
            else {
               metricsTracker.recordBorrowStats(poolEntry, startTime);
               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);

         //超时了,抛出异常
         metricsTracker.recordBorrowTimeoutStats(startTime);
         throw createTimeoutException(startTime);
      }
      catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
      }
      finally {
         suspendResumeLock.release();
      }
   }
}

getConnection() 方法展示了获取连接的整个流程,其中 connectionBag 是用于存放连接对象的容器对象。如果从 connectionBag 获得的连接不再满足存活条件,那么会将其手动关闭,代码如下:

void closeConnection(final PoolEntry poolEntry, final String closureReason)
   {
      //移除连接对象
      if (connectionBag.remove(poolEntry)) {
         final Connection connection = poolEntry.close();
         //异步关闭连接
         closeConnectionExecutor.execute(() -> {
            quietlyCloseConnection(connection, closureReason);
            //由于可用连接变少,将触发填充连接池的任务
            if (poolState == POOL_NORMAL) {
               fillPool();
            }
         });
      }
   }

注意到,只有当连接满足下面条件中的其中一个时,会被执行 close。

  • isMarkedEvicted() 的返回结果是 true,即标记为清除

如果连接存活时间超出最大生存时间(maxLifeTime),或者距离上一次使用超过了idleTimeout,会被定时任务标记为清除状态,清除状态的连接在获取的时候才真正 close。

  • 500ms 内没有被使用,且连接已经不再存活,即 isConnectionAlive() 返回 false

由于我们把 idleTimeout 和 maxLifeTime 都设置得非常大,因此需重点检查 isConnectionAlive 方法中的判断,如下:

public class PoolBase{

   //判断连接是否存活
   boolean isConnectionAlive(final Connection connection)
   {
      try {
         try {
            //设置 JDBC 连接的执行超时
            setNetworkTimeout(connection, validationTimeout);

            final int validationSeconds = (int) Math.max(1000L, validationTimeout) / 1000;

            //如果没有设置 TestQuery,使用 JDBC4 的校验接口
            if (isUseJdbc4Validation) {
               return connection.isValid(validationSeconds);
            }

            //使用 TestQuery(如 select 1)语句对连接进行探测
            try (Statement statement = connection.createStatement()) {
               if (isNetworkTimeoutSupported != TRUE) {
                  setQueryTimeout(statement, validationSeconds);
               }

               statement.execute(config.getConnectionTestQuery());
            }
         }
         finally {
            setNetworkTimeout(connection, networkTimeout);

            if (isIsolateInternalQueries && !isAutoCommit) {
               connection.rollback();
            }
         }

         return true;
      }
      catch (Exception e) {
         //发生异常时,将失败信息记录到上下文
         lastConnectionFailure.set(e);
         logger.warn("{} - Failed to validate connection {} ({}). Possibly consider using a shorter maxLifetime value.",
                     poolName, connection, e.getMessage());
         return false;
      }
   }

}

我们看到,在PoolBase.isConnectionAlive 方法中对连接执行了一系列的探测,如果发生异常还会将异常信息记录到当前的线程上下文中。随后,在 HikariPool 抛出异常时会将最后一次检测失败的异常也一同收集,如下:

private SQLException createTimeoutException(long startTime)
{
   logPoolState("Timeout failure ");
   metricsTracker.recordConnectionTimeout();

   String sqlState = null;
   //获取最后一次连接失败的异常
   final Throwable originalException = getLastConnectionFailure();
   if (originalException instanceof SQLException) {
      sqlState = ((SQLException) originalException).getSQLState();
   }
   //抛出异常
   final SQLException connectionException = new SQLTransientConnectionException(poolName + " - Connection is not available, request timed out after " + elapsedMillis(startTime) + "ms.", sqlState, originalException);
   if (originalException instanceof SQLException) {
      connectionException.setNextException((SQLException) originalException);
   }

   return connectionException;
}

这里的异常消息和我们在业务服务中看到的异常日志基本上是吻合的,即除了超时产生的 “Connection is not available, request timed out after xxxms” 消息之外,日志中还伴随输出了校验失败的信息:

Caused by: java.sql.SQLException: Connection.setNetworkTimeout cannot be called on a closed connection at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.getSqlException(ExceptionMapper.java:211) ~[mariadb-java-client-2.2.6.jar!/:?] at org.mariadb.jdbc.MariaDbConnection.setNetworkTimeout(MariaDbConnection.java:1632) ~[mariadb-java-client-2.2.6.jar!/:?] at com.zaxxer.hikari.pool.PoolBase.setNetworkTimeout(PoolBase.java:541) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.PoolBase.isConnectionAlive(PoolBase.java:162) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:172) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.9.jar!/:?]

到这里,我们已经将应用获得连接的代码大致梳理了一遍,整个过程如下图所示:

从执行逻辑上看,连接池的处理并没有问题,相反其在许多细节上都考虑到位了。在对非存活连接执行 close 时,同样调用了 removeFromBag 动作将其从连接池中移除,因此也不应该存在僵尸连接对象的问题。

那么,我们之前的推测应该就是错误的!

陷入焦灼

在代码分析之余,开发同学也注意到当前使用的 hikariCP 版本为 3.4.5,而环境上出问题的业务服务却是 2.7.9 版本,这仿佛预示着什么… 让我们再次假设 hikariCP 2.7.9 版本存在某种未知的 BUG,导致了问题的产生。

为了进一步分析连接池对于服务端故障的行为处理,我们尝试在本地机器上进行模拟,这一次使用了 hikariCP 2.7.9 版本进行测试,并同时将 hikariCP 的日志级别设置为 DEBUG。

模拟场景中,会由 由本地应用程序连接本机的 MySQL 数据库进行操作,步骤如下:

1. 初始化数据源,此时连接池 min-idle 设置为 10;
2. 每隔50ms 执行一次SQL操作,查询当前的元数据表;
3. 将 MySQL 服务停止一段时间,观察业务表现;
4. 将 MySQL 服务重新启动,观察业务表现。

最终产生的日志如下:

//初始化过程,建立10个连接 
DEBUG -HikariPool.logPoolState - Pool stats (total=1, active=1, idle=0, waiting=0) DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@71ab7c09 DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@7f6c9c4c DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@7b531779 ... DEBUG -HikariPool.logPoolState- After adding stats (total=10, active=1, idle=9, waiting=0) 
//执行业务操作,成功 execute statement: true test time -------1 execute statement: true test time -------2 ... 
//停止MySQL ... 
//检测到无效连接 WARN -PoolBase.isConnectionAlive - Failed to validate connection MariaDbConnection@9225652 ((conn=38652) Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value. WARN -PoolBase.isConnectionAlive - Failed to validate connection MariaDbConnection@71ab7c09 ((conn=38653) Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value. 
//释放连接 DEBUG -PoolBase.quietlyCloseConnection(PoolBase.java:134) - Closing connection MariaDbConnection@9225652: (connection is dead) DEBUG -PoolBase.quietlyCloseConnection(PoolBase.java:134) - Closing connection MariaDbConnection@71ab7c09: (connection is dead) 
//尝试创建连接失败 DEBUG -HikariPool.createPoolEntry - Cannot acquire connection from data source java.sql.SQLNonTransientConnectionException: Could not connect to address=(host=localhost)(port=3306)(type=master) : Socket fail to connect to host:localhost, port:3306. Connection refused: connect Caused by: java.sql.SQLNonTransientConnectionException: Socket fail to connect to host:localhost, port:3306. Connection refused: connect at internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:73) ~[mariadb-java-client-2.6.0.jar:?] ... 
//持续失败.. 直到MySQL重启 //重启后,自动创建连接成功 DEBUG -HikariPool$PoolEntryCreator.call -Added connection MariaDbConnection@42c5503e DEBUG -HikariPool$PoolEntryCreator.call -Added connection MariaDbConnection@695a7435 
//连接池状态,重新建立10个连接 DEBUG -HikariPool.logPoolState(HikariPool.java:421) -After adding stats (total=10, active=1, idle=9, waiting=0) 
//执行业务操作,成功(已经自愈) execute statement: true

从日志上看,hikariCP 还是能成功检测到坏死的连接并将其踢出连接池,一旦 MySQL 重新启动,业务操作又能自动恢复成功了。根据这个结果,基于 hikariCP 版本问题的设想也再次落空,研发同学再次陷入焦灼。

拨开云雾见光明

多方面求证无果之后,我们最终尝试在业务服务所在的容器内进行抓包,看是否能发现一些蛛丝马迹。

进入故障容器,执行 tcpdump -i eth0 tcp port 30052 进行抓包,然后对业务接口发起访问。

此时令人诡异的事情发生了,没有任何网络包产生!而业务日志在 30s 之后也出现了获取连接失败的异常。

我们通过 netstat 命令检查网络连接,发现只有一个 ESTABLISHED 状态的 TCP 连接。

也就是说,当前业务实例和 MySQL 服务端是存在一个建好的连接的,但为什么业务还是报出可用连接呢?

推测可能原因有二:

  • 该连接被某个业务(如定时器)一直占用。
  • 该连接实际上还没有办法使用,可能处于某种僵死的状态。

对于原因一,很快就可以被推翻,一来当前服务并没有什么定时器任务,二来就算该连接被占用,按照连接池的原理,只要没有达到上限,新的业务请求应该会促使连接池进行新连接的建立,那么无论是从 netstat 命令检查还是 tcpdump 的结果来看,不应该一直是只有一个连接的状况。

那么,情况二的可能性就很大了。带着这个思路,继续分析 Java 进程的线程栈。

执行 kill -3 pid 将线程栈输出后分析,果不其然,在当前 thread stack 中发现了如下的条目:

"HikariPool-1 connection adder" #121 daemon prio=5 os_prio=0 tid=0x00007f1300021800 nid=0xad runnable [0x00007f12d82e5000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.FilterInputStream.read(FilterInputStream.java:133) at org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream.fillBuffer(ReadAheadBufferedStream.java:129) at org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream.read(ReadAheadBufferedStream.java:102) - locked <0x00000000d7f5b480> (a org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream) at org.mariadb.jdbc.internal.io.input.StandardPacketInputStream.getPacketArray(StandardPacketInputStream.java:241) at org.mariadb.jdbc.internal.io.input.StandardPacketInputStream.getPacket(StandardPacketInputStream.java:212) at org.mariadb.jdbc.internal.com.read.ReadInitialHandShakePacket.<init>(ReadInitialHandShakePacket.java:90) at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.createConnection(AbstractConnectProtocol.java:480) at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1236) at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:610) at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:142) at org.mariadb.jdbc.Driver.connect(Driver.java:86) at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:138) at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:358) at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206) at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:477)

这里显示 HikariPool-1 connection adder 这个线程一直处于 socketRead 的可执行状态。从命名上看该线程应该是 HikariCP 连接池用于建立连接的任务线程,socket 读操作则来自于 MariaDbConnection.newConnection() 这个方法,即 mariadb-java-client 驱动层建立 MySQL 连接的一个操作,其中 ReadInitialHandShakePacket 初始化则属于 MySQL 建链协议中的一个环节。

简而言之,上面的线程刚好处于建链的一个过程态,关于 mariadb 驱动和 MySQL 建链的过程大致如下:

MySQL 建链首先是建立 TCP 连接(三次握手),客户端会读取 MySQL 协议的一个初始化握手消息包,内部包含 MySQL 版本号,鉴权算法等等信息,之后再进入身份鉴权的环节。

这里的问题就在于 ReadInitialHandShakePacket 初始化(读取握手消息包)一直处于 socket read 的一个状态。

如果此时 MySQL 远端主机故障了,那么该操作就会一直卡住。而此时的连接虽然已经建立(处于 ESTABLISHED 状态),但却一直没能完成协议握手和后面的身份鉴权流程,即该连接只能算一个半成品(无法进入 hikariCP 连接池的列表中)。从故障服务的 DEBUG 日志也可以看到,连接池持续是没有可用连接的,如下:DEBUG HikariPool.logPoolState –> Before cleanup stats (total=0, active=0, idle=0, waiting=3)

另一个需要解释的问题则是,这样一个 socket read 操作的阻塞是否就造成了整个连接池的阻塞呢?

经过代码走读,我们再次梳理了 hikariCP 建立连接的一个流程,其中涉及到几个模块:

  • HikariPool,连接池实例,由该对象连接的获取、释放以及连接的维护。
  • ConnectionBag,连接对象容器,存放当前的连接对象列表,用于提供可用连接。
  • AddConnectionExecutor,添加连接的执行器,命名如 “HikariPool-1 connection adder”,是一个单线程的线程池。
  • PoolEntryCreator,添加连接的任务,实现创建连接的具体逻辑。
  • HouseKeeper,内部定时器,用于实现连接的超时淘汰、连接池的补充等工作。

HouseKeeper 在连接池初始化后的 100ms 触发执行,其调用 fillPool() 方法完成连接池的填充,例如 min-idle 是10,那么初始化就会创建10个连接。ConnectionBag 维护了当前连接对象的列表,该模块还维护了请求连接者(waiters)的一个计数器,用于评估当前连接数的需求。

其中,borrow 方法的逻辑如下:

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
   {
      // 尝试从 thread-local 中获取
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
         ...
      }

      // 计算当前等待请求的任务
      final int waiting = waiters.incrementAndGet();
      try {
         for (T bagEntry : sharedList) {
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               //如果获得了可用连接,会触发填充任务
               if (waiting > 1) {
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }

         //没有可用连接,先触发填充任务
         listener.addBagItem(waiting);

         //在指定时间内等待可用连接进入
         timeout = timeUnit.toNanos(timeout);
         do {
            final long start = currentTime();
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= elapsedNanos(start);
         } while (timeout > 10_000);

         return null;
      }
      finally {
         waiters.decrementAndGet();
      }
   }

注意到,无论是有没有可用连接,该方法都会触发一个 listener.addBagItem() 方法,HikariPool 对该接口的实现如下:

public void addBagItem(final int waiting)
   {
      final boolean shouldAdd = waiting - addConnectionQueueReadOnlyView.size() >= 0; // Yes, >= is intentional.
      if (shouldAdd) {
         //调用 AddConnectionExecutor 提交创建连接的任务
         addConnectionExecutor.submit(poolEntryCreator);
      }
      else {
         logger.debug("{} - Add connection elided, waiting {}, queue {}", poolName, waiting, addConnectionQueueReadOnlyView.size());
      }
   }
PoolEntryCreator 则实现了创建连接的具体逻辑,如下:
public class PoolEntryCreator{
     @Override
      public Boolean call()
      {
         long sleepBackoff = 250L;
         //判断是否需要建立连接
         while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {
            //创建 MySQL 连接
            final PoolEntry poolEntry = createPoolEntry();
 
            if (poolEntry != null) {
               //建立连接成功,直接返回。
               connectionBag.add(poolEntry);
               logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
               if (loggingPrefix != null) {
                  logPoolState(loggingPrefix);
               }
               return Boolean.TRUE;
            }
            ...
         }

         // Pool is suspended or shutdown or at max size
         return Boolean.FALSE;
      }
}

由此可见,AddConnectionExecutor 采用了单线程的设计,当产生新连接需求时,会异步触发 PoolEntryCreator 任务进行补充。其中 PoolEntryCreator. createPoolEntry() 会完成 MySQL 驱动连接建立的所有事情,而我们的情况则恰恰是 MySQL 建链过程产生了永久性阻塞。因此无论后面怎么获取连接,新来的建链任务都会一直排队等待,这便导致了业务上一直没有连接可用。

下面这个图说明了 hikariCP 的建链过程:

好了,让我们在回顾一下前面关于可靠性测试的场景:

首先,MySQL 主实例发生故障,而紧接着 hikariCP 则检测到了坏的连接(connection is dead)并将其释放,在释放关闭连接的同时又发现连接数需要补充,进而立即触发了新的建链请求。
而问题就刚好出在这一次建链请求上,TCP 握手的部分是成功了(客户端和 MySQL VM 上 nodePort 完成连接),但在接下来由于当前的 MySQL 容器已经停止(此时 VIP 也切换到了另一台 MySQL 实例上),因此客户端再也无法获得原 MySQL 实例的握手包响应(该握手属于MySQL应用层的协议),此时便陷入了长时间的阻塞式 socketRead 操作。而建链请求任务恰恰好采用了单线程运作,进一步则导致了所有业务的阻塞。

三、解决方案

在了解了事情的来龙去脉之后,我们主要考虑从两方面进行优化:

  • 优化一,增加 HirakiPool 中 AddConnectionExecutor 线程的数量,这样即使第一个线程出现挂死,还有其他的线程能参与建链任务的分配。
  • 优化二,出问题的 socketRead 是一种同步阻塞式的调用,可通过 SO_TIMEOUT 来避免长时间挂死。

对于优化点一,我们一致认为用处并不大,如果连接出现了挂死那么相当于线程资源已经泄露,对服务后续的稳定运行十分不利,而且 hikariCP 在这里也已经将其写死了。因此关键的方案还是避免阻塞式的调用。

查阅了 mariadb-java-client 官方文档后,发现可以在 JDBC URL 中指定网络IO 的超时参数,如下:

具体参考:https://mariadb.com/kb/en/about-mariadb-connector-j/

如描述所说的,socketTimeout 可以设置 socket 的 SO_TIMEOUT 属性,从而达到控制超时时间的目的。默认是 0,即不超时。

我们在 MySQL JDBC URL 中加入了相关的参数,如下:

spring.datasource.url=jdbc:mysql://10.0.71.13:33052/appdb?socketTimeout=60000&connectTimeout=30000&serverTimezone=UTC

此后对 MySQL 可靠性场景进行多次验证,发现连接挂死的现象已经不再出现,此时问题得到解决。

四、小结

本次分享了一次关于 MySQL 连接挂死问题排查的心路历程,由于环境搭建的工作量巨大,而且该问题复现存在偶然性,整个分析过程还是有些坎坷的(其中也踩了坑)。的确,我们很容易被一些表面的现象所迷惑,而觉得问题很难解决时,更容易带着偏向性思维去处理问题。例如本例中曾一致认为连接池出现了问题,但实际上却是由于 MySQL JDBC 驱动(mariadb driver)的一个不严谨的配置所导致。

从原则上讲,应该避免一切可能导致资源挂死的行为。如果我们能在前期对代码及相关配置做好充分的排查工作,相信 996 就会离我们越来越远。

Oauth2介绍

oauth2是什么?

oauth2.0是一个标准的授权协议,它可以为第三方应用,如:web程序、桌面/移动客户端等提供授权,获取用户的数据,它的标准是RFC 6749 文件,该文件解释了oauth的定义:

OAuth 引入了一个授权层,用来分离两种不同的角色:客户端和资源所有者。资源所有者同意以后,资源服务器可以向客户端颁发令牌。客户端通过令牌,去请求数据

其核心可以理解为:服务端向客户端颁发一个令牌,客户端通过令牌来访问服务端的数据。

为什么要使用oauth2.0

举个通俗的例子:古代的皇宫是皇上居住的地方,普通人是不允许进出的,某一天来了一个人对着守卫说:我是皇上七舅老爷的外孙女,快让我进去。

正常情况下守卫肯定不让他进去,谁知道这个人是不是骗子,守卫一般会找人禀明皇上,皇上跑过来一看,果真是他七舅老爷的外孙女,此时守卫才会放他进去

但隔了几天守卫换了,这个人出去了一趟又要进宫,或者他要进皇宫其它的地方,是不是又得走一遍流程,这样搞了几次后,皇上也不耐烦了,每次我总这样跑来跑去,效率也太低了,要是明天八舅老爷的外孙女来了不得累死,好了,我给你们发一张令牌:“皇室亲属进去牌”,并备注:这个人的信息,及可以进出的门。

这样门卫看到这样令牌后,直接根据上面的备注信息确认是否让他进去就可以了

oauth2.0的四种授权方式

  • 授权码(Authorization Code)
  • 隐藏式(implicit)
  • 密码式(password)
  • 客户端凭证(client credentials)

授权码方式

先思考一个问题:比如我们要访问B网站,B网站依赖于A网站给的用户数据,该如何做?
授权码的方式,是先在第三方应用申请一个授权码,然后用该码获取令牌。
比如下面链接:

https://b.com/oauth/authorize?
  response_type=code&amp;
  client_id=app37482284&amp;
  redirect_uri=https%3A%2F%2Fa.com%2Fcallback&amp;
  scope=read&amp;state=xcoiv98y2kd22vusuye3kch

首先用户访问B网站,获取一个授权码,client_id:是让B知道是谁在访问,redirect_uri:B网站接受或拒绝后跳转的网址
scope:标识要求授权的范围,这里是只读
state:随机生成的一个字符串,表示客户端的当前状态,服务端授权后会原封不动的返回这个值,简单的来说:这个参数主要保证请求的授权的客户端和后面使用授权服务的客户端是一个,防止CSRF攻击,可参考:《OAuth2.0忽略state参数引发的CSRF漏洞》(https://blog.csdn.net/gjb724332682/article/details/54428808)

用户点击这个链接后,会跳转到一个授权页面,类型微信的授权页面,如下:

然后用户点击授权,B网站会生成一个授权码,然后将需要的用户信息通过:
授权码 -> 临时令牌 的关系保存下来,然后将授权码返给A网站,跳转之后的地址为:

https://a.com/callback?code=8x4d3N674g2&amp;state=xcoiv98y2kd22vusuye3kch

然后A网站通过授权码在后端调用B的接口就可以获取到令牌了,
注意:这里获取令牌是在后端进行,A调用B服务器接口还需要传入一个密钥。
这样就避免了攻击者拿到授权码后在其它服务器上窃取令牌。

https://b.com/oauth/token?
 client_id=app37482284&amp;
 client_secret=X9gK6uB20sD1myc8cRe&amp;
 grant_type=authorization_code&amp;
 code=AUTHORIZATION_CODE&amp;
 redirect_uri=CALLBACK_URL

然后B网站根据授权码拿到令牌(token),返回给A的回调地址,A收到token后,就可以通过token获取到用户信息。

HTTP/1.1 200 OK
Content-Type: application/json
Cache-Control: no-store
Pragma: no-cache

{
  "access_token":"MTQ0NjJkZmQ5OTM2NDE1ZTZjNGZmZjI3",
  "token_type":"bearer",
  "expires_in":3600,
  "refresh_token":"IwOGYzYTlmM2YxOTQ5MGE3YmNmMDFkNTVk",
  "scope":"create delete"
}

回顾上面的调用流程,主要分为两个步骤:

  • 用户在客户端确认授权,获取授权码
  • 根据授权码获取令牌

为什么要先获取一个授权码,然后再根据授权码获取令牌呢?
因为获取授权码往往是在客户端上发起,如果第一步就获取到了令牌,那么攻击者很容易就窃取到令牌,所以获取令牌往往是放到服务端来做,这样的安全性更高。

隐藏式

隐藏式实质是授权码方式的一种缩略版,即直接是第一步就通过客户端访问获取到令牌,以上也说过这种方式的安全性很低,极可能存在中间人攻击的风险。
第一步,同样是客户端授权

https://b.com/oauth/authorize?
  response_type=token&amp;
  client_id=app37482284&amp;
  redirect_uri=https%3A%2F%2Fa.com%2Fcallback&amp;
  scope=read&amp;state=xcoiv98y2kd22vusuye3kch

第二步,直接回调到A网站地址,并携带令牌

https://a.com/callback#token=ACCESS_TOKEN

因为这种方式安全很低,已经不推荐使用,之所以出现这种方式,主要是因为最原始的应用不支持浏览器访问外部主机链接,所以采取了这种妥协的方式,后面由于Cross-Origin Resource Sharing (CORS)技术的出现,解决了这个问题,目前对于web服务推荐使用授权码方式,具体可参考:
《Why you should stop using the OAuth implicit grant!》(https://medium.com/oauth-2/why-you-should-stop-using-the-oauth-implicit-grant-2436ced1c926)

《Is the OAuth 2.0 Implicit Flow Dead?》(https://developer.okta.com/blog/2019/05/01/is-the-oauth-implicit-flow-dead)

密码式

密码式是单点登录用到比较多的一种场景。即客户端直接通过用户名和密码请求获取令牌。

先说说单点登录,比如我们登录了taobao.com网站,准备买一些东西,可看了半天没看到合适的,准备到tmall.com看看,淘宝和天猫都是阿里系的,既然都是一家的,完全可以复用taobao网站的登录状态。授权给tmall使用。
所以这就可以用到oauth2的密码方式,用户登录了一次,服务端把生成一个令牌,保存用户的信息,taobao和tmall都可以共享。

POST /oauth/token HTTP/1.1
Host: authorization-server.com
Content-type: application/x-www-form-urlencoded

https://taobao/token?grant_type=password
&amp;username=guestuser1
&amp;password=123654ww
&amp;client_id=xxxxxxxxxx

返回令牌信息

{
  "access_token": "ece8ec32-5855-48c5-838b-0d7f20c64d1c",
  "token_type": "bearer",
  "expires_in": 3600,
  "scope": "create"
}

grant_type:授权方式,password表示:密码式
username:用户名
password:密码

在获取令牌的时候,服务端会将令牌->用户信息保存起来,后面客户端直接通过令牌就可以获取到用户信息。

客户端凭证

凭证式实质也是授权码的一种缩略版,可以理解为授权码方式的第二步,获取令牌都在服务端进行

https://b.com/token?
  grant_type=client_credentials&amp;
  client_id=app37482284&amp;
 client_secret=X9gK6uB20sD1myc8cR

这种方式适用于没有前端的应用,主要是用于第三方应用在后端访问传递数据。

Spring Boot启动出现死锁

Spring Boot启动出现死锁

最近公司的一个项目在启动后,在调用往mongoDB写数据时,一直写不进去,运行一段时间后,程序出现内存溢出,jstack导出了线程信息,如下:

CC6B8F5EF4392AAA4F0B393FFDA32AA6 DFB379909A737E20E4A894BFF683739A

mongoDB在插入数据时发生了堵塞,在等待一个监控对象,主线程在调用afterPropertiesSet方法,一直在等待中,表示spring的初始化工作一直没有完成,所以我们结合代码分析,查看afterPropertiesSet方法

    @Override
    public void afterPropertiesSet() throws Exception {
        scheduleRulePriorityCacheService.start();

        if (isTesting == false) {
            kafkaConsumerTask.start();
            executeService.execute();
        }

    }

 

在spring属性设置完成后,执行了一个kafka消费任务,这个kafka消费任务大致的流程就是从kafka中取出数据,然后放入一个队列中,之后执行了executeService.execute()从队列里面拿出数据然后进行业务处理,我们看看execute()方法

	public void execute() {
		while (true) {
			try {
				if (receiveQueue.isEmpty()) {
					Thread.sleep(200);
				} else {
					String msg = receiveQueue.poll();
					if (StringUtils.isBlank(msg)) {
						continue;
					}
					poolTaskExecutor.execute(() -> {
						scheduleProcessCenter.process(msg);
					});
				}
			} catch (Exception e) {
				logger.error("ExecuteService error,e:{}", e);
			}
		}
	}

这里就是一个无线循环操作,即如果队列有数据就消费,没有就等待200ms

我们知道,afterPropertiesSet方法是spring Bean的生命周期的一部分,这里发生了死锁,必定是锁住了一个资源,没有释放,而MongoDB又需要这个资源,我看到错误信息,在调用DefaultSingletonBeanRegistry.getSingleton方法时锁住了一个资源,我们查看spring的源代码,

7D5A37364ED4463F83CA98076DBEB6AD

可以看到锁住了singletonObjects对象,这个对象就是spring的单例容器,同样我们可以确定,在MongoDB调用注册事件的时候也需要这个对象,我们同样查看源代码

CECB8DBCEB5F05A176976D877FCEC3A1

可以看到MongoDB在注册事件的时候同时需要锁住retrievalMutex对象,那么retrievalMutex和singletonObjects有什么关系?我们接着看

ADE46D9008DA9E92021028F3E5582578 1F3CF43276456CBF7BF938AE5A1CC168

点进去查看getSingletonMutex方法,返回的就是singletonObjects对象,所有retrievalMutex实质和singletonObjects是同一个对象

那么就很容易得出结论了,在调用afterPropertiesSet方法时,singletonObjects对象一直没释放,而MongoDB又需要这个对象,所以产生了死锁。 解决方法,让afterPropertiesSet方法尽快释放singletonObjects对象,我们可以开启一个新线程来执行从队列中读取数据做业务处理的逻辑

	public void execute() {
		poolTaskExecutor.execute(this);
	}

	@Override
	public void run() {
		while (true) {
			try {
				if (receiveQueue.isEmpty()) {
					Thread.sleep(200);
				} else {
					String msg = receiveQueue.poll();
					if (StringUtils.isBlank(msg)) {
						continue;
					}
					poolTaskExecutor.execute(() -> {
						scheduleProcessCenter.process(msg);
					});
				}
			} catch (Exception e) {
				logger.error("ExecuteService error,e:{}", e);
			}
		}
	}

 

Spring Boot整合Mybatis配置多数据源

Spring Boot整合Mybatis配置多数据源

mybatis是目前比较流行的的orm框架了,在spring中,我们知道只需要通过构造器的方式在dao层注入slaveSqlSession就可以指定对应的数据库了,那么Spring Boot是怎么实现的呢?

我们现在有两个库,一个Dynamic,一个Moment,那么我们分别配置两个数据源

@Configuration
@MapperScan(basePackages = "com.huoli.trip.dao.dynamic", sqlSessionTemplateRef = "dynamicSqlSessionTemplate")
public class DynamicDataSourceConfig {
    //配置数据源
    @Bean(name = "dynamicDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.mysql.dynamic")
    public DataSource dynamicDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean(name = "dynamicSqlSessionFactory")
    public SqlSessionFactory dynamicSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setTypeAliasesPackage("com.huoli.trip.entity.dynamic");
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mybatis/dynamic/*.xml"));
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

     //配置事务
    @Bean(name = "dynamicTransactionManger")
    public DataSourceTransactionManager dynamicTransactionManger(@Qualifier("dynamicDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "dynamicSqlSessionTemplate")
    public SqlSessionTemplate dynamicSqlSessionTemplate(@Qualifier("dynamicSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
@Configuration
@MapperScan(basePackages = "com.huoli.trip.dao.moment", sqlSessionTemplateRef = "momentSqlSessionTemplate")
public class MomentDataSourceConfig {
    private static final Logger logger = LoggerFactory.getLogger(MomentDataSourceConfig.class);

    //配置数据源
    @Bean(name = "momentDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.mysql.moment")
    @Primary
    public DataSource momentDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean(name = "momentSqlSessionFactory")
    @Primary
    public SqlSessionFactory momentSqlSessionFactory(@Qualifier("momentDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setTypeAliasesPackage("com.huoli.trip.entity.moment");
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mybatis/moment/*.xml"));
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

     //配置事务
    @Bean(name = "momentTransactionManger")
    @Primary
    public DataSourceTransactionManager momentTransactionManger(@Qualifier("momentDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "momentSqlSessionTemplate")
    @Primary
    public SqlSessionTemplate momentSqlSessionTemplate(@Qualifier("momentSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

}

这里我们看到配置模板、事务、数据源都相同,处理配置数据库工厂的时候指定的setMapperLocations不同,另外通过@MapperScan注解扫描不同包下面的Mapper文件并指定sqlSessionTemplateRef对应的数据库模板。 Dao及xml对应的文件

@Mapper
public interface AirportInfoMapper {

    List<Airport> selectAll();

    String findName(String airportCode);
}
public interface MomentMapper {
    int deleteByPrimaryKey(Integer id);

    int insert(Moment record);

    MomentVo selectByPrimaryKey(@Param("id") Integer id);
}

xml就不贴了,和spring里面的配置一样,写对应的方法及SQL语句,另外在application.yml中配置需要的数据源

spring:
  datasource:
    mysql:
      moment:
        driverClassName: com.mysql.jdbc.Driver
        jdbcUrl: jdbc:mysql://host:port/moment?useUnicode=true&characterEncoding=utf-8
        username: xxx
        password: xxxx
        type: com.zaxxer.hikari.HikariDataSource
      dynamic:
        driverClassName: com.mysql.jdbc.Driver
        jdbcUrl: jdbc:mysql://host:port/dynamic?useUnicode=true&characterEncoding=utf-8
        username: xxx
        password: xxxx
        type: com.zaxxer.hikari.HikariDataSource

在service层就可以通过AirportInfoMapper和MomentMapper访问不同的数据库了

 

Spring Boot自动配置原理

Spring Boot自动化配置原理

Spring Boot提供了很多”开箱即用“的依赖模块,这些模块已经帮我们做了自动配置,这些依赖模块都是以spring-boot-starter-xx作为命名的,比如spring-boot-starter-redis、spring-boot-starter-data-mongodb ,Spring Boot关于自动配置的源码在spring-boot-autoconfigure.jar内 在说spring的自动配置,我们先看看Spring Boot的运作原理,我们都知道,在Spring Boot的启动类上都会添加@SpringBootApplication注解,我们点击进去看看里面的源码,自动配置的核心注解是@EnableAutoConfiguration

 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(excludeFilters = {
        @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
        @Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) })
public @interface SpringBootApplication 

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import(AutoConfigurationImportSelector.class)
public @interface EnableAutoConfiguration

这里的关键功能时@Import注解到的配置功能,AutoConfigurationImportSelector使用SpringFactoriesLoader.loadFacotryNames方法来扫描spring-boot-autoconfigure.jar包里面spring.factories文件,此文件声明了哪些类可以自动注入

# Initializers
org.springframework.context.ApplicationContextInitializer=\
org.springframework.boot.autoconfigure.SharedMetadataReaderFactoryContextInitializer,\
org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLoggingListener

# Application Listeners
org.springframework.context.ApplicationListener=\
org.springframework.boot.autoconfigure.BackgroundPreinitializer

# Auto Configuration Import Listeners
org.springframework.boot.autoconfigure.AutoConfigurationImportListener=\
org.springframework.boot.autoconfigure.condition.ConditionEvaluationReportAutoConfigurationImportListener

# Auto Configuration Import Filters
org.springframework.boot.autoconfigure.AutoConfigurationImportFilter=\
org.springframework.boot.autoconfigure.condition.OnClassCondition

# Auto Configure
org.springframework.boot.autoconfigure.web.servlet.HttpEncodingAutoConfiguration,\
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration,\

平时我们知道在创建springWeb的时候,我们都需要在web.xml里面配置一个filter,设置Encode编码为UTF-8,但在spring boot中我们却没有,另外如果本地启动了Redis,我们也可以不需要配置Redis,我们点进去HttpEncodingAutoConfiguration、RedisAutoConfiguration里面看看里面的配置

@Configuration
@EnableConfigurationProperties(HttpEncodingProperties.class)
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
@ConditionalOnClass(CharacterEncodingFilter.class)
@ConditionalOnProperty(prefix = "spring.http.encoding", value = "enabled", matchIfMissing = true)
public class HttpEncodingAutoConfiguration

@Configuration
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration

可以看到注解上面都包含了ConditionalOnClass和EnableConfigurationProperties两个注解,我们深入ConditionalOnClass看它是如何实现的

    @Override
    public ConditionOutcome getMatchOutcome(ConditionContext context,
            AnnotatedTypeMetadata metadata) {
        ClassLoader classLoader = context.getClassLoader();
        ConditionMessage matchMessage = ConditionMessage.empty();
        List<String> onClasses = getCandidates(metadata, ConditionalOnClass.class);
        if (onClasses != null) {
            List<String> missing = getMatches(onClasses, MatchType.MISSING, classLoader);
            if (!missing.isEmpty()) {
                return ConditionOutcome
                        .noMatch(ConditionMessage.forCondition(ConditionalOnClass.class)
                                .didNotFind("required class", "required classes")
                                .items(Style.QUOTE, missing));
            }
            matchMessage = matchMessage.andCondition(ConditionalOnClass.class)
                    .found("required class", "required classes").items(Style.QUOTE,
                            getMatches(onClasses, MatchType.PRESENT, classLoader));
        }
        List<String> onMissingClasses = getCandidates(metadata,
                ConditionalOnMissingClass.class);
        if (onMissingClasses != null) {
            List<String> present = getMatches(onMissingClasses, MatchType.PRESENT,
                    classLoader);
            if (!present.isEmpty()) {
                return ConditionOutcome.noMatch(
                        ConditionMessage.forCondition(ConditionalOnMissingClass.class)
                                .found("unwanted class", "unwanted classes")
                                .items(Style.QUOTE, present));
            }
            matchMessage = matchMessage.andCondition(ConditionalOnMissingClass.class)
                    .didNotFind("unwanted class", "unwanted classes").items(Style.QUOTE,
                            getMatches(onMissingClasses, MatchType.MISSING, classLoader));
        }
        return ConditionOutcome.match(matchMessage);
    }

源码中会通过ConditionalOnClass中设置的参数,在classPath下查找是否存在,现在如果我们引入了web依赖,那么CharacterEncodingFilter这个类肯定是存在的,此时它就会通过EnableConfigurationProperties注解配置的参数使用里面的默认配置,我们点进去HttpEncodingProperties看看

@ConfigurationProperties(prefix = "spring.http.encoding")
public class HttpEncodingProperties {

    public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;

    /**
     * Charset of HTTP requests and responses. Added to the "Content-Type" header if not
     * set explicitly.
     */
    private Charset charset = DEFAULT_CHARSET;

果然这里面有一个参数DEFAULT_CHARSET已经默认设置为UTF-8了,这里我们也可以通过spring.http.encoding来修改它的默认编码
同样,我们看看Redis的配置源码。

@Configuration
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean(name = "redisTemplate")
	public RedisTemplate<Object, Object> redisTemplate(
			RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
		RedisTemplate<Object, Object> template = new RedisTemplate<>();
		template.setConnectionFactory(redisConnectionFactory);
		return template;
	}

	@Bean
	@ConditionalOnMissingBean
	public StringRedisTemplate stringRedisTemplate(
			RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
		StringRedisTemplate template = new StringRedisTemplate();
		template.setConnectionFactory(redisConnectionFactory);
		return template;
	}

}

这里redis为我们自动注入了一个Bean,名字是redisTemplate,所以我们在项目中使用redisTemplate直接就可以用了,通过spring.redis可以修改默认的配置

所以可以总结spring boot自动配置的原理是:先在claspath下查找是否存在的依赖类,如果存在则触发自动配置,我们只要通过Maven添加依赖,这些依赖就会下载很多jar包到classpath中。

实现一个自动配置模块

通过上面的分析,我们发现spring的自动配置还是比较简单的,那我们也可以自己实现一个,创建一个Maven项目,添加autoconfigure依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-autoconfigure</artifactId>
    </dependency>
</dependencies>

定义一个配置文件,我们以chuanz开头,后面我们在application.properties中就可以直接以chuanz开头来配置了

@ConfigurationProperties(prefix = "chuanz")
public class ChuanzProperties {
    public static final String DEFAULT_NAME = "baichuan";
    public String name = DEFAULT_NAME;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

实现一个简单的服务类

public class ChuanzService {
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

下面我们来实现最主要的自动配置类,注意配置完成后,需要将这个类添加到spring.factories,我们在src/main/resources/ META-INF/创建一个spring.factories文件,然后配置它

@Configuration
@ConditionalOnClass({ ChuanzService.class })
@EnableConfigurationProperties(ChuanzProperties.class)
public class ChuanzAutoConfiguration {

    @Autowired
    private ChuanzProperties chuanzProperties;

    @Bean
    @ConditionalOnMissingBean(ChuanzService.class)
    public ChuanzService chuanzService() {
        ChuanzService chuanzService = new ChuanzService();
        chuanzService.setName(chuanzProperties.getName());
        return chuanzService;
    }
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.chuanz.springboot.autoconfig.ChuanzAutoConfiguration

然后我们将项目打包成一个jar文件,这里我们可以把文件上传到本地私服上,然后再通过maven来引用

<dependency>
    <groupId>cn.chuanz</groupId>
    <artifactId>springboot-chuanz-autoconfig</artifactId>
    <version>0.1</version>
</dependency>

然后我们就可以在项目中通过@Autowired来注入ChuanzService使用了,通过chuanz前缀在application.properties中也可以配置name属性的值

 

网络释义
Autowired: 自动装配

Spring Boot读取配置的几种方式

Spring Boot读取配置几种方式

spring读取配置有三种方式,使用@Value,@ConfigurationProperties,@Environment,@PropertySource,除了@PropertySource不支持yml的文件读取,下面我们对这几种方式分别进行介绍

使用@Value

比如我们在application.properties或application.yml中定义了一些配置,如:dynamic.subscribeUrl,在项目中我们只需要使用@Value就可以获取到。

@Component
public class DynamicConfig {

    @Value("dynamic.subscribeUrl")
    private String subscribeUrl;

}

使用@ConfigurationProperties

上面的例子,我们也可以通过@ConfigurationProperties来实现,只需要指定一个前缀

@Component
@ConfigurationProperties(prefix = "dynamic")
public class DynamicConfig {

    private String subscribeUrl;

}

使用@Environment

@Environment可以获取所有加载的配置文件,我们只需要根据getProperty方法就可以获取

@Autowired
private Environment environment;

environment.getProperty(String key);

使用@PropertySource

我们获取config/flight.properties下所有的配置,前缀以dynamic开头的

@Component
@ConfigurationProperties(prefix = "dynamic")
@PropertySource("config/flight.properties")
public class DynamicConfig {

    private String subscribeUrl;

}

RxJava使用介绍

在说RxJava之前先说说ReactiveX。

ReactiveX 简称 Rx,全称 Reactive Extensions,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Java等几乎所有的编程语言,RxJava则是java语言的实现。

Rx介绍:

1)扩展的观察者模式:通过订阅可观测对象的序列流然后做出反应。

2)迭代器模式:对对象序列进行迭代输出从而使订阅者可以依次对其处理。

3)函数式编程思想:简化问题的解决的步骤,让你的代码更优雅和简洁

为什么说是扩展的观察者模式?

观察者模式:被观察者发出事件,然后观察者(事件源)订阅然后进行处理。

图片 1

扩展:如果没有观察者,被观察者是不会发出任何事件的。另外知道事件何时结束,还有错误通知处理

迭代器模式

提供一种方法顺序访问一个聚合对象中的各种元素,而又不暴露该对象的内部表示

《RxJava Essentials》一书做的的对比:迭代器模式在事件处理上采用的是“同步/拉式”的方式,而被观察者采用的是“异步/推式”的方式,而对观察者而言,显然后者更灵活。

图片 1

函数式编程

//线程操作模式
new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();
//函数模式 (Lambda)A->B ->C->D
Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

 

RxJava核心

Observable(被观察者,也就是事件源)和Subscriber(观察者)

//被观察者
Observable<String> myObservable = Observable.create(
                new Observable.OnSubscribe<String>(){

                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("hello world");
                        subscriber.onCompleted();
                    }
                }
        );

//观察者
Subscriber<String> mySubscriber = new Subscriber<String>() {
          @Override
          public void onCompleted() {}

          @Override
          public void onError(Throwable e) {}

          @Override
          public void onNext(String s) {
                log.info("基础写法:"+s);
            }
      };

 myObservable.subscribe(mySubscriber);
Observable<String> myObservable = Observable.just("Hello World!");

		Action1<String> onNextAction = new Action1<String>() {

			@Override
			public void call(String s) {
				logger.info("Action1简化后:"+s);
			}
		};

		myObservable.subscribe(onNextAction);

		/* 写成匿名函数*/
		Observable.just("Hello World!").subscribe(new Action1<String>() {
			@Override
			public void call(String s) {
				logger.info("匿名函数写法:"+s);
			}
		});

		/*用Java 8 lambdas(Retrolambda)表达式*/
		Observable.just("Hello World!").subscribe(s -> logger.info("lambdas表达式写法:"+s));

 

RxJava操作符

创建操作符:Create, Defer, From, Interval, Just, Range, Repeat, Timer等。

String[] strings = {"张三","李四","王五","赵六"};
Observable.from(strings)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i("name",s);
            }
        });

变换操作:Map、FlatMap、ConcatMap等

public void showUserName(String userName){
	textView.setText(userName);
}

public void showUserName(String userName){
Observable.just(userName).subscribe( new  Action1<String>(){
         @Override
         public void call(String s){
            textView.setText(s);
        }
});  
}

如果需要在显示前对这个字符串做处理,然后再展示,比如加“张三,你好”

方法1:我们可以对字符串本身操作

方法2:我们可以放到Action1.call()方法里做处理

方法3:使用操作符做变换:map

public void showUserName(String userName){
Observable.just(userName).map(new Func1<String,String>(){
          public String call(String text){
             return handleUserName(text);   
           }
   }).subscribe( new Action1<String>(){
        public void call(String s){
        }
        });
}

打印出中国的所有省份名称:flatMap()

List<Province>  provinceList = …
Observable.from(provinceList)
.flatMap(new Func1<Province,String>(){
@Override
 public String call(Province province){
            return province.getName();
      }
}).subscribe(new Action1<String>(){
         @Override
         public void call(String s){
            Log.i(“省份名称”,s)
        }
});
List<Province>  provinceList = …
Observable.from(provinceList)
.subscribe(new Action1<Province>(){
         @Override
         public void call(Province province){
            List<City> cities = province.getCities();
            for (int i = 0; i < cities.size(); i++) {
                   City city = cities.get(i);
                   Log.i(“城市”, city.getName());
            }        
          }
});

显然第一种比第二种看着更简洁,清晰

异步

调度器Scheduler:

0b350c8350142aff

操作符:

subscribeOn():指定回调发生的线程,事件消费的线程,可以执行多次!

observeOn():订阅事件发生的线程,事件产生的线程,只允许执行一次。

其他操作符

01E17C8B-D4F3-4D92-859D-D36D72D5A07C

Debounce:“去抖”,只有在空闲了一段时间后才发射数据,过滤掉发射速率过快的数据项

Sample:“采样”,定期发射Observable最近发射的数据项

F77989EF-B666-47B5-9FCB-901E68BE64C3

String[] numbers = {"11", "2", "2", "13", "4", "5", "7"}
Observable
  .from(numbers)
  .map(s -> Integer.parseInt(s))
  .filter(s -> s < 10)
  .distinct()
  .takeLast(3)
  .reduce((number1, number2) -> 
     number1 + number2)
  )
  .subscribe(i -> System.out.println(i));//16

 

流式处理的优势

如需要从多个数据源获取数据内存、磁盘、网络依次获取。

Observable<String> memory= bservable.just("memory"); 
Observable<String> disk= Observable.just("disk");  
Observable<String> network=Observable.just("network");  

//依次检查memory、disk、network  
Observable.concat(memory, disk, network)  
.first()  
.subscribeOn(Schedulers.newThread())  
.subscribe(s -> {  
    memoryCache = "memory";  
    System.out.println("--------------subscribe: " + s);  
});

参考资料:

https://gank.io/post/560e15be2dca930e00da1083

http://www.jianshu.com/p/e0891032ee4d