Sentinel流量控制介绍

1. 背景

  • 比如双11,秒杀等,系统访问量远远超出系统所能处理的并发数,需对系统进行保护。
  • 在微服务架构中,服务拆分粒度较细,会出现请求链路较长的情况,如果链路中某个服务因网络延迟或者请求超时等原因不可用,会导致当前请求阻塞,可能出现请求堆积从而导致雪崩效应。
  • 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、
  • 系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性

2. 常见的限流场景

●  在Nginx 层添加限流模块限制平均访问速度
●  通过设置数据库连接池,线程池的大小来限制总的并发数
●  通过guava 提供的Ratelimiter 限制接口的访问速度
●  TCP通信协议中的限流整形
 

3. 限流算法

3.1、 计数器算法

         计数器算法是一种比较简单的限流实现算法,在指定的周期内累加访问次数,当访问次数达到设定的阈值时,触发限流策略,当进入下一个时间周期时进行访问数的清零。

优点:实现简单
缺点:临界问题:如图所示,当在8-10秒和10-12秒内分别并发500,虽然没有超过阈值,但如果算8-12秒,则并发数高达1000,已经超过了原先定义的10秒内不超过500的并发量
 

3.2、 滑动窗口算法

为了解决计数器算法带来的临界问题,所以引入了滑动窗口算法。
简单来说,滑动窗口算法原理是在固定窗口汇总分割出多个小时间窗口,分别在每个小时间窗口中记录访问次数,然后根据时间将窗口往前滑动并删除过期的小时间窗口,最终只需要统计滑动窗口时间范围内的所有小时窗口的总的计数即可。
如图所示,最终只需要统计每个小时间窗口不超过阈值/n && 在滑动窗口范围内的所有的小时间窗口总的计数不超过阈值即可

优点:实现相对简单,且没有计数器算法的临界问题
缺点:无法应对短时间高并发(突刺现象)
 

3.3、 令牌桶限流算法

令牌桶是网络流量整形和速率限制中最常使用的一种算法,对于每一个请求,都需要从令牌桶中获得一个令牌,如果没有获得令牌,则需要触发限流策略。

基本过程:

1)每进来一个请求,都在桶里获取一个令牌
2)如果有令牌,则拿着令牌通过
3)如果没有令牌,则不允许请求通过

几种情况:
请求速度 > 令牌生成速度:当令牌被取空后会被限流
请求速度 = 令牌生成速度:流量处于平稳状态
请求速度 < 令牌生成速度:说明此时并发数不高,请求能被正常处理
优点:可以像漏桶那样匀速,也可以像计数器那样处理突发流量
 

3.4、  漏桶限流算法

漏桶限流算法的主要作用是控制数据注入网络的速度,平滑网络上的突发流量
漏桶算法的原理:在漏桶内部同样维护一个容器,这个容器会以恒定的速度出水,不管上面的水流速度多快,漏桶水滴的流出速度始终保持不变,实际上消息中间件就使用了漏桶限流的思想。
 

漏桶算法中,有如下几种可能的情况:
●  请求速度大于漏桶流出速度,也就是请求数超出当前服务所能处理的极限,将会触发限流策略
●  请求速度小于或者等于漏桶流出的速度,也就是服务处理能力整合满足客户端的请求量,将正常执行。
不足:无法应对突发的并发流量,因为流出速率一直都是恒定的
优点:平滑系统流量
 

Sentinel介绍

       Sentinel是阿里开源的项目,是面向分布式架构的轻量级流量控制组件,主要以流量为切入点,从限流,流量整形,服务降级,系统负载保护等多个维度来帮助我们保障微服务的稳定性
Sentinel 分2 部分
核心库:不依赖任何框架/库,能够运行于所有的java 环境,对Dubbo ,Spring Cloud 等框架有较好的支持。
控制台:基于Spring boot 开发,打包后可以直接运行,不需要额外的tomcat 等应用部署。
 

5、Sentinel 流量控制

5.1、整体步骤

  • 定义资源
  • 定义限流规则
  • 检验规则是否生效
 

5.2、资源定义方式

方式1:抛出异常的方式定义资源
SphU 包含了 try-catch 风格的 API。用这种方式,当资源发生了限流之后会抛出 BlockException。这个时候可以捕捉异常,进行限流之后的逻辑处理。示例代码如下:
// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
Entry entry = null;
try {
    // 资源名可使用任意有业务语义的字符串
    entry = SphU.entry("自定义资源名");
    // 被保护的业务逻辑
    // do something...
} catch (BlockException e1) {
    // 资源访问阻止,被限流或被降级
    // 进行相应的处理操作
} finally {
    if (entry != null) {
        entry.exit();
    }
}

注意: 
1、SphU.entry(xxx) 需要与 entry.exit() 方法成对出现,匹配调用,否则会导致调用链记录异常,抛出 ErrorEntryFreeException 异常。
2、务必保证finally会被执行

方式2:注解方式定义资源
Sentinel 支持通过 @SentinelResource 注解定义资源并配置 blockHandler 和 fallback 函数来进行限流之后的处理。示例:

@SentinelResource(blockHandler = "blockHandlerForGetUser")
public User getUserById(String id) {
        throw new RuntimeException("getUserById command failed");
}
// blockHandler 函数,原方法调用被限流/降级/系统保护的时候调用
public User blockHandlerForGetUser(String id, BlockException ex) {
        return new User("admin");
}

注意
1、blockHandler 函数会在原方法被限流/降级/系统保护的时候调用,而 fallback 函数会针对所有类型的异常。请注意 blockHandler 和 fallback 函数的形式要求
2、需注意:blockHandler 所配置的值会在触发限流之后调用,这个方法定义必须和原始方法的返回值,参数保持一致,而且需要增加BlockException 参数。

5.3、限流具体实现

5.3.1. 引入 Sentinel 依赖

添加依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.4</version>
</dependency>

5.3.2. 定义资源

资源 是 Sentinel 中的核心概念之一。最常用的资源是我们代码中的 Java 方法,也可以更灵活的定义你的资源,例如,把需要控制流量的代码用 Sentinel API SphU.entry(“HelloWorld”) 和 entry.exit() 包围起来即可。

    在下面的例子中,我们将 System.out.println(“hello world”); 作为资源(被保护的逻辑),用 API 包装起来。参考代码如下:

public static void main(String[] args) {
    // 配置规则.
    initFlowRules();
    while (true) {
        try (Entry entry = SphU.entry("HelloWorld")) {
        // 被保护的逻辑
            System.out.println("hello world");
        } catch (BlockException ex) {
            // 处理被流控的逻辑
            System.out.println("blocked!");
        }
    }
}

完成以上两步后,代码端的改造就完成了。也可以通过我们提供的 注解,来定义我们的资源,类似于下面的代码:

@SentinelResource("HelloWorld")
public void helloWorld() {
    // 资源中的逻辑
    System.out.println("hello world");
}

这样,helloWorld() 方法就成了我们的一个资源。

5.3.3. 定义规则

通过流控规则来指定允许该资源通过的请求次数,例如下面的代码定义了资源 HelloWorld 每秒最多只能通过 20 个请求。

private static void initFlowRules(){
    List<FlowRule> rules = new ArrayList<>();
    FlowRule rule = new FlowRule();
    rule.setResource("HelloWorld");
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule.setCount(20);
    rules.add(rule);
    FlowRuleManager.loadRules(rules);
}

完成上面 3 步,Sentinel 就能够正常工作了。

5.3.4. 检查效果

Demo 运行之后,我们可以在日志 ~/logs/csp/${appName}-metrics.log.xxx 里看到下面的输出:

其中 p 代表通过的请求, block 代表被阻止的请求, s 代表成功执行完成的请求个数, e 代表用户自定义的异常, rt 代表平均响应时长。

可以看到,这个程序每秒稳定输出 “hello world” 20 次,和规则中预先设定的阈值是一样的。

6、Sentinel 熔断

6.1、背景

1、现代微服务架构都是分布式的,由非常多的服务组成,除了流量控制以外,对调用链路中不稳定的资源进行熔断降级也是保障高可用的重要措施之一。
2、分布式系统中,不同服务之间相互调用,组成复杂的调用链路,复杂链路上的某一环不稳定,可能会层层级联导致整体的雪崩 。

6.2、熔断策略

Sentinel 提供以下几种熔断策略:

  • 慢调用比例 (SLOW_REQUEST_RATIO):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。
  • 异常比例 (ERROR_RATIO):当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是 [0.0, 1.0],代表 0% – 100%。
  • 异常数 (ERROR_COUNT):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。

注意
 1、@SentinelResource 注解会自动统计业务异常,无需手动调用。
2、异常降级仅针对业务异常,对 Sentinel 限流降级本身的异常(BlockException)不生效。为了统计异常比例或异常数,需要通过 Tracer.trace(ex) 记录业务异常。示例:

Entry entry = null;
try {
    entry = SphU.entry(resource);

} catch (Throwable t) {
    if (!BlockException.isBlockException(t)) {
        Tracer.trace(t);
    }
} finally {
    if (entry != null) {
        entry.exit();
    }
}

熔断降级规则

熔断降级规则(DegradeRule)包含下面几个重要的属性:

Field说明默认值
resource资源名,即规则的作用对象
grade熔断策略,支持慢调用比例/异常比例/异常数策略慢调用比例
count慢调用比例模式下为慢调用临界 RT(超出该值计为慢调用);异常比例/异常数模式下为对应的阈值
timeWindow熔断时长,单位为 s
minRequestAmount熔断触发的最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入)5
statIntervalMs统计时长(单位为 ms),如 60*1000 代表分钟级(1.8.0 引入)1000 ms
slowRatioThreshold慢调用比例阈值,仅慢调用比例模式有效(1.8.0 引入)

责任链模式实践

1、前言

       当今软件开发中,设计模式是一种重要的方法论,它们帮助开发人员解决了各种常见的问题,并提高了代码的可维护性和可扩展性。本文将重点介绍责任链模式(Chain of Responsibility Pattern),并使用Java语言来解释它的概念、用途以及如何实现。

2、责任链模式简介

       责任链模式是一种行为型设计模式,它允许你将请求沿着处理链传递,直到有一个对象处理它为止。这种模式非常适合处理请求的分发和处理,特别是当您不知道哪个对象能够处理请求时。

       在责任链模式中,通常有多个处理对象(处理器),每个处理对象都有一个处理请求的方法。当一个请求进入责任链时,它从链的起始点开始传递,每个处理对象检查是否能够处理该请求。如果可以处理,则该请求被处理,否则它会被传递给链中的下一个处理对象,直到找到一个合适的处理器。

3、责任链模式的优点

责任链模式有以下几个优点:

  1. 解耦性: 责任链模式将请求发送者和接收者解耦,发送者无需知道请求最终由哪个接收者处理。
  2. 可扩展性: 可以轻松地添加新的处理对象或修改处理顺序,而不会影响其他部分的代码。
  3. 灵活性: 可以动态配置责任链,根据需要添加、删除或修改处理对象。
  4. 单一职责原则: 每个处理对象只需关心自己是否能够处理请求,符合单一职责原则。

4、责任链模式的实现

让我们使用Java语言来实现一个简单的责任链模式示例。假设我们有一个报销审批系统,根据报销金额不同,有三个处理对象:经理、财务部门和CEO。让我们一步一步实现这个示例。

首先,我们创建一个请求类表示报销请求:

public class ReimbursementRequest {
    private double amount;

    public ReimbursementRequest(double amount) {
        this.amount = amount;
    }

    public double getAmount() {
        return amount;
    }
}

接下来,我们创建一个处理接口,表示处理请求的方法:

public interface ReimbursementHandler {
    void handleRequest(ReimbursementRequest request);
    void setNextHandler(ReimbursementHandler nextHandler);
}

然后,我们创建三个具体的处理对象:经理、财务部门和CEO,它们实现了ReimbursementHandler接口:

