Sentinel流量控制介绍

1. 背景

  • 比如双11,秒杀等,系统访问量远远超出系统所能处理的并发数,需对系统进行保护。
  • 在微服务架构中,服务拆分粒度较细,会出现请求链路较长的情况,如果链路中某个服务因网络延迟或者请求超时等原因不可用,会导致当前请求阻塞,可能出现请求堆积从而导致雪崩效应。
  • 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、
  • 系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性

2. 常见的限流场景

●  在Nginx 层添加限流模块限制平均访问速度
●  通过设置数据库连接池,线程池的大小来限制总的并发数
●  通过guava 提供的Ratelimiter 限制接口的访问速度
●  TCP通信协议中的限流整形
 

3. 限流算法

3.1、 计数器算法

         计数器算法是一种比较简单的限流实现算法,在指定的周期内累加访问次数,当访问次数达到设定的阈值时,触发限流策略,当进入下一个时间周期时进行访问数的清零。

优点:实现简单
缺点:临界问题:如图所示,当在8-10秒和10-12秒内分别并发500,虽然没有超过阈值,但如果算8-12秒,则并发数高达1000,已经超过了原先定义的10秒内不超过500的并发量
 

3.2、 滑动窗口算法

为了解决计数器算法带来的临界问题,所以引入了滑动窗口算法。
简单来说,滑动窗口算法原理是在固定窗口汇总分割出多个小时间窗口,分别在每个小时间窗口中记录访问次数,然后根据时间将窗口往前滑动并删除过期的小时间窗口,最终只需要统计滑动窗口时间范围内的所有小时窗口的总的计数即可。
如图所示,最终只需要统计每个小时间窗口不超过阈值/n && 在滑动窗口范围内的所有的小时间窗口总的计数不超过阈值即可

优点:实现相对简单,且没有计数器算法的临界问题
缺点:无法应对短时间高并发(突刺现象)
 

3.3、 令牌桶限流算法

令牌桶是网络流量整形和速率限制中最常使用的一种算法,对于每一个请求,都需要从令牌桶中获得一个令牌,如果没有获得令牌,则需要触发限流策略。

基本过程:

1)每进来一个请求,都在桶里获取一个令牌
2)如果有令牌,则拿着令牌通过
3)如果没有令牌,则不允许请求通过

几种情况:
请求速度 > 令牌生成速度:当令牌被取空后会被限流
请求速度 = 令牌生成速度:流量处于平稳状态
请求速度 < 令牌生成速度:说明此时并发数不高,请求能被正常处理
优点:可以像漏桶那样匀速,也可以像计数器那样处理突发流量
 

3.4、  漏桶限流算法

漏桶限流算法的主要作用是控制数据注入网络的速度,平滑网络上的突发流量
漏桶算法的原理:在漏桶内部同样维护一个容器,这个容器会以恒定的速度出水,不管上面的水流速度多快,漏桶水滴的流出速度始终保持不变,实际上消息中间件就使用了漏桶限流的思想。
 

漏桶算法中,有如下几种可能的情况:
●  请求速度大于漏桶流出速度,也就是请求数超出当前服务所能处理的极限,将会触发限流策略
●  请求速度小于或者等于漏桶流出的速度,也就是服务处理能力整合满足客户端的请求量,将正常执行。
不足:无法应对突发的并发流量,因为流出速率一直都是恒定的
优点:平滑系统流量
 

Sentinel介绍

       Sentinel是阿里开源的项目,是面向分布式架构的轻量级流量控制组件,主要以流量为切入点,从限流,流量整形,服务降级,系统负载保护等多个维度来帮助我们保障微服务的稳定性
Sentinel 分2 部分
核心库:不依赖任何框架/库,能够运行于所有的java 环境,对Dubbo ,Spring Cloud 等框架有较好的支持。
控制台:基于Spring boot 开发,打包后可以直接运行,不需要额外的tomcat 等应用部署。
 

5、Sentinel 流量控制

5.1、整体步骤

  • 定义资源
  • 定义限流规则
  • 检验规则是否生效
 

5.2、资源定义方式

方式1:抛出异常的方式定义资源
SphU 包含了 try-catch 风格的 API。用这种方式,当资源发生了限流之后会抛出 BlockException。这个时候可以捕捉异常,进行限流之后的逻辑处理。示例代码如下:
// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
Entry entry = null;
try {
    // 资源名可使用任意有业务语义的字符串
    entry = SphU.entry("自定义资源名");
    // 被保护的业务逻辑
    // do something...
} catch (BlockException e1) {
    // 资源访问阻止,被限流或被降级
    // 进行相应的处理操作
} finally {
    if (entry != null) {
        entry.exit();
    }
}

注意: 
1、SphU.entry(xxx) 需要与 entry.exit() 方法成对出现,匹配调用,否则会导致调用链记录异常,抛出 ErrorEntryFreeException 异常。
2、务必保证finally会被执行

方式2:注解方式定义资源
Sentinel 支持通过 @SentinelResource 注解定义资源并配置 blockHandler 和 fallback 函数来进行限流之后的处理。示例:

@SentinelResource(blockHandler = "blockHandlerForGetUser")
public User getUserById(String id) {
        throw new RuntimeException("getUserById command failed");
}
// blockHandler 函数,原方法调用被限流/降级/系统保护的时候调用
public User blockHandlerForGetUser(String id, BlockException ex) {
        return new User("admin");
}

注意
1、blockHandler 函数会在原方法被限流/降级/系统保护的时候调用,而 fallback 函数会针对所有类型的异常。请注意 blockHandler 和 fallback 函数的形式要求
2、需注意:blockHandler 所配置的值会在触发限流之后调用,这个方法定义必须和原始方法的返回值,参数保持一致,而且需要增加BlockException 参数。

5.3、限流具体实现

5.3.1. 引入 Sentinel 依赖

添加依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.4</version>
</dependency>

5.3.2. 定义资源

资源 是 Sentinel 中的核心概念之一。最常用的资源是我们代码中的 Java 方法,也可以更灵活的定义你的资源,例如,把需要控制流量的代码用 Sentinel API SphU.entry(“HelloWorld”) 和 entry.exit() 包围起来即可。

    在下面的例子中,我们将 System.out.println(“hello world”); 作为资源(被保护的逻辑),用 API 包装起来。参考代码如下:

public static void main(String[] args) {
    // 配置规则.
    initFlowRules();
    while (true) {
        try (Entry entry = SphU.entry("HelloWorld")) {
        // 被保护的逻辑
            System.out.println("hello world");
        } catch (BlockException ex) {
            // 处理被流控的逻辑
            System.out.println("blocked!");
        }
    }
}

完成以上两步后,代码端的改造就完成了。也可以通过我们提供的 注解,来定义我们的资源,类似于下面的代码:

@SentinelResource("HelloWorld")
public void helloWorld() {
    // 资源中的逻辑
    System.out.println("hello world");
}

这样,helloWorld() 方法就成了我们的一个资源。

5.3.3. 定义规则

通过流控规则来指定允许该资源通过的请求次数,例如下面的代码定义了资源 HelloWorld 每秒最多只能通过 20 个请求。

private static void initFlowRules(){
    List<FlowRule> rules = new ArrayList<>();
    FlowRule rule = new FlowRule();
    rule.setResource("HelloWorld");
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule.setCount(20);
    rules.add(rule);
    FlowRuleManager.loadRules(rules);
}

完成上面 3 步,Sentinel 就能够正常工作了。

5.3.4. 检查效果

Demo 运行之后,我们可以在日志 ~/logs/csp/${appName}-metrics.log.xxx 里看到下面的输出:

其中 p 代表通过的请求, block 代表被阻止的请求, s 代表成功执行完成的请求个数, e 代表用户自定义的异常, rt 代表平均响应时长。

可以看到,这个程序每秒稳定输出 “hello world” 20 次,和规则中预先设定的阈值是一样的。

6、Sentinel 熔断

6.1、背景

1、现代微服务架构都是分布式的,由非常多的服务组成,除了流量控制以外,对调用链路中不稳定的资源进行熔断降级也是保障高可用的重要措施之一。
2、分布式系统中,不同服务之间相互调用,组成复杂的调用链路,复杂链路上的某一环不稳定,可能会层层级联导致整体的雪崩 。

6.2、熔断策略

Sentinel 提供以下几种熔断策略:

  • 慢调用比例 (SLOW_REQUEST_RATIO):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。
  • 异常比例 (ERROR_RATIO):当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是 [0.0, 1.0],代表 0% – 100%。
  • 异常数 (ERROR_COUNT):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。

注意
 1、@SentinelResource 注解会自动统计业务异常,无需手动调用。
2、异常降级仅针对业务异常,对 Sentinel 限流降级本身的异常(BlockException)不生效。为了统计异常比例或异常数,需要通过 Tracer.trace(ex) 记录业务异常。示例:

Entry entry = null;
try {
    entry = SphU.entry(resource);

} catch (Throwable t) {
    if (!BlockException.isBlockException(t)) {
        Tracer.trace(t);
    }
} finally {
    if (entry != null) {
        entry.exit();
    }
}

