ElasticSearch高并发场景写入优化

ElasticSearch高并发场景写入优化

ElasticSearch号称分布式全文搜索引擎,但使用不当依然会很慢,特别是在高并发写入时,会存在写入超时的问题。

在公司内部,基本所有的日志都会放入到ElasticSearch,比如接口访问时间日志、动态/计划数据审核日志、动态/计划抓取报文日志、动态报文更新日志等,每天的写入数据量巨大,最初我们基于ElasticSearch封装了一层,使用内部RestHighLevelClient获取数据,部分代码如下:

@Slf4j
public class ElasticsearchService {

    private RestHighLevelClient client;

    public ElasticsearchService(RestHighLevelClient client) {
        if (client == null)
            throw new IllegalArgumentException("client");
        this.client = client;
    }

    /**
     * 根据id获取文档
     **/
    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;
        try {
            GetRequest getRequest = new GetRequest(index, type, id);
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

            return getResponse;

        } catch (Exception e) {
            log.error("获取文档失败", e);
        }
        return null;
    }

    /**
     * 插入文档, id可以为空,id如果为空,就自动生成id
     **/
    public IndexResponse insertDocument(String index, String type, String id, String json) {
        if (StringUtils.isAnyBlank(index, type, json))
            return null;

        try {
            IndexRequest indexRequest = generateInsertRequest(index, type, id, json);

            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            return indexResponse;
        } catch (Exception e) {
            log.error("创建文档失败", e);
        }
        return null;
    }
}

然而在放到线上生成环境下,经常会出现写入失败数据丢失的情况,发现是数据量写入太大,调用接口超时,直接返回错误了。RestHighLevelClient实质就是通过httpclient请求接口发送数据,是基于http协议。

查看Elasticsearch文档,发现Elasticsearch同样支持使用Transport传输,其内部使用netty长连接通信,并且可以设置处理连接的数量。

新建一个TransportClientFactory用来创建TransportClient,这里连接需要使用密钥,具体填写自己Elasticsearch的账号密码。

public class TransportClientFactory {
    // 默认处理核心数,cpu核心数
    private static int DEFAULT_PROCESSORS = 8;
    // 配置核心数
    private static int processors = 8;

    public static TransportClient create(String addresses, int port, String keyStore, String keyPassword, String trustStore, String trustPassword) throws Exception {

        if (StringUtils.isBlank(addresses)) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (StringUtils.isAnyBlank(keyStore, keyPassword, trustStore, trustPassword)) {
            throw new IllegalArgumentException("缺少证书或密码");
        }

        String[] addrs = addresses.split(",");
        if (addrs.length == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        List<String> validAddrs = new ArrayList<>();
        for (String addr : addrs) {
            if (StringUtils.isNotBlank(addr)) {
                validAddrs.add(addr.trim());
            }
        }

        if (validAddrs.size() == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (processors < DEFAULT_PROCESSORS) {
            processors = DEFAULT_PROCESSORS;
        }
        if (processors > 32) {
            processors = 32;
        }
        int threadPoolCore = processors * 2;
        int threadPoolMax = processors * 2;
        Settings settings = Settings
                .builder()
                .put("path.home", ".")
                // .put("cluster.name", clusterName)
                .put("client.transport.ignore_cluster_name", true).put("searchguard.ssl.transport.enabled", true)
                .put("searchguard.ssl.transport.enforce_hostname_verification", false).put("searchguard.ssl.transport.keystore_filepath", keyStore)
                .put("searchguard.ssl.transport.keystore_password", keyPassword).put("searchguard.ssl.transport.truststore_filepath", trustStore)
                .put("searchguard.ssl.transport.truststore_password", trustPassword).put("processors", processors).put("thread_pool.flush.core", threadPoolCore)
                .put("thread_pool.flush.max", threadPoolMax).build();

        TransportClient client = new PreBuiltTransportClient(settings, SearchGuardPlugin.class);

        for (String validAddr : validAddrs) {
            client.addTransportAddress(new TransportAddress(InetAddress.getByName(validAddr), port));
        }

        return client;
    }

    public static int getProcessors() {
        return processors;
    }

    public static void setProcessors(int processors) {
        TransportClientFactory.processors = processors;
    }

}

修改ElasticsearchService

@Slf4j
public class ElasticsearchService {

    /**
     * 默认10秒超时
     **/
    private long bulkTimeoutSecond = 10;

    private TransportClient client;

    public ElasticsearchService(TransportClient client) {
        this.client = client;
    }

    public ElasticsearchService(TransportClient client, int asyncTimeoutSecond) {
        this.client = client;
        this.bulkTimeoutSecond = asyncTimeoutSecond;
    }

    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;

        return this.client.prepareGet(index, type, id).get();
    }

    public IndexResponse createDocument(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        IndexRequestBuilder builder = createDocumentRequestBuilder(index, type, id, msg);
        if (builder == null)
            return null;

        return builder.get();
    }

    private IndexRequestBuilder createDocumentRequestBuilder(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        if (StringUtils.isBlank(id)) {
            return this.client.prepareIndex(index, type).setSource(msg, XContentType.JSON);
        } else {
            return this.client.prepareIndex(index, type, id).setSource(msg, XContentType.JSON);
        }
    }

}

另外为了提升写入效率,可以批量异步一次性写入,使用bulk方法,如下:

  /**
     * 批量操作,listener可空,支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    public void bulkDocumentOperationAsync(List<DocWriteRequest<?>> requests, ActionListener<BulkResponse> listener) {
        if (requests == null || requests.size() == 0)
            return;

        BulkRequest request = prepareBulkRequests(requests);
        if (request == null)
            return;

        this.client.bulk(request, listener);
    }

    /**
     * 支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    private BulkRequest prepareBulkRequests(List<DocWriteRequest<?>> requests) {
        BulkRequestBuilder builder = bulkRequestBuilder();

        for (DocWriteRequest<?> request : requests) {
            if (request instanceof IndexRequest) {
                builder.add((IndexRequest) request);
            } else if (request instanceof UpdateRequest) {
                builder.add((UpdateRequest) request);
            } else if (request instanceof DeleteRequest) {
                builder.add((DeleteRequest) request);
            }
        }

        if (builder.numberOfActions() == 0) {
            return null;
        }
        return builder.request();
    }

在dao层,我们可以一次性写入多条数据

    /**
     * 异步写入动态日志
     * 
     * @param flightRecords
     */
    public void insertFlightRecodeListAsync(List<FlightRecord> flightRecords) {
        try {
            if (CollectionUtils.isEmpty(flightRecords)) {
                return;
            }
            ElasticsearchService elasticsearchService = elasticsearchProvider.getElasticsearchService();
            List<DocWriteRequest<?>> requests = new ArrayList(flightRecords.size());
            for (FlightRecord flightRecord : flightRecords) {
                // index以航班日期为准
                String index = elasticsearchProvider.FLIGHT_RECORD_INDEX_PREFIX + flightRecord.getLocalDate().replace("-", ".");
                requests.add(elasticsearchService.createDocumentRequest(index, elasticsearchProvider.FLIGHT_RECORD_TYPE, null, JSON.toJSONString(flightRecord)));

            }
            if (CollectionUtils.isEmpty(requests)) {
                return;
            }
            elasticsearchService.bulkDocumentOperationAsync(requests, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkItemResponses) {
                }

                @Override
                public void onFailure(Exception e) {
                    log.error("flightRecord写入ES失败", e);
                }
            });
        } catch (Exception e) {
            log.error("insertFlightRecodeListAsync Exception", e);
        }
    }

