ElasticSearch高并发场景写入优化

ElasticSearch高并发场景写入优化

ElasticSearch号称分布式全文搜索引擎,但使用不当依然会很慢,特别是在高并发写入时,会存在写入超时的问题。

在公司内部,基本所有的日志都会放入到ElasticSearch,比如接口访问时间日志、动态/计划数据审核日志、动态/计划抓取报文日志、动态报文更新日志等,每天的写入数据量巨大,最初我们基于ElasticSearch封装了一层,使用内部RestHighLevelClient获取数据,部分代码如下:

@Slf4j
public class ElasticsearchService {

    private RestHighLevelClient client;

    public ElasticsearchService(RestHighLevelClient client) {
        if (client == null)
            throw new IllegalArgumentException("client");
        this.client = client;
    }

    /**
     * 根据id获取文档
     **/
    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;
        try {
            GetRequest getRequest = new GetRequest(index, type, id);
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

            return getResponse;

        } catch (Exception e) {
            log.error("获取文档失败", e);
        }
        return null;
    }

    /**
     * 插入文档, id可以为空,id如果为空,就自动生成id
     **/
    public IndexResponse insertDocument(String index, String type, String id, String json) {
        if (StringUtils.isAnyBlank(index, type, json))
            return null;

        try {
            IndexRequest indexRequest = generateInsertRequest(index, type, id, json);

            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            return indexResponse;
        } catch (Exception e) {
            log.error("创建文档失败", e);
        }
        return null;
    }
}

然而在放到线上生成环境下,经常会出现写入失败数据丢失的情况,发现是数据量写入太大,调用接口超时,直接返回错误了。RestHighLevelClient实质就是通过httpclient请求接口发送数据,是基于http协议。

查看Elasticsearch文档,发现Elasticsearch同样支持使用Transport传输,其内部使用netty长连接通信,并且可以设置处理连接的数量。

新建一个TransportClientFactory用来创建TransportClient,这里连接需要使用密钥,具体填写自己Elasticsearch的账号密码。

public class TransportClientFactory {
    // 默认处理核心数,cpu核心数
    private static int DEFAULT_PROCESSORS = 8;
    // 配置核心数
    private static int processors = 8;

    public static TransportClient create(String addresses, int port, String keyStore, String keyPassword, String trustStore, String trustPassword) throws Exception {

        if (StringUtils.isBlank(addresses)) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (StringUtils.isAnyBlank(keyStore, keyPassword, trustStore, trustPassword)) {
            throw new IllegalArgumentException("缺少证书或密码");
        }

        String[] addrs = addresses.split(",");
        if (addrs.length == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        List<String> validAddrs = new ArrayList<>();
        for (String addr : addrs) {
            if (StringUtils.isNotBlank(addr)) {
                validAddrs.add(addr.trim());
            }
        }

        if (validAddrs.size() == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (processors < DEFAULT_PROCESSORS) {
            processors = DEFAULT_PROCESSORS;
        }
        if (processors > 32) {
            processors = 32;
        }
        int threadPoolCore = processors * 2;
        int threadPoolMax = processors * 2;
        Settings settings = Settings
                .builder()
                .put("path.home", ".")
                // .put("cluster.name", clusterName)
                .put("client.transport.ignore_cluster_name", true).put("searchguard.ssl.transport.enabled", true)
                .put("searchguard.ssl.transport.enforce_hostname_verification", false).put("searchguard.ssl.transport.keystore_filepath", keyStore)
                .put("searchguard.ssl.transport.keystore_password", keyPassword).put("searchguard.ssl.transport.truststore_filepath", trustStore)
                .put("searchguard.ssl.transport.truststore_password", trustPassword).put("processors", processors).put("thread_pool.flush.core", threadPoolCore)
                .put("thread_pool.flush.max", threadPoolMax).build();

        TransportClient client = new PreBuiltTransportClient(settings, SearchGuardPlugin.class);

        for (String validAddr : validAddrs) {
            client.addTransportAddress(new TransportAddress(InetAddress.getByName(validAddr), port));
        }

        return client;
    }

    public static int getProcessors() {
        return processors;
    }

    public static void setProcessors(int processors) {
        TransportClientFactory.processors = processors;
    }

}

修改ElasticsearchService

@Slf4j
public class ElasticsearchService {

    /**
     * 默认10秒超时
     **/
    private long bulkTimeoutSecond = 10;

    private TransportClient client;

    public ElasticsearchService(TransportClient client) {
        this.client = client;
    }

    public ElasticsearchService(TransportClient client, int asyncTimeoutSecond) {
        this.client = client;
        this.bulkTimeoutSecond = asyncTimeoutSecond;
    }

    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;

        return this.client.prepareGet(index, type, id).get();
    }

    public IndexResponse createDocument(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        IndexRequestBuilder builder = createDocumentRequestBuilder(index, type, id, msg);
        if (builder == null)
            return null;

        return builder.get();
    }

    private IndexRequestBuilder createDocumentRequestBuilder(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        if (StringUtils.isBlank(id)) {
            return this.client.prepareIndex(index, type).setSource(msg, XContentType.JSON);
        } else {
            return this.client.prepareIndex(index, type, id).setSource(msg, XContentType.JSON);
        }
    }

}

另外为了提升写入效率,可以批量异步一次性写入,使用bulk方法,如下:

