Sentinel流量控制介绍

1. 背景

  • 比如双11,秒杀等,系统访问量远远超出系统所能处理的并发数,需对系统进行保护。
  • 在微服务架构中,服务拆分粒度较细,会出现请求链路较长的情况,如果链路中某个服务因网络延迟或者请求超时等原因不可用,会导致当前请求阻塞,可能出现请求堆积从而导致雪崩效应。
  • 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、
  • 系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性

2. 常见的限流场景

●  在Nginx 层添加限流模块限制平均访问速度
●  通过设置数据库连接池,线程池的大小来限制总的并发数
●  通过guava 提供的Ratelimiter 限制接口的访问速度
●  TCP通信协议中的限流整形
 

3. 限流算法

3.1、 计数器算法

         计数器算法是一种比较简单的限流实现算法,在指定的周期内累加访问次数,当访问次数达到设定的阈值时,触发限流策略,当进入下一个时间周期时进行访问数的清零。

优点:实现简单
缺点:临界问题:如图所示,当在8-10秒和10-12秒内分别并发500,虽然没有超过阈值,但如果算8-12秒,则并发数高达1000,已经超过了原先定义的10秒内不超过500的并发量
 

3.2、 滑动窗口算法

为了解决计数器算法带来的临界问题,所以引入了滑动窗口算法。
简单来说,滑动窗口算法原理是在固定窗口汇总分割出多个小时间窗口,分别在每个小时间窗口中记录访问次数,然后根据时间将窗口往前滑动并删除过期的小时间窗口,最终只需要统计滑动窗口时间范围内的所有小时窗口的总的计数即可。
如图所示,最终只需要统计每个小时间窗口不超过阈值/n && 在滑动窗口范围内的所有的小时间窗口总的计数不超过阈值即可

优点:实现相对简单,且没有计数器算法的临界问题
缺点:无法应对短时间高并发(突刺现象)
 

3.3、 令牌桶限流算法

令牌桶是网络流量整形和速率限制中最常使用的一种算法,对于每一个请求,都需要从令牌桶中获得一个令牌,如果没有获得令牌,则需要触发限流策略。

基本过程:

1)每进来一个请求,都在桶里获取一个令牌
2)如果有令牌,则拿着令牌通过
3)如果没有令牌,则不允许请求通过

几种情况:
请求速度 > 令牌生成速度:当令牌被取空后会被限流
请求速度 = 令牌生成速度:流量处于平稳状态
请求速度 < 令牌生成速度:说明此时并发数不高,请求能被正常处理
优点:可以像漏桶那样匀速,也可以像计数器那样处理突发流量
 

3.4、  漏桶限流算法

漏桶限流算法的主要作用是控制数据注入网络的速度,平滑网络上的突发流量
漏桶算法的原理:在漏桶内部同样维护一个容器,这个容器会以恒定的速度出水,不管上面的水流速度多快,漏桶水滴的流出速度始终保持不变,实际上消息中间件就使用了漏桶限流的思想。
 

漏桶算法中,有如下几种可能的情况:
●  请求速度大于漏桶流出速度,也就是请求数超出当前服务所能处理的极限,将会触发限流策略
●  请求速度小于或者等于漏桶流出的速度,也就是服务处理能力整合满足客户端的请求量,将正常执行。
不足:无法应对突发的并发流量,因为流出速率一直都是恒定的
优点:平滑系统流量
 

Sentinel介绍

       Sentinel是阿里开源的项目,是面向分布式架构的轻量级流量控制组件,主要以流量为切入点,从限流,流量整形,服务降级,系统负载保护等多个维度来帮助我们保障微服务的稳定性
Sentinel 分2 部分
核心库:不依赖任何框架/库,能够运行于所有的java 环境,对Dubbo ,Spring Cloud 等框架有较好的支持。
控制台:基于Spring boot 开发,打包后可以直接运行,不需要额外的tomcat 等应用部署。
 

5、Sentinel 流量控制

5.1、整体步骤

  • 定义资源
  • 定义限流规则
  • 检验规则是否生效
 

5.2、资源定义方式