public class Manager implements ReimbursementHandler {
    private ReimbursementHandler nextHandler;

    @Override
    public void handleRequest(ReimbursementRequest request) {
        if (request.getAmount() <= 1000) {
            System.out.println("经理审批通过,报销金额:" + request.getAmount());
        } else {
            if (nextHandler != null) {
                nextHandler.handleRequest(request);
            } else {
                System.out.println("无法处理该报销申请");
            }
        }
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        this.nextHandler = nextHandler;
    }
}

public class FinanceDepartment implements ReimbursementHandler {
    private ReimbursementHandler nextHandler;

    @Override
    public void handleRequest(ReimbursementRequest request) {
        if (request.getAmount() <= 5000) {
            System.out.println("财务部门审批通过,报销金额:" + request.getAmount());
        } else {
            if (nextHandler != null) {
                nextHandler.handleRequest(request);
            } else {
                System.out.println("无法处理该报销申请");
            }
        }
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        this.nextHandler = nextHandler;
    }
}

public class CEO implements ReimbursementHandler {
    @Override
    public void handleRequest(ReimbursementRequest request) {
        System.out.println("CEO审批通过,报销金额:" + request.getAmount());
    }

    @Override
    public void setNextHandler(ReimbursementHandler nextHandler) {
        // CEO是责任链中的最后一个处理对象,不需要设置下一个处理对象
    }
}

最后,我们可以创建一个责任链并测试它:

public class Main {
    public static void main(String[] args) {
        ReimbursementHandler manager = new Manager();
        ReimbursementHandler finance = new FinanceDepartment();
        ReimbursementHandler ceo = new CEO();

        // 构建责任链
        manager.setNextHandler(finance);
        finance.setNextHandler(ceo);

        // 创建报销请求
        ReimbursementRequest request1 = new ReimbursementRequest(800);
        ReimbursementRequest request2 = new ReimbursementRequest(3500);
        ReimbursementRequest request3 = new ReimbursementRequest(10000);

        // 发送请求
        manager.handleRequest(request1);
        manager.handleRequest(request2);
        manager.handleRequest(request3);
    }
}

运行以上代码,你会看到不同金额的报销请求会被不同的处理对象处理。

5、责任链模式在检查系统中的运用

检查系统具有以下检查-申诉-整改-奖惩链路,在该链路中我们可以使用责任链模式,使代码变得更容易维护和扩展。

首先我们创建一个包含各个链路中需要用到的请求类对象。

/**
 * @author: guojiangtao
 * @description:
 * @param: 
 **/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CheckRewardPunishmentRequest implements Serializable {

    private static final long serialVersionUID = 8676996408255323152L;

    /**
     * 检查结果ID
     **/
    private Long checkListResultId;

    /**
     * 检查基本事实
     **/
    CheckBaseBO checkBaseBO;
    /**
     * 检查整改事实
     **/
    CheckRectifyBO checkRectifyBO;
    /**
     * 检查申诉事实
     **/
    CheckAppealBO checkAppealBO;

}

我们创建一个处理接口,表示处理请求的方法。

/**
 * @author: guojiangtao
 * @description: 处理请求
 * @param:
 **/
public interface CheckRewardPunishmentPhaseActionInterface {

    CheckRewardPunishmentPhaseResult proceed(PhaseChain chain);

    /**
     * @author: guojiangtao
     * @description: 获取request以及转发请求
     * @param: 
     **/
    interface PhaseChain {
        // 获取当前Request
        CheckRewardPunishmentRequest request();
        // 转发Request
        CheckRewardPunishmentPhaseResult dispatch(CheckRewardPunishmentRequest request);
    }

}

创建一个客户端类来维护处理的实现类。

/**
 * @author guojiangtao
 */
@Data
public class PhaseChainClient {

    public static final List<CheckRewardPunishmentPhaseActionInterface> phaseActionInterfaceList = Lists.newArrayList();
    private static void addPhaseActionInterface(CheckRewardPunishmentPhaseActionInterface phaseActionInterface) {
        phaseActionInterfaceList.add(phaseActionInterface);
    }

    public static PhaseChainClient getChainClient(CheckRewardPunishmentPhaseActionInterface ... interfaces) {

        List<CheckRewardPunishmentPhaseActionInterface> interfaceList = Lists.newArrayList(interfaces);

        if (CollectionUtil.isNullOrEmpty(interfaceList)) {
            throw new BizException(BizErrorCodeEnum.ILLEGAL_OPERATION, "getChainClient exception");
        }

        interfaceList.forEach(PhaseChainClient::addPhaseActionInterface);

        return new PhaseChainClient();
    }

    public CheckRewardPunishmentPhaseResult execute (CheckRewardPunishmentRequest request) {

        RealPhaseChain realChain = new RealPhaseChain(request, phaseActionInterfaceList, 0);

        return realChain.dispatch(request);
    }

}

检查提交实现类。

/**
 * @author: guojiangtao
 * @description: 检查提交
 * @param:
 **/
@Service
@Slf4j
public class CheckSubmit implements CheckRewardPunishmentPhaseActionInterface {

    @Autowired
    private CheckAppeal checkAppeal;
    @Autowired
    private CheckRpProducer checkRpProducer;