  /**
     * 批量操作,listener可空,支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    public void bulkDocumentOperationAsync(List<DocWriteRequest<?>> requests, ActionListener<BulkResponse> listener) {
        if (requests == null || requests.size() == 0)
            return;

        BulkRequest request = prepareBulkRequests(requests);
        if (request == null)
            return;

        this.client.bulk(request, listener);
    }

    /**
     * 支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    private BulkRequest prepareBulkRequests(List<DocWriteRequest<?>> requests) {
        BulkRequestBuilder builder = bulkRequestBuilder();

        for (DocWriteRequest<?> request : requests) {
            if (request instanceof IndexRequest) {
                builder.add((IndexRequest) request);
            } else if (request instanceof UpdateRequest) {
                builder.add((UpdateRequest) request);
            } else if (request instanceof DeleteRequest) {
                builder.add((DeleteRequest) request);
            }
        }

        if (builder.numberOfActions() == 0) {
            return null;
        }
        return builder.request();
    }

在dao层,我们可以一次性写入多条数据

    /**
     * 异步写入动态日志
     * 
     * @param flightRecords
     */
    public void insertFlightRecodeListAsync(List<FlightRecord> flightRecords) {
        try {
            if (CollectionUtils.isEmpty(flightRecords)) {
                return;
            }
            ElasticsearchService elasticsearchService = elasticsearchProvider.getElasticsearchService();
            List<DocWriteRequest<?>> requests = new ArrayList(flightRecords.size());
            for (FlightRecord flightRecord : flightRecords) {
                // index以航班日期为准
                String index = elasticsearchProvider.FLIGHT_RECORD_INDEX_PREFIX + flightRecord.getLocalDate().replace("-", ".");
                requests.add(elasticsearchService.createDocumentRequest(index, elasticsearchProvider.FLIGHT_RECORD_TYPE, null, JSON.toJSONString(flightRecord)));

            }
            if (CollectionUtils.isEmpty(requests)) {
                return;
            }
            elasticsearchService.bulkDocumentOperationAsync(requests, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkItemResponses) {
                }

                @Override
                public void onFailure(Exception e) {
                    log.error("flightRecord写入ES失败", e);
                }
            });
        } catch (Exception e) {
            log.error("insertFlightRecodeListAsync Exception", e);
        }
    }

其它写入性能优化:

  1. 去掉不必要的字段分词和索引,ElasticSearch会默认对所有字段进行分词,一般查询我们都是根据航班号、航班日期、起飞机场、到达机场查询;所以没必要的字段,我们可以使用keyword类型,不进行分词。
    PUT my_index
    {
      "mappings": {
        "my_type": {
          "properties": {
            "tail_no": { 
              "type": "keyword",
              "index": false
            }
          }
        }
      }
    }
  2. 对于一些普通的日志,比如动态/计划抓取网页日志,数据丢失也无所谓,可以禁用掉refresh和replia,即index.refreshinterval设置为-1,将index.numberof_replicas设置为0即可。
    PUT my_index
    {
      "settings": {
        "index": {
            "refresh_interval" : "-1",
            "number_of_replicas" : 0
        }
      }
    }
  3. 使用ElasticSearch自增ID,如果我们要手动给ElasticSearch document设置一个id,那么ElasticSearch需要每次都去确认一下那个id是否存在,这个过程是比较耗费时间的。如果我们使用自动生成的id,那么ElasticSearch就可以跳过这个步骤,写入性能会更好。
  4. 使用多线程写入ElasticSearch,单线程发送bulk请求是无法最大化ElasticSearch集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。为了更好的利用集群的资源,这样多线程并发写入,可以减少每次底层磁盘fsync的次数和开销。一样,可以对单个ElasticSearch节点的单个shard做压测,比如说,先是2个线程,然后是4个线程,然后是8个线程,16个,每次线程数量倍增。一旦发现es返回了TOOMANYREQUESTS的错误,JavaClient也就是EsRejectedExecutionException,此时那么就说明ElasticSearch是说已经到了一个并发写入的最大瓶颈了,此时我们就知道最多只能支撑这么高的并发写入了。
  5. 适当增加index buffer的大小,如果我们要进行非常重的高并发写入操作,那么最好将index buffer调大一些,indices.memory.indexbuffersize,这个可以调节大一些,设置的这个index buffer大小,是所有的shard公用的,但是如果除以shard数量以后,算出来平均每个shard可以使用的内存大小,一般建议,但是对于每个shard来说,最多给512mb,因为再大性能就没什么提升了。ElasticSearch会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个buffer。默认这个参数的值是10%,也就是jvm heap的10%,如果我们给jvm heap分配10gb内存,那么这个index buffer就有1gb,对于两个shard共享来说,是足够的了。