方式1:抛出异常的方式定义资源
SphU 包含了 try-catch 风格的 API。用这种方式,当资源发生了限流之后会抛出 BlockException。这个时候可以捕捉异常,进行限流之后的逻辑处理。示例代码如下:
// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
Entry entry = null;
try {
    // 资源名可使用任意有业务语义的字符串
    entry = SphU.entry("自定义资源名");
    // 被保护的业务逻辑
    // do something...
} catch (BlockException e1) {
    // 资源访问阻止,被限流或被降级
    // 进行相应的处理操作
} finally {
    if (entry != null) {
        entry.exit();
    }
}

注意: 
1、SphU.entry(xxx) 需要与 entry.exit() 方法成对出现,匹配调用,否则会导致调用链记录异常,抛出 ErrorEntryFreeException 异常。
2、务必保证finally会被执行

方式2:注解方式定义资源
Sentinel 支持通过 @SentinelResource 注解定义资源并配置 blockHandler 和 fallback 函数来进行限流之后的处理。示例:

@SentinelResource(blockHandler = "blockHandlerForGetUser")
public User getUserById(String id) {
        throw new RuntimeException("getUserById command failed");
}
// blockHandler 函数,原方法调用被限流/降级/系统保护的时候调用
public User blockHandlerForGetUser(String id, BlockException ex) {
        return new User("admin");
}

注意
1、blockHandler 函数会在原方法被限流/降级/系统保护的时候调用,而 fallback 函数会针对所有类型的异常。请注意 blockHandler 和 fallback 函数的形式要求
2、需注意:blockHandler 所配置的值会在触发限流之后调用,这个方法定义必须和原始方法的返回值,参数保持一致,而且需要增加BlockException 参数。

5.3、限流具体实现

5.3.1. 引入 Sentinel 依赖

添加依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.4</version>
</dependency>

5.3.2. 定义资源

资源 是 Sentinel 中的核心概念之一。最常用的资源是我们代码中的 Java 方法,也可以更灵活的定义你的资源,例如,把需要控制流量的代码用 Sentinel API SphU.entry(“HelloWorld”) 和 entry.exit() 包围起来即可。

    在下面的例子中,我们将 System.out.println(“hello world”); 作为资源(被保护的逻辑),用 API 包装起来。参考代码如下:

public static void main(String[] args) {
    // 配置规则.
    initFlowRules();
    while (true) {
        try (Entry entry = SphU.entry("HelloWorld")) {
        // 被保护的逻辑
            System.out.println("hello world");
        } catch (BlockException ex) {
            // 处理被流控的逻辑
            System.out.println("blocked!");
        }
    }
}

完成以上两步后,代码端的改造就完成了。也可以通过我们提供的 注解,来定义我们的资源,类似于下面的代码:

@SentinelResource("HelloWorld")
public void helloWorld() {
    // 资源中的逻辑
    System.out.println("hello world");
}

这样,helloWorld() 方法就成了我们的一个资源。

5.3.3. 定义规则

通过流控规则来指定允许该资源通过的请求次数,例如下面的代码定义了资源 HelloWorld 每秒最多只能通过 20 个请求。

private static void initFlowRules(){
    List<FlowRule> rules = new ArrayList<>();
    FlowRule rule = new FlowRule();
    rule.setResource("HelloWorld");
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule.setCount(20);
    rules.add(rule);
    FlowRuleManager.loadRules(rules);
}

完成上面 3 步,Sentinel 就能够正常工作了。

5.3.4. 检查效果

Demo 运行之后,我们可以在日志 ~/logs/csp/${appName}-metrics.log.xxx 里看到下面的输出:

其中 p 代表通过的请求, block 代表被阻止的请求, s 代表成功执行完成的请求个数, e 代表用户自定义的异常, rt 代表平均响应时长。

可以看到,这个程序每秒稳定输出 “hello world” 20 次,和规则中预先设定的阈值是一样的。

6、Sentinel 熔断

6.1、背景

