Sentinel流量控制介绍

1. 背景

  • 比如双11,秒杀等,系统访问量远远超出系统所能处理的并发数,需对系统进行保护。
  • 在微服务架构中,服务拆分粒度较细,会出现请求链路较长的情况,如果链路中某个服务因网络延迟或者请求超时等原因不可用,会导致当前请求阻塞,可能出现请求堆积从而导致雪崩效应。
  • 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、
  • 系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性

2. 常见的限流场景

●  在Nginx 层添加限流模块限制平均访问速度
●  通过设置数据库连接池,线程池的大小来限制总的并发数
●  通过guava 提供的Ratelimiter 限制接口的访问速度
●  TCP通信协议中的限流整形
 

3. 限流算法

3.1、 计数器算法

         计数器算法是一种比较简单的限流实现算法,在指定的周期内累加访问次数,当访问次数达到设定的阈值时,触发限流策略,当进入下一个时间周期时进行访问数的清零。

优点:实现简单
缺点:临界问题:如图所示,当在8-10秒和10-12秒内分别并发500,虽然没有超过阈值,但如果算8-12秒,则并发数高达1000,已经超过了原先定义的10秒内不超过500的并发量
 

3.2、 滑动窗口算法

为了解决计数器算法带来的临界问题,所以引入了滑动窗口算法。
简单来说,滑动窗口算法原理是在固定窗口汇总分割出多个小时间窗口,分别在每个小时间窗口中记录访问次数,然后根据时间将窗口往前滑动并删除过期的小时间窗口,最终只需要统计滑动窗口时间范围内的所有小时窗口的总的计数即可。
如图所示,最终只需要统计每个小时间窗口不超过阈值/n && 在滑动窗口范围内的所有的小时间窗口总的计数不超过阈值即可

优点:实现相对简单,且没有计数器算法的临界问题
缺点:无法应对短时间高并发(突刺现象)
 

3.3、 令牌桶限流算法

令牌桶是网络流量整形和速率限制中最常使用的一种算法,对于每一个请求,都需要从令牌桶中获得一个令牌,如果没有获得令牌,则需要触发限流策略。

基本过程:

1)每进来一个请求,都在桶里获取一个令牌
2)如果有令牌,则拿着令牌通过
3)如果没有令牌,则不允许请求通过

几种情况:
请求速度 > 令牌生成速度:当令牌被取空后会被限流
请求速度 = 令牌生成速度:流量处于平稳状态
请求速度 < 令牌生成速度:说明此时并发数不高,请求能被正常处理
优点:可以像漏桶那样匀速,也可以像计数器那样处理突发流量
 

3.4、  漏桶限流算法

漏桶限流算法的主要作用是控制数据注入网络的速度,平滑网络上的突发流量
漏桶算法的原理:在漏桶内部同样维护一个容器,这个容器会以恒定的速度出水,不管上面的水流速度多快,漏桶水滴的流出速度始终保持不变,实际上消息中间件就使用了漏桶限流的思想。
 

漏桶算法中,有如下几种可能的情况:
●  请求速度大于漏桶流出速度,也就是请求数超出当前服务所能处理的极限,将会触发限流策略
●  请求速度小于或者等于漏桶流出的速度,也就是服务处理能力整合满足客户端的请求量,将正常执行。
不足:无法应对突发的并发流量,因为流出速率一直都是恒定的
优点:平滑系统流量
 

Sentinel介绍

       Sentinel是阿里开源的项目,是面向分布式架构的轻量级流量控制组件,主要以流量为切入点,从限流,流量整形,服务降级,系统负载保护等多个维度来帮助我们保障微服务的稳定性
Sentinel 分2 部分
核心库:不依赖任何框架/库,能够运行于所有的java 环境,对Dubbo ,Spring Cloud 等框架有较好的支持。
控制台:基于Spring boot 开发,打包后可以直接运行,不需要额外的tomcat 等应用部署。
 

5、Sentinel 流量控制

5.1、整体步骤

  • 定义资源
  • 定义限流规则
  • 检验规则是否生效
 

5.2、资源定义方式