Elasticsearch排序(三)

Elasticsearch排序(三)

Elasticsearch中排序是通过设置score字段来进行排序,score是一个浮点类型,所以我们对靠前的数据设置一个较大的值,然后根据倒序排列。

同样对于航班号查询,有时候需要根据航班号,日期查询出所有的航班号,用户会输入185,2018-12-01,这时需要返回所有相关航司的航班号。
比如匹配的航班有:CA1858,B6185,QF185,ZH1858,对于精确匹配,B6185、QF185要排在CA1858、ZH1858之前,对于非精确排序,CA航司要排在ZH航司前面

创建一个索引

之前我们对航班号创建了一个索引,用户可以输入CA1,出现CA1*的结果,现在用户输入185,是纯数字型的,所以我们需要单独一个字段(flightNum)保存这个航班数字,然后对它创建索引

PUT flight
{
  "settings": {
    "analysis": {
      "analyzer": {
        "flightNoAndNumAnalyzer": {
          "tokenizer": "flightNoAndNumTokenizer"
        }
      },
      "tokenizer": {
        "flightNoAndNumTokenizer": {
          "type": "edge_ngram",
          "min_gram": 3,
          "max_gram": 8,
          "token_chars": ["letter","digit"]
        }
      }
    }
  },
  "mappings": {       
      "dynamic": {           
          "properties": {   
              "flightNo": {
                 "type": "text",
                 "analyzer" : "flightNoAndNumAnalyzer"                          
              },
              "flightNum": {
                 "type": "text",
                 "analyzer" : "flightNoAndNumAnalyzer"                          
              }
          }
      }
  }

}

如果是航班数字查询,我们只需要根据航司来设定优先级,同时设定航班数字全匹配的优先级最高,然后查询的时候再通过constant_score统一来设置boost参数

GET flight/dynamic/_search
{
  "query": {
    "bool": {
      "must": [
        {"prefix": {
          "flightNum": {
            "value": "185"
          }
        }}
      ], 
      "should": [
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "CA"
              }
            },
            "boost": 4
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "ZH"
              }
            },
            "boost": 3
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "B6"
              }
            },
            "boost": 2
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "QF"
              }
            },
            "boost": 1
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "flightNum": "185"
              }
            },
            "boost": 10
          }
        }
      ]
      }
    }
}

可以看到CA航司比B6高,但是当查询flightNum为185的时候,评分设置为最高,所以B6186会优先显示,其次是QF185,最后是CA1858和ZH1858 具体代码实现:

//航班数字优先级设置
BoolQueryBuilder qb = QueryBuilders.boolQuery()
                    .must(prefixQuery("flightNum", keyword))
                    .should(constantScoreQuery(matchQuery("flightNum", keyword)).boost(10f))

//航司优先级设置
airNumer.put("CA", 4f);
airNumer.put("ZH", 3f);
airNumer.put("B6", 4f);
airNumer.put("QF", 1f);
for (String airline : airNumer.keySet()) {
    queryBuilder.should(constantScoreQuery(matchQuery("airline", airline)).boost(airNumer.get(airline)));
}

 

 

 

Elasticsearch创建索引及数据(二)

Elasticsearch创建索引及数据(二)

第一篇介绍了Elasticsearch安装及基本用法,下面我们自己来创建一个索引,并写入一些数据

创建索引

基本语法