    @Override
    public CheckRewardPunishmentPhaseResult proceed(PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        // 参数校验
        if (null == request) {
            log.warn("CheckSubmit proceed with null request");
            return null;
        }

        CheckBaseBO checkBaseBO = request.getCheckBaseBO();

        if (null == checkBaseBO) {
            log.warn("CheckSubmit proceed with null checkBaseBO");
            return null;
        }

        // 检查单配置
        if (AppealTypeEnum.OPEN_APPEAL.getCode().equals(checkBaseBO.getAppealType())) {
            // 构建检查申诉事实实例
            CheckAppealBO appealBO = new CheckAppealBO(checkBaseBO.getCheckItemId(), checkBaseBO.getCheckItemResultId(), 1);
            request.setCheckAppealBO(appealBO);
            return PhaseChainClient.getChainClient(checkAppeal).execute(request);
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

}

检查申诉实现类:

/**
 * @author: guojiangtao
 * @description: 检查申诉
 * @param:
 **/
@Service
@Slf4j
public class CheckAppeal implements CheckRewardPunishmentPhaseActionInterface {

    @Autowired
    private CheckRpProducer checkRpProducer;
    @Autowired
    private CheckResultRectifyListRepository checkResultRectifyListRepository;
    @Autowired
    private CheckRectifyTemplateItemRelationRepository rectifyTemplateItemRelationRepository;
    @Autowired
    private CheckRectify checkRectify;

    @Override
    public CheckRewardPunishmentPhaseResult proceed (PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        // 参数校验
        if (null == request) {
            return new CheckRewardPunishmentPhaseResult(Boolean.FALSE, "");
        }

        CheckBaseBO checkBaseBO = request.getCheckBaseBO();
        CheckAppealBO appealBO = request.getCheckAppealBO();

        if (null == checkBaseBO || null == appealBO) {
            return new CheckRewardPunishmentPhaseResult(Boolean.FALSE, "");
        }

        // 根据检查项ID查询 检查项与整改任务模板 1:1
        CheckRectifyTemplateItemRelationDAO templateItemRelationDAO = rectifyTemplateItemRelationRepository.queryByItemId(checkBaseBO.getCheckItemId());

        if (null != templateItemRelationDAO) {
            // 整改单模板ID
            Long rectifyTemplateId = templateItemRelationDAO.getRectifyTemplateId();
            // 整改单
            CheckResultRectifyListDAO rectifyListDAO = checkResultRectifyListRepository.getByCheckResultIdAndTemplateId(checkBaseBO.getCheckListResultId(), rectifyTemplateId);
            if (null != rectifyListDAO && Objects.equals(RectifyResultListStatusEnum.FINISH.getCode(), rectifyListDAO.getStatus())) {
                // 构建检查整改事实实例
                CheckRectifyBO rectifyBO = new CheckRectifyBO(checkBaseBO.getCheckItemId(), rectifyListDAO.getStatus());
                request.setCheckRectifyBO(rectifyBO);
                return PhaseChainClient.getChainClient(checkRectify).execute(request);
            }
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

}

检查整改实现类:

@Service
@Slf4j
public class CheckRectify implements CheckRewardPunishmentPhaseActionInterface {
    @Autowired
    private CheckRpProducer checkRpProducer;

    @Override
    public CheckRewardPunishmentPhaseResult proceed(PhaseChain chain) {

        CheckRewardPunishmentRequest request = chain.request();

        if (null == request) {
            return null;
        }

        CheckRectifyBO rectifyBO = request.getCheckRectifyBO();

        // 无整改事实 不处理
        if (null == rectifyBO) {
            return null;
        }

        checkRpProducer.sendMessage(request);

        return new CheckRewardPunishmentPhaseResult(Boolean.TRUE, "");
    }

6、责任链模式的适用场景

责任链模式在以下情况下特别有用:

  1. 当您希望将请求的发送者和接收者解耦时,可以使用责任链模式。发送者无需知道最终哪个对象会处理请求。
  2. 当您有多个对象可以处理请求,但您不确定哪个对象应该处理时,可以使用责任链模式。这种情况下,您可以将请求传递给责任链,直到有一个对象能够处理它。
  3. 当您希望动态配置处理对象的顺序或添加新的处理对象时,可以使用责任链模式。这使得系统更加灵活和可扩展。
  4. 在需要应用单一职责原则的情况下,责任链模式可以帮助将处理逻辑分散到不同的对象中,每个对象只负责一部分功能。

7、总结

责任链模式是一种有用的设计模式,可以帮助您构建灵活的、可扩展的系统。它通过将请求传递给一系列处理对象来解耦请求发送者和接收者,使得代码更容易维护和扩展。通过示例代码和解释,我们希望您现在更好地理解了责任链模式及其在Java中的实现方式。希望这篇分享文档有助于您在实际项目中应用责任链模式以解决类似的问题。

MySQL 连接为什么挂死了?

本次分享的是一次关于 MySQL 高可用问题的定位过程,其中曲折颇多但问题本身却比较有些代表性,遂将其记录以供参考。

一、背景

近期由测试反馈的问题有点多,其中关于系统可靠性测试提出的问题令人感到头疼,一来这类问题有时候属于“偶发”现象,难以在环境上快速复现;二来则是可靠性问题的定位链条有时候变得很长,极端情况下可能要从 A 服务追踪到 Z 服务,或者是从应用代码追溯到硬件层面。

本次分享的是一次关于 MySQL 高可用问题的定位过程,其中曲折颇多但问题本身却比较有些代表性,遂将其记录以供参考。

架构

首先,本系统以 MySQL 作为主要的数据存储部件。整一个是典型的微服务架构(SpringBoot + SpringCloud),持久层则采用了如下几个组件:

  • mybatis,实现 SQL <-> Method 的映射
  • hikaricp,实现数据库连接池
  • mariadb-java-client,实现 JDBC 驱动

在 MySQL 服务端部分,后端采用了双主架构,前端以 keepalived 结合浮动IP(VIP)做一层高可用。如下:

说明

  • MySQL 部署两台实例,设定为互为主备的关系。
  • 为每台 MySQL 实例部署一个 keepalived 进程,由 keepalived 提供 VIP 高可用的故障切换。

实际上,keepalived 和 MySQL 都实现了容器化,而 VIP 端口则映射到 VM 上的 nodePort 服务端口上。

  • 业务服务一律使用 VIP 进行数据库访问。

Keepalived 是基于 VRRP 协议实现了路由层转换的,在同一时刻,VIP 只会指向其中的一个虚拟机(master)。当主节点发生故障时,其他的 keepalived 会检测到问题并重新选举出新的 master,此后 VIP 将切换到另一个可用的 MySQL 实例节点上。这样一来,MySQL 数据库就拥有了基础的高可用能力。

另外一点,Keepalived 还会对 MySQL 实例进行定时的健康检查,一旦发现 MySQL 实例不可用会将自身进程杀死,进而再触发 VIP 的切换动作。

问题现象

本次的测试用例也是基于虚拟机故障的场景来设计的:

持续以较小的压力向业务服务发起访问,随后将其中一台 MySQL 的容器实例(master)重启。
按照原有的评估,业务可能会产生很小的抖动,但其中断时间应该保持在秒级。

然而经过多次的测试后发现,在重启 MySQL 主节点容器之后,有一定的概率会出现业务却再也无法访问的情况!

二、分析过程

在发生问题之后,开发同学的第一反应是 MySQL 的高可用机制出了问题。由于此前曾经出现过由于 keepalived 配置不当导致 VIP 未能及时切换的问题,因此对其已经有所戒备。

先是经过一通的排查,然后并没有找到 keepalived 任何配置上的毛病。

然后在没有办法的情况下,重新测试了几次,问题又复现了。

紧接着,我们提出了几个疑点:

1.Keepalived 会根据 MySQL 实例的可达性进行判断,会不会是健康检查出了问题?

但在本次测试场景中,MySQL 容器销毁会导致 keepalived 的端口探测产生失败,这同样会导致 keepalived 失效。如果 keepalived 也发生了中止,那么 VIP 应该能自动发生抢占。而通过对比两台虚拟机节点的信息后,发现 VIP 的确发生了切换。

2. 业务进程所在的容器是否发生了网络不可达的问题?

尝试进入容器,对当前发生切换后的浮动IP、端口执行 telnet 测试,发现仍然能访问成功。

连接池

在排查前面两个疑点之后,我们只能将目光转向了业务服务的DB客户端上。

从日志上看,在产生故障的时刻,业务侧的确出现了一些异常,如下:

这里提示的是业务操作获取连接超时了(超过了30秒)。那么,会不会是连接数不够用呢?

Unable to acquire JDBC Connection [n/a] java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms. at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:669) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.9.jar!/:?] ...

业务接入采用的是 hikariCP 连接池,这也是市面上流行度很高的一款组件了。

我们随即检查了当前的连接池配置,如下:

//最小空闲连接数 
spring.datasource.hikari.minimum-idle=10 
//连接池最大大小 
spring.datasource.hikari.maximum-pool-size=50 
//连接最大空闲时长 
spring.datasource.hikari.idle-timeout=60000 
//连接生命时长 
spring.datasource.hikari.max-lifetime=1800000 
//获取连接的超时时长 
spring.datasource.hikari.connection-timeout=30000

其中 注意到 hikari 连接池配置了 minimum-idle = 10,也就是说,就算在没有任何业务的情况下,连接池应该保证有 10 个连接。更何况当前的业务访问量极低,不应该存在连接数不够使用的情况。

除此之外,另外一种可能性则可能是出现了“僵尸连接”,也就是说在重启的过程中,连接池一直没有释放这些不可用的连接,最终造成没有可用连接的结果。

开发同学对”僵尸链接”的说法深信不疑,倾向性的认为这很可能是来自于 HikariCP 组件的某个 BUG…

于是开始走读 HikariCP 的源码,发现应用层向连接池请求连接的一处代码如下:

public class HikariPool{

   //获取连接对象入口
   public Connection getConnection(final long hardTimeout) throws SQLException
   {
      suspendResumeLock.acquire();
      final long startTime = currentTime();

      try {
         //使用预设的30s 超时时间
         long timeout = hardTimeout;
         do {
            //进入循环,在指定时间内获取可用连接
            //从 connectionBag 中获取连接
            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {
               break; // We timed out... break and throw exception
            }

            final long now = currentTime();
            //连接对象被标记清除或不满足存活条件时,关闭该连接
            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            }
            //成功获得连接对象
            else {
               metricsTracker.recordBorrowStats(poolEntry, startTime);
               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);

         //超时了,抛出异常
         metricsTracker.recordBorrowTimeoutStats(startTime);
         throw createTimeoutException(startTime);
      }
      catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
      }
      finally {
         suspendResumeLock.release();
      }
   }
}

getConnection() 方法展示了获取连接的整个流程,其中 connectionBag 是用于存放连接对象的容器对象。如果从 connectionBag 获得的连接不再满足存活条件,那么会将其手动关闭,代码如下:

void closeConnection(final PoolEntry poolEntry, final String closureReason)
   {
      //移除连接对象
      if (connectionBag.remove(poolEntry)) {
         final Connection connection = poolEntry.close();
         //异步关闭连接
         closeConnectionExecutor.execute(() -> {
            quietlyCloseConnection(connection, closureReason);
            //由于可用连接变少,将触发填充连接池的任务
            if (poolState == POOL_NORMAL) {
               fillPool();
            }
         });
      }
   }

注意到,只有当连接满足下面条件中的其中一个时,会被执行 close。

  • isMarkedEvicted() 的返回结果是 true,即标记为清除

如果连接存活时间超出最大生存时间(maxLifeTime),或者距离上一次使用超过了idleTimeout,会被定时任务标记为清除状态,清除状态的连接在获取的时候才真正 close。

  • 500ms 内没有被使用,且连接已经不再存活,即 isConnectionAlive() 返回 false

由于我们把 idleTimeout 和 maxLifeTime 都设置得非常大,因此需重点检查 isConnectionAlive 方法中的判断,如下:

public class PoolBase{

   //判断连接是否存活
   boolean isConnectionAlive(final Connection connection)
   {
      try {
         try {
            //设置 JDBC 连接的执行超时
            setNetworkTimeout(connection, validationTimeout);

            final int validationSeconds = (int) Math.max(1000L, validationTimeout) / 1000;

            //如果没有设置 TestQuery,使用 JDBC4 的校验接口
            if (isUseJdbc4Validation) {
               return connection.isValid(validationSeconds);
            }

            //使用 TestQuery(如 select 1)语句对连接进行探测
            try (Statement statement = connection.createStatement()) {
               if (isNetworkTimeoutSupported != TRUE) {
                  setQueryTimeout(statement, validationSeconds);
               }

               statement.execute(config.getConnectionTestQuery());
            }
         }
         finally {
            setNetworkTimeout(connection, networkTimeout);

            if (isIsolateInternalQueries && !isAutoCommit) {
               connection.rollback();
            }
         }

         return true;
      }
      catch (Exception e) {
         //发生异常时,将失败信息记录到上下文
         lastConnectionFailure.set(e);
         logger.warn("{} - Failed to validate connection {} ({}). Possibly consider using a shorter maxLifetime value.",
                     poolName, connection, e.getMessage());
         return false;
      }
   }

}

我们看到,在PoolBase.isConnectionAlive 方法中对连接执行了一系列的探测,如果发生异常还会将异常信息记录到当前的线程上下文中。随后,在 HikariPool 抛出异常时会将最后一次检测失败的异常也一同收集,如下:

private SQLException createTimeoutException(long startTime)
{
   logPoolState("Timeout failure ");
   metricsTracker.recordConnectionTimeout();

   String sqlState = null;
   //获取最后一次连接失败的异常
   final Throwable originalException = getLastConnectionFailure();
   if (originalException instanceof SQLException) {
      sqlState = ((SQLException) originalException).getSQLState();
   }
   //抛出异常
   final SQLException connectionException = new SQLTransientConnectionException(poolName + " - Connection is not available, request timed out after " + elapsedMillis(startTime) + "ms.", sqlState, originalException);
   if (originalException instanceof SQLException) {
      connectionException.setNextException((SQLException) originalException);
   }

   return connectionException;
}

这里的异常消息和我们在业务服务中看到的异常日志基本上是吻合的,即除了超时产生的 “Connection is not available, request timed out after xxxms” 消息之外,日志中还伴随输出了校验失败的信息:

Caused by: java.sql.SQLException: Connection.setNetworkTimeout cannot be called on a closed connection at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.getSqlException(ExceptionMapper.java:211) ~[mariadb-java-client-2.2.6.jar!/:?] at org.mariadb.jdbc.MariaDbConnection.setNetworkTimeout(MariaDbConnection.java:1632) ~[mariadb-java-client-2.2.6.jar!/:?] at com.zaxxer.hikari.pool.PoolBase.setNetworkTimeout(PoolBase.java:541) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.PoolBase.isConnectionAlive(PoolBase.java:162) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:172) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.9.jar!/:?] at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.9.jar!/:?]

到这里,我们已经将应用获得连接的代码大致梳理了一遍,整个过程如下图所示:

从执行逻辑上看,连接池的处理并没有问题,相反其在许多细节上都考虑到位了。在对非存活连接执行 close 时,同样调用了 removeFromBag 动作将其从连接池中移除,因此也不应该存在僵尸连接对象的问题。

那么,我们之前的推测应该就是错误的!

陷入焦灼

在代码分析之余,开发同学也注意到当前使用的 hikariCP 版本为 3.4.5,而环境上出问题的业务服务却是 2.7.9 版本,这仿佛预示着什么… 让我们再次假设 hikariCP 2.7.9 版本存在某种未知的 BUG,导致了问题的产生。

为了进一步分析连接池对于服务端故障的行为处理,我们尝试在本地机器上进行模拟,这一次使用了 hikariCP 2.7.9 版本进行测试,并同时将 hikariCP 的日志级别设置为 DEBUG。

模拟场景中,会由 由本地应用程序连接本机的 MySQL 数据库进行操作,步骤如下:

1. 初始化数据源,此时连接池 min-idle 设置为 10;
2. 每隔50ms 执行一次SQL操作,查询当前的元数据表;
3. 将 MySQL 服务停止一段时间,观察业务表现;
4. 将 MySQL 服务重新启动,观察业务表现。

最终产生的日志如下:

//初始化过程,建立10个连接 
DEBUG -HikariPool.logPoolState - Pool stats (total=1, active=1, idle=0, waiting=0) DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@71ab7c09 DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@7f6c9c4c DEBUG -HikariPool$PoolEntryCreator.call- Added connection MariaDbConnection@7b531779 ... DEBUG -HikariPool.logPoolState- After adding stats (total=10, active=1, idle=9, waiting=0) 
//执行业务操作,成功 execute statement: true test time -------1 execute statement: true test time -------2 ... 
//停止MySQL ... 
//检测到无效连接 WARN -PoolBase.isConnectionAlive - Failed to validate connection MariaDbConnection@9225652 ((conn=38652) Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value. WARN -PoolBase.isConnectionAlive - Failed to validate connection MariaDbConnection@71ab7c09 ((conn=38653) Connection.setNetworkTimeout cannot be called on a closed connection). Possibly consider using a shorter maxLifetime value. 
//释放连接 DEBUG -PoolBase.quietlyCloseConnection(PoolBase.java:134) - Closing connection MariaDbConnection@9225652: (connection is dead) DEBUG -PoolBase.quietlyCloseConnection(PoolBase.java:134) - Closing connection MariaDbConnection@71ab7c09: (connection is dead) 
//尝试创建连接失败 DEBUG -HikariPool.createPoolEntry - Cannot acquire connection from data source java.sql.SQLNonTransientConnectionException: Could not connect to address=(host=localhost)(port=3306)(type=master) : Socket fail to connect to host:localhost, port:3306. Connection refused: connect Caused by: java.sql.SQLNonTransientConnectionException: Socket fail to connect to host:localhost, port:3306. Connection refused: connect at internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:73) ~[mariadb-java-client-2.6.0.jar:?] ... 
//持续失败.. 直到MySQL重启 //重启后,自动创建连接成功 DEBUG -HikariPool$PoolEntryCreator.call -Added connection MariaDbConnection@42c5503e DEBUG -HikariPool$PoolEntryCreator.call -Added connection MariaDbConnection@695a7435 
//连接池状态,重新建立10个连接 DEBUG -HikariPool.logPoolState(HikariPool.java:421) -After adding stats (total=10, active=1, idle=9, waiting=0) 
//执行业务操作,成功(已经自愈) execute statement: true