方式1:抛出异常的方式定义资源
SphU 包含了 try-catch 风格的 API。用这种方式,当资源发生了限流之后会抛出 BlockException。这个时候可以捕捉异常,进行限流之后的逻辑处理。示例代码如下:
// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
Entry entry = null;
try {
    // 资源名可使用任意有业务语义的字符串
    entry = SphU.entry("自定义资源名");
    // 被保护的业务逻辑
    // do something...
} catch (BlockException e1) {
    // 资源访问阻止,被限流或被降级
    // 进行相应的处理操作
} finally {
    if (entry != null) {
        entry.exit();
    }
}

注意: 
1、SphU.entry(xxx) 需要与 entry.exit() 方法成对出现,匹配调用,否则会导致调用链记录异常,抛出 ErrorEntryFreeException 异常。
2、务必保证finally会被执行

方式2:注解方式定义资源
Sentinel 支持通过 @SentinelResource 注解定义资源并配置 blockHandler 和 fallback 函数来进行限流之后的处理。示例:

@SentinelResource(blockHandler = "blockHandlerForGetUser")
public User getUserById(String id) {
        throw new RuntimeException("getUserById command failed");
}
// blockHandler 函数,原方法调用被限流/降级/系统保护的时候调用
public User blockHandlerForGetUser(String id, BlockException ex) {
        return new User("admin");
}

注意
1、blockHandler 函数会在原方法被限流/降级/系统保护的时候调用,而 fallback 函数会针对所有类型的异常。请注意 blockHandler 和 fallback 函数的形式要求
2、需注意:blockHandler 所配置的值会在触发限流之后调用,这个方法定义必须和原始方法的返回值,参数保持一致,而且需要增加BlockException 参数。

5.3、限流具体实现

5.3.1. 引入 Sentinel 依赖

添加依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.4</version>
</dependency>

5.3.2. 定义资源

资源 是 Sentinel 中的核心概念之一。最常用的资源是我们代码中的 Java 方法,也可以更灵活的定义你的资源,例如,把需要控制流量的代码用 Sentinel API SphU.entry(“HelloWorld”) 和 entry.exit() 包围起来即可。

    在下面的例子中,我们将 System.out.println(“hello world”); 作为资源(被保护的逻辑),用 API 包装起来。参考代码如下:

public static void main(String[] args) {
    // 配置规则.
    initFlowRules();
    while (true) {
        try (Entry entry = SphU.entry("HelloWorld")) {
        // 被保护的逻辑
            System.out.println("hello world");
        } catch (BlockException ex) {
            // 处理被流控的逻辑
            System.out.println("blocked!");
        }
    }
}

完成以上两步后,代码端的改造就完成了。也可以通过我们提供的 注解,来定义我们的资源,类似于下面的代码:

@SentinelResource("HelloWorld")
public void helloWorld() {
    // 资源中的逻辑
    System.out.println("hello world");
}

这样,helloWorld() 方法就成了我们的一个资源。

5.3.3. 定义规则

通过流控规则来指定允许该资源通过的请求次数,例如下面的代码定义了资源 HelloWorld 每秒最多只能通过 20 个请求。

private static void initFlowRules(){
    List<FlowRule> rules = new ArrayList<>();
    FlowRule rule = new FlowRule();
    rule.setResource("HelloWorld");
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule.setCount(20);
    rules.add(rule);
    FlowRuleManager.loadRules(rules);
}

完成上面 3 步,Sentinel 就能够正常工作了。

5.3.4. 检查效果

Demo 运行之后,我们可以在日志 ~/logs/csp/${appName}-metrics.log.xxx 里看到下面的输出:

其中 p 代表通过的请求, block 代表被阻止的请求, s 代表成功执行完成的请求个数, e 代表用户自定义的异常, rt 代表平均响应时长。

可以看到,这个程序每秒稳定输出 “hello world” 20 次,和规则中预先设定的阈值是一样的。

6、Sentinel 熔断

6.1、背景

1、现代微服务架构都是分布式的,由非常多的服务组成,除了流量控制以外,对调用链路中不稳定的资源进行熔断降级也是保障高可用的重要措施之一。
2、分布式系统中,不同服务之间相互调用,组成复杂的调用链路,复杂链路上的某一环不稳定,可能会层层级联导致整体的雪崩 。

6.2、熔断策略