其它写入性能优化:

  1. 去掉不必要的字段分词和索引,ElasticSearch会默认对所有字段进行分词,一般查询我们都是根据航班号、航班日期、起飞机场、到达机场查询;所以没必要的字段,我们可以使用keyword类型,不进行分词。
    PUT my_index
    {
      "mappings": {
        "my_type": {
          "properties": {
            "tail_no": { 
              "type": "keyword",
              "index": false
            }
          }
        }
      }
    }
  2. 对于一些普通的日志,比如动态/计划抓取网页日志,数据丢失也无所谓,可以禁用掉refresh和replia,即index.refreshinterval设置为-1,将index.numberof_replicas设置为0即可。
    PUT my_index
    {
      "settings": {
        "index": {
            "refresh_interval" : "-1",
            "number_of_replicas" : 0
        }
      }
    }
  3. 使用ElasticSearch自增ID,如果我们要手动给ElasticSearch document设置一个id,那么ElasticSearch需要每次都去确认一下那个id是否存在,这个过程是比较耗费时间的。如果我们使用自动生成的id,那么ElasticSearch就可以跳过这个步骤,写入性能会更好。
  4. 使用多线程写入ElasticSearch,单线程发送bulk请求是无法最大化ElasticSearch集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。为了更好的利用集群的资源,这样多线程并发写入,可以减少每次底层磁盘fsync的次数和开销。一样,可以对单个ElasticSearch节点的单个shard做压测,比如说,先是2个线程,然后是4个线程,然后是8个线程,16个,每次线程数量倍增。一旦发现es返回了TOOMANYREQUESTS的错误,JavaClient也就是EsRejectedExecutionException,此时那么就说明ElasticSearch是说已经到了一个并发写入的最大瓶颈了,此时我们就知道最多只能支撑这么高的并发写入了。
  5. 适当增加index buffer的大小,如果我们要进行非常重的高并发写入操作,那么最好将index buffer调大一些,indices.memory.indexbuffersize,这个可以调节大一些,设置的这个index buffer大小,是所有的shard公用的,但是如果除以shard数量以后,算出来平均每个shard可以使用的内存大小,一般建议,但是对于每个shard来说,最多给512mb,因为再大性能就没什么提升了。ElasticSearch会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个buffer。默认这个参数的值是10%,也就是jvm heap的10%,如果我们给jvm heap分配10gb内存,那么这个index buffer就有1gb,对于两个shard共享来说,是足够的了。

将项目发布到Maven仓库

我的开源包eweb已经发布到maven中心库了,使用maven配置就可以引入:

    <dependency>
        <groupId>com.github.chuanzh</groupId>
        <artifactId>eweb</artifactId>
        <version>1.0.1</version>
    </dependency>

下面讲讲如何发布到中心库,查了网上的步骤,都说的比较复制,但其实也很简单,这里分下面几个步骤:

一,Sonatype账号注册&创建一个Issue

二,使用GPG生成秘钥

三,修改项目的maven配置文件

四,上传构件到OSS&发布

看懂了之后其实很好理解,其实就类似于申请接入腾讯,微博的APP应用,

第一步,你得成为一个开发者吧,也就是要注册一个开发者账号,接下来就是要创建一个应用,等待腾讯、微博服务商审核

第二步,审核通过后,腾讯、微博会给你一对秘钥,但maven是要自己生成的,也就是用GPG

第三步,在你的项目中配置密钥和其他配置。

第四部,就是上传你的项目到腾讯、微博的服务平台,然后再等待审核,审核通过,那你就可以在他们的平台使用APP了

下面就从这4个方面说明下。

一,Sonatype账号注册&创建一个Issue

注册地址: Sign up for JIRA

记住你的用户名和密码,之后要用到

创建一个Issue:https://issues.sonatype.org/secure/CreateIssue.jspa?issuetype=21&pid=10134

v2-404b13199782405587859b6e73c02854_b

填写项目的基本信息:简单描述,详细描述,项目地址

重点说明下:Group Id为你项目的groupid,一般为公司或个人域名,如果你没有,就不要随便写了(他们在审核时会要求你提供一些证明的),写github的项目地址就可以了

好了,现在你就可以静静的等待maven审核了,一般为1~2天

二,使用GPG生成秘钥