从日志上看,hikariCP 还是能成功检测到坏死的连接并将其踢出连接池,一旦 MySQL 重新启动,业务操作又能自动恢复成功了。根据这个结果,基于 hikariCP 版本问题的设想也再次落空,研发同学再次陷入焦灼。

拨开云雾见光明

多方面求证无果之后,我们最终尝试在业务服务所在的容器内进行抓包,看是否能发现一些蛛丝马迹。

进入故障容器,执行 tcpdump -i eth0 tcp port 30052 进行抓包,然后对业务接口发起访问。

此时令人诡异的事情发生了,没有任何网络包产生!而业务日志在 30s 之后也出现了获取连接失败的异常。

我们通过 netstat 命令检查网络连接,发现只有一个 ESTABLISHED 状态的 TCP 连接。

也就是说,当前业务实例和 MySQL 服务端是存在一个建好的连接的,但为什么业务还是报出可用连接呢?

推测可能原因有二:

  • 该连接被某个业务(如定时器)一直占用。
  • 该连接实际上还没有办法使用,可能处于某种僵死的状态。

对于原因一,很快就可以被推翻,一来当前服务并没有什么定时器任务,二来就算该连接被占用,按照连接池的原理,只要没有达到上限,新的业务请求应该会促使连接池进行新连接的建立,那么无论是从 netstat 命令检查还是 tcpdump 的结果来看,不应该一直是只有一个连接的状况。

那么,情况二的可能性就很大了。带着这个思路,继续分析 Java 进程的线程栈。

执行 kill -3 pid 将线程栈输出后分析,果不其然,在当前 thread stack 中发现了如下的条目:

"HikariPool-1 connection adder" #121 daemon prio=5 os_prio=0 tid=0x00007f1300021800 nid=0xad runnable [0x00007f12d82e5000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.FilterInputStream.read(FilterInputStream.java:133) at org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream.fillBuffer(ReadAheadBufferedStream.java:129) at org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream.read(ReadAheadBufferedStream.java:102) - locked <0x00000000d7f5b480> (a org.mariadb.jdbc.internal.io.input.ReadAheadBufferedStream) at org.mariadb.jdbc.internal.io.input.StandardPacketInputStream.getPacketArray(StandardPacketInputStream.java:241) at org.mariadb.jdbc.internal.io.input.StandardPacketInputStream.getPacket(StandardPacketInputStream.java:212) at org.mariadb.jdbc.internal.com.read.ReadInitialHandShakePacket.<init>(ReadInitialHandShakePacket.java:90) at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.createConnection(AbstractConnectProtocol.java:480) at org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1236) at org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:610) at org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:142) at org.mariadb.jdbc.Driver.connect(Driver.java:86) at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:138) at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:358) at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206) at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:477)

这里显示 HikariPool-1 connection adder 这个线程一直处于 socketRead 的可执行状态。从命名上看该线程应该是 HikariCP 连接池用于建立连接的任务线程,socket 读操作则来自于 MariaDbConnection.newConnection() 这个方法,即 mariadb-java-client 驱动层建立 MySQL 连接的一个操作,其中 ReadInitialHandShakePacket 初始化则属于 MySQL 建链协议中的一个环节。

简而言之,上面的线程刚好处于建链的一个过程态,关于 mariadb 驱动和 MySQL 建链的过程大致如下:

MySQL 建链首先是建立 TCP 连接(三次握手),客户端会读取 MySQL 协议的一个初始化握手消息包,内部包含 MySQL 版本号,鉴权算法等等信息,之后再进入身份鉴权的环节。

这里的问题就在于 ReadInitialHandShakePacket 初始化(读取握手消息包)一直处于 socket read 的一个状态。

如果此时 MySQL 远端主机故障了,那么该操作就会一直卡住。而此时的连接虽然已经建立(处于 ESTABLISHED 状态),但却一直没能完成协议握手和后面的身份鉴权流程,即该连接只能算一个半成品(无法进入 hikariCP 连接池的列表中)。从故障服务的 DEBUG 日志也可以看到,连接池持续是没有可用连接的,如下:DEBUG HikariPool.logPoolState –> Before cleanup stats (total=0, active=0, idle=0, waiting=3)

另一个需要解释的问题则是,这样一个 socket read 操作的阻塞是否就造成了整个连接池的阻塞呢?

经过代码走读,我们再次梳理了 hikariCP 建立连接的一个流程,其中涉及到几个模块:

  • HikariPool,连接池实例,由该对象连接的获取、释放以及连接的维护。
  • ConnectionBag,连接对象容器,存放当前的连接对象列表,用于提供可用连接。
  • AddConnectionExecutor,添加连接的执行器,命名如 “HikariPool-1 connection adder”,是一个单线程的线程池。
  • PoolEntryCreator,添加连接的任务,实现创建连接的具体逻辑。
  • HouseKeeper,内部定时器,用于实现连接的超时淘汰、连接池的补充等工作。

HouseKeeper 在连接池初始化后的 100ms 触发执行,其调用 fillPool() 方法完成连接池的填充,例如 min-idle 是10,那么初始化就会创建10个连接。ConnectionBag 维护了当前连接对象的列表,该模块还维护了请求连接者(waiters)的一个计数器,用于评估当前连接数的需求。

其中,borrow 方法的逻辑如下:

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
   {
      // 尝试从 thread-local 中获取
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
         ...
      }

      // 计算当前等待请求的任务
      final int waiting = waiters.incrementAndGet();
      try {
         for (T bagEntry : sharedList) {
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               //如果获得了可用连接,会触发填充任务
               if (waiting > 1) {
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }

         //没有可用连接,先触发填充任务
         listener.addBagItem(waiting);

         //在指定时间内等待可用连接进入
         timeout = timeUnit.toNanos(timeout);
         do {
            final long start = currentTime();
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= elapsedNanos(start);
         } while (timeout > 10_000);

         return null;
      }
      finally {
         waiters.decrementAndGet();
      }
   }

注意到,无论是有没有可用连接,该方法都会触发一个 listener.addBagItem() 方法,HikariPool 对该接口的实现如下:

public void addBagItem(final int waiting)
   {
      final boolean shouldAdd = waiting - addConnectionQueueReadOnlyView.size() >= 0; // Yes, >= is intentional.
      if (shouldAdd) {
         //调用 AddConnectionExecutor 提交创建连接的任务
         addConnectionExecutor.submit(poolEntryCreator);
      }
      else {
         logger.debug("{} - Add connection elided, waiting {}, queue {}", poolName, waiting, addConnectionQueueReadOnlyView.size());
      }
   }
PoolEntryCreator 则实现了创建连接的具体逻辑,如下:
public class PoolEntryCreator{
     @Override
      public Boolean call()
      {
         long sleepBackoff = 250L;
         //判断是否需要建立连接
         while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {
            //创建 MySQL 连接
            final PoolEntry poolEntry = createPoolEntry();
 
            if (poolEntry != null) {
               //建立连接成功,直接返回。
               connectionBag.add(poolEntry);
               logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
               if (loggingPrefix != null) {
                  logPoolState(loggingPrefix);
               }
               return Boolean.TRUE;
            }
            ...
         }

         // Pool is suspended or shutdown or at max size
         return Boolean.FALSE;
      }
}

由此可见,AddConnectionExecutor 采用了单线程的设计,当产生新连接需求时,会异步触发 PoolEntryCreator 任务进行补充。其中 PoolEntryCreator. createPoolEntry() 会完成 MySQL 驱动连接建立的所有事情,而我们的情况则恰恰是 MySQL 建链过程产生了永久性阻塞。因此无论后面怎么获取连接,新来的建链任务都会一直排队等待,这便导致了业务上一直没有连接可用。

下面这个图说明了 hikariCP 的建链过程:

好了,让我们在回顾一下前面关于可靠性测试的场景:

首先,MySQL 主实例发生故障,而紧接着 hikariCP 则检测到了坏的连接(connection is dead)并将其释放,在释放关闭连接的同时又发现连接数需要补充,进而立即触发了新的建链请求。
而问题就刚好出在这一次建链请求上,TCP 握手的部分是成功了(客户端和 MySQL VM 上 nodePort 完成连接),但在接下来由于当前的 MySQL 容器已经停止(此时 VIP 也切换到了另一台 MySQL 实例上),因此客户端再也无法获得原 MySQL 实例的握手包响应(该握手属于MySQL应用层的协议),此时便陷入了长时间的阻塞式 socketRead 操作。而建链请求任务恰恰好采用了单线程运作,进一步则导致了所有业务的阻塞。

三、解决方案

在了解了事情的来龙去脉之后,我们主要考虑从两方面进行优化:

  • 优化一,增加 HirakiPool 中 AddConnectionExecutor 线程的数量,这样即使第一个线程出现挂死,还有其他的线程能参与建链任务的分配。
  • 优化二,出问题的 socketRead 是一种同步阻塞式的调用,可通过 SO_TIMEOUT 来避免长时间挂死。

对于优化点一,我们一致认为用处并不大,如果连接出现了挂死那么相当于线程资源已经泄露,对服务后续的稳定运行十分不利,而且 hikariCP 在这里也已经将其写死了。因此关键的方案还是避免阻塞式的调用。

查阅了 mariadb-java-client 官方文档后,发现可以在 JDBC URL 中指定网络IO 的超时参数,如下:

具体参考:https://mariadb.com/kb/en/about-mariadb-connector-j/

如描述所说的,socketTimeout 可以设置 socket 的 SO_TIMEOUT 属性,从而达到控制超时时间的目的。默认是 0,即不超时。

我们在 MySQL JDBC URL 中加入了相关的参数,如下:

spring.datasource.url=jdbc:mysql://10.0.71.13:33052/appdb?socketTimeout=60000&connectTimeout=30000&serverTimezone=UTC

此后对 MySQL 可靠性场景进行多次验证,发现连接挂死的现象已经不再出现,此时问题得到解决。

四、小结

本次分享了一次关于 MySQL 连接挂死问题排查的心路历程,由于环境搭建的工作量巨大,而且该问题复现存在偶然性,整个分析过程还是有些坎坷的(其中也踩了坑)。的确,我们很容易被一些表面的现象所迷惑,而觉得问题很难解决时,更容易带着偏向性思维去处理问题。例如本例中曾一致认为连接池出现了问题,但实际上却是由于 MySQL JDBC 驱动(mariadb driver)的一个不严谨的配置所导致。

从原则上讲,应该避免一切可能导致资源挂死的行为。如果我们能在前期对代码及相关配置做好充分的排查工作,相信 996 就会离我们越来越远。

谈谈开发如何做好自测

一、为什么要做好开发自测(why)

   1. 避免把线下bug暴露到线上,争取在开发阶段就全部扫清所有显性和隐藏的问题;
   2. 个人能力和价值的体现,也是逐步提升个人技术水平,积累业务经验的过程;
   3.  减少线下bug能提升整个团队的开发效率,提升整体的交付质量和用户口碑;
   4. 满足公司对开发质量规范的要求,为团队争取更多的荣誉和奖励。

二、开发自测方法论

1. 思想意识上,提升对自测的重视程度
  • 开发阶段不仅是代码开发完成,编译通过,更重要的是自测通过;
  • 自测工作必须覆盖全面的自测场景:正向、逆向、正常、异常、并发性能等等;
  • 自测是开发阶段最重要的一环,如果不重视自测,测试阶段可能会产生大量的Bug、提测被打回、直接影响研发、测试进度;
  • 自测直接决定了产品的质量。
2. 日常开发中保持良好的心情和积极的心态,不带情绪做事
  •  迭代开发中允许有争论、有分歧、有权利发表自己的观点,沟通中要学会调整自己的心态,确保对话氛围安全,避免情绪失控;
  •  有争议情况下的对话氛围尽量控制主观想法、臆断,主动回到客观事实;
  •  学会征询对方的观点,切记独断专行,多站在对方的立场考虑问题,虚心接受和采纳正确的观点和建议;
  • 不赌气,多沟通,以团队的利益和出发点考虑问题,少计较个人得失。
