ElasticSearch高并发场景写入优化
ElasticSearch号称分布式全文搜索引擎,但使用不当依然会很慢,特别是在高并发写入时,会存在写入超时的问题。
在公司内部,基本所有的日志都会放入到ElasticSearch,比如接口访问时间日志、动态/计划数据审核日志、动态/计划抓取报文日志、动态报文更新日志等,每天的写入数据量巨大,最初我们基于ElasticSearch封装了一层,使用内部RestHighLevelClient获取数据,部分代码如下:
@Slf4j
public class ElasticsearchService {
private RestHighLevelClient client;
public ElasticsearchService(RestHighLevelClient client) {
if (client == null)
throw new IllegalArgumentException("client");
this.client = client;
}
/**
* 根据id获取文档
**/
public GetResponse getDocument(String index, String type, String id) {
if (StringUtils.isAnyBlank(index, type, id))
return null;
try {
GetRequest getRequest = new GetRequest(index, type, id);
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
return getResponse;
} catch (Exception e) {
log.error("获取文档失败", e);
}
return null;
}
/**
* 插入文档, id可以为空,id如果为空,就自动生成id
**/
public IndexResponse insertDocument(String index, String type, String id, String json) {
if (StringUtils.isAnyBlank(index, type, json))
return null;
try {
IndexRequest indexRequest = generateInsertRequest(index, type, id, json);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
return indexResponse;
} catch (Exception e) {
log.error("创建文档失败", e);
}
return null;
}
}
然而在放到线上生成环境下,经常会出现写入失败数据丢失的情况,发现是数据量写入太大,调用接口超时,直接返回错误了。RestHighLevelClient实质就是通过httpclient请求接口发送数据,是基于http协议。
查看Elasticsearch文档,发现Elasticsearch同样支持使用Transport传输,其内部使用netty长连接通信,并且可以设置处理连接的数量。
新建一个TransportClientFactory用来创建TransportClient,这里连接需要使用密钥,具体填写自己Elasticsearch的账号密码。
public class TransportClientFactory {
// 默认处理核心数,cpu核心数
private static int DEFAULT_PROCESSORS = 8;
// 配置核心数
private static int processors = 8;
public static TransportClient create(String addresses, int port, String keyStore, String keyPassword, String trustStore, String trustPassword) throws Exception {
if (StringUtils.isBlank(addresses)) {
throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
}
if (StringUtils.isAnyBlank(keyStore, keyPassword, trustStore, trustPassword)) {
throw new IllegalArgumentException("缺少证书或密码");
}
String[] addrs = addresses.split(",");
if (addrs.length == 0) {
throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
}
List<String> validAddrs = new ArrayList<>();
for (String addr : addrs) {
if (StringUtils.isNotBlank(addr)) {
validAddrs.add(addr.trim());
}
}
if (validAddrs.size() == 0) {
throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
}
if (processors < DEFAULT_PROCESSORS) {
processors = DEFAULT_PROCESSORS;
}
if (processors > 32) {
processors = 32;
}
int threadPoolCore = processors * 2;
int threadPoolMax = processors * 2;
Settings settings = Settings
.builder()
.put("path.home", ".")
// .put("cluster.name", clusterName)
.put("client.transport.ignore_cluster_name", true).put("searchguard.ssl.transport.enabled", true)
.put("searchguard.ssl.transport.enforce_hostname_verification", false).put("searchguard.ssl.transport.keystore_filepath", keyStore)
.put("searchguard.ssl.transport.keystore_password", keyPassword).put("searchguard.ssl.transport.truststore_filepath", trustStore)
.put("searchguard.ssl.transport.truststore_password", trustPassword).put("processors", processors).put("thread_pool.flush.core", threadPoolCore)
.put("thread_pool.flush.max", threadPoolMax).build();
TransportClient client = new PreBuiltTransportClient(settings, SearchGuardPlugin.class);
for (String validAddr : validAddrs) {
client.addTransportAddress(new TransportAddress(InetAddress.getByName(validAddr), port));
}
return client;
}
public static int getProcessors() {
return processors;
}
public static void setProcessors(int processors) {
TransportClientFactory.processors = processors;
}
}
修改ElasticsearchService
@Slf4j
public class ElasticsearchService {
/**
* 默认10秒超时
**/
private long bulkTimeoutSecond = 10;
private TransportClient client;
public ElasticsearchService(TransportClient client) {
this.client = client;
}
public ElasticsearchService(TransportClient client, int asyncTimeoutSecond) {
this.client = client;
this.bulkTimeoutSecond = asyncTimeoutSecond;
}
public GetResponse getDocument(String index, String type, String id) {
if (StringUtils.isAnyBlank(index, type, id))
return null;
return this.client.prepareGet(index, type, id).get();
}
public IndexResponse createDocument(String index, String type, String id, String msg) {
if (StringUtils.isAnyBlank(index, type, msg))
return null;
IndexRequestBuilder builder = createDocumentRequestBuilder(index, type, id, msg);
if (builder == null)
return null;
return builder.get();
}
private IndexRequestBuilder createDocumentRequestBuilder(String index, String type, String id, String msg) {
if (StringUtils.isAnyBlank(index, type, msg))
return null;
if (StringUtils.isBlank(id)) {
return this.client.prepareIndex(index, type).setSource(msg, XContentType.JSON);
} else {
return this.client.prepareIndex(index, type, id).setSource(msg, XContentType.JSON);
}
}
}
另外为了提升写入效率,可以批量异步一次性写入,使用bulk方法,如下:
/**
* 批量操作,listener可空,支持 IndexRequest, UpdateRequest, DeleteRequest
**/
public void bulkDocumentOperationAsync(List<DocWriteRequest<?>> requests, ActionListener<BulkResponse> listener) {
if (requests == null || requests.size() == 0)
return;
BulkRequest request = prepareBulkRequests(requests);
if (request == null)
return;
this.client.bulk(request, listener);
}
/**
* 支持 IndexRequest, UpdateRequest, DeleteRequest
**/
private BulkRequest prepareBulkRequests(List<DocWriteRequest<?>> requests) {
BulkRequestBuilder builder = bulkRequestBuilder();
for (DocWriteRequest<?> request : requests) {
if (request instanceof IndexRequest) {
builder.add((IndexRequest) request);
} else if (request instanceof UpdateRequest) {
builder.add((UpdateRequest) request);
} else if (request instanceof DeleteRequest) {
builder.add((DeleteRequest) request);
}
}
if (builder.numberOfActions() == 0) {
return null;
}
return builder.request();
}
在dao层,我们可以一次性写入多条数据
/**
* 异步写入动态日志
*
* @param flightRecords
*/
public void insertFlightRecodeListAsync(List<FlightRecord> flightRecords) {
try {
if (CollectionUtils.isEmpty(flightRecords)) {
return;
}
ElasticsearchService elasticsearchService = elasticsearchProvider.getElasticsearchService();
List<DocWriteRequest<?>> requests = new ArrayList(flightRecords.size());
for (FlightRecord flightRecord : flightRecords) {
// index以航班日期为准
String index = elasticsearchProvider.FLIGHT_RECORD_INDEX_PREFIX + flightRecord.getLocalDate().replace("-", ".");
requests.add(elasticsearchService.createDocumentRequest(index, elasticsearchProvider.FLIGHT_RECORD_TYPE, null, JSON.toJSONString(flightRecord)));
}
if (CollectionUtils.isEmpty(requests)) {
return;
}
elasticsearchService.bulkDocumentOperationAsync(requests, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
}
@Override
public void onFailure(Exception e) {
log.error("flightRecord写入ES失败", e);
}
});
} catch (Exception e) {
log.error("insertFlightRecodeListAsync Exception", e);
}
}
其它写入性能优化:
- 去掉不必要的字段分词和索引,ElasticSearch会默认对所有字段进行分词,一般查询我们都是根据航班号、航班日期、起飞机场、到达机场查询;所以没必要的字段,我们可以使用keyword类型,不进行分词。
PUT my_index { "mappings": { "my_type": { "properties": { "tail_no": { "type": "keyword", "index": false } } } } } - 对于一些普通的日志,比如动态/计划抓取网页日志,数据丢失也无所谓,可以禁用掉refresh和replia,即index.refreshinterval设置为-1,将index.numberof_replicas设置为0即可。
PUT my_index { "settings": { "index": { "refresh_interval" : "-1", "number_of_replicas" : 0 } } } - 使用ElasticSearch自增ID,如果我们要手动给ElasticSearch document设置一个id,那么ElasticSearch需要每次都去确认一下那个id是否存在,这个过程是比较耗费时间的。如果我们使用自动生成的id,那么ElasticSearch就可以跳过这个步骤,写入性能会更好。
- 使用多线程写入ElasticSearch,单线程发送bulk请求是无法最大化ElasticSearch集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。为了更好的利用集群的资源,这样多线程并发写入,可以减少每次底层磁盘fsync的次数和开销。一样,可以对单个ElasticSearch节点的单个shard做压测,比如说,先是2个线程,然后是4个线程,然后是8个线程,16个,每次线程数量倍增。一旦发现es返回了TOOMANYREQUESTS的错误,JavaClient也就是EsRejectedExecutionException,此时那么就说明ElasticSearch是说已经到了一个并发写入的最大瓶颈了,此时我们就知道最多只能支撑这么高的并发写入了。
- 适当增加index buffer的大小,如果我们要进行非常重的高并发写入操作,那么最好将index buffer调大一些,indices.memory.indexbuffersize,这个可以调节大一些,设置的这个index buffer大小,是所有的shard公用的,但是如果除以shard数量以后,算出来平均每个shard可以使用的内存大小,一般建议,但是对于每个shard来说,最多给512mb,因为再大性能就没什么提升了。ElasticSearch会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个buffer。默认这个参数的值是10%,也就是jvm heap的10%,如果我们给jvm heap分配10gb内存,那么这个index buffer就有1gb,对于两个shard共享来说,是足够的了。