ElasticSearch高并发场景写入优化
ElasticSearch号称分布式全文搜索引擎,但使用不当依然会很慢,特别是在高并发写入时,会存在写入超时的问题。
在公司内部,基本所有的日志都会放入到ElasticSearch,比如接口访问时间日志、动态/计划数据审核日志、动态/计划抓取报文日志、动态报文更新日志等,每天的写入数据量巨大,最初我们基于ElasticSearch封装了一层,使用内部RestHighLevelClient获取数据,部分代码如下:
@Slf4j public class ElasticsearchService { private RestHighLevelClient client; public ElasticsearchService(RestHighLevelClient client) { if (client == null) throw new IllegalArgumentException("client"); this.client = client; } /** * 根据id获取文档 **/ public GetResponse getDocument(String index, String type, String id) { if (StringUtils.isAnyBlank(index, type, id)) return null; try { GetRequest getRequest = new GetRequest(index, type, id); GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); return getResponse; } catch (Exception e) { log.error("获取文档失败", e); } return null; } /** * 插入文档, id可以为空,id如果为空,就自动生成id **/ public IndexResponse insertDocument(String index, String type, String id, String json) { if (StringUtils.isAnyBlank(index, type, json)) return null; try { IndexRequest indexRequest = generateInsertRequest(index, type, id, json); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); return indexResponse; } catch (Exception e) { log.error("创建文档失败", e); } return null; } }
然而在放到线上生成环境下,经常会出现写入失败数据丢失的情况,发现是数据量写入太大,调用接口超时,直接返回错误了。RestHighLevelClient实质就是通过httpclient请求接口发送数据,是基于http协议。
查看Elasticsearch文档,发现Elasticsearch同样支持使用Transport传输,其内部使用netty长连接通信,并且可以设置处理连接的数量。
新建一个TransportClientFactory用来创建TransportClient,这里连接需要使用密钥,具体填写自己Elasticsearch的账号密码。
public class TransportClientFactory { // 默认处理核心数,cpu核心数 private static int DEFAULT_PROCESSORS = 8; // 配置核心数 private static int processors = 8; public static TransportClient create(String addresses, int port, String keyStore, String keyPassword, String trustStore, String trustPassword) throws Exception { if (StringUtils.isBlank(addresses)) { throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔"); } if (StringUtils.isAnyBlank(keyStore, keyPassword, trustStore, trustPassword)) { throw new IllegalArgumentException("缺少证书或密码"); } String[] addrs = addresses.split(","); if (addrs.length == 0) { throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔"); } List<String> validAddrs = new ArrayList<>(); for (String addr : addrs) { if (StringUtils.isNotBlank(addr)) { validAddrs.add(addr.trim()); } } if (validAddrs.size() == 0) { throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔"); } if (processors < DEFAULT_PROCESSORS) { processors = DEFAULT_PROCESSORS; } if (processors > 32) { processors = 32; } int threadPoolCore = processors * 2; int threadPoolMax = processors * 2; Settings settings = Settings .builder() .put("path.home", ".") // .put("cluster.name", clusterName) .put("client.transport.ignore_cluster_name", true).put("searchguard.ssl.transport.enabled", true) .put("searchguard.ssl.transport.enforce_hostname_verification", false).put("searchguard.ssl.transport.keystore_filepath", keyStore) .put("searchguard.ssl.transport.keystore_password", keyPassword).put("searchguard.ssl.transport.truststore_filepath", trustStore) .put("searchguard.ssl.transport.truststore_password", trustPassword).put("processors", processors).put("thread_pool.flush.core", threadPoolCore) .put("thread_pool.flush.max", threadPoolMax).build(); TransportClient client = new PreBuiltTransportClient(settings, SearchGuardPlugin.class); for (String validAddr : validAddrs) { client.addTransportAddress(new TransportAddress(InetAddress.getByName(validAddr), port)); } return client; } public static int getProcessors() { return processors; } public static void setProcessors(int processors) { TransportClientFactory.processors = processors; } }
修改ElasticsearchService
@Slf4j public class ElasticsearchService { /** * 默认10秒超时 **/ private long bulkTimeoutSecond = 10; private TransportClient client; public ElasticsearchService(TransportClient client) { this.client = client; } public ElasticsearchService(TransportClient client, int asyncTimeoutSecond) { this.client = client; this.bulkTimeoutSecond = asyncTimeoutSecond; } public GetResponse getDocument(String index, String type, String id) { if (StringUtils.isAnyBlank(index, type, id)) return null; return this.client.prepareGet(index, type, id).get(); } public IndexResponse createDocument(String index, String type, String id, String msg) { if (StringUtils.isAnyBlank(index, type, msg)) return null; IndexRequestBuilder builder = createDocumentRequestBuilder(index, type, id, msg); if (builder == null) return null; return builder.get(); } private IndexRequestBuilder createDocumentRequestBuilder(String index, String type, String id, String msg) { if (StringUtils.isAnyBlank(index, type, msg)) return null; if (StringUtils.isBlank(id)) { return this.client.prepareIndex(index, type).setSource(msg, XContentType.JSON); } else { return this.client.prepareIndex(index, type, id).setSource(msg, XContentType.JSON); } } }
另外为了提升写入效率,可以批量异步一次性写入,使用bulk方法,如下:
/** * 批量操作,listener可空,支持 IndexRequest, UpdateRequest, DeleteRequest **/ public void bulkDocumentOperationAsync(List<DocWriteRequest<?>> requests, ActionListener<BulkResponse> listener) { if (requests == null || requests.size() == 0) return; BulkRequest request = prepareBulkRequests(requests); if (request == null) return; this.client.bulk(request, listener); } /** * 支持 IndexRequest, UpdateRequest, DeleteRequest **/ private BulkRequest prepareBulkRequests(List<DocWriteRequest<?>> requests) { BulkRequestBuilder builder = bulkRequestBuilder(); for (DocWriteRequest<?> request : requests) { if (request instanceof IndexRequest) { builder.add((IndexRequest) request); } else if (request instanceof UpdateRequest) { builder.add((UpdateRequest) request); } else if (request instanceof DeleteRequest) { builder.add((DeleteRequest) request); } } if (builder.numberOfActions() == 0) { return null; } return builder.request(); }
在dao层,我们可以一次性写入多条数据
/** * 异步写入动态日志 * * @param flightRecords */ public void insertFlightRecodeListAsync(List<FlightRecord> flightRecords) { try { if (CollectionUtils.isEmpty(flightRecords)) { return; } ElasticsearchService elasticsearchService = elasticsearchProvider.getElasticsearchService(); List<DocWriteRequest<?>> requests = new ArrayList(flightRecords.size()); for (FlightRecord flightRecord : flightRecords) { // index以航班日期为准 String index = elasticsearchProvider.FLIGHT_RECORD_INDEX_PREFIX + flightRecord.getLocalDate().replace("-", "."); requests.add(elasticsearchService.createDocumentRequest(index, elasticsearchProvider.FLIGHT_RECORD_TYPE, null, JSON.toJSONString(flightRecord))); } if (CollectionUtils.isEmpty(requests)) { return; } elasticsearchService.bulkDocumentOperationAsync(requests, new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkItemResponses) { } @Override public void onFailure(Exception e) { log.error("flightRecord写入ES失败", e); } }); } catch (Exception e) { log.error("insertFlightRecodeListAsync Exception", e); } }
其它写入性能优化:
- 去掉不必要的字段分词和索引,ElasticSearch会默认对所有字段进行分词,一般查询我们都是根据航班号、航班日期、起飞机场、到达机场查询;所以没必要的字段,我们可以使用keyword类型,不进行分词。
PUT my_index { "mappings": { "my_type": { "properties": { "tail_no": { "type": "keyword", "index": false } } } } }
- 对于一些普通的日志,比如动态/计划抓取网页日志,数据丢失也无所谓,可以禁用掉refresh和replia,即index.refreshinterval设置为-1,将index.numberof_replicas设置为0即可。
PUT my_index { "settings": { "index": { "refresh_interval" : "-1", "number_of_replicas" : 0 } } }
- 使用ElasticSearch自增ID,如果我们要手动给ElasticSearch document设置一个id,那么ElasticSearch需要每次都去确认一下那个id是否存在,这个过程是比较耗费时间的。如果我们使用自动生成的id,那么ElasticSearch就可以跳过这个步骤,写入性能会更好。
- 使用多线程写入ElasticSearch,单线程发送bulk请求是无法最大化ElasticSearch集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。为了更好的利用集群的资源,这样多线程并发写入,可以减少每次底层磁盘fsync的次数和开销。一样,可以对单个ElasticSearch节点的单个shard做压测,比如说,先是2个线程,然后是4个线程,然后是8个线程,16个,每次线程数量倍增。一旦发现es返回了TOOMANYREQUESTS的错误,JavaClient也就是EsRejectedExecutionException,此时那么就说明ElasticSearch是说已经到了一个并发写入的最大瓶颈了,此时我们就知道最多只能支撑这么高的并发写入了。
- 适当增加index buffer的大小,如果我们要进行非常重的高并发写入操作,那么最好将index buffer调大一些,indices.memory.indexbuffersize,这个可以调节大一些,设置的这个index buffer大小,是所有的shard公用的,但是如果除以shard数量以后,算出来平均每个shard可以使用的内存大小,一般建议,但是对于每个shard来说,最多给512mb,因为再大性能就没什么提升了。ElasticSearch会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个buffer。默认这个参数的值是10%,也就是jvm heap的10%,如果我们给jvm heap分配10gb内存,那么这个index buffer就有1gb,对于两个shard共享来说,是足够的了。