Sentinel 提供以下几种熔断策略:

  • 慢调用比例 (SLOW_REQUEST_RATIO):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。
  • 异常比例 (ERROR_RATIO):当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是 [0.0, 1.0],代表 0% – 100%。
  • 异常数 (ERROR_COUNT):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。

注意
 1、@SentinelResource 注解会自动统计业务异常,无需手动调用。
2、异常降级仅针对业务异常,对 Sentinel 限流降级本身的异常(BlockException)不生效。为了统计异常比例或异常数,需要通过 Tracer.trace(ex) 记录业务异常。示例:

Entry entry = null;
try {
    entry = SphU.entry(resource);

} catch (Throwable t) {
    if (!BlockException.isBlockException(t)) {
        Tracer.trace(t);
    }
} finally {
    if (entry != null) {
        entry.exit();
    }
}

熔断降级规则

熔断降级规则(DegradeRule)包含下面几个重要的属性:

Field说明默认值
resource资源名,即规则的作用对象
grade熔断策略,支持慢调用比例/异常比例/异常数策略慢调用比例
count慢调用比例模式下为慢调用临界 RT(超出该值计为慢调用);异常比例/异常数模式下为对应的阈值
timeWindow熔断时长,单位为 s
minRequestAmount熔断触发的最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入)5
statIntervalMs统计时长(单位为 ms),如 60*1000 代表分钟级(1.8.0 引入)1000 ms
slowRatioThreshold慢调用比例阈值,仅慢调用比例模式有效(1.8.0 引入)

数据库和缓存双写一致性问题

缓存由于其高并发和高性能的特性,已经在项目中被广泛使用。在读取缓存方面,没啥疑问,都是按照下图的流程来进行业务操作。

但是在更新缓存方面,对于更新完数据库,是更新缓存呢,还是删除缓存。又或者是先删除缓存,再更新数据库,其实大家存在很大的争议。

先做一个说明,从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案。这种方案下,我们可以对存入缓存的数据设置过期时间,所有的写操作以数据库为准,对缓存操作只是尽最大努力即可。也就是说如果数据库写成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存。因此,接下来讨论的思路不依赖于给缓存设置过期时间这个方案。

在这里,我们讨论三种更新策略:

  1. 先更新数据库,再更新缓存
  2. 先删除缓存,再更新数据库
  3. 先更新数据库,再删除缓存

应该没人问我,为什么没有先更新缓存,再更新数据库这种策略。

1、先更新数据库,再更新缓存

这套方案,大家是普遍反对的。为什么呢?有如下两点原因。

  • 原因一(线程安全角度)

同时有请求A和请求B进行更新操作,那么会出现

(1)线程A更新了数据库
(2)线程B更新了数据库
(3)线程B更新了缓存
(4)线程A更新了缓存

这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。

  • 原因二(业务场景角度)

有如下两点:

(1)如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到,缓存就被频繁的更新,浪费性能。
(2)如果你写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存。那么,每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。

接下来讨论的就是争议最大的,先删缓存,再更新数据库。还是先更新数据库,再删缓存的问题。

2、先删缓存,再更新数据库

该方案会导致不一致的原因是。同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:

(1)请求A进行写操作,删除缓存
(2)请求B查询发现缓存不存在
(3)请求B去数据库查询得到旧值
(4)请求B将旧值写入缓存
(5)请求A将新值写入数据库

上述情况就会导致不一致的情形出现。而且,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。

那么,如何解决呢?采用延时双删策略

伪代码如下

public void write(String key,Object data){ 
    redis.delKey(key); 
    db.updateData(data); 
    Thread.sleep(1000); 
    redis.delKey(key); 
}

转化为中文描述就是

(1)先淘汰缓存
(2)再写数据库(这两步和原来一样)
(3)休眠1秒,再次淘汰缓存

这么做,可以将1秒内所造成的缓存脏数据,再次删除。那么,这个1秒怎么确定的,具体该休眠多久呢?

针对上面的情形,读者应该自行评估自己的项目的读数据业务逻辑的耗时。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上,加几百ms即可。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。

如果你用了mysql的读写分离架构怎么办?