PUT /my_index
{
    "settings": { ... any settings ... },
    "mappings": {
        "type_one": { ... any mappings ... },
        "type_two": { ... any mappings ... },
        ...
    }

在创建索引的时候,我们需要设置索引被存放的分片数量、分析器、类型设置,分片数量、分析器通过settings来设置,类型映射通过mappings来设置,Elasticsearch默认创建索引是5个分片,1个副本,可通过settings来修改它,这里我们就遵循默认值不动,如下:

PUT flight
{
    "settings" : {
        "index" : {
            "number_of_shards" : 5, 
            "number_of_replicas" : 1 
        }
    }
}
{
  "acknowledged": true,
  "shards_acknowledged": true,
  "index": "flight"
}

创建一个flight索引,大括号后面为默认配置,如果不想修改它,都可以去掉,然后在新建一个type为dynamic,并写入数据

PUT /flight/dynamic/1
{
    "flightNo":"CA1858",
    "flightDate":"2018-12-01",
    "depCode":"SHA",
    "arrCode":"PEK",
    "state": "到达",
    "subState": "",
    "depPlanTime":"2018-12-01 07:45",
    "arrPlanTime":"2018-12-01 10:10",
    "depReadyTime":"2018-12-01 07:45",
    "arrReadyTime":"2018-12-01 09:45",
    "depTime":"2018-12-01 07:54",
    "arrTime":"2018-12-01 07:46",
    "distance":1076, 
    "tailNo":"B2487", 
    "depTerm":"T2", 
    "arrTerm":"T3", 
    "gate":"48", 
    "luggage":"32"
}

创建一条数据后,ES会默认会对所有字段添加索引,当然也可以不指定ID,ES会默认生成一个ID,自动生成的ID有22个字符长,类似:wM0OSFhDQXGZAWDf0-drSA

{
  "_index": "flight",
  "_type": "dynamic",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 0,
  "_primary_term": 1
}

下面我们来查询数据,ES默认情况下是禁用了source,即我们查询时,不会返回source里面的内容,只会返回数据ID,我们可以指定需要返回的_srouce字段

GET flight/dynamic/_search
{
    "query":   { "match_all": {}},
    "_source": [ "flightNo", "flightDate"]
}
{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "flight",
        "_type": "dynamic",
        "_id": "1",
        "_score": 1,
        "_source": {
          "flightNo": "CA1858",
          "flightDate": "2018-12-01"
        }
      }
    ]
  }
}

配置分析器

分析器介绍

分析器主要是将一块文本分成适合于倒排索引的独立 词条,通过这些词条我们可以搜索到指定的文档

分析器主要有以下几个功能:
1,字符过滤:通过整理字符串,如去掉HTML,将&转换为and
2,分词器:将字符串分成单个词条,比如通过空格或者标点符号来拆分
3,Token过滤:比如将Quick这种词条统一转换为小写,删除a,and,the这些无用词,增加近义词条(如:jump和leap这种同义词)

比如我们有一个航班号CA1858,我们需要通过输入CA18就可以返回对应的结果,那么我们就需要对其进行分词,如果直接使用下面的查询是获取不到数据的

GET flight/dynamic/_search
{
  "query": {
    "match": {
      "flightNo": "CA18"
    }
  }
}

我们可以查看ES对flightNo的分词情况

POST flight/_analyze
{
  "field": "flightNo",
  "text": "CA1858"
}
{
  "tokens": [
    {
      "token": "ca1858",
      "start_offset": 0,
      "end_offset": 6,
      "type": "<ALPHANUM>",
      "position": 0
    }
  ]
}

可以看到ES是对航班号转换为小写后直接进行了倒排索引,没有进行分词,直接查询CA18肯定搜索不到,下面我们自定义一个分析器

PUT flight
{
  "settings": {
    "analysis": {
      "analyzer": {
        "flightNoAnalyzer": {
          "tokenizer": "flightNoTokenizer"
        }
      },
      "tokenizer": {
        "flightNoTokenizer": {
          "type": "edge_ngram",
          "min_gram": 4,
          "max_gram": 8,
          "token_chars": ["letter","digit"]
        }
      }
    }
  },
  "mappings": {       
      "dynamic": {           
          "properties": {   
              "flightNo": {
                 "type": "text",
                 "analyzer" : "flightNoAnalyzer"                          
              }
          }
      }
  }

}

edgengram为ES自带的分词器,ES自带了8种分析器,具体可以查看官方文档,我们自定义了一个analyzer,使用edgengram来进行分词,字符长度从4到8,然后将自定义的分析器映射到flightNo字段上,之后我们使用查询CA18就可以获取到结果了