首先需要下载GPG软件,Mac下载地址:GPG Suite,Windows用户下载地址:Secure email and file encryption with GnuPG for Windows,我的是Mac,所以具体说说Mac下的生成方式,Window方式请参考这篇博客

Mac下生成很简单,如下几个步骤:

1,新建一个密钥

v2-4fe9a772f939f0a8ef4fd2bfd103f97d_b

2,点击右键,将公钥发送至公钥服务器

v2-30789dafea5f96661c8e22b9c58ac165_b

到此密钥就生成了,记住你的口令,后面要用到

三,修改项目的maven配置文件

1,修改maven的setting配置

<settings>

    ...

    <servers>
        <server>
            <id>oss</id>
            <username>用户名</username>
            <password>密码</password>
        </server>
    </servers>

    ...

</settings>

用户名和密码就是你注册Sonatype的账号

2,配置项目pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.github.chuanzh</groupId>
  <artifactId>eweb</artifactId>
  <version>1.0.1</version>
  <packaging>jar</packaging>

  <name>eweb</name>
  <description>轻量web开发框架</description>
  <url>https://github.com/chuanzh/eweb</url>

  <licenses>
    <license>
      <name>The Apache Software License, Version 2.0</name>
      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
    </license>
  </licenses>

  <developers>
    <developer>
      <name>chuan.zhang</name>
      <email>zhangchuan0305@gmail.com</email>
    </developer>
  </developers>

  <scm>
    <connection>scm:git@github.com:chuanzh/eweb.git</connection>
    <developerConnection>scm:git@github.com:chuanzh/eweb.git</developerConnection>
    <url>git@github.com:chuanzh/eweb.git</url>
  </scm>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>utf-8</project.reporting.outputEncoding>
  </properties>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5.1</version>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

  <profiles>
    <profile>
      <id>release</id>
      <distributionManagement>
        <snapshotRepository>
          <id>oss</id>
          <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
        </snapshotRepository>
        <repository>
          <id>oss</id>
          <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
        </repository>
      </distributionManagement>
      <build>
        <plugins>
          <!-- Source -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-source-plugin</artifactId>
            <version>3.0.1</version>
            <executions>
              <execution>
                <phase>package</phase>
                <goals>
                  <goal>jar-no-fork</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
          <!-- Javadoc -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-javadoc-plugin</artifactId>
            <version>2.10.4</version>
            <executions>
              <execution>
                <phase>package</phase>
                <goals>
                  <goal>jar</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
          <!-- Gpg Signature -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-gpg-plugin</artifactId>
            <version>1.6</version>
            <executions>
              <execution>
                <id>sign-artifacts</id>
                <phase>verify</phase>
                <goals>
                  <goal>sign</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </profile>
  </profiles>

</project>

pom.xml必须配置的有:name,description,licenses,developers,scm。

snapshotRepository和repository在Issure初审通过后会给你,另外id要和setting.xml中保持一致

6EC7FDB0-E87F-4A66-ABCD-D7DC14EC17D1

四,上传构件到OSS&发布

1,使用下面命令,会提示要求你输入GPG的密钥,就是申请时候填写的,此过程比较慢,需要耐心等待

mvn clean deploy -P release

2,在OSS中发布构件

使用sonatype账号登陆https://oss.sonatype.org ,通过模糊查找,选择Staging Repositories,如下图,该构件的状态此时应该为open,勾选它,点击close按钮,sonatype会先做检验,通过后,就可以点击Release了,如果审核不通过,尝试删掉后重新上传

49767CB8-C939-456A-B314-1DA45F606BFB

A521FDDB-E66E-4116-B604-DF098982EA20

3,通知sonatype已经发布构建

也就是在Issure中回复,再次等待,也是需要审核1~2天,若审核通过,会回复你如下的信息

C86DFA4C-BC11-4A82-8CEB-FD3F3D9BF75C

等待10分钟,你就可以在maven库中搜索到你的jar包了

3CB6AF70-C0F5-4F63-8F3C-14D22E8C3CA8

 

若下次修改了本地的代码,只需要重新执行第四步的命令即可