ok,在这种情况下,造成数据不一致的原因如下,还是两个请求,一个请求A进行更新操作,另一个请求B进行查询操作。

(1)请求A进行写操作,删除缓存
(2)请求A将数据写入数据库了,
(3)请求B查询缓存发现,缓存没有值
(4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值
(5)请求B将旧值写入缓存
(6)数据库完成主从同步,从库变为新值

上述情形,就是数据不一致的原因。还是使用双删延时策略。只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms。

采用这种同步淘汰策略,吞吐量降低怎么办?

那就将第二次删除作为异步的。自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后了,再返回。这么做,加大吞吐量。

第二次删除,如果删除失败怎么办?

这是个非常好的问题,因为第二次删除失败,就会出现如下情形。还是有两个请求,一个请求A进行更新操作,另一个请求B进行查询操作,为了方便,假设是单库:

(1)请求A进行写操作,删除缓存
(2)请求B查询发现缓存不存在
(3)请求B去数据库查询得到旧值
(4)请求B将旧值写入缓存
(5)请求A将新值写入数据库
(6)请求A试图去删除请求B写入对缓存值,结果失败了。

ok,这也就是说。如果第二次删除缓存失败,会再次出现缓存和数据库不一致的问题。

如何解决呢?

具体解决方案,且看博主对第(3)种更新策略的解析。

3、先更新数据库,再删缓存

首先,先说一下。老外提出了一个缓存更新套路,名为《Cache-Aside pattern》。其中就指出

  • 失效:应用程序先从cache取数据,没有得到,则从数据库中取数据,成功后,放到缓存中。
  • 命中:应用程序从cache中取数据,取到后返回。
  • 更新:先把数据存到数据库中,成功后,再让缓存失效。

另外,知名社交网站facebook也在论文《Scaling Memcache at Facebook》中提出,他们用的也是先更新数据库,再删缓存的策略。

这种情况不存在并发问题么?

不是的。假设这会有两个请求,一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生

(1)缓存刚好失效
(2)请求A查询数据库,得一个旧值
(3)请求B将新值写入数据库
(4)请求B删除缓存
(5)请求A将查到的旧值写入缓存

ok,如果发生上述情况,确实是会发生脏数据。

然而,发生这种情况的概率又有多少呢?

发生上述情况有一个先天性条件,就是步骤(3)的写数据库操作比步骤(2)的读数据库操作耗时更短,才有可能使得步骤(4)先于步骤(5)。可是,大家想想,数据库的读操作的速度远快于写操作的(不然做读写分离干嘛,做读写分离的意义就是因为读操作比较快,耗资源少),因此步骤(3)耗时比步骤(2)更短,这一情形很难出现。

假设,有人非要抬杠,有强迫症,一定要解决怎么办?

如何解决上述并发问题?

首先,给缓存设有效时间是一种方案。其次,采用策略(2)里给出的异步延时删除策略,保证读请求完成以后,再进行删除操作。

还有其他造成不一致的原因么?

有的,这也是缓存更新策略(2)和缓存更新策略(3)都存在的一个问题,如果删缓存失败了怎么办,那不是会有不一致的情况出现么。比如一个写数据请求,然后写入数据库了,删缓存失败了,这会就出现不一致的情况了。这也是缓存更新策略(2)里留下的最后一个疑问。

如何解决?提供一个保障的重试机制即可,这里给出两套方案。

方案一,如下图所示:

​(1)更新数据库数据;
(2)缓存因为种种问题删除失败
(3)将需要删除的key发送至消息队列
(4)自己消费消息,获得需要删除的key
(5)继续重试删除操作,直到成功

然而,该方案有一个缺点,对业务线代码造成大量的侵入。于是有了方案二,在方案二中,启动一个订阅程序去订阅数据库的binlog,获得需要操作的数据。在应用程序中,另起一段程序,获得这个订阅程序传来的信息,进行删除缓存操作。

方案二,流程如下图所示:

​(1)更新数据库数据
(2)数据库会将操作信息写入binlog日志当中
(3)订阅程序提取出所需要的数据以及key
(4)另起一段非业务代码,获得该信息
(5)尝试删除缓存操作,发现删除失败
(6)将这些信息发送至消息队列
(7)重新从消息队列中获得该数据,重试操作。

备注说明:上述的订阅binlog程序在mysql中有现成的中间件叫canal,可以完成订阅binlog日志的功能。至于oracle中,博主目前不知道有没有现成中间件可以使用。另外,重试机制,博主是采用的是消息队列的方式。如果对一致性要求不是很高,直接在程序中另起一个线程,每隔一段时间去重试即可,这些大家可以灵活自由发挥,只是提供一个思路。

总结
对目前互联网中已有的一致性方案,进行了一个总结。对于先删缓存,再更新数据库的更新策略,还有方案提出维护一个内存队列的方式,觉得实现异常复杂,没有必要,因此没有必要在文中给出。最后,希望大家有所收获。

ElasticSearch高并发场景写入优化

ElasticSearch高并发场景写入优化

ElasticSearch号称分布式全文搜索引擎,但使用不当依然会很慢,特别是在高并发写入时,会存在写入超时的问题。

在公司内部,基本所有的日志都会放入到ElasticSearch,比如接口访问时间日志、动态/计划数据审核日志、动态/计划抓取报文日志、动态报文更新日志等,每天的写入数据量巨大,最初我们基于ElasticSearch封装了一层,使用内部RestHighLevelClient获取数据,部分代码如下:

@Slf4j
public class ElasticsearchService {

    private RestHighLevelClient client;

    public ElasticsearchService(RestHighLevelClient client) {
        if (client == null)
            throw new IllegalArgumentException("client");
        this.client = client;
    }

    /**
     * 根据id获取文档
     **/
    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;
        try {
            GetRequest getRequest = new GetRequest(index, type, id);
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

            return getResponse;

        } catch (Exception e) {
            log.error("获取文档失败", e);
        }
        return null;
    }

    /**
     * 插入文档, id可以为空,id如果为空,就自动生成id
     **/
    public IndexResponse insertDocument(String index, String type, String id, String json) {
        if (StringUtils.isAnyBlank(index, type, json))
            return null;

        try {
            IndexRequest indexRequest = generateInsertRequest(index, type, id, json);

            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            return indexResponse;
        } catch (Exception e) {
            log.error("创建文档失败", e);
        }
        return null;
    }
}