熔断降级规则

熔断降级规则(DegradeRule)包含下面几个重要的属性:

Field说明默认值
resource资源名,即规则的作用对象
grade熔断策略,支持慢调用比例/异常比例/异常数策略慢调用比例
count慢调用比例模式下为慢调用临界 RT(超出该值计为慢调用);异常比例/异常数模式下为对应的阈值
timeWindow熔断时长,单位为 s
minRequestAmount熔断触发的最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入)5
statIntervalMs统计时长(单位为 ms),如 60*1000 代表分钟级(1.8.0 引入)1000 ms
slowRatioThreshold慢调用比例阈值,仅慢调用比例模式有效(1.8.0 引入)

分布式事务seata的AT模式介绍

seata是阿里开源的一款分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,本文主要介绍AT模式的使用。

seata安装

下载seata服务,官方地址:https://github.com/seata/seata/releases
在Linux下,下载完成后,直接解压,通过命令安装即可:

sh ./bin/seata-server.sh

支持的启动参数

参数 全写 作用 备注
-h –host 指定在注册中心注册的 IP 不指定时获取当前的 IP,外部访问部署在云环境和容器中的 server 建议指定
-p –port 指定 server 启动的端口 默认为 8091
-m –storeMode 事务日志存储方式 支持file和db,默认为 file
-n –serverNode 用于指定seata-server节点ID ,如 1,2,3…, 默认为 1
-e –seataEnv 指定 seata-server 运行环境 如 dev, test 等, 服务启动时会使用 registry-dev.conf 这样的配置

如:

sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file

seata的AT模式介绍

AT模式实质是两阶段提交协议的演变,具体如下:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源
  • 二阶段:
    提交异步化,非常快速地完成。
    回滚通过一阶段的回滚日志进行反向补偿。

业务背景:
用户调用系统A的store服务,store服务调用系统B的company服务,company服务会新增一条数据,然后把companyId返回系统A,然后系统A通过companyId再新增一条store数据。

一般如果store服务执行失败了,直接抛异常了,所以company服务也不会执行,
但如果store服务执行成功了,已经写了一条数据到数据库,执行company服务时失败了,就会产生数据不一致的问题。

使用seata的AT模式,主要分为下面几个步骤:

  • 配置seata服务及创建事务表
  • 调用方配置(对应上面的store服务)
  • 服务提供方配置(对应上面的company服务)

配置seata服务及创建事务表

配置conf/file.conf文件

