ElasticSearch高并发场景写入优化

ElasticSearch高并发场景写入优化

ElasticSearch号称分布式全文搜索引擎,但使用不当依然会很慢,特别是在高并发写入时,会存在写入超时的问题。

在公司内部,基本所有的日志都会放入到ElasticSearch,比如接口访问时间日志、动态/计划数据审核日志、动态/计划抓取报文日志、动态报文更新日志等,每天的写入数据量巨大,最初我们基于ElasticSearch封装了一层,使用内部RestHighLevelClient获取数据,部分代码如下:

@Slf4j
public class ElasticsearchService {

    private RestHighLevelClient client;

    public ElasticsearchService(RestHighLevelClient client) {
        if (client == null)
            throw new IllegalArgumentException("client");
        this.client = client;
    }

    /**
     * 根据id获取文档
     **/
    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;
        try {
            GetRequest getRequest = new GetRequest(index, type, id);
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

            return getResponse;

        } catch (Exception e) {
            log.error("获取文档失败", e);
        }
        return null;
    }

    /**
     * 插入文档, id可以为空,id如果为空,就自动生成id
     **/
    public IndexResponse insertDocument(String index, String type, String id, String json) {
        if (StringUtils.isAnyBlank(index, type, json))
            return null;

        try {
            IndexRequest indexRequest = generateInsertRequest(index, type, id, json);

            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            return indexResponse;
        } catch (Exception e) {
            log.error("创建文档失败", e);
        }
        return null;
    }
}

然而在放到线上生成环境下,经常会出现写入失败数据丢失的情况,发现是数据量写入太大,调用接口超时,直接返回错误了。RestHighLevelClient实质就是通过httpclient请求接口发送数据,是基于http协议。

查看Elasticsearch文档,发现Elasticsearch同样支持使用Transport传输,其内部使用netty长连接通信,并且可以设置处理连接的数量。

新建一个TransportClientFactory用来创建TransportClient,这里连接需要使用密钥,具体填写自己Elasticsearch的账号密码。

public class TransportClientFactory {
    // 默认处理核心数,cpu核心数
    private static int DEFAULT_PROCESSORS = 8;
    // 配置核心数
    private static int processors = 8;

    public static TransportClient create(String addresses, int port, String keyStore, String keyPassword, String trustStore, String trustPassword) throws Exception {

        if (StringUtils.isBlank(addresses)) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (StringUtils.isAnyBlank(keyStore, keyPassword, trustStore, trustPassword)) {
            throw new IllegalArgumentException("缺少证书或密码");
        }

        String[] addrs = addresses.split(",");
        if (addrs.length == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        List<String> validAddrs = new ArrayList<>();
        for (String addr : addrs) {
            if (StringUtils.isNotBlank(addr)) {
                validAddrs.add(addr.trim());
            }
        }

        if (validAddrs.size() == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (processors < DEFAULT_PROCESSORS) {
            processors = DEFAULT_PROCESSORS;
        }
        if (processors > 32) {
            processors = 32;
        }
        int threadPoolCore = processors * 2;
        int threadPoolMax = processors * 2;
        Settings settings = Settings
                .builder()
                .put("path.home", ".")
                // .put("cluster.name", clusterName)
                .put("client.transport.ignore_cluster_name", true).put("searchguard.ssl.transport.enabled", true)
                .put("searchguard.ssl.transport.enforce_hostname_verification", false).put("searchguard.ssl.transport.keystore_filepath", keyStore)
                .put("searchguard.ssl.transport.keystore_password", keyPassword).put("searchguard.ssl.transport.truststore_filepath", trustStore)
                .put("searchguard.ssl.transport.truststore_password", trustPassword).put("processors", processors).put("thread_pool.flush.core", threadPoolCore)
                .put("thread_pool.flush.max", threadPoolMax).build();

        TransportClient client = new PreBuiltTransportClient(settings, SearchGuardPlugin.class);

        for (String validAddr : validAddrs) {
            client.addTransportAddress(new TransportAddress(InetAddress.getByName(validAddr), port));
        }

        return client;
    }

    public static int getProcessors() {
        return processors;
    }

    public static void setProcessors(int processors) {
        TransportClientFactory.processors = processors;
    }

}

修改ElasticsearchService

@Slf4j
public class ElasticsearchService {

    /**
     * 默认10秒超时
     **/
    private long bulkTimeoutSecond = 10;

    private TransportClient client;

    public ElasticsearchService(TransportClient client) {
        this.client = client;
    }

    public ElasticsearchService(TransportClient client, int asyncTimeoutSecond) {
        this.client = client;
        this.bulkTimeoutSecond = asyncTimeoutSecond;
    }

    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;

        return this.client.prepareGet(index, type, id).get();
    }

    public IndexResponse createDocument(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        IndexRequestBuilder builder = createDocumentRequestBuilder(index, type, id, msg);
        if (builder == null)
            return null;

        return builder.get();
    }

    private IndexRequestBuilder createDocumentRequestBuilder(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        if (StringUtils.isBlank(id)) {
            return this.client.prepareIndex(index, type).setSource(msg, XContentType.JSON);
        } else {
            return this.client.prepareIndex(index, type, id).setSource(msg, XContentType.JSON);
        }
    }

}

另外为了提升写入效率,可以批量异步一次性写入,使用bulk方法,如下:

  /**
     * 批量操作,listener可空,支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    public void bulkDocumentOperationAsync(List<DocWriteRequest<?>> requests, ActionListener<BulkResponse> listener) {
        if (requests == null || requests.size() == 0)
            return;

        BulkRequest request = prepareBulkRequests(requests);
        if (request == null)
            return;

        this.client.bulk(request, listener);
    }

    /**
     * 支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    private BulkRequest prepareBulkRequests(List<DocWriteRequest<?>> requests) {
        BulkRequestBuilder builder = bulkRequestBuilder();

        for (DocWriteRequest<?> request : requests) {
            if (request instanceof IndexRequest) {
                builder.add((IndexRequest) request);
            } else if (request instanceof UpdateRequest) {
                builder.add((UpdateRequest) request);
            } else if (request instanceof DeleteRequest) {
                builder.add((DeleteRequest) request);
            }
        }

        if (builder.numberOfActions() == 0) {
            return null;
        }
        return builder.request();
    }

在dao层,我们可以一次性写入多条数据

    /**
     * 异步写入动态日志
     * 
     * @param flightRecords
     */
    public void insertFlightRecodeListAsync(List<FlightRecord> flightRecords) {
        try {
            if (CollectionUtils.isEmpty(flightRecords)) {
                return;
            }
            ElasticsearchService elasticsearchService = elasticsearchProvider.getElasticsearchService();
            List<DocWriteRequest<?>> requests = new ArrayList(flightRecords.size());
            for (FlightRecord flightRecord : flightRecords) {
                // index以航班日期为准
                String index = elasticsearchProvider.FLIGHT_RECORD_INDEX_PREFIX + flightRecord.getLocalDate().replace("-", ".");
                requests.add(elasticsearchService.createDocumentRequest(index, elasticsearchProvider.FLIGHT_RECORD_TYPE, null, JSON.toJSONString(flightRecord)));

            }
            if (CollectionUtils.isEmpty(requests)) {
                return;
            }
            elasticsearchService.bulkDocumentOperationAsync(requests, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkItemResponses) {
                }

                @Override
                public void onFailure(Exception e) {
                    log.error("flightRecord写入ES失败", e);
                }
            });
        } catch (Exception e) {
            log.error("insertFlightRecodeListAsync Exception", e);
        }
    }

其它写入性能优化:

  1. 去掉不必要的字段分词和索引,ElasticSearch会默认对所有字段进行分词,一般查询我们都是根据航班号、航班日期、起飞机场、到达机场查询;所以没必要的字段,我们可以使用keyword类型,不进行分词。
    PUT my_index
    {
      "mappings": {
        "my_type": {
          "properties": {
            "tail_no": { 
              "type": "keyword",
              "index": false
            }
          }
        }
      }
    }
  2. 对于一些普通的日志,比如动态/计划抓取网页日志,数据丢失也无所谓,可以禁用掉refresh和replia,即index.refreshinterval设置为-1,将index.numberof_replicas设置为0即可。
    PUT my_index
    {
      "settings": {
        "index": {
            "refresh_interval" : "-1",
            "number_of_replicas" : 0
        }
      }
    }
  3. 使用ElasticSearch自增ID,如果我们要手动给ElasticSearch document设置一个id,那么ElasticSearch需要每次都去确认一下那个id是否存在,这个过程是比较耗费时间的。如果我们使用自动生成的id,那么ElasticSearch就可以跳过这个步骤,写入性能会更好。
  4. 使用多线程写入ElasticSearch,单线程发送bulk请求是无法最大化ElasticSearch集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。为了更好的利用集群的资源,这样多线程并发写入,可以减少每次底层磁盘fsync的次数和开销。一样,可以对单个ElasticSearch节点的单个shard做压测,比如说,先是2个线程,然后是4个线程,然后是8个线程,16个,每次线程数量倍增。一旦发现es返回了TOOMANYREQUESTS的错误,JavaClient也就是EsRejectedExecutionException,此时那么就说明ElasticSearch是说已经到了一个并发写入的最大瓶颈了,此时我们就知道最多只能支撑这么高的并发写入了。
  5. 适当增加index buffer的大小,如果我们要进行非常重的高并发写入操作,那么最好将index buffer调大一些,indices.memory.indexbuffersize,这个可以调节大一些,设置的这个index buffer大小,是所有的shard公用的,但是如果除以shard数量以后,算出来平均每个shard可以使用的内存大小,一般建议,但是对于每个shard来说,最多给512mb,因为再大性能就没什么提升了。ElasticSearch会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个buffer。默认这个参数的值是10%,也就是jvm heap的10%,如果我们给jvm heap分配10gb内存,那么这个index buffer就有1gb,对于两个shard共享来说,是足够的了。