然而在放到线上生成环境下,经常会出现写入失败数据丢失的情况,发现是数据量写入太大,调用接口超时,直接返回错误了。RestHighLevelClient实质就是通过httpclient请求接口发送数据,是基于http协议。

查看Elasticsearch文档,发现Elasticsearch同样支持使用Transport传输,其内部使用netty长连接通信,并且可以设置处理连接的数量。

新建一个TransportClientFactory用来创建TransportClient,这里连接需要使用密钥,具体填写自己Elasticsearch的账号密码。

public class TransportClientFactory {
    // 默认处理核心数,cpu核心数
    private static int DEFAULT_PROCESSORS = 8;
    // 配置核心数
    private static int processors = 8;

    public static TransportClient create(String addresses, int port, String keyStore, String keyPassword, String trustStore, String trustPassword) throws Exception {

        if (StringUtils.isBlank(addresses)) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (StringUtils.isAnyBlank(keyStore, keyPassword, trustStore, trustPassword)) {
            throw new IllegalArgumentException("缺少证书或密码");
        }

        String[] addrs = addresses.split(",");
        if (addrs.length == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        List<String> validAddrs = new ArrayList<>();
        for (String addr : addrs) {
            if (StringUtils.isNotBlank(addr)) {
                validAddrs.add(addr.trim());
            }
        }

        if (validAddrs.size() == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (processors < DEFAULT_PROCESSORS) {
            processors = DEFAULT_PROCESSORS;
        }
        if (processors > 32) {
            processors = 32;
        }
        int threadPoolCore = processors * 2;
        int threadPoolMax = processors * 2;
        Settings settings = Settings
                .builder()
                .put("path.home", ".")
                // .put("cluster.name", clusterName)
                .put("client.transport.ignore_cluster_name", true).put("searchguard.ssl.transport.enabled", true)
                .put("searchguard.ssl.transport.enforce_hostname_verification", false).put("searchguard.ssl.transport.keystore_filepath", keyStore)
                .put("searchguard.ssl.transport.keystore_password", keyPassword).put("searchguard.ssl.transport.truststore_filepath", trustStore)
                .put("searchguard.ssl.transport.truststore_password", trustPassword).put("processors", processors).put("thread_pool.flush.core", threadPoolCore)
                .put("thread_pool.flush.max", threadPoolMax).build();

        TransportClient client = new PreBuiltTransportClient(settings, SearchGuardPlugin.class);

        for (String validAddr : validAddrs) {
            client.addTransportAddress(new TransportAddress(InetAddress.getByName(validAddr), port));
        }

        return client;
    }

    public static int getProcessors() {
        return processors;
    }

    public static void setProcessors(int processors) {
        TransportClientFactory.processors = processors;
    }

}

修改ElasticsearchService

@Slf4j
public class ElasticsearchService {

    /**
     * 默认10秒超时
     **/
    private long bulkTimeoutSecond = 10;

    private TransportClient client;

    public ElasticsearchService(TransportClient client) {
        this.client = client;
    }

    public ElasticsearchService(TransportClient client, int asyncTimeoutSecond) {
        this.client = client;
        this.bulkTimeoutSecond = asyncTimeoutSecond;
    }

    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;

        return this.client.prepareGet(index, type, id).get();
    }

    public IndexResponse createDocument(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        IndexRequestBuilder builder = createDocumentRequestBuilder(index, type, id, msg);
        if (builder == null)
            return null;

        return builder.get();
    }

    private IndexRequestBuilder createDocumentRequestBuilder(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        if (StringUtils.isBlank(id)) {
            return this.client.prepareIndex(index, type).setSource(msg, XContentType.JSON);
        } else {
            return this.client.prepareIndex(index, type, id).setSource(msg, XContentType.JSON);
        }
    }

}

另外为了提升写入效率,可以批量异步一次性写入,使用bulk方法,如下:

  /**
     * 批量操作,listener可空,支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    public void bulkDocumentOperationAsync(List<DocWriteRequest<?>> requests, ActionListener<BulkResponse> listener) {
        if (requests == null || requests.size() == 0)
            return;

        BulkRequest request = prepareBulkRequests(requests);
        if (request == null)
            return;

        this.client.bulk(request, listener);
    }

    /**
     * 支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    private BulkRequest prepareBulkRequests(List<DocWriteRequest<?>> requests) {
        BulkRequestBuilder builder = bulkRequestBuilder();

        for (DocWriteRequest<?> request : requests) {
            if (request instanceof IndexRequest) {
                builder.add((IndexRequest) request);
            } else if (request instanceof UpdateRequest) {
                builder.add((UpdateRequest) request);
            } else if (request instanceof DeleteRequest) {
                builder.add((DeleteRequest) request);
            }
        }

        if (builder.numberOfActions() == 0) {
            return null;
        }
        return builder.request();
    }

在dao层,我们可以一次性写入多条数据

    /**
     * 异步写入动态日志
     * 
     * @param flightRecords
     */
    public void insertFlightRecodeListAsync(List<FlightRecord> flightRecords) {
        try {
            if (CollectionUtils.isEmpty(flightRecords)) {
                return;
            }
            ElasticsearchService elasticsearchService = elasticsearchProvider.getElasticsearchService();
            List<DocWriteRequest<?>> requests = new ArrayList(flightRecords.size());
            for (FlightRecord flightRecord : flightRecords) {
                // index以航班日期为准
                String index = elasticsearchProvider.FLIGHT_RECORD_INDEX_PREFIX + flightRecord.getLocalDate().replace("-", ".");
                requests.add(elasticsearchService.createDocumentRequest(index, elasticsearchProvider.FLIGHT_RECORD_TYPE, null, JSON.toJSONString(flightRecord)));

            }
            if (CollectionUtils.isEmpty(requests)) {
                return;
            }
            elasticsearchService.bulkDocumentOperationAsync(requests, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkItemResponses) {
                }

                @Override
                public void onFailure(Exception e) {
                    log.error("flightRecord写入ES失败", e);
                }
            });
        } catch (Exception e) {
            log.error("insertFlightRecodeListAsync Exception", e);
        }
    }