<code>## transaction log store, only used in server side
store {
  ## store mode: file、db
  mode = "db" //修改为db模式,标识事务信息用db存储
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://192.168.234.1:3306/seata?useUnicode=true&amp;characterEncoding=utf8&amp;useSSL=false&amp;&amp;serverTimezone=UTC" //修改数据库连接
    user = "seata" //修改数据库账号
    password = "123456" //修改数据库密码
    minConn = 5
    maxConn = 30
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
  }
}
## server configuration, only used in server side
service {
  #vgroup-&gt;rgroup
  vgroup_mapping.chuanzh_tx_group = "default" //chuanzh_tx_group为自定义的事务组名称,要和客户端配置保持一致
  #only support single node
  default.grouplist = "192.168.234.128:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

</code>

上面配置共修改了3个地方:

  1. 存储模式改为db模式,需要创建3张事务表,如下:
    <code>-- the table to store GlobalSession data
     CREATE TABLE IF NOT EXISTS `global_table`
     (
         `xid`                       VARCHAR(128) NOT NULL,
         `transaction_id`            BIGINT,
         `status`                    TINYINT      NOT NULL,
         `application_id`            VARCHAR(32),
         `transaction_service_group` VARCHAR(32),
         `transaction_name`          VARCHAR(128),
         `timeout`                   INT,
         `begin_time`                BIGINT,
         `application_data`          VARCHAR(2000),
         `gmt_create`                DATETIME,
         `gmt_modified`              DATETIME,
         PRIMARY KEY (`xid`),
         KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
         KEY `idx_transaction_id` (`transaction_id`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
     -- the table to store BranchSession data
     CREATE TABLE IF NOT EXISTS `branch_table`
     (
         `branch_id`         BIGINT       NOT NULL,
         `xid`               VARCHAR(128) NOT NULL,
         `transaction_id`    BIGINT,
         `resource_group_id` VARCHAR(32),
         `resource_id`       VARCHAR(256),
         `branch_type`       VARCHAR(8),
         `status`            TINYINT,
         `client_id`         VARCHAR(64),
         `application_data`  VARCHAR(2000),
         `gmt_create`        DATETIME(6),
         `gmt_modified`      DATETIME(6),
         PRIMARY KEY (`branch_id`),
         KEY `idx_xid` (`xid`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
     -- the table to store lock data
     CREATE TABLE IF NOT EXISTS `lock_table`
     (
         `row_key`        VARCHAR(128) NOT NULL,
         `xid`            VARCHAR(96),
         `transaction_id` BIGINT,
         `branch_id`      BIGINT       NOT NULL,
         `resource_id`    VARCHAR(256),
         `table_name`     VARCHAR(32),
         `pk`             VARCHAR(36),
         `gmt_create`     DATETIME,
         `gmt_modified`   DATETIME,
         PRIMARY KEY (`row_key`),
         KEY `idx_branch_id` (`branch_id`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
    </code>
  2. 修改数据库连接,注意如果你安装的是MySQL8,则需要修改MySQL8的驱动:driverClassName = “com.mysql.cj.jdbc.Driver”,不然会出现启动报错的问题,详细请参考:seata启动MySQL报错 #359(https://github.com/seata/seata-samples/issues/359)。
  3. 修改事务的组名,你也可以不修改,我这里使用的是:chuanzh_tx_group
  4. 创建业务事务表,记录业务需要回滚的数据,在分布式事务中,每个参与的业务数据库都需要添加对应的表
    <code>CREATE TABLE `undo_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(100) NOT NULL,
      `context` varchar(128) NOT NULL,
      `rollback_info` longblob NOT NULL,
      `log_status` int(11) NOT NULL,
      `log_created` datetime NOT NULL,
      `log_modified` datetime NOT NULL,
      `ext` varchar(100) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
    </code>

配置conf/registry.conf文件

<code>registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"  修改注册方式,微服务调用使用的是Eureka

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://192.168.234.1:8081/eureka"  //修改Eureka地址
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}
</code>

以上修改了使用Eureka方式注册,并配置了Eureka地址,启动MySQL、Eureka服务后,就可以启动seata服务了。

调用方配置(store-server)

maven配置,使用seata-spring-boot-starter,自动配置的方式,不需要再添加file.conf和register.conf文件

<code>    &lt;!--druid--&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;com.alibaba&lt;/groupId&gt;
        &lt;artifactId&gt;druid-spring-boot-starter&lt;/artifactId&gt;
        &lt;version&gt;${druid-spring-boot-starter.version}&lt;/version&gt;
    &lt;/dependency&gt;

    &lt;!--seata--&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;io.seata&lt;/groupId&gt;
        &lt;artifactId&gt;seata-spring-boot-starter&lt;/artifactId&gt;
        &lt;version&gt;1.2.0&lt;/version&gt;
    &lt;/dependency&gt;
</code>

application.properties配置:

<code>server.port=9090
spring.application.name=store-server

mybatis.type-aliases-package=com.chuanzh.model
mybatis.mapper-locations=classpath:mapper/*.xml

spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&amp;characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

#注意这里的事务组配置要和服务端一致
seata.tx-service-group=chuanzh_tx_group
seata.service.vgroup-mapping.chuanzh_tx_group=default
seata.service.grouplist.default=192.168.234.128:8091

logging.level.io.seata=DEBUG
## eureka
eureka.client.serviceUrl.defaultZone= http://localhost:8081/eureka/

</code>

数据源配置,因为seata是对数据库的datasource进行了接管和代理,所以在每个参与分布式事务的数据源都要进行如下配置:

<code>@Configuration
public class DataSourceConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        DruidDataSource druidDataSource = new DruidDataSource();
        return druidDataSource;
    }

    @Primary
    @Bean("dataSource")
    public DataSourceProxy dataSource(DataSource druidDataSource){
        return new DataSourceProxy(druidDataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy)throws Exception{
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:/mapper/*.xml"));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }

}
</code>

注意配置了数据源后,启动会出现循环依赖的问题,如下,

还需要在启动类排除dataSource自动配置,其它的解决方法,可以参考:集成fescar数据源循环依赖错误解决方案(https://blog.csdn.net/kangsa998/article/details/90042406)

<code>@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
</code>

配置请求拦截器,生成一个请求事务ID,用于在微服务中传递

<code>@Configuration
public class SeataRequestInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
        String xid = RootContext.getXID();
        if (StringUtils.isNotBlank(xid)) {
            //构建请求头
            requestTemplate.header("TX_XID", xid);
        }
    }
}
</code>

服务提供方配置(company-server)

maven、application.properties、数据源配置同调用方配置,区别主要是拦截器的配置,如下:

<code>@Slf4j
@Component
public class SeataHandlerInterceptor implements HandlerInterceptor {

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String xid = RootContext.getXID();
        String rpcXid = request.getHeader("TX_XID");
        //获取全局事务编号
        if(log.isDebugEnabled()) {
            log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
        }
        if(xid == null &amp;&amp; rpcXid != null) {
            //设置全局事务编号
            RootContext.bind(rpcXid);
            if(log.isDebugEnabled()) {
                log.debug("bind {} to RootContext", rpcXid);
            }
        }
        return true;
    }
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception e) {
        String rpcXid = request.getHeader("TX_XID");
        if(!StringUtils.isEmpty(rpcXid)) {
            String unbindXid = RootContext.unbind();
            if(log.isDebugEnabled()) {
                log.debug("unbind {} from RootContext", unbindXid);
            }

            if(!rpcXid.equalsIgnoreCase(unbindXid)) {
                log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
                if(unbindXid != null) {
                    RootContext.bind(unbindXid);
                    log.warn("bind {} back to RootContext", unbindXid);
                }
            }

        }
    }

}
</code>
<code>@Configuration
public class WebMvcConfiguration implements WebMvcConfigurer {

    @Autowired
    private SeataHandlerInterceptor seataHandlerInterceptor;

    public void addInterceptors(InterceptorRegistry registry) {
        //注册HandlerInterceptor,拦截所有请求
        registry.addInterceptor(seataHandlerInterceptor).addPathPatterns(new String[]{"/**"});
    }

}
</code>

添加全局事务注解

在服务调用方的方法上添加@GlobalTransactional注解,下面模拟了一种场景,如果companyId为偶数,则会抛异常。

<code>    @GlobalTransactional(rollbackFor = Exception.class)
    public void create(StoreEntity storeEntity) throws Exception {
        CompanyEntity companyEntity = new CompanyEntity();
        companyEntity.setName(storeEntity.getName());
        companyEntity = companyFeign.createCompany(companyEntity);

        /**
         * 模拟异常
         */
        if (companyEntity.getId() % 2 == 0) {
            throw new Exception();
        }

        /** 写入store数据 */
        storeEntity.setCompanyId(companyEntity.getId());
        storeMapper.insert(storeEntity);
    }
</code>

经过测试,companyFeign.createCompany服务调用后会先向数据库写一条数据,当create方法执行抛异常,就会事务回滚,删除掉原先的company数据

ElasticSearch高并发场景写入优化

ElasticSearch高并发场景写入优化

ElasticSearch号称分布式全文搜索引擎,但使用不当依然会很慢,特别是在高并发写入时,会存在写入超时的问题。

在公司内部,基本所有的日志都会放入到ElasticSearch,比如接口访问时间日志、动态/计划数据审核日志、动态/计划抓取报文日志、动态报文更新日志等,每天的写入数据量巨大,最初我们基于ElasticSearch封装了一层,使用内部RestHighLevelClient获取数据,部分代码如下:

@Slf4j
public class ElasticsearchService {

    private RestHighLevelClient client;

    public ElasticsearchService(RestHighLevelClient client) {
        if (client == null)
            throw new IllegalArgumentException("client");
        this.client = client;
    }

    /**
     * 根据id获取文档
     **/
    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;
        try {
            GetRequest getRequest = new GetRequest(index, type, id);
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

            return getResponse;

        } catch (Exception e) {
            log.error("获取文档失败", e);
        }
        return null;
    }

    /**
     * 插入文档, id可以为空,id如果为空,就自动生成id
     **/
    public IndexResponse insertDocument(String index, String type, String id, String json) {
        if (StringUtils.isAnyBlank(index, type, json))
            return null;

        try {
            IndexRequest indexRequest = generateInsertRequest(index, type, id, json);

            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            return indexResponse;
        } catch (Exception e) {
            log.error("创建文档失败", e);
        }
        return null;
    }
}

然而在放到线上生成环境下,经常会出现写入失败数据丢失的情况,发现是数据量写入太大,调用接口超时,直接返回错误了。RestHighLevelClient实质就是通过httpclient请求接口发送数据,是基于http协议。

查看Elasticsearch文档,发现Elasticsearch同样支持使用Transport传输,其内部使用netty长连接通信,并且可以设置处理连接的数量。

新建一个TransportClientFactory用来创建TransportClient,这里连接需要使用密钥,具体填写自己Elasticsearch的账号密码。

public class TransportClientFactory {
    // 默认处理核心数,cpu核心数
    private static int DEFAULT_PROCESSORS = 8;
    // 配置核心数
    private static int processors = 8;

    public static TransportClient create(String addresses, int port, String keyStore, String keyPassword, String trustStore, String trustPassword) throws Exception {

        if (StringUtils.isBlank(addresses)) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (StringUtils.isAnyBlank(keyStore, keyPassword, trustStore, trustPassword)) {
            throw new IllegalArgumentException("缺少证书或密码");
        }

        String[] addrs = addresses.split(",");
        if (addrs.length == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        List<String> validAddrs = new ArrayList<>();
        for (String addr : addrs) {
            if (StringUtils.isNotBlank(addr)) {
                validAddrs.add(addr.trim());
            }
        }

        if (validAddrs.size() == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (processors < DEFAULT_PROCESSORS) {
            processors = DEFAULT_PROCESSORS;
        }
        if (processors > 32) {
            processors = 32;
        }
        int threadPoolCore = processors * 2;
        int threadPoolMax = processors * 2;
        Settings settings = Settings
                .builder()
                .put("path.home", ".")
                // .put("cluster.name", clusterName)
                .put("client.transport.ignore_cluster_name", true).put("searchguard.ssl.transport.enabled", true)
                .put("searchguard.ssl.transport.enforce_hostname_verification", false).put("searchguard.ssl.transport.keystore_filepath", keyStore)
                .put("searchguard.ssl.transport.keystore_password", keyPassword).put("searchguard.ssl.transport.truststore_filepath", trustStore)
                .put("searchguard.ssl.transport.truststore_password", trustPassword).put("processors", processors).put("thread_pool.flush.core", threadPoolCore)
                .put("thread_pool.flush.max", threadPoolMax).build();

        TransportClient client = new PreBuiltTransportClient(settings, SearchGuardPlugin.class);

        for (String validAddr : validAddrs) {
            client.addTransportAddress(new TransportAddress(InetAddress.getByName(validAddr), port));
        }

        return client;
    }

    public static int getProcessors() {
        return processors;
    }

    public static void setProcessors(int processors) {
        TransportClientFactory.processors = processors;
    }

}

修改ElasticsearchService

@Slf4j
public class ElasticsearchService {

    /**
     * 默认10秒超时
     **/
    private long bulkTimeoutSecond = 10;

    private TransportClient client;

    public ElasticsearchService(TransportClient client) {
        this.client = client;
    }

    public ElasticsearchService(TransportClient client, int asyncTimeoutSecond) {
        this.client = client;
        this.bulkTimeoutSecond = asyncTimeoutSecond;
    }

    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;

        return this.client.prepareGet(index, type, id).get();
    }

    public IndexResponse createDocument(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        IndexRequestBuilder builder = createDocumentRequestBuilder(index, type, id, msg);
        if (builder == null)
            return null;

        return builder.get();
    }

    private IndexRequestBuilder createDocumentRequestBuilder(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        if (StringUtils.isBlank(id)) {
            return this.client.prepareIndex(index, type).setSource(msg, XContentType.JSON);
        } else {
            return this.client.prepareIndex(index, type, id).setSource(msg, XContentType.JSON);
        }
    }

}

另外为了提升写入效率,可以批量异步一次性写入,使用bulk方法,如下:

  /**
     * 批量操作,listener可空,支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    public void bulkDocumentOperationAsync(List<DocWriteRequest<?>> requests, ActionListener<BulkResponse> listener) {
        if (requests == null || requests.size() == 0)
            return;

        BulkRequest request = prepareBulkRequests(requests);
        if (request == null)
            return;

        this.client.bulk(request, listener);
    }

    /**
     * 支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    private BulkRequest prepareBulkRequests(List<DocWriteRequest<?>> requests) {
        BulkRequestBuilder builder = bulkRequestBuilder();

        for (DocWriteRequest<?> request : requests) {
            if (request instanceof IndexRequest) {
                builder.add((IndexRequest) request);
            } else if (request instanceof UpdateRequest) {
                builder.add((UpdateRequest) request);
            } else if (request instanceof DeleteRequest) {
                builder.add((DeleteRequest) request);
            }
        }

        if (builder.numberOfActions() == 0) {
            return null;
        }
        return builder.request();
    }

在dao层,我们可以一次性写入多条数据

    /**
     * 异步写入动态日志
     * 
     * @param flightRecords
     */
    public void insertFlightRecodeListAsync(List<FlightRecord> flightRecords) {
        try {
            if (CollectionUtils.isEmpty(flightRecords)) {
                return;
            }
            ElasticsearchService elasticsearchService = elasticsearchProvider.getElasticsearchService();
            List<DocWriteRequest<?>> requests = new ArrayList(flightRecords.size());
            for (FlightRecord flightRecord : flightRecords) {
                // index以航班日期为准
                String index = elasticsearchProvider.FLIGHT_RECORD_INDEX_PREFIX + flightRecord.getLocalDate().replace("-", ".");
                requests.add(elasticsearchService.createDocumentRequest(index, elasticsearchProvider.FLIGHT_RECORD_TYPE, null, JSON.toJSONString(flightRecord)));

            }
            if (CollectionUtils.isEmpty(requests)) {
                return;
            }
            elasticsearchService.bulkDocumentOperationAsync(requests, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkItemResponses) {
                }

                @Override
                public void onFailure(Exception e) {
                    log.error("flightRecord写入ES失败", e);
                }
            });
        } catch (Exception e) {
            log.error("insertFlightRecodeListAsync Exception", e);
        }
    }

其它写入性能优化:

  1. 去掉不必要的字段分词和索引,ElasticSearch会默认对所有字段进行分词,一般查询我们都是根据航班号、航班日期、起飞机场、到达机场查询;所以没必要的字段,我们可以使用keyword类型,不进行分词。
    PUT my_index
    {
      "mappings": {
        "my_type": {
          "properties": {
            "tail_no": { 
              "type": "keyword",
              "index": false
            }
          }
        }
      }
    }
  2. 对于一些普通的日志,比如动态/计划抓取网页日志,数据丢失也无所谓,可以禁用掉refresh和replia,即index.refreshinterval设置为-1,将index.numberof_replicas设置为0即可。
    PUT my_index
    {
      "settings": {
        "index": {
            "refresh_interval" : "-1",
            "number_of_replicas" : 0
        }
      }
    }
  3. 使用ElasticSearch自增ID,如果我们要手动给ElasticSearch document设置一个id,那么ElasticSearch需要每次都去确认一下那个id是否存在,这个过程是比较耗费时间的。如果我们使用自动生成的id,那么ElasticSearch就可以跳过这个步骤,写入性能会更好。
  4. 使用多线程写入ElasticSearch,单线程发送bulk请求是无法最大化ElasticSearch集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。为了更好的利用集群的资源,这样多线程并发写入,可以减少每次底层磁盘fsync的次数和开销。一样,可以对单个ElasticSearch节点的单个shard做压测,比如说,先是2个线程,然后是4个线程,然后是8个线程,16个,每次线程数量倍增。一旦发现es返回了TOOMANYREQUESTS的错误,JavaClient也就是EsRejectedExecutionException,此时那么就说明ElasticSearch是说已经到了一个并发写入的最大瓶颈了,此时我们就知道最多只能支撑这么高的并发写入了。
  5. 适当增加index buffer的大小,如果我们要进行非常重的高并发写入操作,那么最好将index buffer调大一些,indices.memory.indexbuffersize,这个可以调节大一些,设置的这个index buffer大小,是所有的shard公用的,但是如果除以shard数量以后,算出来平均每个shard可以使用的内存大小,一般建议,但是对于每个shard来说,最多给512mb,因为再大性能就没什么提升了。ElasticSearch会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个buffer。默认这个参数的值是10%,也就是jvm heap的10%,如果我们给jvm heap分配10gb内存,那么这个index buffer就有1gb,对于两个shard共享来说,是足够的了。

Kafka分区分配策略

Kafka分区分配策略

我们知道每个Topic会分配为很多partitions,Producers会将数据分配到每个partitions中,然后消费者Consumers从partitions中获取数据消费,那么Producers是如何将数据分到partitions中?Consumers又怎么知道从哪个partitions中消费数据?

生产者往Topic写数据

我们从product.send方法入手,看看里面的具体实现,可以看到在调用send方法时,其内部是调用了doSend方法,在doSend方法中有一个获取partitions的方法

int partition = partition(record, serializedKey, serializedValue, cluster);

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

从上面代码中,首先先选择配置的分区,如果没有配置则使用默认的分区,即使用了DefaultPartitioner中的partition方法

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // 没有可用的分区,则给一个不可用分区
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // 根据key的hash值和分区数取模
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

上面代码中先会根据topic获取所有的分区
1,如果key为null,则通过先产生随机数,之后在该数上自增的方式产生一个数nextValue,如果存在可用分区,将nextValue转为正数之后对可用分区进行取模操作,如果不存在可用分区,则将nextValue对总分区数进行取模操作

2,如果key不为空,就先获取key的hash值,然后和分区数进行取模操作

消费者从Topic读数据

kafka默认对消费分区指定了两种策略,分别为Range策略(org.apache.kafka.clients.consumer.RangeAssignor)和RoundRobin策略(org.apache.kafka.clients.consumer.RoundRobinAssignor),它们都实现了PartitionAssignor接口

Range策略

比如有10个分区,分别为P1、P2、P3、P4、P5、P6、P7、P8、P9、P10,三个消费者C1、C2、C3,消费如下图:

C4F4C4E974548380CE4D5274D30F20B9

我们来看看源代码:

@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                Map<String, Subscription> subscriptions) {
    // 得到topic和订阅的消费者集合信息,例如{t1:[c1,c2,c3], t2:[c1,c2,c3,c4]}                                            
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    // 将consumersPerTopic信息转换为assignment,memberId就是消费者client.id+uuid(kafka在client.id上追加的)
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());

    // 遍历每个Topic,获取所有的订阅消费者
    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
        String topic = topicEntry.getKey();
        List<String> consumersForTopic = topicEntry.getValue();

        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        // 如果Topic没有分区,则调过
        if (numPartitionsForTopic == null)
            continue;

         // 将Topic的订阅者根据字典排序
        Collections.sort(consumersForTopic);
         // 总分区数/订阅者的数量 得到每个订阅者应该分配分区数
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        // 无法整除的剩余分区数量
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

        List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        //遍历所有的消费者
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
              //分配到的分区的开始位置
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            // 分配到的分区数量(整除分配到的分区数量,加上1个无法整除分配到的分区--如果有资格分配到这个分区的话。判断是否有资格分配到这个分区:如果整除后余数为m,那么排序后的消费者集合中前m个消费者都能分配到一个额外的分区)
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            //给消费者分配分区
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        }
    }
    return assignment;
}

上面的代码添加了注释很清楚的展现了range的实现,对应上面的例子,如果有4个消费者C1、C2、C3、C4,那么根据上面的算法:

C1 -> [P1,P2,P3] ,C2 -> [P4,P5,P6] ,C3 -> [P7,P8] C4 -> [P9,P10] 。取余多出来的两个分区,由最前n个消费者来消费

RoundRobin策略

将主题的所有分区依次分配给消费者,比如有两个Topic:T1[P1,P2,P3,P4],T2[P5,P6,P7,P8,P9,P10],若C1、C2订阅了T1,C2、C3订阅了T2,那么C1将消费T1[P1,P3],C2将消费T1[P2,P4,P6,P8,P10],C3将消费T2[P5,P7,P9],如下图:

A8E30AE2B2B3245CCEB5102A9F1B8470

@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                Map<String, Subscription> subscriptions) {
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());
    // 将消费集合先按字典排序,构建成一个环形迭代器
    CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
   // 按Topic的名称排序,得到Topic下的所有分区
    for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
        final String topic = partition.topic();

        while (!subscriptions.get(assigner.peek()).topics().contains(topic))
            assigner.next();
        // 给消费者分配分区,并轮询到下一个消费者
        assignment.get(assigner.next()).add(partition);
    }
    return assignment;
}

/**
 * 根据消费者得到订阅的Topic下的所有分区
 * Topic按名称字典排序
 */
public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
                                                Map<String, Subscription> subscriptions) {
    SortedSet<String> topics = new TreeSet<>();
    for (Subscription subscription : subscriptions.values())
        topics.addAll(subscription.topics());

    List<TopicPartition> allPartitions = new ArrayList<>();
    for (String topic : topics) {
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        if (numPartitionsForTopic != null)
            allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));
    }
    return allPartitions;
}

 

使用Sharing-JDBC实现分表

使用Sharing-JDBC实现分表

Sharing-JDBC介绍

在创建数据库时,我们最先考虑的是按模块对数据库进行划为,但即使这样,单表数据量还是出现大数量的情况,Sharing-JDBC可以对表进行水平切分,将数据均分到不同表中

通过日期水平切分

目前我们航班动态数据全球每天航班20多万,考虑到我们业务场景,用户都是通过航班号+日期来查询一个航班,所以我们采取使用日期来水平分表

添加依赖

<dependency>
	<groupId>com.dangdang</groupId>
	<artifactId>sharding-jdbc-core</artifactId>
	<version>1.5.4.1</version>
</dependency>
<dependency>
	<groupId>com.dangdang</groupId>
	<artifactId>sharding-jdbc-config-spring</artifactId>
	<version>1.5.4.1</version>
</dependency>
<!--mysql flight数据源 -->
<bean id="mysqlDataSource" class="com.zaxxer.hikari.HikariDataSource" destroy-method="close">
...
</bean>

<!--shared jdbc -->

<rdb:strategy id="tableShardingStrategy" sharding-columns="local_date" algorithm-class="com.huoli.songshan.sharding.FlyDateTableShardingAlgorithm" />
<rdb:data-source id="shardingDataSource">
	<rdb:sharding-rule data-sources="mysqlDataSource" default-data-source="mysqlDataSource">
		<rdb:table-rules>
			<rdb:table-rule logic-table="flight_info" actual-tables="flight_info_${2012..2020}${['01','02','03','04','05','06','07','08','09','10','11','12']}${0..3}${0..9}"
				table-strategy="tableShardingStrategy" />
		</rdb:table-rules>
		<rdb:default-database-strategy sharding-columns="none" algorithm-class="com.dangdang.ddframe.rdb.sharding.api.strategy.database.NoneDatabaseShardingAlgorithm" />
	</rdb:sharding-rule>
	<rdb:props>
		<prop key="sql.show">false</prop>
	</rdb:props>
</rdb:data-source>

先配置一个dataSource数据源,这里我们使用的是hikari,再通过shardingDataSource对dataSource进行包装,按照表中localdate字段对表进行拆分,即表名依次为flightinfo20120101、flightinfo20120102,到flightinfo20201231,下面我们实现根据日期localdate映射到对应表上

public final class FlyDateTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Date> {
    private DateTimeFormatter dt = DateTimeFormat.forPattern("yyyyMMdd");

    @Override
    public String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<Date> shardingValue) {
        DateTime datetime = new DateTime(shardingValue.getValue());
        String flydate = datetime.toString(dt);
        for (String each : availableTargetNames) {
            if (each.endsWith(flydate)) {
                return each;
            }
        }
        throw new IllegalArgumentException();
    }

    @Override
    public Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<Date> shardingValue) {
        Collection<String> result = new LinkedHashSet<>(availableTargetNames.size());
        for (Date value : shardingValue.getValues()) {
            DateTime datetime = new DateTime(value);
            String flydate = datetime.toString(dt);
            for (String tableName : availableTargetNames) {
                if (tableName.endsWith(flydate)) {
                    result.add(tableName);
                }
            }
        }
        return result;
    }

    @Override
    public Collection<String> doBetweenSharding(Collection<String> availableTargetNames,
            ShardingValue<Date> shardingValue) {
        Collection<String> result = new LinkedHashSet<>(availableTargetNames.size());
        Range<Date> range = (Range<Date>) shardingValue.getValueRange();
        for (Date value = range.lowerEndpoint(); value.before(range.upperEndpoint()) || value.equals(range.upperEndpoint()); value = addDays(value, 1)) {
            DateTime datetime = new DateTime(value);
            String flydate = datetime.toString(dt);
            for (String each : availableTargetNames) {
                if (each.endsWith(flydate)) {
                    result.add(each);
                }
            }
        }
        return result;
    }

    private Date addDays(Date date, int days) {
        return new Date(new DateTime(new DateTime(date).plusDays(days)).toDate().getTime());
    }
}

这里实现了SQL的equal、in、between方法,即通过使用select * from flight_info where local_date=’2019-01-01′,会自动映射flight_info_20190101表中查询。

通过用户ID哈希取模进行拆分

在用户订阅航班动态数据后,我们需要保存用户的订阅数据,以便后期推送动态消息给用户,用户可以在登陆的情况下订阅航班,也可以在未登陆的情况下订阅航班,用户登陆后可以拿到用户的phoneId(设备ID)和userId(用户ID),用户未登陆只能获取到用户的phoneId(设备ID),所以我们区分两种情况,使用两种表来存放用户信息trip_subscribe_nologin和trip_subscribe_login,用户未登陆根据phoneId来分表,用户登陆的情况根据userId来分表

和之前一样,我们需要先配置一个分表策略

<!-- trip_subscribe_nonlogin分表策略 -->
<rdb:strategy id="subscribeNonLoginTableShardingStrategy" sharding-columns="uid" algorithm-class="com.huoli.trip.dao.sharding.SubscribeNonLoginTableShardingAlgorithm" />
<!-- trip_subscribe_login分表策略 -->
<rdb:strategy id="subscribeLoginTableShardingStrategy" sharding-columns="user_id" algorithm-class="com.huoli.trip.dao.sharding.SubscribeLoginTableShardingAlgorithm" />

<!-- 分表配置配置 -->
<rdb:data-source id="shardingDataSource">
	<rdb:sharding-rule data-sources="mysqlDataSource">
		<rdb:table-rules>
			<rdb:table-rule logic-table="trip_subscribe_nonlogin" table-strategy="subscribeNonLoginTableShardingStrategy" actual-tables="trip_subscribe_nonlogin_${0..63}">
				<rdb:generate-key-column column-name="id" column-key-generator-class="com.huoli.trip.dao.sharding.TripKeyGenerator" />
			</rdb:table-rule>
			<rdb:table-rule logic-table="trip_subscribe_login" table-strategy="subscribeLoginTableShardingStrategy" actual-tables="trip_subscribe_login_${0..63}">
				<rdb:generate-key-column column-name="id" column-key-generator-class="com.huoli.trip.dao.sharding.TripKeyGenerator" />
			</rdb:table-rule>
		</rdb:table-rules>
		<rdb:default-database-strategy sharding-columns="none" algorithm-class="com.dangdang.ddframe.rdb.sharding.api.strategy.database.NoneDatabaseShardingAlgorithm" />
	</rdb:sharding-rule>
	<rdb:props>
		<prop key="sql.show">true</prop>
	</rdb:props>
</rdb:data-source>

这里是根据phoneId、userId分别分成了64张表,即trip_subscribe_nologin有64张表(trip_subscribe_nologin_1,trip_subscribe_nologin_2 …),trip_subscribe_login有64张表(trip_subscribe_login_1,trip_subscribe_login_2 …),具体代码实现

public interface Shard<T> {

    /** 根据参数计算分片标识 */
    public T calculateShard(Object... args);
}

public class SubscribeNonLoginTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<String>, Shard<String> {

    /** 分片数量 */
    private final int shardNum = 64;

    public String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<String> shardingValue) {
        String shard = calculateShard(shardingValue.getValue());
        for (String tableName : availableTargetNames) {
            if (tableName.endsWith("_" + shard)) {
                return tableName;
            }
        }

        throw new IllegalArgumentException("未找到该表:trip_subscribe_nonlogin_" + shard);
    }

    public Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<String> shardingValue) {
        Collection<String> result = new LinkedHashSet<String>(availableTargetNames.size());
        for (String value : shardingValue.getValues()) {
            String shard = calculateShard(value);
            for (String tableName : availableTargetNames) {
                if (tableName.endsWith("_" + shard)) {
                    result.add(tableName);
                }
            }
        }
        return result;
    }

    public Collection<String> doBetweenSharding(Collection<String> availableTargetNames, ShardingValue<String> shardingValue) {
        // 不会出现between两个uid之间的查询需求,所以无需实现该方法
        return new LinkedHashSet<String>(availableTargetNames.size());
    }

    public String calculateShard(Object... args) {
        String phoneId = (String) args[0];
        return "" + (phoneId.hashCode() & 0x7fffffff) % shardNum;
    }
}

public class SubscribeLoginTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<String>, Shard<String> {

    /** 分片数量 */
    private final int shardNum = 64;

    /**
     * doEqualSharding/doInSharding同SubscribeNonLoginTableShardingAlgorithm
     */

    public String calculateShard(Object... args) {
        String userId = (String) args[0];
        return "" + (userId.hashCode() & 0x7fffffff) % shardNum;
    }
}

这里主要介绍下calculateShard方法,首先是先拿到了phoneId或userId,获取hashCode,然后和0x7fffffff进行&运算,0x7fffffff表示long的最大值,这一步是为了保证得到的index的第一位为0,也就是为了得到一个正数。因为有符号数第一位0代表正数,1代表负数,然后和分片数shardNum,使表数据分布在shardNum内

另外这里还用到了id的生成策略,即生成一个全局的自增ID,sharing-jdbc自带了DefaultKeyGenerator生成器可以实现

public class TripKeyGenerator implements KeyGenerator {
    private static Logger logger = LoggerFactory.getLogger(TripKeyGenerator.class);

    private DefaultKeyGenerator defaultKeyGenerator;

    public TripKeyGenerator() {
        defaultKeyGenerator = new DefaultKeyGenerator();
    }

    static {
        /** 从配置中获取workId */
        DefaultKeyGenerator.setWorkerId(workerId);
    }

    @Override
    public synchronized Number generateKey() {
        return defaultKeyGenerator.generateKey();
    }

}

通过twitter的snowlflake也可以生成唯一ID,具体可以参考:这里

 

Elasticsearch排序(三)

Elasticsearch排序(三)

Elasticsearch中排序是通过设置score字段来进行排序,score是一个浮点类型,所以我们对靠前的数据设置一个较大的值,然后根据倒序排列。

同样对于航班号查询,有时候需要根据航班号,日期查询出所有的航班号,用户会输入185,2018-12-01,这时需要返回所有相关航司的航班号。
比如匹配的航班有:CA1858,B6185,QF185,ZH1858,对于精确匹配,B6185、QF185要排在CA1858、ZH1858之前,对于非精确排序,CA航司要排在ZH航司前面

创建一个索引

之前我们对航班号创建了一个索引,用户可以输入CA1,出现CA1*的结果,现在用户输入185,是纯数字型的,所以我们需要单独一个字段(flightNum)保存这个航班数字,然后对它创建索引

PUT flight
{
  "settings": {
    "analysis": {
      "analyzer": {
        "flightNoAndNumAnalyzer": {
          "tokenizer": "flightNoAndNumTokenizer"
        }
      },
      "tokenizer": {
        "flightNoAndNumTokenizer": {
          "type": "edge_ngram",
          "min_gram": 3,
          "max_gram": 8,
          "token_chars": ["letter","digit"]
        }
      }
    }
  },
  "mappings": {       
      "dynamic": {           
          "properties": {   
              "flightNo": {
                 "type": "text",
                 "analyzer" : "flightNoAndNumAnalyzer"                          
              },
              "flightNum": {
                 "type": "text",
                 "analyzer" : "flightNoAndNumAnalyzer"                          
              }
          }
      }
  }

}

如果是航班数字查询,我们只需要根据航司来设定优先级,同时设定航班数字全匹配的优先级最高,然后查询的时候再通过constant_score统一来设置boost参数

GET flight/dynamic/_search
{
  "query": {
    "bool": {
      "must": [
        {"prefix": {
          "flightNum": {
            "value": "185"
          }
        }}
      ], 
      "should": [
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "CA"
              }
            },
            "boost": 4
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "ZH"
              }
            },
            "boost": 3
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "B6"
              }
            },
            "boost": 2
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "QF"
              }
            },
            "boost": 1
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "flightNum": "185"
              }
            },
            "boost": 10
          }
        }
      ]
      }
    }
}

可以看到CA航司比B6高,但是当查询flightNum为185的时候,评分设置为最高,所以B6186会优先显示,其次是QF185,最后是CA1858和ZH1858 具体代码实现:

//航班数字优先级设置
BoolQueryBuilder qb = QueryBuilders.boolQuery()
                    .must(prefixQuery("flightNum", keyword))
                    .should(constantScoreQuery(matchQuery("flightNum", keyword)).boost(10f))

//航司优先级设置
airNumer.put("CA", 4f);
airNumer.put("ZH", 3f);
airNumer.put("B6", 4f);
airNumer.put("QF", 1f);
for (String airline : airNumer.keySet()) {
    queryBuilder.should(constantScoreQuery(matchQuery("airline", airline)).boost(airNumer.get(airline)));
}

 

 

 

Elasticsearch创建索引及数据(二)

Elasticsearch创建索引及数据(二)

第一篇介绍了Elasticsearch安装及基本用法,下面我们自己来创建一个索引,并写入一些数据

创建索引

基本语法

PUT /my_index
{
    "settings": { ... any settings ... },
    "mappings": {
        "type_one": { ... any mappings ... },
        "type_two": { ... any mappings ... },
        ...
    }

在创建索引的时候,我们需要设置索引被存放的分片数量、分析器、类型设置,分片数量、分析器通过settings来设置,类型映射通过mappings来设置,Elasticsearch默认创建索引是5个分片,1个副本,可通过settings来修改它,这里我们就遵循默认值不动,如下:

PUT flight
{
    "settings" : {
        "index" : {
            "number_of_shards" : 5, 
            "number_of_replicas" : 1 
        }
    }
}
{
  "acknowledged": true,
  "shards_acknowledged": true,
  "index": "flight"
}

创建一个flight索引,大括号后面为默认配置,如果不想修改它,都可以去掉,然后在新建一个type为dynamic,并写入数据

PUT /flight/dynamic/1
{
    "flightNo":"CA1858",
    "flightDate":"2018-12-01",
    "depCode":"SHA",
    "arrCode":"PEK",
    "state": "到达",
    "subState": "",
    "depPlanTime":"2018-12-01 07:45",
    "arrPlanTime":"2018-12-01 10:10",
    "depReadyTime":"2018-12-01 07:45",
    "arrReadyTime":"2018-12-01 09:45",
    "depTime":"2018-12-01 07:54",
    "arrTime":"2018-12-01 07:46",
    "distance":1076, 
    "tailNo":"B2487", 
    "depTerm":"T2", 
    "arrTerm":"T3", 
    "gate":"48", 
    "luggage":"32"
}

创建一条数据后,ES会默认会对所有字段添加索引,当然也可以不指定ID,ES会默认生成一个ID,自动生成的ID有22个字符长,类似:wM0OSFhDQXGZAWDf0-drSA

{
  "_index": "flight",
  "_type": "dynamic",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 0,
  "_primary_term": 1
}

下面我们来查询数据,ES默认情况下是禁用了source,即我们查询时,不会返回source里面的内容,只会返回数据ID,我们可以指定需要返回的_srouce字段

GET flight/dynamic/_search
{
    "query":   { "match_all": {}},
    "_source": [ "flightNo", "flightDate"]
}
{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "flight",
        "_type": "dynamic",
        "_id": "1",
        "_score": 1,
        "_source": {
          "flightNo": "CA1858",
          "flightDate": "2018-12-01"
        }
      }
    ]
  }
}

配置分析器

分析器介绍

分析器主要是将一块文本分成适合于倒排索引的独立 词条,通过这些词条我们可以搜索到指定的文档

分析器主要有以下几个功能:
1,字符过滤:通过整理字符串,如去掉HTML,将&转换为and
2,分词器:将字符串分成单个词条,比如通过空格或者标点符号来拆分
3,Token过滤:比如将Quick这种词条统一转换为小写,删除a,and,the这些无用词,增加近义词条(如:jump和leap这种同义词)

比如我们有一个航班号CA1858,我们需要通过输入CA18就可以返回对应的结果,那么我们就需要对其进行分词,如果直接使用下面的查询是获取不到数据的

GET flight/dynamic/_search
{
  "query": {
    "match": {
      "flightNo": "CA18"
    }
  }
}

我们可以查看ES对flightNo的分词情况

POST flight/_analyze
{
  "field": "flightNo",
  "text": "CA1858"
}
{
  "tokens": [
    {
      "token": "ca1858",
      "start_offset": 0,
      "end_offset": 6,
      "type": "<ALPHANUM>",
      "position": 0
    }
  ]
}

可以看到ES是对航班号转换为小写后直接进行了倒排索引,没有进行分词,直接查询CA18肯定搜索不到,下面我们自定义一个分析器

PUT flight
{
  "settings": {
    "analysis": {
      "analyzer": {
        "flightNoAnalyzer": {
          "tokenizer": "flightNoTokenizer"
        }
      },
      "tokenizer": {
        "flightNoTokenizer": {
          "type": "edge_ngram",
          "min_gram": 4,
          "max_gram": 8,
          "token_chars": ["letter","digit"]
        }
      }
    }
  },
  "mappings": {       
      "dynamic": {           
          "properties": {   
              "flightNo": {
                 "type": "text",
                 "analyzer" : "flightNoAnalyzer"                          
              }
          }
      }
  }

}

edgengram为ES自带的分词器,ES自带了8种分析器,具体可以查看官方文档,我们自定义了一个analyzer,使用edgengram来进行分词,字符长度从4到8,然后将自定义的分析器映射到flightNo字段上,之后我们使用查询CA18就可以获取到结果了

 

Elasticsearch安装及介绍(一)

Elasticsearch安装及介绍

安装Elasticsearch

Elasticsearch的安装很简单,直接下载官方包,运行即可

curl -L -O http://download.elasticsearch.org/PATH/TO/VERSION.zip 
unzip elasticsearch-$VERSION.zip
cd  elasticsearch-$VERSION
./bin/elasticsearch

使用curl ‘http://localhost:9200/?pretty’获取数据

{
  "name" : "node-0",
  "cluster_name" : "dongtai-es",
  "cluster_uuid" : "aIah5IcqS1y9qiJ4LphAbA",
  "version" : {
    "number" : "6.1.2",
    "build_hash" : "5b1fea5",
    "build_date" : "2018-01-10T02:35:59.208Z",
    "build_snapshot" : false,
    "lucene_version" : "7.1.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

基本概念介绍

Elasticsearch本质是一个分布式数据库,每个服务器上称为一个Elastic实例,这一组实例构成了ES集群

Document

ES中存储数据记录称为Document,Document使用JSON格式表示,同一个 Index 里面的Document没有要求有相同的结构,但最好一致,这样能提高效率,如下:

{
  "first_name": "Jane2",
  "last_name": "Smith",
  "age": 32,
  "about": "I like to collect rock albums",
  "interests": "music"
}

Index

ES文档的索引,定义了文档放在哪里,你可以理解为对于数据库名

Type

文档表示的对象类别,可以理解为数据库中的表,比如我们有一个机票数据,可以按舱位来分组(头等舱,经济舱),根据规则,Elastic 6.x 版只允许每个 Index 包含一个 Type,7.x 版将会彻底移除 Type

ID

文档的唯一标识,可以理解为表中的一条记录,根据index、type、_ID可以确定一个文档

基本用法

添加一个文档

PUT index/employee/6
{
  "first_name": "chuan",
  "last_name": "zhang",
  "age": 32,
  "about": "hi I'm hai na bai chuan",
  "interests": "read book"
}

上面PUT后面分别对应 index、Type、Id

获取文档

获取所有文档

ES默认会返回10条,可以指定size来改变

GET index/employee/_search
{
  "size": 3
}
{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 9,
    "max_score": 1,
    "hits": [
      {
        "_index": "index",
        "_type": "employee",
        "_id": "5",
        "_score": 1,
        "_source": {
          "first_name": "Jane1",
          "last_name": "Smith",
          "age": 32,
          "about": "I like to collect rock albums",
          "interests": "music"
        }
      },
      {
        "_index": "index",
        "_type": "employee",
        "_id": "10",
        "_score": 1,
        "_source": {
          "first_name": "chuan",
          "last_name": "Smith",
          "age": 32,
          "about": "I like to collect rock albums",
          "interests": "music"
        }
      }
    ]
  }
}

按条件匹配

比如我需要查找firstname是’chuan‘的所有文档,如果要搜索firstname中多个关键字,中间用空格隔开就可以了,比如”firstname”: “chuan smith” 表示查询firstname包含chuan或smith的

GET index/employee/_search
{
  "query": {
    "match": {
            "first_name": "chuan"
        }
  }
}
{
  "took": 4,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 0.6931472,
    "hits": [
      {
        "_index": "index",
        "_type": "employee",
        "_id": "6",
        "_score": 0.6931472,
        "_source": {
          "first_name": "chuan",
          "last_name": "zhang",
          "age": 32,
          "about": "hi I'm hai na bai chuan",
          "interests": "read book"
        }
      }
    ]
  }
}

如果要使用and查询,比如我要查询firstname为chuan,lastname为chuan的文档记录

GET index/employee/_search
{
  "query": {
    "bool": {
      "must": [
        { "match": { "first_name": "chuan" } },
        { "match": { "last_name": "zhang" } }
      ]
    }
  }
}

如果要根据范围查询或过滤,类似MySQL中’>’,或 ‘!=’操作,比如需要查询上面条件中,年龄大于30的文档

{
  "query": {
    "bool": {
      "must": [
        { "match": { "first_name": "chuan" } },
        { "match": { "last_name": "zhang" } }
      ]
      "filter": {
        "range" : {
            "age" : { "gt" : 30 } 
        }
      }
    }
  }
}

更新文档

更新记录使用PUT请求,重新发送一次数据,ES会根据id来修改,如果我需要更新id为6的about字段

PUT index/employee/6
{
  "first_name": "chuan",
  "last_name": "zhang",
  "age": 32,
  "about": "i modify my about",
  "interests": "read book"
}
{
  "_index": "index",
  "_type": "employee",
  "_id": "6",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 12,
  "_primary_term": 2
}

_version为版本号,由原来的1变成了2,result为”update“,以上为全量更新,如果需要局部更新,可以使用POST请求

POST index/employee/6/_update
{
  "doc":{
    "about": "i modify my about"
  }
}

删除文档

如下,我需要删除id为6的文档

DELETE index/employee/6
{
  "found" :    true,
  "_index" :   "index",
  "_type" :    "employee",
  "_id" :      "6",
  "_version" : 3
}

可以看到version也增加了1,如果对于的id没有找到,found将返回false

数据分析

ES同样有类似MySQL group by的用法,ES称为聚合,比如我想统计employee中最受欢迎的兴趣,注意:ES在5.x之后对排序、聚合操作用单独的数据结构(fielddata)缓存到内存里了,需要单独开启。

PUT index/_mapping/employee/
{
  "properties": {
    "interests": { 
      "type":     "text",
      "fielddata": true
    }
  }
}

GET index/employee/_search
{
  "aggs": {
    "all_interests": {
      "terms": { "field": "interests" }
    }
  }
}
"aggregations": {
    "all_interests": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "music",
          "doc_count": 3
        },
        {
          "key": "forestry",
          "doc_count": 1
        },
        {
          "key": "read",
          "doc_count": 1
        }
      ]
    }
  }

Spring Boot集成Quartz

Spring Boot集成Quartz

我们常用的定时任务调度有如下几种:
1. 使用JDK自带的类库实现

  • 通过继承TimeTask,使用Time来调度任务
  • 使用ScheduledExecutorService来实现任务调度

2,Spring 自带了任务调度功能,通过使用@Schedule()注解来实现
3,使用Quartz框架,Quartz可以用于在分布式环境下的任务调度

Quartz的基本概念:

1,Job 表示一个工作,要执行的具体内容。此接口中只有一个方法,如下:

void execute(JobExecutionContext context)

2,JobDetail 表示一个具体的可执行的调度程序,Job 是这个可执行程调度程序所要执行的内容,另外 JobDetail 还包含了这个任务调度的方案和策略。
3,Trigger 代表一个调度参数的配置,什么时候去调。
4,Scheduler 代表一个调度容器,一个调度容器中可以注册多个 JobDetail 和 Trigger。当 Trigger 与 JobDetail 组合,就可以被 Scheduler 容器调度了。

配置环境依赖

添加Maven依赖:

    org.springframework.boot
    spring-boot-starter-quartz

修改application.yml文件,添加:

spring:
  datasource:
    name: schedule
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/schedule?useUnicode=true&characterEncoding=utf-8
    username: root
    password: 123456
  quartz:
    job-store-type: jdbc

quartz支持两种存储方式,一个使用数据库,也就是job-store-type=jdbc,需要你下载quartz提供的表,并导入到你本地,然后配置你的数据库连接,官方下载地址, 另一种是内存方式,job-store-type设置为momery

配置任务

创建一个任务配置,这里的数据可以对应数据库一条记录

public class ScheduleTaskConfig {

    private Integer id;
    private String name;
    private String groupName;
    private String describe;
    private String cron;
    private String classPath;
    private Character isEnabled;
    private String createTime;
    private String updateTime;

    public String getClassName() {
        try {
            Class cls = Class.forName(classPath);
            return cls.getSimpleName();
        } catch (ClassNotFoundException e) {
            log.error("获取Class失败",e);
        }
        return null;
    }

}

创建用于操作Quartz任务创建、暂停、恢复方法

Service
@Slf4j
public class QuartzService {

    @Autowired
    private Scheduler scheduler;

    public void createJob(ScheduleTaskConfig config) {
        Class cls = null;
        try {
            cls = Class.forName(config.getClassPath());
            JobDataMap jobDataMap = new JobDataMap();
            jobDataMap.put("config", config);
            JobDetail jobDetail = JobBuilder.newJob(cls)
                    .withIdentity(config.getName(), config.getGroupName())
                    .withDescription(config.getDescribe())
                    .setJobData(jobDataMap)
                    .storeDurably()
                    .build();

            Trigger trigger = TriggerBuilder.newTrigger()
                    .forJob(jobDetail)
                    .withIdentity(jobDetail.getKey().getName(), GROUP_NAME)
                    .withSchedule(CronScheduleBuilder.cronSchedule(config.getCron()))
                    .build();
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (Exception e) {
            log.error("创建任务失败", e);
        }
    }

    /**
     * 获取任务状态
     * @return
     */
    public String getJobState(ScheduleTaskConfig config) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(config.getName(), config.getGroupName());
            Trigger.TriggerState state = scheduler.getTriggerState(triggerKey);
            return state.name();
        } catch (Exception e) {
            log.error("创建任务失败", e);
        }
        return null;
    }

    /**
     * 判断Job是否存在
     * @param config
     * @return
     */
    public boolean isExistJob(ScheduleTaskConfig config) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(config.getName(), config.getGroupName());
            CronTrigger trigger = (CronTrigger)scheduler.getTrigger(triggerKey);
            if (trigger != null) {
                return true;
            }
        } catch (Exception e) {
            log.error("获取任务trigger失败", e);
        }

        return false;
    }

    /**
     * 更新Job任务
     * @param config
     */
    public void updateJob(ScheduleTaskConfig config) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(config.getName(), config.getGroupName());
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(config.getCron());

            //按新的cronExpression表达式重新构建trigger
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
            Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
            // 忽略状态为PAUSED的任务,解决集群环境中在其他机器设置定时任务为PAUSED状态后,集群环境启动另一台主机时定时任务全被唤醒的bug
            if(!triggerState.name().equalsIgnoreCase("PAUSED")){
                //按新的trigger重新设置job执行
                scheduler.rescheduleJob(triggerKey, trigger);
            }
        } catch (Exception e) {
            log.error("更新任务trigger失败", e);
        }

    }

    /**
     * 停止任务
     * @param jobName
     * @param groupName
     * @return
     */
    public boolean pauseJob(String jobName) {
        try {
            JobKey jobKey = JobKey.jobKey(config.getName(), config.getGroupName());
            scheduler.pauseJob(jobKey);
            return true;
        } catch (SchedulerException e) {
            log.error("停止任务失败", e);
        }
        return  false;
    }

    /**
     * 恢复任务
     * @param jobName
     * @param groupName
     * @return
     */
    public boolean resumeJob(String jobName) {
        try {
            JobKey jobKey = JobKey.jobKey(config.getName(), config.getGroupName());
            scheduler.resumeJob(jobKey);
            return true;
        } catch (SchedulerException e) {
            log.error("恢复任务失败", e);
        }
        return  false;
    }

    /**
     * 删除任务
     * @param jobName
     * @param groupName
     * @return
     */
    public boolean deleteJob(String jobName) {
        try {
            JobKey jobKey = JobKey.jobKey(config.getName(), config.getGroupName());
            scheduler.deleteJob(jobKey);
            return true;
        } catch (SchedulerException e) {
            log.error("删除任务失败", e);
        }
        return  false;
    }

可以看到Quartz创建任务就是通过JobDetail和Trigger来实现定时任务的,JobDetail声明了任务的一些信息,比如名称、分组、描述、需要执行类(例如:com.chuanz.task.CheckExecutor)需要的数据(通过调用setJobData方法) Trigger设置了任务的执行周期及触发时间,这里通过cron表达式来设置触发,另外Trigger还有另外一种触发方式,通过SimpleScheduleBuilder来实现,比如:

.withSchedule(SimpleScheduleBuilder.repeatMinutelyForTotalCount(10, 5))

我需要重复执行10次,每隔5分钟执行一次

创建我们需要执行的任务,实现Job接口,并重写execute方法

@Slf4j
@Service
public class CheckExecutor implements Job {

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        log.info("执行计划冲突类型审核开始");
        // 获取数据
        ScheduleTaskConfig config = (ScheduleTaskConfig)context.get("config");

        /**
         * 处理逻辑
         */
        log.info("执行计划冲突类型审核结束");
    }
}

我们可以将quartz创建、删除、停止、恢复任务都封装成一个个接口,通过前端页面来管理quartz的任务,具体前端页面代码我就不贴了,基本就是调用QuartzService里面的方法了,另外说说获取quartz任务列表的时候,我们可以获取Trigger里面响应的执行状态,通过Trigger里面的方法来获取,比如:

JobKey jobKey = new JobKey(config.getName(), config.getGroupName());
List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
if (triggers.size() > 0) {
    Trigger trigger = triggers.get(0);
    //开始执行时间
    trigger.getStartTime();
    //上一次执行时间
    trigger.getPreviousFireTime();
    //下一次执行时间
    trigger.getNextFireTime();
}

trigger有很多实现,比如我们这里使用的是cron表达式,所以相对应的就是CronTriggerImpl,常用的触发器有四种。

触发器介绍

SimpleTrigger:简单的触发器

指定从某一个时间开始,以一定的时间间隔,如秒、分、小时重复执行的任务
比如:从10:00 开始,每隔1小时/1分钟/1秒钟执行一次,执行10次。
对应的方法如下:

8A76CC1568BDC509456138BBB5327AB8

CalendarIntervalTrigger:日历触发器

类似于SimpleTrigger,指定从某一个时间开始,以一定的时间间隔执行的任务。 但是不同的是SimpleTrigger指定的时间间隔为毫秒,没办法指定每隔一个月执行一次(每月的时间间隔不是固定值),而CalendarIntervalTrigger支持的间隔单位有秒,分钟,小时,天,月,年,星期。
对应的方法如下:

0D2A2B1FFAA7D216938E00C9D6E3DAAC

DailyTimeIntervalTrigger:日期触发器

指定每天的某个时间段内,以一定的时间间隔执行任务。并且它可以支持指定星期。 它适合的任务类似于:指定每天9:00 至 18:00 ,每隔70秒执行一次,并且只要周一至周五执行。 比如如下代码:

DailyTimeIntervalTrigger trigger = dailyTimeIntervalSchedule()
    .startingDailyAt(TimeOfDay.hourAndMinuteOfDay(10, 0)) //从10:00点开始
    .endingDailyAt(TimeOfDay.hourAndMinuteOfDay(22, 0)) //22:00点结束
    .onDaysOfTheWeek(MONDAY,TUESDAY,WEDNESDAY,THURSDAY,FRIDAY) //周一到周五执行
    .withIntervalInHours(1) //每间隔1小时执行一次
    .withRepeatCount(50) //最多重复50次(实际执行50+1次)
    .build();

CronTrigger:Cron表达式触发器

适合于更复杂的任务,它支持类型于Linux Cron的语法(并且更强大)。基本上它覆盖了以上三个Trigger的绝大部分能力
属性只有一个就是cron表达式

我们可以根据对应的触发器取到任务执行的数据,如之前用SimpleScheduleBuilder.repeatMinutelyForTotalCount(10, 5),那么对应的就是SimpleTrigger,我们可以获取执行的次数,剩余的次数等。 触发器的介绍可以参考官方文档

 

 

 

Redis分布式锁使用

最近项目中遇到同时有两个线程同时更新一行记录导致后面一条语句执行失败的问题,由于项目是部署在不同的服务器上,这里要控制两个线程的执行顺序,自然想到了使用Redis的锁,废话不多说,下面给出具体实现

/**
 * 核查四要素相同报文是否正在处理,如果有实例正在处理四要素相同报文pass,否则线程等待
 * 
 * @param processData
 */
public void checkPacketProcessRepeat(ProcessData processData) {
	try {
		// 四要素key
		String repeatKey = REPEATKEYSTART + processData.getReviseFlight().getKey();
		while (true) {
			// 设置nx锁,如果nx锁设置成功跳出去,继续执行报文后续处理流程
			if (setNX(repeatKey, EXIST, 3)) {
				log.info("FlightPreProcess-checkPacketProcessRepeat,报文处理拿到NX锁,直接pass,key:{},sourceId:{}", processData.getReviseFlight().getKey(), processData.getReviseFlight()
						.getSourceId());
				return;
			}
			// 如果有当前航班有nx锁、或者nx锁设置失败,则需要等待3秒,等待其他实例处理完成
			log.info("FlightPreProcess-checkPacketProcessRepeat,报文多实例并发处理,需要等待3秒,key:{},sourceId:{}", processData.getReviseFlight().getKey(), processData.getReviseFlight()
					.getSourceId());
			Thread.sleep(3000);
		}
	} catch (Exception e) {
		log.error("FlightPreProcess-checkPacketProcessRepeat,异常,key:{},e:{}", processData.getReviseFlight().getKey(), e);
	}
}

这里根据报文的四要素确定唯一条记录,先调用setNX获取redis锁,如果获取到了就执行后面的逻辑,如果没有获取到则等待3s再重试,下面是setNX方法的实现

/** 设置锁 */
private boolean setNX(final String key, String value, final int exp) {
	return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
		@Override
		public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
			byte[] serializeKey = redisTemplate.getStringSerializer().serialize(key);
			Boolean acquire = connection.incr(serializeKey) == 1;
			// 如果设值成功,则设置过期时间
			if (acquire) {
				connection.expire(serializeKey, exp);
			}
			return acquire;
		}
	});
}

这里使用了redis的incr命令,它是一个原子操作,如果key不存在,那么key的值将初始化为0,然后执行INCR操作,这里判断如果设置成功,则对key设置过期时间,相当于了一个带有时间的锁。

在Redis2.6.12版本后,使用set命令也可以实现分布式锁,具体代码如下:

public static Boolean setNX(final String key, final String value, final int exp) {
        return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
			@Override
			public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
				Jedis jedis = (Jedis) connection.getNativeConnection();
				String result = jedis.set(key, value, "NX", "EX", exp);
				if ("OK".equals(result)) {
					return Boolean.TRUE;
				}
				return Boolean.FALSE;
			}
		});
    }

这里重点说说第3个和第4个参数,这里填的是NX,意思是当key不存在时,我们进行set操作,若key已经存在则不进行任何操作,第4个表示我们要给key设置一个过期时间,具体时间由第5个参数决定。

另外我们这里保存了key对应的value值,所以线程可以根据value值来释放锁,这里的value值可以是线程的ID,比如我们线程后面的逻辑执行失败了,我们可以通过这个value值来尽快释放锁,减少其它线程的等待时间,我们可以使用Lua脚本来实现

private static final Long RELEASE_SUCCESS = 1L;
private static final RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

public static Boolean releaseLock(final String key, final String value) {
			return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
			@Override
			public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
				Jedis jedis = (Jedis) connection.getNativeConnection();
				Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(key),
						Collections.singletonList(value));
				if (RELEASE_SUCCESS.equals(result)) {
					return Boolean.TRUE;
				}
				return Boolean.FALSE;
			}
		});
    }

通过Lua脚本获取对应key的value值,如果value值和给定的一样,则释放锁