3. 做好前期计划,并合理估计投入时间,给自测预留充足的时间和资源
  • 在规定的时间内尽量放慢开发的速度,做到慎之又慎,切记追求速度,需求上来就干 ;
  • 合理地规划好时间,尽量避免多线程做事,多迭代并行开发;
  • 对于复杂功能要梳理出开发顺序、先做什么后做什么,制定阶段性的开发目标和时间排期表,切记打乱仗。
 4. 做好高效率、直接有效的沟通,及时暴露和反馈问题
  • 对于不明确的需求、场景要主动、提前确认,确认后要告知相关的参与者;
  • 需求问题确认最好当面或语音沟通、尽量做到产品、前后端开发、测试对需求的理解能达成一致;
  • 不隐藏问题,尽早发现并提出问题,力所能及地加以解决。
 5. 评估、衡量自测的质量:以关键结果为导向
  • 做好全流程的自测,用户操作—前端页面—-后台接口—–数据库数据check
  • 核心方法是否都通过了单元测试;
  • 交叉Review已实现的功能,发现更多的Bug,完善到自测场景中。

三、具体如何做(do—执行)

1.需求立项、评审阶段
  •  需求评审前,我们应该先仔细阅读prd及交互文档,先形成自己对产品的思考,通过脑图的方式列出对产品设计的疑问点, 从用户或者从行业角度找出产品设计缺陷点;
  •  需求评审不能流于形式,要把业务问题都理解吃透,确保产品设计的准确性以及合理性;带着自己的疑问点向产品、开发沟通自己对产品的疑惑和质疑点,多提几个为什么?如何实现?数据获取来源?超出预期的数据怎么处理?缓存处理机制如何?数据保存何处?逻辑由前端处理还是后端服务?后端服务逻辑是否跟第三方关联?
  •  评审后要主动、反复确认跟进不明确的需求和待确认的问题,特别是一句话的需求。
2.设计、开发阶段
  •  开发前要做功能设计,特别是核心复杂的功能或流程,首先是确立最优的技术方案或架构设计;
  •  对于不熟悉的新功能要想办法梳理出整体流程,再有针对性地钻研突破难点和瓶颈问题;对于熟悉的业务功能要梳理出这次改动的影响点和之前的遇到过的bug;
  •  详细、完善的功能设计要包含正常场景和异常场景的覆盖;
  • 考虑并发、性能相关问题的出现场景及相对应的技术解决方案;
  • 开发阶段尽量考虑代码的公用性、拓展性及可维护性,多学习优秀的代码结构和设计思想。
3. 联调、自测阶段
  • 一定要在测试环境自测流程, 确保基本的核心流程不出错,不出低级bug;
  • 如时间充裕,组织组内用例评审也是非常必须的。特别是一些经验老道或者业务熟悉的老司机们,可以在用例评审上快速的帮忙指出用例的遗漏点,有助于测试人员打开思路,尽可能多的覆盖用户场景,值得注意的是用例评审上遇到不确定的,应立即记录下来,结束后及时找相关人员确认,避免猜测。开发人员对照达成一致的测试用例完成自测,特别留意异常场景和核心流程的测试;
  • 多人协作开发的功能要注意提前界定好开发边界,确保整个功能的完整性,自测时要相互交叉测试;
  • 和外部系统对接、联调要充分预计到对方系统、接口可能存在的问题,采取适当的补偿、降级和异常处理策略。
其实还有很多的方法和途径来提升开发自测,提升自测的质量是一个不断完善和改进的过程。

数据库和缓存双写一致性问题

缓存由于其高并发和高性能的特性,已经在项目中被广泛使用。在读取缓存方面,没啥疑问,都是按照下图的流程来进行业务操作。

但是在更新缓存方面,对于更新完数据库,是更新缓存呢,还是删除缓存。又或者是先删除缓存,再更新数据库,其实大家存在很大的争议。

先做一个说明,从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案。这种方案下,我们可以对存入缓存的数据设置过期时间,所有的写操作以数据库为准,对缓存操作只是尽最大努力即可。也就是说如果数据库写成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存。因此,接下来讨论的思路不依赖于给缓存设置过期时间这个方案。

在这里,我们讨论三种更新策略:

  1. 先更新数据库,再更新缓存
  2. 先删除缓存,再更新数据库
  3. 先更新数据库,再删除缓存

应该没人问我,为什么没有先更新缓存,再更新数据库这种策略。

1、先更新数据库,再更新缓存

这套方案,大家是普遍反对的。为什么呢?有如下两点原因。

  • 原因一(线程安全角度)

同时有请求A和请求B进行更新操作,那么会出现

(1)线程A更新了数据库
(2)线程B更新了数据库
(3)线程B更新了缓存
(4)线程A更新了缓存

这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。

  • 原因二(业务场景角度)

有如下两点:

(1)如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到,缓存就被频繁的更新,浪费性能。
(2)如果你写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存。那么,每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。

接下来讨论的就是争议最大的,先删缓存,再更新数据库。还是先更新数据库,再删缓存的问题。

2、先删缓存,再更新数据库

该方案会导致不一致的原因是。同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:

(1)请求A进行写操作,删除缓存
(2)请求B查询发现缓存不存在
(3)请求B去数据库查询得到旧值
(4)请求B将旧值写入缓存
(5)请求A将新值写入数据库

上述情况就会导致不一致的情形出现。而且,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。

那么,如何解决呢?采用延时双删策略

伪代码如下

public void write(String key,Object data){ 
    redis.delKey(key); 
    db.updateData(data); 
    Thread.sleep(1000); 
    redis.delKey(key); 
}

转化为中文描述就是

(1)先淘汰缓存
(2)再写数据库(这两步和原来一样)
(3)休眠1秒,再次淘汰缓存

这么做,可以将1秒内所造成的缓存脏数据,再次删除。那么,这个1秒怎么确定的,具体该休眠多久呢?

针对上面的情形,读者应该自行评估自己的项目的读数据业务逻辑的耗时。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上,加几百ms即可。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。

如果你用了mysql的读写分离架构怎么办?

ok,在这种情况下,造成数据不一致的原因如下,还是两个请求,一个请求A进行更新操作,另一个请求B进行查询操作。