其它写入性能优化:

  1. 去掉不必要的字段分词和索引,ElasticSearch会默认对所有字段进行分词,一般查询我们都是根据航班号、航班日期、起飞机场、到达机场查询;所以没必要的字段,我们可以使用keyword类型,不进行分词。
    PUT my_index
    {
      "mappings": {
        "my_type": {
          "properties": {
            "tail_no": { 
              "type": "keyword",
              "index": false
            }
          }
        }
      }
    }
  2. 对于一些普通的日志,比如动态/计划抓取网页日志,数据丢失也无所谓,可以禁用掉refresh和replia,即index.refreshinterval设置为-1,将index.numberof_replicas设置为0即可。
    PUT my_index
    {
      "settings": {
        "index": {
            "refresh_interval" : "-1",
            "number_of_replicas" : 0
        }
      }
    }
  3. 使用ElasticSearch自增ID,如果我们要手动给ElasticSearch document设置一个id,那么ElasticSearch需要每次都去确认一下那个id是否存在,这个过程是比较耗费时间的。如果我们使用自动生成的id,那么ElasticSearch就可以跳过这个步骤,写入性能会更好。
  4. 使用多线程写入ElasticSearch,单线程发送bulk请求是无法最大化ElasticSearch集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。为了更好的利用集群的资源,这样多线程并发写入,可以减少每次底层磁盘fsync的次数和开销。一样,可以对单个ElasticSearch节点的单个shard做压测,比如说,先是2个线程,然后是4个线程,然后是8个线程,16个,每次线程数量倍增。一旦发现es返回了TOOMANYREQUESTS的错误,JavaClient也就是EsRejectedExecutionException,此时那么就说明ElasticSearch是说已经到了一个并发写入的最大瓶颈了,此时我们就知道最多只能支撑这么高的并发写入了。
  5. 适当增加index buffer的大小,如果我们要进行非常重的高并发写入操作,那么最好将index buffer调大一些,indices.memory.indexbuffersize,这个可以调节大一些,设置的这个index buffer大小,是所有的shard公用的,但是如果除以shard数量以后,算出来平均每个shard可以使用的内存大小,一般建议,但是对于每个shard来说,最多给512mb,因为再大性能就没什么提升了。ElasticSearch会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个buffer。默认这个参数的值是10%,也就是jvm heap的10%,如果我们给jvm heap分配10gb内存,那么这个index buffer就有1gb,对于两个shard共享来说,是足够的了。

Redis导致接口变慢故障排查

Redis导致接口变慢故障排查

近段时间我们一个接口总是隔断时间出现一次访问很慢的情况,如下图,这是我们通过kibana统计的接口响应时间

WechatIMG12-1

最开始我们想到是不是并发量太大导致后端数据库压力太大了,所以多开了一个实例,并且数据读写采用了读写分离,但情况依旧,最后也打印出了操作数据库、Redis的耗时情况,如下图

WechatIMG13

发现MySQL并不是瓶颈,Redis读取的时候耗时比MySQL更严重,我们知道Redis是单线程直接操作内存的,一定是有某些操作阻碍了主线程的执行,查看了Redis执行日志

WechatIMG14

发现在耗时那个点,Redis正在做持久化操作,而且使用的是RDB全量快照的方式,介绍下RDB的持久化功能:
Redis默认是使用RDB的持久化策略,可以配置周期性将数据保存到磁盘,比如可配置在1分钟内发生1000次写操作,就保存一次,这里的保存是全量保存,即Redis会fork一个子进程来循环所有的数据,然后将数据写入到RDB文件中,如果在某个时间段有频繁的写请求过来,那么Redis就不不断的fork子进程来处理数据库快照操作,但fork操作会发生堵塞,所以那段时间就会发生客户端的读写请求比较卡的情况,Redis的持久化策略流程如下图:

3084708676-5b70e0fd04072_articlex

解决,使用AOP持久化策略或者我们可以配置不使用Redis持久化策略,因为根据接口的业务情况,发现即使数据丢失,也不会造成太大影响,可以直接再去读数据库获取,具体配置只要在最后一行加上:save “”,即可禁用RDB

参考:

http://www.cnblogs.com/zhoujinyi/archive/2013/05/26/3098508.html

https://segmentfault.com/a/1190000015983518