1、现代微服务架构都是分布式的,由非常多的服务组成,除了流量控制以外,对调用链路中不稳定的资源进行熔断降级也是保障高可用的重要措施之一。
2、分布式系统中,不同服务之间相互调用,组成复杂的调用链路,复杂链路上的某一环不稳定,可能会层层级联导致整体的雪崩 。

6.2、熔断策略

Sentinel 提供以下几种熔断策略:

  • 慢调用比例 (SLOW_REQUEST_RATIO):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。
  • 异常比例 (ERROR_RATIO):当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是 [0.0, 1.0],代表 0% – 100%。
  • 异常数 (ERROR_COUNT):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。

注意
 1、@SentinelResource 注解会自动统计业务异常,无需手动调用。
2、异常降级仅针对业务异常,对 Sentinel 限流降级本身的异常(BlockException)不生效。为了统计异常比例或异常数,需要通过 Tracer.trace(ex) 记录业务异常。示例:

Entry entry = null;
try {
    entry = SphU.entry(resource);

} catch (Throwable t) {
    if (!BlockException.isBlockException(t)) {
        Tracer.trace(t);
    }
} finally {
    if (entry != null) {
        entry.exit();
    }
}

熔断降级规则

熔断降级规则(DegradeRule)包含下面几个重要的属性:

Field说明默认值
resource资源名,即规则的作用对象
grade熔断策略,支持慢调用比例/异常比例/异常数策略慢调用比例
count慢调用比例模式下为慢调用临界 RT(超出该值计为慢调用);异常比例/异常数模式下为对应的阈值
timeWindow熔断时长,单位为 s
minRequestAmount熔断触发的最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入)5
statIntervalMs统计时长(单位为 ms),如 60*1000 代表分钟级(1.8.0 引入)1000 ms
slowRatioThreshold慢调用比例阈值,仅慢调用比例模式有效(1.8.0 引入)

分布式事务seata的AT模式介绍

seata是阿里开源的一款分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,本文主要介绍AT模式的使用。

seata安装

下载seata服务,官方地址:https://github.com/seata/seata/releases
在Linux下,下载完成后,直接解压,通过命令安装即可:

sh ./bin/seata-server.sh

支持的启动参数

参数 全写 作用 备注
-h –host 指定在注册中心注册的 IP 不指定时获取当前的 IP,外部访问部署在云环境和容器中的 server 建议指定
-p –port 指定 server 启动的端口 默认为 8091
-m –storeMode 事务日志存储方式 支持file和db,默认为 file
-n –serverNode 用于指定seata-server节点ID ,如 1,2,3…, 默认为 1
-e –seataEnv 指定 seata-server 运行环境 如 dev, test 等, 服务启动时会使用 registry-dev.conf 这样的配置

如:

sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file

seata的AT模式介绍

AT模式实质是两阶段提交协议的演变,具体如下:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源
  • 二阶段:
    提交异步化,非常快速地完成。
    回滚通过一阶段的回滚日志进行反向补偿。

业务背景:
用户调用系统A的store服务,store服务调用系统B的company服务,company服务会新增一条数据,然后把companyId返回系统A,然后系统A通过companyId再新增一条store数据。

一般如果store服务执行失败了,直接抛异常了,所以company服务也不会执行,
但如果store服务执行成功了,已经写了一条数据到数据库,执行company服务时失败了,就会产生数据不一致的问题。

使用seata的AT模式,主要分为下面几个步骤:

  • 配置seata服务及创建事务表
  • 调用方配置(对应上面的store服务)
  • 服务提供方配置(对应上面的company服务)

配置seata服务及创建事务表

配置conf/file.conf文件