(1)请求A进行写操作,删除缓存
(2)请求A将数据写入数据库了,
(3)请求B查询缓存发现,缓存没有值
(4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值
(5)请求B将旧值写入缓存
(6)数据库完成主从同步,从库变为新值

上述情形,就是数据不一致的原因。还是使用双删延时策略。只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms。

采用这种同步淘汰策略,吞吐量降低怎么办?

那就将第二次删除作为异步的。自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后了,再返回。这么做,加大吞吐量。

第二次删除,如果删除失败怎么办?

这是个非常好的问题,因为第二次删除失败,就会出现如下情形。还是有两个请求,一个请求A进行更新操作,另一个请求B进行查询操作,为了方便,假设是单库:

(1)请求A进行写操作,删除缓存
(2)请求B查询发现缓存不存在
(3)请求B去数据库查询得到旧值
(4)请求B将旧值写入缓存
(5)请求A将新值写入数据库
(6)请求A试图去删除请求B写入对缓存值,结果失败了。

ok,这也就是说。如果第二次删除缓存失败,会再次出现缓存和数据库不一致的问题。

如何解决呢?

具体解决方案,且看博主对第(3)种更新策略的解析。

3、先更新数据库,再删缓存

首先,先说一下。老外提出了一个缓存更新套路,名为《Cache-Aside pattern》。其中就指出

  • 失效:应用程序先从cache取数据,没有得到,则从数据库中取数据,成功后,放到缓存中。
  • 命中:应用程序从cache中取数据,取到后返回。
  • 更新:先把数据存到数据库中,成功后,再让缓存失效。

另外,知名社交网站facebook也在论文《Scaling Memcache at Facebook》中提出,他们用的也是先更新数据库,再删缓存的策略。

这种情况不存在并发问题么?

不是的。假设这会有两个请求,一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生

(1)缓存刚好失效
(2)请求A查询数据库,得一个旧值
(3)请求B将新值写入数据库
(4)请求B删除缓存
(5)请求A将查到的旧值写入缓存

ok,如果发生上述情况,确实是会发生脏数据。

然而,发生这种情况的概率又有多少呢?

发生上述情况有一个先天性条件,就是步骤(3)的写数据库操作比步骤(2)的读数据库操作耗时更短,才有可能使得步骤(4)先于步骤(5)。可是,大家想想,数据库的读操作的速度远快于写操作的(不然做读写分离干嘛,做读写分离的意义就是因为读操作比较快,耗资源少),因此步骤(3)耗时比步骤(2)更短,这一情形很难出现。

假设,有人非要抬杠,有强迫症,一定要解决怎么办?

如何解决上述并发问题?

首先,给缓存设有效时间是一种方案。其次,采用策略(2)里给出的异步延时删除策略,保证读请求完成以后,再进行删除操作。

还有其他造成不一致的原因么?

有的,这也是缓存更新策略(2)和缓存更新策略(3)都存在的一个问题,如果删缓存失败了怎么办,那不是会有不一致的情况出现么。比如一个写数据请求,然后写入数据库了,删缓存失败了,这会就出现不一致的情况了。这也是缓存更新策略(2)里留下的最后一个疑问。

如何解决?提供一个保障的重试机制即可,这里给出两套方案。

方案一,如下图所示:

​(1)更新数据库数据;
(2)缓存因为种种问题删除失败
(3)将需要删除的key发送至消息队列
(4)自己消费消息,获得需要删除的key
(5)继续重试删除操作,直到成功

然而,该方案有一个缺点,对业务线代码造成大量的侵入。于是有了方案二,在方案二中,启动一个订阅程序去订阅数据库的binlog,获得需要操作的数据。在应用程序中,另起一段程序,获得这个订阅程序传来的信息,进行删除缓存操作。

方案二,流程如下图所示:

​(1)更新数据库数据
(2)数据库会将操作信息写入binlog日志当中
(3)订阅程序提取出所需要的数据以及key
(4)另起一段非业务代码,获得该信息
(5)尝试删除缓存操作,发现删除失败
(6)将这些信息发送至消息队列
(7)重新从消息队列中获得该数据,重试操作。

备注说明:上述的订阅binlog程序在mysql中有现成的中间件叫canal,可以完成订阅binlog日志的功能。至于oracle中,博主目前不知道有没有现成中间件可以使用。另外,重试机制,博主是采用的是消息队列的方式。如果对一致性要求不是很高,直接在程序中另起一个线程,每隔一段时间去重试即可,这些大家可以灵活自由发挥,只是提供一个思路。

总结
对目前互联网中已有的一致性方案,进行了一个总结。对于先删缓存,再更新数据库的更新策略,还有方案提出维护一个内存队列的方式,觉得实现异常复杂,没有必要,因此没有必要在文中给出。最后,希望大家有所收获。

基于RBAC模型构建权限系统

什么是权限

当我们打开页面的时候经常会碰到下面的提示,比如一个页面/文件夹被禁止访问,一个button被禁止点击

1

当权限作为名词的时候,可以理解为对API、页面、功能点的权限控制,当权限作为动词时,可以理解为对某个功能可操作或不可操作,以下是对权限的整体划分:

3

如果要用一句话来概括:控制Who对what进行How的操作

RBAC模型

4

用户 是发起操作的主体,按类型分可分为2B和2C用户,可以是后台管理系统的用户,可以是OA系统的内部员工,也可以是面向C端的用户,比如阿里云的用户。

角色 起到了桥梁的作用,连接了用户和权限的关系,每个角色可以关联多个权限,同时一个用户关联多个角色,那么这个用户就有了多个角色的多个权限。

有人会问了为什么用户不直接关联权限呢?在用户基数小的系统,比如20个人的小系统,管理员可以直接把用户和权限关联,工作量并不大,选择一个用户勾选下需要的权限就完事了。

但是在实际企业系统中,用户基数比较大,其中很多人的权限都是一样的,就是个普通访问权限,如果管理员给100人甚至更多授权,工作量巨大。

这就引入了 “角色(Role)” 概念,一个角色可以与多个用户关联,管理员只需要把该角色赋予用户,那么用户就有了该角色下的所有权限,这样设计既提升了效率,也有很大的拓展性。

RBAC-1模型

5

此模型引入了角色继承(Hierarchical Role)概念,即角色具有上下级的关系,角色间的继承关系可分为一般继承关系和受限继承关系。
一般继承关系仅要求角色继承关系是一个绝对偏序关系,允许角色间的多继承。
而受限继承关系则进一步要求角色继承关系是一个树结构,实现角色间的单继承。这种设计可以给角色分组和分层,一定程度简化了权限管理工作。

RBAC-2模型

基于核心模型的基础上,进行了角色的约束控制,RBAC2模型中添加了责任分离关系。

其规定了权限被赋予角色时,或角色被赋予用户时,以及当用户在某一时刻激活一个角色时所应遵循的强制性规则。责任分离包括静态责任分离和动态责任分离。
主要包括以下约束:
互斥角色: 同一用户只能分配到一组互斥角色集合中至多一个角色,支持责任分离的原则。互斥角色是指各自权限互相制约的两个角色。比如财务部有会计和审核员两个角色,他们是互斥角色,那么用户不能同时拥有这两个角色,体现了职责分离原则
基数约束: 一个角色被分配的用户数量受限;一个用户可拥有的角色数目受限;同样一个角色对应的访问权限数目也应受限,以控制高级权限在系统中的分配
先决条件角色: 即用户想获得某上级角色,必须先获得其下一级的角色
动态限制用户角色:如果一个用户可以拥有两个角色,但运行时只能激活一个角色

RBAC-3模型

即最全面的权限管理,它是基于RBAC-0,将RBAC-1和RBAC-2进行了整合。

RBAC延展-用户组

当平台用户基数增大,角色类型增多时,而且有一部分人具有相同的属性,比如财务部的所有员工,如果直接给用户分配角色,管理员的工作量就会很大。
如果把相同属性的用户归类到某用户组,那么管理员直接给用户组分配角色,用户组里的每个用户即可拥有该角色,以后其他用户加入用户组后,即可自动获取用户组的所有角色,退出用户组,同时也撤销了用户组下的角色,无须管理员手动管理角色。
根据用户组是否有上下级关系,可以分为有上下级的用户组和普通用户组:

  • 具有上下级关系的用户组: 最典型的例子就是部门和职位,可能多数人没有把部门职位和用户组关联起来吧。当然用户组是可以拓展的,部门和职位常用于内部的管理系统,如果是面向C端的系统。比如淘宝网的商家,商家自身也有一套组织架构,比如采购部,销售部,客服部,后勤部等,有些人拥有客服权限,有些人拥有上架权限等等,所以用户组是可以拓展的
  • 普通用户组: 即没有上下级关系,和组织架构,职位都没有关系,也就是说可以跨部门,跨职位。举个例子,某电商后台管理系统,有拼团活动管理角色,我们可以设置一个拼团用户组,该组可以包括研发部的后台开发人员,运营部的运营人员,采购部的人员等等。

每个公司都会涉及到到组织和职位,下面就重点介绍这两个。

组织

6

常见的组织架构如
我们可以把组织与角色进行关联,用户加入组织后,就会自动获得该组织的全部角色,无须管理员手动授予,大大减少工作量,同时用户在调岗时,只需调整组织,角色即可批量调整。
组织的另外一个作用是控制数据权限,把角色关联到组织,那么该角色只能看到该组织下的数据权限。

职位

7

假设财务部的职位,每个组织部门下都会有多个职位,比如财务部有总监,会计,出纳等职位,虽然都在同一部门,但是每个职位的权限是不同的,职位高的拥有更多的权限。
总监拥有所有权限,会计和出纳拥有部分权限。特殊情况下,一个人可能身兼多职。

含有组织/职位/用户组的模型

根据以上场景,新的权限模型就可以设计出来了,如下图:

8

组织/职位/用户组

根据系统的复杂度不同,其中的多对多关系和一对一关系可能会有变化

  • 在单系统且用户类型单一的情况下,用户和组织是一对一关系,组织和职位是一对多关系,用户和职位是一对一关系,组织和角色是一对一关系,职位和角色是一对一关系,用户和用户组是多对对关系,用户组和角色是一对一关系,当然这些关系也可以根据具体业务进行调整。模型设计并不是死的,如果小系统不需要用户组,这块是可以去掉的。
  • 分布式系统且用户类型单一的情况下,到这里权限系统就会变得很复杂,这里就要引入了一个”系统”概念。此时系统架构是个分布式系统,权限系统独立出来,负责所有的系统的权限控制,其他业务系统比如商品中心,订单中心,用户中心,每个系统都有自己的角色和权限,那么权限系统就可以配置其他系统的角色和权限。
  • 分布式系统且用户类型多个的情况下,比如淘宝网,它的用户类型包括内部用户,商家,普通用户,内部用户登录多个后台管理系统,商家登录商家中心

数据权限

9

数据权限就是用户在同一页面看到的数据是不同的,比如财务部只能看到其部门下的用户数据,采购部只看采购部的数据。在一些大型的公司,全国有很多城市和分公司,比如杭州用户登录系统只能看到杭州的数据,上海用户只能看到上海的数据,解决方案一般是把数据和具体的组织架构关联起来。

举个例子,再给用户授权的时候,用户选择某个角色同时绑定组织如财务部或者合肥分公司,那么该用户就有了该角色下财务部或合肥分公司下的的数据权限

数据库表设计

10

权限框架

目前比较流行的是Apache Shiro和Spring Security

11

Apache Shiro

12

Spring Security

13

 

逆波兰算法在规则引擎中的运用

前言

逆波兰表示法(Reverse Polish notation,RPN,或逆波兰记法),是一种是由波兰数学家扬·武卡谢维奇1920年引入的数学表达式方式,在逆波兰记法中,所有操作符置于操作数的后面,因此也被称为后缀表示法。逆波兰记法不需要括号来标识操作符的优先级,比如:3 – 4 + 5,转换为波兰表达式为:3 4 – 5 +

场景

以工单系统为例,比如发起了一个审批流程,通过规则引擎配置了如下规则:

$合同类型$ = ‘商务合同’ || ( $合同总金额$ > 1000000 && $合同总金额$ < 2000000 )

满足上面表达式会走审批1,否则走审批2逻辑。
对于上面的表达式,我们都知道首先应该先比较 || 两边的表达式,||右边的表达式因为是用括号连接,所以需要放在一起做&&比较,最后左右两边做 || 操作

但这是人思维方式,机器并不这样识别,机器可不知道哪个先比较,哪个后比较,所以可以将其转换为如下格式,逆波兰(后缀)表达式

合同类型、商务合同、=、合同总金额、1000000、>、合同总金额、2000000、<、&&、||

上面的表达式,机器就比较好识别了,一般利用栈来输出结果:
当为数值时压栈,当遇到表达式,从栈里面拿出两个数进行比较(运算)

算法流程介绍

转换为逆波兰表达式的基本思路是:
1、需要两个栈来存放“操作数”(如:合同类型)、运算符(如:||)
2、每个操作符都有自己的优先级
3、如果是操作数,则放入操作数栈
4、如果是运算符(不包括括号),则与运算符栈顶元素进行比较,如果优先级大于栈顶的元素,则直接入栈,如果小于,则将栈顶的元素取出,入操作数栈
5、如果运算符是“(”,则直接放入运算符栈顶,如果运算符是“)”,则从运算符依次取出元素放入操作数栈,直到遇到“(”

下面是我画的一个操作流程:

逆波兰算法

代码实现

public class ExpressionEvaluator {

    private static String leftBraces = "(";
    private static String rightBraces = ")";
    private static Map<String, Integer> operatorPriorityMap = new HashMap<>();
    static {
        operatorPriorityMap.put("(",-1);
        operatorPriorityMap.put(")",-1);

        operatorPriorityMap.put("||",1);
        operatorPriorityMap.put("&&",1);
        operatorPriorityMap.put("=",2);
        operatorPriorityMap.put(">",2);
        operatorPriorityMap.put("<",2);

//        operatorPriorityMap.put("+",1);
//        operatorPriorityMap.put("-",1);
//        operatorPriorityMap.put("*",2);
//        operatorPriorityMap.put("/",2);
    }
    /**
     * 中缀表达式转化为后缀表达式
     * @param expression
     * @return
     */
    public Queue<String> parseExpression(String expression) {
        String[] expressionArr = expression.split(" ");
        Stack<String> operatorStack = new Stack();
        Queue<String> operandQueue = new LinkedBlockingQueue<>();
        for(int i=0;i<expressionArr.length;i++) {
            if (operatorPriorityMap.containsKey(expressionArr[i])) {
                //操作符
                if (expressionArr[i].equals(leftBraces)) {
                    //遇到左括号
                    operatorStack.add(expressionArr[i]);
                }else if (expressionArr[i].equals(rightBraces)) {
                    //遇到右括号
                    appendLeftBracesOperator(operatorStack, operandQueue);
                } else {
                    if (operatorStack.isEmpty()) {
                        operatorStack.add(expressionArr[i]);
                        continue;
                    }
                    String topOperate = operatorStack.peek();
                    if (comparePriority(expressionArr[i], topOperate) >= 0) {
                        //优先级大于栈顶的元素
                        operatorStack.add(expressionArr[i]);
                    } else {
                        //优先级小于栈顶的元素
                        appendLowPriority(operatorStack, operandQueue, expressionArr[i]);
                    }
                }
            } else {
                //操作数
                operandQueue.add(expressionArr[i]);
            }
        }
        while (!operatorStack.isEmpty()) {
            operandQueue.add(operatorStack.pop());
        }
        return operandQueue;
    }

    /**
     * 将操作符栈低优先级的元素移到操作数队列中
     * @param operatorStack
     * @param operandStack
     * @param operator
     */
    private void appendLowPriority(Stack<String> operatorStack, Queue<String> operandStack, String operator) {
        while (!operatorStack.isEmpty() && comparePriority(operator, operatorStack.peek()) < 0) {
            String topOperate = operatorStack.pop();
            operandStack.add(topOperate);
        }
        operatorStack.add(operator);
    }

    /**
     * 将左括号之前的操作符元素移动到操作数栈中
     * @param operatorStack
     * @param operandStack
     */
    private void appendLeftBracesOperator(Stack<String> operatorStack, Queue<String> operandStack) {
        String topOperator = operatorStack.pop();
        while (!topOperator.equals(leftBraces)) {
            operandStack.add(topOperator);
            topOperator = operatorStack.pop();
        }
      }

      public static void main(String[] args) {
        //String s = "1 + ( 2 + 3 * 4 ) * 2 / 4 - 5";
        String s = "商务合同1 = 商务合同 || ( 1100000 > 1000000 && 1100000 < 2000000 )";
        ExpressionEvaluator ee = new ExpressionEvaluator();
        Queue<String> queue = ee.parseExpression(s);
        System.out.println(queue.toString());
      }
    }

逻辑运算

通过将表达式转换后,通过栈就可以实现对表达式的计算,具体思路:

1、遍历队列,如果是操作数,则入栈

2、如果是操作符,则从栈顶取出两个操作数进行运算操作

3、将最终结果返回

代码实现

    /**
     * 计算后缀表达式结果
     * @param queue
     * @return
     */
    public boolean calcExpression(Queue<String> queue) {
        if(Objects.isNull(queue)) {
            throw new RuntimeException("表达式为空");
        }

        String s = queue.poll();
        Stack<Object> stack = new Stack();
        while (Objects.nonNull(s)) {
            if (operatorPriorityMap.containsKey(s)) {
                Object o1 = stack.pop();
                Object o2 = stack.pop();
                if (">".equals(s)) {
                    stack.push(Double.parseDouble(o2.toString()) > Double.parseDouble(o1.toString()));
                } else if ("<".equals(s)) {
                    stack.push(Double.parseDouble(o2.toString()) < Double.parseDouble(o1.toString()));
                } else if ("=".equals(s)) {
                    stack.push(o2.equals(o1));
                } else if ("||".equals(s)) {
                    stack.push(Boolean.parseBoolean(o2.toString()) || Boolean.parseBoolean(o1.toString()));
                } else if ("&&".equals(s)){
                    stack.push(Boolean.parseBoolean(o2.toString()) && Boolean.parseBoolean(o1.toString()));
                }
            } else {
                stack.push(s);
            }
            s = queue.poll();
        }

        return Boolean.parseBoolean(stack.pop().toString());
    }

Oauth2介绍

oauth2是什么?

oauth2.0是一个标准的授权协议,它可以为第三方应用,如:web程序、桌面/移动客户端等提供授权,获取用户的数据,它的标准是RFC 6749 文件,该文件解释了oauth的定义:

OAuth 引入了一个授权层,用来分离两种不同的角色:客户端和资源所有者。资源所有者同意以后,资源服务器可以向客户端颁发令牌。客户端通过令牌,去请求数据

其核心可以理解为:服务端向客户端颁发一个令牌,客户端通过令牌来访问服务端的数据。

为什么要使用oauth2.0

举个通俗的例子:古代的皇宫是皇上居住的地方,普通人是不允许进出的,某一天来了一个人对着守卫说:我是皇上七舅老爷的外孙女,快让我进去。

正常情况下守卫肯定不让他进去,谁知道这个人是不是骗子,守卫一般会找人禀明皇上,皇上跑过来一看,果真是他七舅老爷的外孙女,此时守卫才会放他进去

但隔了几天守卫换了,这个人出去了一趟又要进宫,或者他要进皇宫其它的地方,是不是又得走一遍流程,这样搞了几次后,皇上也不耐烦了,每次我总这样跑来跑去,效率也太低了,要是明天八舅老爷的外孙女来了不得累死,好了,我给你们发一张令牌:“皇室亲属进去牌”,并备注:这个人的信息,及可以进出的门。

这样门卫看到这样令牌后,直接根据上面的备注信息确认是否让他进去就可以了

oauth2.0的四种授权方式

  • 授权码(Authorization Code)
  • 隐藏式(implicit)
  • 密码式(password)
  • 客户端凭证(client credentials)

授权码方式

先思考一个问题:比如我们要访问B网站,B网站依赖于A网站给的用户数据,该如何做?
授权码的方式,是先在第三方应用申请一个授权码,然后用该码获取令牌。
比如下面链接:

https://b.com/oauth/authorize?
  response_type=code&amp;
  client_id=app37482284&amp;
  redirect_uri=https%3A%2F%2Fa.com%2Fcallback&amp;
  scope=read&amp;state=xcoiv98y2kd22vusuye3kch

首先用户访问B网站,获取一个授权码,client_id:是让B知道是谁在访问,redirect_uri:B网站接受或拒绝后跳转的网址
scope:标识要求授权的范围,这里是只读
state:随机生成的一个字符串,表示客户端的当前状态,服务端授权后会原封不动的返回这个值,简单的来说:这个参数主要保证请求的授权的客户端和后面使用授权服务的客户端是一个,防止CSRF攻击,可参考:《OAuth2.0忽略state参数引发的CSRF漏洞》(https://blog.csdn.net/gjb724332682/article/details/54428808)

用户点击这个链接后,会跳转到一个授权页面,类型微信的授权页面,如下:

然后用户点击授权,B网站会生成一个授权码,然后将需要的用户信息通过:
授权码 -> 临时令牌 的关系保存下来,然后将授权码返给A网站,跳转之后的地址为:

https://a.com/callback?code=8x4d3N674g2&amp;state=xcoiv98y2kd22vusuye3kch

然后A网站通过授权码在后端调用B的接口就可以获取到令牌了,
注意:这里获取令牌是在后端进行,A调用B服务器接口还需要传入一个密钥。
这样就避免了攻击者拿到授权码后在其它服务器上窃取令牌。

https://b.com/oauth/token?
 client_id=app37482284&amp;
 client_secret=X9gK6uB20sD1myc8cRe&amp;
 grant_type=authorization_code&amp;
 code=AUTHORIZATION_CODE&amp;
 redirect_uri=CALLBACK_URL

然后B网站根据授权码拿到令牌(token),返回给A的回调地址,A收到token后,就可以通过token获取到用户信息。

HTTP/1.1 200 OK
Content-Type: application/json
Cache-Control: no-store
Pragma: no-cache

{
  "access_token":"MTQ0NjJkZmQ5OTM2NDE1ZTZjNGZmZjI3",
  "token_type":"bearer",
  "expires_in":3600,
  "refresh_token":"IwOGYzYTlmM2YxOTQ5MGE3YmNmMDFkNTVk",
  "scope":"create delete"
}

回顾上面的调用流程,主要分为两个步骤:

  • 用户在客户端确认授权,获取授权码
  • 根据授权码获取令牌

为什么要先获取一个授权码,然后再根据授权码获取令牌呢?
因为获取授权码往往是在客户端上发起,如果第一步就获取到了令牌,那么攻击者很容易就窃取到令牌,所以获取令牌往往是放到服务端来做,这样的安全性更高。

隐藏式

隐藏式实质是授权码方式的一种缩略版,即直接是第一步就通过客户端访问获取到令牌,以上也说过这种方式的安全性很低,极可能存在中间人攻击的风险。
第一步,同样是客户端授权

https://b.com/oauth/authorize?
  response_type=token&amp;
  client_id=app37482284&amp;
  redirect_uri=https%3A%2F%2Fa.com%2Fcallback&amp;
  scope=read&amp;state=xcoiv98y2kd22vusuye3kch

第二步,直接回调到A网站地址,并携带令牌

https://a.com/callback#token=ACCESS_TOKEN

因为这种方式安全很低,已经不推荐使用,之所以出现这种方式,主要是因为最原始的应用不支持浏览器访问外部主机链接,所以采取了这种妥协的方式,后面由于Cross-Origin Resource Sharing (CORS)技术的出现,解决了这个问题,目前对于web服务推荐使用授权码方式,具体可参考:
《Why you should stop using the OAuth implicit grant!》(https://medium.com/oauth-2/why-you-should-stop-using-the-oauth-implicit-grant-2436ced1c926)

《Is the OAuth 2.0 Implicit Flow Dead?》(https://developer.okta.com/blog/2019/05/01/is-the-oauth-implicit-flow-dead)

密码式

密码式是单点登录用到比较多的一种场景。即客户端直接通过用户名和密码请求获取令牌。

先说说单点登录,比如我们登录了taobao.com网站,准备买一些东西,可看了半天没看到合适的,准备到tmall.com看看,淘宝和天猫都是阿里系的,既然都是一家的,完全可以复用taobao网站的登录状态。授权给tmall使用。
所以这就可以用到oauth2的密码方式,用户登录了一次,服务端把生成一个令牌,保存用户的信息,taobao和tmall都可以共享。

POST /oauth/token HTTP/1.1
Host: authorization-server.com
Content-type: application/x-www-form-urlencoded

https://taobao/token?grant_type=password
&amp;username=guestuser1
&amp;password=123654ww
&amp;client_id=xxxxxxxxxx

返回令牌信息

{
  "access_token": "ece8ec32-5855-48c5-838b-0d7f20c64d1c",
  "token_type": "bearer",
  "expires_in": 3600,
  "scope": "create"
}

grant_type:授权方式,password表示:密码式
username:用户名
password:密码

在获取令牌的时候,服务端会将令牌->用户信息保存起来,后面客户端直接通过令牌就可以获取到用户信息。

客户端凭证

凭证式实质也是授权码的一种缩略版,可以理解为授权码方式的第二步,获取令牌都在服务端进行

https://b.com/token?
  grant_type=client_credentials&amp;
  client_id=app37482284&amp;
 client_secret=X9gK6uB20sD1myc8cR

这种方式适用于没有前端的应用,主要是用于第三方应用在后端访问传递数据。

分布式事务seata的AT模式介绍

seata是阿里开源的一款分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,本文主要介绍AT模式的使用。

seata安装

下载seata服务,官方地址:https://github.com/seata/seata/releases
在Linux下,下载完成后,直接解压,通过命令安装即可:

sh ./bin/seata-server.sh

支持的启动参数

参数 全写 作用 备注
-h –host 指定在注册中心注册的 IP 不指定时获取当前的 IP,外部访问部署在云环境和容器中的 server 建议指定
-p –port 指定 server 启动的端口 默认为 8091
-m –storeMode 事务日志存储方式 支持file和db,默认为 file
-n –serverNode 用于指定seata-server节点ID ,如 1,2,3…, 默认为 1
-e –seataEnv 指定 seata-server 运行环境 如 dev, test 等, 服务启动时会使用 registry-dev.conf 这样的配置

如:

sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file

seata的AT模式介绍

AT模式实质是两阶段提交协议的演变,具体如下:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源
  • 二阶段:
    提交异步化,非常快速地完成。
    回滚通过一阶段的回滚日志进行反向补偿。

业务背景:
用户调用系统A的store服务,store服务调用系统B的company服务,company服务会新增一条数据,然后把companyId返回系统A,然后系统A通过companyId再新增一条store数据。

一般如果store服务执行失败了,直接抛异常了,所以company服务也不会执行,
但如果store服务执行成功了,已经写了一条数据到数据库,执行company服务时失败了,就会产生数据不一致的问题。

使用seata的AT模式,主要分为下面几个步骤:

  • 配置seata服务及创建事务表
  • 调用方配置(对应上面的store服务)
  • 服务提供方配置(对应上面的company服务)

配置seata服务及创建事务表

配置conf/file.conf文件

<code>## transaction log store, only used in server side
store {
  ## store mode: file、db
  mode = "db" //修改为db模式,标识事务信息用db存储
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://192.168.234.1:3306/seata?useUnicode=true&amp;characterEncoding=utf8&amp;useSSL=false&amp;&amp;serverTimezone=UTC" //修改数据库连接
    user = "seata" //修改数据库账号
    password = "123456" //修改数据库密码
    minConn = 5
    maxConn = 30
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
  }
}
## server configuration, only used in server side
service {
  #vgroup-&gt;rgroup
  vgroup_mapping.chuanzh_tx_group = "default" //chuanzh_tx_group为自定义的事务组名称,要和客户端配置保持一致
  #only support single node
  default.grouplist = "192.168.234.128:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

</code>

上面配置共修改了3个地方:

  1. 存储模式改为db模式,需要创建3张事务表,如下:
    <code>-- the table to store GlobalSession data
     CREATE TABLE IF NOT EXISTS `global_table`
     (
         `xid`                       VARCHAR(128) NOT NULL,
         `transaction_id`            BIGINT,
         `status`                    TINYINT      NOT NULL,
         `application_id`            VARCHAR(32),
         `transaction_service_group` VARCHAR(32),
         `transaction_name`          VARCHAR(128),
         `timeout`                   INT,
         `begin_time`                BIGINT,
         `application_data`          VARCHAR(2000),
         `gmt_create`                DATETIME,
         `gmt_modified`              DATETIME,
         PRIMARY KEY (`xid`),
         KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
         KEY `idx_transaction_id` (`transaction_id`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
     -- the table to store BranchSession data
     CREATE TABLE IF NOT EXISTS `branch_table`
     (
         `branch_id`         BIGINT       NOT NULL,
         `xid`               VARCHAR(128) NOT NULL,
         `transaction_id`    BIGINT,
         `resource_group_id` VARCHAR(32),
         `resource_id`       VARCHAR(256),
         `branch_type`       VARCHAR(8),
         `status`            TINYINT,
         `client_id`         VARCHAR(64),
         `application_data`  VARCHAR(2000),
         `gmt_create`        DATETIME(6),
         `gmt_modified`      DATETIME(6),
         PRIMARY KEY (`branch_id`),
         KEY `idx_xid` (`xid`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
     -- the table to store lock data
     CREATE TABLE IF NOT EXISTS `lock_table`
     (
         `row_key`        VARCHAR(128) NOT NULL,
         `xid`            VARCHAR(96),
         `transaction_id` BIGINT,
         `branch_id`      BIGINT       NOT NULL,
         `resource_id`    VARCHAR(256),
         `table_name`     VARCHAR(32),
         `pk`             VARCHAR(36),
         `gmt_create`     DATETIME,
         `gmt_modified`   DATETIME,
         PRIMARY KEY (`row_key`),
         KEY `idx_branch_id` (`branch_id`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
    </code>
  2. 修改数据库连接,注意如果你安装的是MySQL8,则需要修改MySQL8的驱动:driverClassName = “com.mysql.cj.jdbc.Driver”,不然会出现启动报错的问题,详细请参考:seata启动MySQL报错 #359(https://github.com/seata/seata-samples/issues/359)。
  3. 修改事务的组名,你也可以不修改,我这里使用的是:chuanzh_tx_group
  4. 创建业务事务表,记录业务需要回滚的数据,在分布式事务中,每个参与的业务数据库都需要添加对应的表
    <code>CREATE TABLE `undo_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(100) NOT NULL,
      `context` varchar(128) NOT NULL,
      `rollback_info` longblob NOT NULL,
      `log_status` int(11) NOT NULL,
      `log_created` datetime NOT NULL,
      `log_modified` datetime NOT NULL,
      `ext` varchar(100) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
    </code>

配置conf/registry.conf文件

<code>registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"  修改注册方式,微服务调用使用的是Eureka

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://192.168.234.1:8081/eureka"  //修改Eureka地址
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}
</code>

以上修改了使用Eureka方式注册,并配置了Eureka地址,启动MySQL、Eureka服务后,就可以启动seata服务了。

调用方配置(store-server)

maven配置,使用seata-spring-boot-starter,自动配置的方式,不需要再添加file.conf和register.conf文件

<code>    &lt;!--druid--&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;com.alibaba&lt;/groupId&gt;
        &lt;artifactId&gt;druid-spring-boot-starter&lt;/artifactId&gt;
        &lt;version&gt;${druid-spring-boot-starter.version}&lt;/version&gt;
    &lt;/dependency&gt;

    &lt;!--seata--&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;io.seata&lt;/groupId&gt;
        &lt;artifactId&gt;seata-spring-boot-starter&lt;/artifactId&gt;
        &lt;version&gt;1.2.0&lt;/version&gt;
    &lt;/dependency&gt;
</code>

application.properties配置:

<code>server.port=9090
spring.application.name=store-server

mybatis.type-aliases-package=com.chuanzh.model
mybatis.mapper-locations=classpath:mapper/*.xml

spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&amp;characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

#注意这里的事务组配置要和服务端一致
seata.tx-service-group=chuanzh_tx_group
seata.service.vgroup-mapping.chuanzh_tx_group=default
seata.service.grouplist.default=192.168.234.128:8091

logging.level.io.seata=DEBUG
## eureka
eureka.client.serviceUrl.defaultZone= http://localhost:8081/eureka/

</code>

数据源配置,因为seata是对数据库的datasource进行了接管和代理,所以在每个参与分布式事务的数据源都要进行如下配置:

<code>@Configuration
public class DataSourceConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        DruidDataSource druidDataSource = new DruidDataSource();
        return druidDataSource;
    }

    @Primary
    @Bean("dataSource")
    public DataSourceProxy dataSource(DataSource druidDataSource){
        return new DataSourceProxy(druidDataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy)throws Exception{
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:/mapper/*.xml"));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }

}
</code>

注意配置了数据源后,启动会出现循环依赖的问题,如下,

还需要在启动类排除dataSource自动配置,其它的解决方法,可以参考:集成fescar数据源循环依赖错误解决方案(https://blog.csdn.net/kangsa998/article/details/90042406)

<code>@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
</code>

配置请求拦截器,生成一个请求事务ID,用于在微服务中传递

<code>@Configuration
public class SeataRequestInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
        String xid = RootContext.getXID();
        if (StringUtils.isNotBlank(xid)) {
            //构建请求头
            requestTemplate.header("TX_XID", xid);
        }
    }
}
</code>

服务提供方配置(company-server)

maven、application.properties、数据源配置同调用方配置,区别主要是拦截器的配置,如下:

<code>@Slf4j
@Component
public class SeataHandlerInterceptor implements HandlerInterceptor {

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String xid = RootContext.getXID();
        String rpcXid = request.getHeader("TX_XID");
        //获取全局事务编号
        if(log.isDebugEnabled()) {
            log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
        }
        if(xid == null &amp;&amp; rpcXid != null) {
            //设置全局事务编号
            RootContext.bind(rpcXid);
            if(log.isDebugEnabled()) {
                log.debug("bind {} to RootContext", rpcXid);
            }
        }
        return true;
    }
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception e) {
        String rpcXid = request.getHeader("TX_XID");
        if(!StringUtils.isEmpty(rpcXid)) {
            String unbindXid = RootContext.unbind();
            if(log.isDebugEnabled()) {
                log.debug("unbind {} from RootContext", unbindXid);
            }

            if(!rpcXid.equalsIgnoreCase(unbindXid)) {
                log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
                if(unbindXid != null) {
                    RootContext.bind(unbindXid);
                    log.warn("bind {} back to RootContext", unbindXid);
                }
            }

        }
    }

}
</code>
<code>@Configuration
public class WebMvcConfiguration implements WebMvcConfigurer {

    @Autowired
    private SeataHandlerInterceptor seataHandlerInterceptor;

    public void addInterceptors(InterceptorRegistry registry) {
        //注册HandlerInterceptor,拦截所有请求
        registry.addInterceptor(seataHandlerInterceptor).addPathPatterns(new String[]{"/**"});
    }

}
</code>

添加全局事务注解

在服务调用方的方法上添加@GlobalTransactional注解,下面模拟了一种场景,如果companyId为偶数,则会抛异常。

<code>    @GlobalTransactional(rollbackFor = Exception.class)
    public void create(StoreEntity storeEntity) throws Exception {
        CompanyEntity companyEntity = new CompanyEntity();
        companyEntity.setName(storeEntity.getName());
        companyEntity = companyFeign.createCompany(companyEntity);

        /**
         * 模拟异常
         */
        if (companyEntity.getId() % 2 == 0) {
            throw new Exception();
        }

        /** 写入store数据 */
        storeEntity.setCompanyId(companyEntity.getId());
        storeMapper.insert(storeEntity);
    }
</code>

经过测试,companyFeign.createCompany服务调用后会先向数据库写一条数据,当create方法执行抛异常,就会事务回滚,删除掉原先的company数据

MySQL事务隔离级别

对于数据库的隔离级别之前一直没有做详细整理,最近项目运行中发现了一个问题,所以抽时间对这块认真研究了下

业务场景:
服务A在处理流程中,会调用外部服务B,然后写入一条数据,服务B执行完成后,会回调服务C的接口更新服务A写入的数据。
问题:
在服务B回调服务C的时候总是找不到服务A写入的数据,在服务C中添加延时重试,问题依然存在,但此时查看数据库,对应的数据是已经存在。

先说原因吧,是因为MySQL的事务默认隔离级别是:可重复读。
在服务A调用服务B后,还没有写入数据到数据库,服务B就已经回调服务C了,服务C此时肯定是找不到对应的数据的,由于MySQL默认隔离级别是可重复读(即在一个事务中,对于同一份数据读取都是一样的),所以即使服务A已经写入了数据,服务C依然读取不到。

解决方案:
在服务C中的查询,不要放到一个事务里面,单独提取一个方法,后面的更新逻辑放到同一个事务中

什么是事务?

数据库事务是数据库管理系统执行过程中的一个逻辑单位,由一个有限的数据库操作序列构成。
比如:某人在商店购买100商品,其中包括两个操作:
1.该人账户减少100元
2.商店账户增加100元
这两个操作要么同时执行成功,要么同时执行失败。

数据库事务的ACID性质

  • Atomic:原子性,将所有SQL作为原子工作单元执行,要么全部执行,要么全部不执行;
  • Consistent:一致性,事务完成后,所有数据的状态都是一致的,即该人的账户减少了100,商店的账户必须增加100
  • Isolation:隔离性,比如两个人从同一个账户取款,这两个事务对数据的修改必须相互隔离,具体隔离策略后面具体讲解。
  • Duration:持久性,事务完成后,对数据库数据的修改必须被持久化存储。

数据库的隔离级别

在单个事务中,不需要做隔离,所谓数据库隔离级别是针对在并发事务的情况下,解决导致的一系列问题,这些问题包括:脏读、不可重复读、幻读,具体隔离级别如下图:

隔离级别 脏读 不可重复读 幻读
SERIALIZABLE(串行化) 避免 避免 避免
REPEATABLE READ(可重复读) 避免 避免 允许
READ COMMITED(读已提交) 避免 允许 允许
READ UNCOMMITED(读未提交) 允许 允许 允许

SERIALIZABLE(串行化)

当两个事务同时操作数据库中相同数据时,如果第一个事务已经在访问该数据,第二个事务只能停下来等待,必须等到第一个事务结束后才能恢复运行。因此这两个事务实际上是串行化方式运行。

REPEATABLE READ(可重复读)

一个事务在执行过程中可以看到其他事务已经提交的新插入的记录,但是不能看到其他事务对已有记录的更新。

READ COMMITTED(读已提交数据)

一个事务在执行过程中可以看到其他事务已经提交的新插入的记录,而且还能看到其他事务已经提交的对已有记录的更新。

READ UNCOMMITTED(读未提交数据)

一个事务在执行过程中可以看到其他事务没有提交的新插入的记录,而且还能看到其他事务没有提交的对已有记录的更新。

没有事务隔离级别导致的问题

如果没有数据库的隔离级别,数据库的数据是实时变化的,即每个事务都可以读到其它事务修改后的数据,下面结合实例介绍每个场景的问题。

脏读

定义:读到未提交更新的数据

时间点 事务1 事务2
T1 开始事务
T2 开始事务
T3 查询账户余额为1000
T4 取出100后金额为900
T5 查询账户金额为900(脏读)
T6 撤销事务,余额恢复为1000
T7 存入100元后,金额变为1000
T8 提交事务

如上事务1取出金额100后又回滚了,即啥都没做,但事务2存入了100,但最终的金额确还是1000,正确应该是1100。
在T5时间节点出现了脏读,如果数据库配置了隔离级别为SERIALIZABLE、REPEATABLE READ、READ COMMITTED,在事务1没有提交的时候,事务2读取的都是原来的值就不会出现问题。

不可重复读

定义:在同一个数据中,两次读取到的数据不一致,读到了其他数据提交更新的数据

时间点 事务1 事务2
T1 开始事务
T2 开始事务
T3 查询账户余额为1000
T4 查询账户余额为1000
T5 取出100后金额为900
T6 提交事务
T7 查询账户余额为900(与T4读取的不一致)

事务2的两次读取到的数据不一致,第二次读取到了事务1提交的数据

幻读

定义:读取到另一个事务已提交插入或删除的数据。

时间点 事务1 事务2
T1 开始事务
T2 开始事务
T3 统计一年级1班所有的学生人数为40人
T4 一年级1班新增一名学生
T5 提交事务
T6 再次统计一年级1班的所有学生人数为41人

事务2第一次统计人数为40人,第二次统计为41人,两次统计的结果不一致,同样如果T4时间节点转走一名学生,也会出现不一致

不可重复读和幻读看起来比较类似,都是一个事务里面读取到两次不同的结果
本质的区别是:不可重复读是由于数据更新导致数据不一致导致,幻读是由于插入或删除了数据导致的。