<code>## transaction log store, only used in server side
store {
  ## store mode: file、db
  mode = "db" //修改为db模式,标识事务信息用db存储
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://192.168.234.1:3306/seata?useUnicode=true&amp;characterEncoding=utf8&amp;useSSL=false&amp;&amp;serverTimezone=UTC" //修改数据库连接
    user = "seata" //修改数据库账号
    password = "123456" //修改数据库密码
    minConn = 5
    maxConn = 30
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
  }
}
## server configuration, only used in server side
service {
  #vgroup-&gt;rgroup
  vgroup_mapping.chuanzh_tx_group = "default" //chuanzh_tx_group为自定义的事务组名称,要和客户端配置保持一致
  #only support single node
  default.grouplist = "192.168.234.128:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

</code>

上面配置共修改了3个地方:

  1. 存储模式改为db模式,需要创建3张事务表,如下:
    <code>-- the table to store GlobalSession data
     CREATE TABLE IF NOT EXISTS `global_table`
     (
         `xid`                       VARCHAR(128) NOT NULL,
         `transaction_id`            BIGINT,
         `status`                    TINYINT      NOT NULL,
         `application_id`            VARCHAR(32),
         `transaction_service_group` VARCHAR(32),
         `transaction_name`          VARCHAR(128),
         `timeout`                   INT,
         `begin_time`                BIGINT,
         `application_data`          VARCHAR(2000),
         `gmt_create`                DATETIME,
         `gmt_modified`              DATETIME,
         PRIMARY KEY (`xid`),
         KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
         KEY `idx_transaction_id` (`transaction_id`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
     -- the table to store BranchSession data
     CREATE TABLE IF NOT EXISTS `branch_table`
     (
         `branch_id`         BIGINT       NOT NULL,
         `xid`               VARCHAR(128) NOT NULL,
         `transaction_id`    BIGINT,
         `resource_group_id` VARCHAR(32),
         `resource_id`       VARCHAR(256),
         `branch_type`       VARCHAR(8),
         `status`            TINYINT,
         `client_id`         VARCHAR(64),
         `application_data`  VARCHAR(2000),
         `gmt_create`        DATETIME(6),
         `gmt_modified`      DATETIME(6),
         PRIMARY KEY (`branch_id`),
         KEY `idx_xid` (`xid`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
     -- the table to store lock data
     CREATE TABLE IF NOT EXISTS `lock_table`
     (
         `row_key`        VARCHAR(128) NOT NULL,
         `xid`            VARCHAR(96),
         `transaction_id` BIGINT,
         `branch_id`      BIGINT       NOT NULL,
         `resource_id`    VARCHAR(256),
         `table_name`     VARCHAR(32),
         `pk`             VARCHAR(36),
         `gmt_create`     DATETIME,
         `gmt_modified`   DATETIME,
         PRIMARY KEY (`row_key`),
         KEY `idx_branch_id` (`branch_id`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
    </code>
  2. 修改数据库连接,注意如果你安装的是MySQL8,则需要修改MySQL8的驱动:driverClassName = “com.mysql.cj.jdbc.Driver”,不然会出现启动报错的问题,详细请参考:seata启动MySQL报错 #359(https://github.com/seata/seata-samples/issues/359)。
  3. 修改事务的组名,你也可以不修改,我这里使用的是:chuanzh_tx_group
  4. 创建业务事务表,记录业务需要回滚的数据,在分布式事务中,每个参与的业务数据库都需要添加对应的表
    <code>CREATE TABLE `undo_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(100) NOT NULL,
      `context` varchar(128) NOT NULL,
      `rollback_info` longblob NOT NULL,
      `log_status` int(11) NOT NULL,
      `log_created` datetime NOT NULL,
      `log_modified` datetime NOT NULL,
      `ext` varchar(100) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
    </code>

配置conf/registry.conf文件

<code>registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"  修改注册方式,微服务调用使用的是Eureka

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://192.168.234.1:8081/eureka"  //修改Eureka地址
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}
</code>

以上修改了使用Eureka方式注册,并配置了Eureka地址,启动MySQL、Eureka服务后,就可以启动seata服务了。

调用方配置(store-server)

maven配置,使用seata-spring-boot-starter,自动配置的方式,不需要再添加file.conf和register.conf文件

<code>    &lt;!--druid--&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;com.alibaba&lt;/groupId&gt;
        &lt;artifactId&gt;druid-spring-boot-starter&lt;/artifactId&gt;
        &lt;version&gt;${druid-spring-boot-starter.version}&lt;/version&gt;
    &lt;/dependency&gt;

    &lt;!--seata--&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;io.seata&lt;/groupId&gt;
        &lt;artifactId&gt;seata-spring-boot-starter&lt;/artifactId&gt;
        &lt;version&gt;1.2.0&lt;/version&gt;
    &lt;/dependency&gt;
</code>

application.properties配置:

<code>server.port=9090
spring.application.name=store-server

mybatis.type-aliases-package=com.chuanzh.model
mybatis.mapper-locations=classpath:mapper/*.xml

spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&amp;characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

#注意这里的事务组配置要和服务端一致
seata.tx-service-group=chuanzh_tx_group
seata.service.vgroup-mapping.chuanzh_tx_group=default
seata.service.grouplist.default=192.168.234.128:8091

logging.level.io.seata=DEBUG
## eureka
eureka.client.serviceUrl.defaultZone= http://localhost:8081/eureka/

</code>

数据源配置,因为seata是对数据库的datasource进行了接管和代理,所以在每个参与分布式事务的数据源都要进行如下配置:

<code>@Configuration
public class DataSourceConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        DruidDataSource druidDataSource = new DruidDataSource();
        return druidDataSource;
    }

    @Primary
    @Bean("dataSource")
    public DataSourceProxy dataSource(DataSource druidDataSource){
        return new DataSourceProxy(druidDataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy)throws Exception{
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:/mapper/*.xml"));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }

}
</code>

注意配置了数据源后,启动会出现循环依赖的问题,如下,

还需要在启动类排除dataSource自动配置,其它的解决方法,可以参考:集成fescar数据源循环依赖错误解决方案(https://blog.csdn.net/kangsa998/article/details/90042406)

<code>@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
</code>

配置请求拦截器,生成一个请求事务ID,用于在微服务中传递

<code>@Configuration
public class SeataRequestInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
        String xid = RootContext.getXID();
        if (StringUtils.isNotBlank(xid)) {
            //构建请求头
            requestTemplate.header("TX_XID", xid);
        }
    }
}
</code>

服务提供方配置(company-server)

maven、application.properties、数据源配置同调用方配置,区别主要是拦截器的配置,如下:

<code>@Slf4j
@Component
public class SeataHandlerInterceptor implements HandlerInterceptor {

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String xid = RootContext.getXID();
        String rpcXid = request.getHeader("TX_XID");
        //获取全局事务编号
        if(log.isDebugEnabled()) {
            log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
        }
        if(xid == null &amp;&amp; rpcXid != null) {
            //设置全局事务编号
            RootContext.bind(rpcXid);
            if(log.isDebugEnabled()) {
                log.debug("bind {} to RootContext", rpcXid);
            }
        }
        return true;
    }
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception e) {
        String rpcXid = request.getHeader("TX_XID");
        if(!StringUtils.isEmpty(rpcXid)) {
            String unbindXid = RootContext.unbind();
            if(log.isDebugEnabled()) {
                log.debug("unbind {} from RootContext", unbindXid);
            }

            if(!rpcXid.equalsIgnoreCase(unbindXid)) {
                log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
                if(unbindXid != null) {
                    RootContext.bind(unbindXid);
                    log.warn("bind {} back to RootContext", unbindXid);
                }
            }

        }
    }

}
</code>
<code>@Configuration
public class WebMvcConfiguration implements WebMvcConfigurer {

    @Autowired
    private SeataHandlerInterceptor seataHandlerInterceptor;

    public void addInterceptors(InterceptorRegistry registry) {
        //注册HandlerInterceptor,拦截所有请求
        registry.addInterceptor(seataHandlerInterceptor).addPathPatterns(new String[]{"/**"});
    }

}
</code>

添加全局事务注解

在服务调用方的方法上添加@GlobalTransactional注解,下面模拟了一种场景,如果companyId为偶数,则会抛异常。

<code>    @GlobalTransactional(rollbackFor = Exception.class)
    public void create(StoreEntity storeEntity) throws Exception {
        CompanyEntity companyEntity = new CompanyEntity();
        companyEntity.setName(storeEntity.getName());
        companyEntity = companyFeign.createCompany(companyEntity);

        /**
         * 模拟异常
         */
        if (companyEntity.getId() % 2 == 0) {
            throw new Exception();
        }

        /** 写入store数据 */
        storeEntity.setCompanyId(companyEntity.getId());
        storeMapper.insert(storeEntity);
    }
</code>

经过测试,companyFeign.createCompany服务调用后会先向数据库写一条数据,当create方法执行抛异常,就会事务回滚,删除掉原先的company数据

SpringCloud链路追踪

Spring Cloud链路追踪

接着上一篇的文章,今天讲讲spring cloud在分布式系统中的链路跟踪,主要使用的是zipkin框架实现的,上篇文章写道了有一个注册中心Eureka,和两个服务方,一个消费方,我们的消费方也可以做了一个服务,注册到Eureka中,所以我们对消费方也添加EurekaClient和zipkin的maven依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

启动类添加@EurekaClient注解,同样服务方也要添加zipkin的maven依赖

zipkin介绍

Zipkin 是一个开放源代码分布式的跟踪系统,由Twitter公司开源,它致力于收集服务的定时数据,以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现,架构如下:

5ec1521a-26b4-11e7-9679-8c429afdbe0c

每个服务向zipkin报告计时数据,zipkin会根据调用关系通过Zipkin UI生成依赖关系图,显示了多少跟踪请求通过每个服务,该系统让开发者可通过一个 Web 前端轻松的收集和分析数据,例如用户每次请求服务的处理时间等,可方便的监测系统中存在的瓶颈。

Zipkin提供了可插拔数据存储方式:In-Memory、MySql、Cassandra以及Elasticsearch。Zipkin默认是使用http+内存传输和收集,在并发量比较大会影响效率,下面我们我们通过Kafka+ElasticSearch实现服务的传输与收集

创建ZipKin服务

新建一个模块,我们称为zipkinserver,添加下面的依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
   <groupId>io.zipkin.java</groupId>
   <artifactId>zipkin-server</artifactId>
</dependency>
<dependency>
   <groupId>io.zipkin.java</groupId>
   <artifactId>zipkin-autoconfigure-ui</artifactId>
</dependency>

在启动类,添加如下注解:

@SpringBootApplication
@EnableEurekaClient
@EnableZipkinServer
public class ZipkinServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ZipkinServerApplication, args);
    }

}

修改application.yml配置文件,添加kafka收集和ElasticSearch存储,

zipkin:
  storage:
    type: elasticsearch
    elasticsearch:
      hosts: localhost:9300
      index: zipkin

  collector:
    kafka:
      zookeeper: localhost:2181
      topic: zipkin
      groupId: zipkin

然后启动服务,zipkin的默认端口是9494,访问地址:http://localhost:9494

修改服务方和消费方的application.yml,添加zipkin的地址,kafka收集地址

spring: 
  zipkin:
    base-url: http://localhost:9411
    kafka:
      topic: zipkin
  kafka:
    bootstrap-servers: localhost:9092

  sleuth:
    sampler:
      percentage: 1.0

zipkin只有在接口调用后,才会产生数据的调用情况,所以我们先访问消费方的接口,然后再打开zipkin的界面,可以看到dynamic-service和feign的调用关系及耗时情况

31DADE5B71CF7F9EE33D80AE6B097E57 64043E75E8489933DFB3E2FA03A5AF9A

SpringCloud服务注册与发现

Spring Cloud服务注册与发现

Spring Cloud集成了搭建分布式服务一系列框架,如服务注册与发现Eureka,熔断器Hystrix,路由网关Zuul,链路追踪zipkin,今天主要讲解Eureka的使用。

Eureka是什么?

Eureka是Netflix开源的一款提供服务注册和发现的产品,它提供了完整的Service Registry和Service Discovery实现。也是springcloud体系中最重要最核心的组件之一,我们通过下面这样图就可以了解

48EB8D2E311BF36563200EF5B0015EB6

1)服务提供方向Eureka注册自己的服务,

2)消费者向Eureka获取自己需要的服务,和提供方建立连接

3) 如果服务方出现故障,Eureka会自动将服务方从注册列表中删除

搭建项目

创建Eureka服务

首先创建一个Maven项目,指定spring boot,spring cloud 版本

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.1.RELEASE</version>
    <relativePath/>
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Finchley.SR2</spring-cloud.version>
</properties>

创建一个模块,我们称为EurekaServer,使用Eureka只需要引入maven包,然后启动项目就可以了,很方面,如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
    </dependency>
</dependencies>

配置application.yml文件

server:
  port: 8081

eureka:
  instance:
    hostname: localhost
  client:
    registerWithEureka: false
    fetchRegistry: false
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

spring:
  application:
    name: eurka-server

添加注解@EnableEurekaServer,并启动EurekaServer

@SpringBootApplication
@EnableEurekaServer
public class EurakaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurakaServerApplication.class, args);
    }
}

启动EurekaServer,地址为:http://localhost:8081/eureka

创建提供方服务

添加maven依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

创建服务接口

@RestController
public class AirportController {

    @Autowired
    private AirportService airportService;

    @RequestMapping("/getAirport")
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode) {
        return airportService.getAirport(threeCode);
    }

}

@Service
public class AirportService {

    @Value("${server.port}")
    private int port;

    public AirportBean getAirport(String threeCode) {
        AirportBean bean = new AirportBean();
        bean.setName("北京首都国际机场");
        bean.setThreeCode(threeCode);
        bean.setPort(port);
        return bean;
    }

}

public class AirportBean {

    private String threeCode;
    private String name;
    private int port;
}

修改application.yml文件

<code>server:
  port: 8082

spring:
  application:
    name: dynamic-service

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8081/eureka/</code>

添加@EnableEurekaClient注解,这里我们为了方便演示负载均衡,同时也启动了两个实例,端口分别为8082,8083

@SpringBootApplication
@EnableEurekaClient
public class DynamicServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(DynamicServiceApplication.class, args);
    }
}

创建服务消费方

我们再项目下再新建一个模块,称为springcloudclient,添加maven依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

这里我们使用了feign的服务调用方式,Spring cloud有两种服务调用方式,一种是ribbon+restTemplate,另一种是feign,ribbon类似一种rest风格的API调用方式,而feign整合了ribbon,具有负载均衡的能力,通过注解的方式,使代码看起来更加简洁,另外feign整合了Hystrix,具有熔断的能力

调用服务方的接口

@RestController
public class AirportFeignController {

    @Autowired
    private AirportFeignService airportFeignService;

    @RequestMapping(value = "/getAirport",method = RequestMethod.GET)
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode) {
        return airportFeignService.getAirport(threeCode);
    }

}

@FeignClient(value = "dynamic-service", fallback = AirportFeignFallbackService.class)
public interface AirportFeignService {

    @RequestMapping(value = "/getAirport",method = RequestMethod.GET)
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode);

}

// 服务失败后熔断,调用的方法
public class AirportFeignFallbackService implements AirportFeignService {
    @Override
    public AirportBean getAirport(String threeCode) {
        return null;
    }
}

public class AirportBean {
    private String threeCode;
    private String name;
    private int port;
}

配置application.yml文件

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
server:
  port: 8084
spring:
  application:
    name: service-feign

添加@ EnableEurekaClient,@EnableDiscoveryClient, @EnableFeignClients注解,端口为8084,

@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableFeignClients
public class SpringCloudServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudServerApplication.class, args);
    }
}

好了下面可以演示springcloud的服务注册与发现了,通过上面的例子,我们启动了Eureka服务,分别为:8081,同时启动了两个服务提供方,注册到Eureka中,端口分别为8082和8083,接着我们启动了一个服务消费方,端口为8084,我们分别启动他们
打开Eureka的服务页面:http://localhost:8081

55AD7F4965A098E135257B0B04BBF3B6

可以发现有两个服务方已经注册上了,我们调用消费方的接口,发现消费方会使用负载均衡的方式分别访问服务方

 

有道词典

org.springframe …

详细X

  org.springframework.boot   spring-boot-starter-parent   2.1.1.RELEASE      utf – 8   utf – 8   1.8   Finchley.SR2