Elasticsearch排序(三)

Elasticsearch排序(三)

Elasticsearch中排序是通过设置score字段来进行排序,score是一个浮点类型,所以我们对靠前的数据设置一个较大的值,然后根据倒序排列。

同样对于航班号查询,有时候需要根据航班号,日期查询出所有的航班号,用户会输入185,2018-12-01,这时需要返回所有相关航司的航班号。
比如匹配的航班有:CA1858,B6185,QF185,ZH1858,对于精确匹配,B6185、QF185要排在CA1858、ZH1858之前,对于非精确排序,CA航司要排在ZH航司前面

创建一个索引

之前我们对航班号创建了一个索引,用户可以输入CA1,出现CA1*的结果,现在用户输入185,是纯数字型的,所以我们需要单独一个字段(flightNum)保存这个航班数字,然后对它创建索引

PUT flight
{
  "settings": {
    "analysis": {
      "analyzer": {
        "flightNoAndNumAnalyzer": {
          "tokenizer": "flightNoAndNumTokenizer"
        }
      },
      "tokenizer": {
        "flightNoAndNumTokenizer": {
          "type": "edge_ngram",
          "min_gram": 3,
          "max_gram": 8,
          "token_chars": ["letter","digit"]
        }
      }
    }
  },
  "mappings": {       
      "dynamic": {           
          "properties": {   
              "flightNo": {
                 "type": "text",
                 "analyzer" : "flightNoAndNumAnalyzer"                          
              },
              "flightNum": {
                 "type": "text",
                 "analyzer" : "flightNoAndNumAnalyzer"                          
              }
          }
      }
  }

}

如果是航班数字查询,我们只需要根据航司来设定优先级,同时设定航班数字全匹配的优先级最高,然后查询的时候再通过constant_score统一来设置boost参数

GET flight/dynamic/_search
{
  "query": {
    "bool": {
      "must": [
        {"prefix": {
          "flightNum": {
            "value": "185"
          }
        }}
      ], 
      "should": [
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "CA"
              }
            },
            "boost": 4
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "ZH"
              }
            },
            "boost": 3
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "B6"
              }
            },
            "boost": 2
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "QF"
              }
            },
            "boost": 1
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "flightNum": "185"
              }
            },
            "boost": 10
          }
        }
      ]
      }
    }
}

可以看到CA航司比B6高,但是当查询flightNum为185的时候,评分设置为最高,所以B6186会优先显示,其次是QF185,最后是CA1858和ZH1858 具体代码实现:

//航班数字优先级设置
BoolQueryBuilder qb = QueryBuilders.boolQuery()
                    .must(prefixQuery("flightNum", keyword))
                    .should(constantScoreQuery(matchQuery("flightNum", keyword)).boost(10f))

//航司优先级设置
airNumer.put("CA", 4f);
airNumer.put("ZH", 3f);
airNumer.put("B6", 4f);
airNumer.put("QF", 1f);
for (String airline : airNumer.keySet()) {
    queryBuilder.should(constantScoreQuery(matchQuery("airline", airline)).boost(airNumer.get(airline)));
}

 

 

 

Elasticsearch创建索引及数据(二)

Elasticsearch创建索引及数据(二)

第一篇介绍了Elasticsearch安装及基本用法,下面我们自己来创建一个索引,并写入一些数据

创建索引

基本语法

PUT /my_index
{
    "settings": { ... any settings ... },
    "mappings": {
        "type_one": { ... any mappings ... },
        "type_two": { ... any mappings ... },
        ...
    }

在创建索引的时候,我们需要设置索引被存放的分片数量、分析器、类型设置,分片数量、分析器通过settings来设置,类型映射通过mappings来设置,Elasticsearch默认创建索引是5个分片,1个副本,可通过settings来修改它,这里我们就遵循默认值不动,如下:

PUT flight
{
    "settings" : {
        "index" : {
            "number_of_shards" : 5, 
            "number_of_replicas" : 1 
        }
    }
}
{
  "acknowledged": true,
  "shards_acknowledged": true,
  "index": "flight"
}

创建一个flight索引,大括号后面为默认配置,如果不想修改它,都可以去掉,然后在新建一个type为dynamic,并写入数据

PUT /flight/dynamic/1
{
    "flightNo":"CA1858",
    "flightDate":"2018-12-01",
    "depCode":"SHA",
    "arrCode":"PEK",
    "state": "到达",
    "subState": "",
    "depPlanTime":"2018-12-01 07:45",
    "arrPlanTime":"2018-12-01 10:10",
    "depReadyTime":"2018-12-01 07:45",
    "arrReadyTime":"2018-12-01 09:45",
    "depTime":"2018-12-01 07:54",
    "arrTime":"2018-12-01 07:46",
    "distance":1076, 
    "tailNo":"B2487", 
    "depTerm":"T2", 
    "arrTerm":"T3", 
    "gate":"48", 
    "luggage":"32"
}

创建一条数据后,ES会默认会对所有字段添加索引,当然也可以不指定ID,ES会默认生成一个ID,自动生成的ID有22个字符长,类似:wM0OSFhDQXGZAWDf0-drSA

{
  "_index": "flight",
  "_type": "dynamic",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 0,
  "_primary_term": 1
}

下面我们来查询数据,ES默认情况下是禁用了source,即我们查询时,不会返回source里面的内容,只会返回数据ID,我们可以指定需要返回的_srouce字段

GET flight/dynamic/_search
{
    "query":   { "match_all": {}},
    "_source": [ "flightNo", "flightDate"]
}
{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "flight",
        "_type": "dynamic",
        "_id": "1",
        "_score": 1,
        "_source": {
          "flightNo": "CA1858",
          "flightDate": "2018-12-01"
        }
      }
    ]
  }
}

配置分析器

分析器介绍

分析器主要是将一块文本分成适合于倒排索引的独立 词条,通过这些词条我们可以搜索到指定的文档

分析器主要有以下几个功能:
1,字符过滤:通过整理字符串,如去掉HTML,将&转换为and
2,分词器:将字符串分成单个词条,比如通过空格或者标点符号来拆分
3,Token过滤:比如将Quick这种词条统一转换为小写,删除a,and,the这些无用词,增加近义词条(如:jump和leap这种同义词)

比如我们有一个航班号CA1858,我们需要通过输入CA18就可以返回对应的结果,那么我们就需要对其进行分词,如果直接使用下面的查询是获取不到数据的

GET flight/dynamic/_search
{
  "query": {
    "match": {
      "flightNo": "CA18"
    }
  }
}

我们可以查看ES对flightNo的分词情况

POST flight/_analyze
{
  "field": "flightNo",
  "text": "CA1858"
}
{
  "tokens": [
    {
      "token": "ca1858",
      "start_offset": 0,
      "end_offset": 6,
      "type": "<ALPHANUM>",
      "position": 0
    }
  ]
}

可以看到ES是对航班号转换为小写后直接进行了倒排索引,没有进行分词,直接查询CA18肯定搜索不到,下面我们自定义一个分析器

PUT flight
{
  "settings": {
    "analysis": {
      "analyzer": {
        "flightNoAnalyzer": {
          "tokenizer": "flightNoTokenizer"
        }
      },
      "tokenizer": {
        "flightNoTokenizer": {
          "type": "edge_ngram",
          "min_gram": 4,
          "max_gram": 8,
          "token_chars": ["letter","digit"]
        }
      }
    }
  },
  "mappings": {       
      "dynamic": {           
          "properties": {   
              "flightNo": {
                 "type": "text",
                 "analyzer" : "flightNoAnalyzer"                          
              }
          }
      }
  }

}

edgengram为ES自带的分词器,ES自带了8种分析器,具体可以查看官方文档,我们自定义了一个analyzer,使用edgengram来进行分词,字符长度从4到8,然后将自定义的分析器映射到flightNo字段上,之后我们使用查询CA18就可以获取到结果了

 

Elasticsearch安装及介绍(一)

Elasticsearch安装及介绍

安装Elasticsearch

Elasticsearch的安装很简单,直接下载官方包,运行即可

curl -L -O http://download.elasticsearch.org/PATH/TO/VERSION.zip 
unzip elasticsearch-$VERSION.zip
cd  elasticsearch-$VERSION
./bin/elasticsearch

使用curl ‘http://localhost:9200/?pretty’获取数据

{
  "name" : "node-0",
  "cluster_name" : "dongtai-es",
  "cluster_uuid" : "aIah5IcqS1y9qiJ4LphAbA",
  "version" : {
    "number" : "6.1.2",
    "build_hash" : "5b1fea5",
    "build_date" : "2018-01-10T02:35:59.208Z",
    "build_snapshot" : false,
    "lucene_version" : "7.1.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

基本概念介绍

Elasticsearch本质是一个分布式数据库,每个服务器上称为一个Elastic实例,这一组实例构成了ES集群

Document

ES中存储数据记录称为Document,Document使用JSON格式表示,同一个 Index 里面的Document没有要求有相同的结构,但最好一致,这样能提高效率,如下:

{
  "first_name": "Jane2",
  "last_name": "Smith",
  "age": 32,
  "about": "I like to collect rock albums",
  "interests": "music"
}

Index

ES文档的索引,定义了文档放在哪里,你可以理解为对于数据库名

Type

文档表示的对象类别,可以理解为数据库中的表,比如我们有一个机票数据,可以按舱位来分组(头等舱,经济舱),根据规则,Elastic 6.x 版只允许每个 Index 包含一个 Type,7.x 版将会彻底移除 Type

ID

文档的唯一标识,可以理解为表中的一条记录,根据index、type、_ID可以确定一个文档

基本用法

添加一个文档

PUT index/employee/6
{
  "first_name": "chuan",
  "last_name": "zhang",
  "age": 32,
  "about": "hi I'm hai na bai chuan",
  "interests": "read book"
}

上面PUT后面分别对应 index、Type、Id

获取文档

获取所有文档

ES默认会返回10条,可以指定size来改变

GET index/employee/_search
{
  "size": 3
}
{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 9,
    "max_score": 1,
    "hits": [
      {
        "_index": "index",
        "_type": "employee",
        "_id": "5",
        "_score": 1,
        "_source": {
          "first_name": "Jane1",
          "last_name": "Smith",
          "age": 32,
          "about": "I like to collect rock albums",
          "interests": "music"
        }
      },
      {
        "_index": "index",
        "_type": "employee",
        "_id": "10",
        "_score": 1,
        "_source": {
          "first_name": "chuan",
          "last_name": "Smith",
          "age": 32,
          "about": "I like to collect rock albums",
          "interests": "music"
        }
      }
    ]
  }
}

按条件匹配

比如我需要查找firstname是’chuan‘的所有文档,如果要搜索firstname中多个关键字,中间用空格隔开就可以了,比如”firstname”: “chuan smith” 表示查询firstname包含chuan或smith的

GET index/employee/_search
{
  "query": {
    "match": {
            "first_name": "chuan"
        }
  }
}
{
  "took": 4,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 0.6931472,
    "hits": [
      {
        "_index": "index",
        "_type": "employee",
        "_id": "6",
        "_score": 0.6931472,
        "_source": {
          "first_name": "chuan",
          "last_name": "zhang",
          "age": 32,
          "about": "hi I'm hai na bai chuan",
          "interests": "read book"
        }
      }
    ]
  }
}

如果要使用and查询,比如我要查询firstname为chuan,lastname为chuan的文档记录

GET index/employee/_search
{
  "query": {
    "bool": {
      "must": [
        { "match": { "first_name": "chuan" } },
        { "match": { "last_name": "zhang" } }
      ]
    }
  }
}

如果要根据范围查询或过滤,类似MySQL中’>’,或 ‘!=’操作,比如需要查询上面条件中,年龄大于30的文档

{
  "query": {
    "bool": {
      "must": [
        { "match": { "first_name": "chuan" } },
        { "match": { "last_name": "zhang" } }
      ]
      "filter": {
        "range" : {
            "age" : { "gt" : 30 } 
        }
      }
    }
  }
}

更新文档

更新记录使用PUT请求,重新发送一次数据,ES会根据id来修改,如果我需要更新id为6的about字段

PUT index/employee/6
{
  "first_name": "chuan",
  "last_name": "zhang",
  "age": 32,
  "about": "i modify my about",
  "interests": "read book"
}
{
  "_index": "index",
  "_type": "employee",
  "_id": "6",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 12,
  "_primary_term": 2
}

_version为版本号,由原来的1变成了2,result为”update“,以上为全量更新,如果需要局部更新,可以使用POST请求

POST index/employee/6/_update
{
  "doc":{
    "about": "i modify my about"
  }
}

删除文档

如下,我需要删除id为6的文档

DELETE index/employee/6
{
  "found" :    true,
  "_index" :   "index",
  "_type" :    "employee",
  "_id" :      "6",
  "_version" : 3
}

可以看到version也增加了1,如果对于的id没有找到,found将返回false

数据分析

ES同样有类似MySQL group by的用法,ES称为聚合,比如我想统计employee中最受欢迎的兴趣,注意:ES在5.x之后对排序、聚合操作用单独的数据结构(fielddata)缓存到内存里了,需要单独开启。

PUT index/_mapping/employee/
{
  "properties": {
    "interests": { 
      "type":     "text",
      "fielddata": true
    }
  }
}

GET index/employee/_search
{
  "aggs": {
    "all_interests": {
      "terms": { "field": "interests" }
    }
  }
}
"aggregations": {
    "all_interests": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "music",
          "doc_count": 3
        },
        {
          "key": "forestry",
          "doc_count": 1
        },
        {
          "key": "read",
          "doc_count": 1
        }
      ]
    }
  }

Spring Boot集成Quartz

Spring Boot集成Quartz

我们常用的定时任务调度有如下几种:
1. 使用JDK自带的类库实现

  • 通过继承TimeTask,使用Time来调度任务
  • 使用ScheduledExecutorService来实现任务调度

2,Spring 自带了任务调度功能,通过使用@Schedule()注解来实现
3,使用Quartz框架,Quartz可以用于在分布式环境下的任务调度

Quartz的基本概念:

1,Job 表示一个工作,要执行的具体内容。此接口中只有一个方法,如下:

void execute(JobExecutionContext context)

2,JobDetail 表示一个具体的可执行的调度程序,Job 是这个可执行程调度程序所要执行的内容,另外 JobDetail 还包含了这个任务调度的方案和策略。
3,Trigger 代表一个调度参数的配置,什么时候去调。
4,Scheduler 代表一个调度容器,一个调度容器中可以注册多个 JobDetail 和 Trigger。当 Trigger 与 JobDetail 组合,就可以被 Scheduler 容器调度了。

配置环境依赖

添加Maven依赖:

    org.springframework.boot
    spring-boot-starter-quartz

修改application.yml文件,添加:

spring:
  datasource:
    name: schedule
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/schedule?useUnicode=true&characterEncoding=utf-8
    username: root
    password: 123456
  quartz:
    job-store-type: jdbc

quartz支持两种存储方式,一个使用数据库,也就是job-store-type=jdbc,需要你下载quartz提供的表,并导入到你本地,然后配置你的数据库连接,官方下载地址, 另一种是内存方式,job-store-type设置为momery

配置任务

创建一个任务配置,这里的数据可以对应数据库一条记录

public class ScheduleTaskConfig {

    private Integer id;
    private String name;
    private String groupName;
    private String describe;
    private String cron;
    private String classPath;
    private Character isEnabled;
    private String createTime;
    private String updateTime;

    public String getClassName() {
        try {
            Class cls = Class.forName(classPath);
            return cls.getSimpleName();
        } catch (ClassNotFoundException e) {
            log.error("获取Class失败",e);
        }
        return null;
    }

}

创建用于操作Quartz任务创建、暂停、恢复方法

Service
@Slf4j
public class QuartzService {

    @Autowired
    private Scheduler scheduler;

    public void createJob(ScheduleTaskConfig config) {
        Class cls = null;
        try {
            cls = Class.forName(config.getClassPath());
            JobDataMap jobDataMap = new JobDataMap();
            jobDataMap.put("config", config);
            JobDetail jobDetail = JobBuilder.newJob(cls)
                    .withIdentity(config.getName(), config.getGroupName())
                    .withDescription(config.getDescribe())
                    .setJobData(jobDataMap)
                    .storeDurably()
                    .build();

            Trigger trigger = TriggerBuilder.newTrigger()
                    .forJob(jobDetail)
                    .withIdentity(jobDetail.getKey().getName(), GROUP_NAME)
                    .withSchedule(CronScheduleBuilder.cronSchedule(config.getCron()))
                    .build();
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (Exception e) {
            log.error("创建任务失败", e);
        }
    }

    /**
     * 获取任务状态
     * @return
     */
    public String getJobState(ScheduleTaskConfig config) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(config.getName(), config.getGroupName());
            Trigger.TriggerState state = scheduler.getTriggerState(triggerKey);
            return state.name();
        } catch (Exception e) {
            log.error("创建任务失败", e);
        }
        return null;
    }

    /**
     * 判断Job是否存在
     * @param config
     * @return
     */
    public boolean isExistJob(ScheduleTaskConfig config) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(config.getName(), config.getGroupName());
            CronTrigger trigger = (CronTrigger)scheduler.getTrigger(triggerKey);
            if (trigger != null) {
                return true;
            }
        } catch (Exception e) {
            log.error("获取任务trigger失败", e);
        }

        return false;
    }

    /**
     * 更新Job任务
     * @param config
     */
    public void updateJob(ScheduleTaskConfig config) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(config.getName(), config.getGroupName());
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(config.getCron());

            //按新的cronExpression表达式重新构建trigger
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
            Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
            // 忽略状态为PAUSED的任务,解决集群环境中在其他机器设置定时任务为PAUSED状态后,集群环境启动另一台主机时定时任务全被唤醒的bug
            if(!triggerState.name().equalsIgnoreCase("PAUSED")){
                //按新的trigger重新设置job执行
                scheduler.rescheduleJob(triggerKey, trigger);
            }
        } catch (Exception e) {
            log.error("更新任务trigger失败", e);
        }

    }

    /**
     * 停止任务
     * @param jobName
     * @param groupName
     * @return
     */
    public boolean pauseJob(String jobName) {
        try {
            JobKey jobKey = JobKey.jobKey(config.getName(), config.getGroupName());
            scheduler.pauseJob(jobKey);
            return true;
        } catch (SchedulerException e) {
            log.error("停止任务失败", e);
        }
        return  false;
    }

    /**
     * 恢复任务
     * @param jobName
     * @param groupName
     * @return
     */
    public boolean resumeJob(String jobName) {
        try {
            JobKey jobKey = JobKey.jobKey(config.getName(), config.getGroupName());
            scheduler.resumeJob(jobKey);
            return true;
        } catch (SchedulerException e) {
            log.error("恢复任务失败", e);
        }
        return  false;
    }

    /**
     * 删除任务
     * @param jobName
     * @param groupName
     * @return
     */
    public boolean deleteJob(String jobName) {
        try {
            JobKey jobKey = JobKey.jobKey(config.getName(), config.getGroupName());
            scheduler.deleteJob(jobKey);
            return true;
        } catch (SchedulerException e) {
            log.error("删除任务失败", e);
        }
        return  false;
    }

可以看到Quartz创建任务就是通过JobDetail和Trigger来实现定时任务的,JobDetail声明了任务的一些信息,比如名称、分组、描述、需要执行类(例如:com.chuanz.task.CheckExecutor)需要的数据(通过调用setJobData方法) Trigger设置了任务的执行周期及触发时间,这里通过cron表达式来设置触发,另外Trigger还有另外一种触发方式,通过SimpleScheduleBuilder来实现,比如:

.withSchedule(SimpleScheduleBuilder.repeatMinutelyForTotalCount(10, 5))

我需要重复执行10次,每隔5分钟执行一次

创建我们需要执行的任务,实现Job接口,并重写execute方法

@Slf4j
@Service
public class CheckExecutor implements Job {

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        log.info("执行计划冲突类型审核开始");
        // 获取数据
        ScheduleTaskConfig config = (ScheduleTaskConfig)context.get("config");

        /**
         * 处理逻辑
         */
        log.info("执行计划冲突类型审核结束");
    }
}

我们可以将quartz创建、删除、停止、恢复任务都封装成一个个接口,通过前端页面来管理quartz的任务,具体前端页面代码我就不贴了,基本就是调用QuartzService里面的方法了,另外说说获取quartz任务列表的时候,我们可以获取Trigger里面响应的执行状态,通过Trigger里面的方法来获取,比如:

JobKey jobKey = new JobKey(config.getName(), config.getGroupName());
List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
if (triggers.size() > 0) {
    Trigger trigger = triggers.get(0);
    //开始执行时间
    trigger.getStartTime();
    //上一次执行时间
    trigger.getPreviousFireTime();
    //下一次执行时间
    trigger.getNextFireTime();
}

trigger有很多实现,比如我们这里使用的是cron表达式,所以相对应的就是CronTriggerImpl,常用的触发器有四种。

触发器介绍

SimpleTrigger:简单的触发器

指定从某一个时间开始,以一定的时间间隔,如秒、分、小时重复执行的任务
比如:从10:00 开始,每隔1小时/1分钟/1秒钟执行一次,执行10次。
对应的方法如下:

CalendarIntervalTrigger:日历触发器

类似于SimpleTrigger,指定从某一个时间开始,以一定的时间间隔执行的任务。 但是不同的是SimpleTrigger指定的时间间隔为毫秒,没办法指定每隔一个月执行一次(每月的时间间隔不是固定值),而CalendarIntervalTrigger支持的间隔单位有秒,分钟,小时,天,月,年,星期。
对应的方法如下:

DailyTimeIntervalTrigger:日期触发器

指定每天的某个时间段内,以一定的时间间隔执行任务。并且它可以支持指定星期。 它适合的任务类似于:指定每天9:00 至 18:00 ,每隔70秒执行一次,并且只要周一至周五执行。 比如如下代码:

DailyTimeIntervalTrigger trigger = dailyTimeIntervalSchedule()
    .startingDailyAt(TimeOfDay.hourAndMinuteOfDay(10, 0)) //从10:00点开始
    .endingDailyAt(TimeOfDay.hourAndMinuteOfDay(22, 0)) //22:00点结束
    .onDaysOfTheWeek(MONDAY,TUESDAY,WEDNESDAY,THURSDAY,FRIDAY) //周一到周五执行
    .withIntervalInHours(1) //每间隔1小时执行一次
    .withRepeatCount(50) //最多重复50次(实际执行50+1次)
    .build();

CronTrigger:Cron表达式触发器

适合于更复杂的任务,它支持类型于Linux Cron的语法(并且更强大)。基本上它覆盖了以上三个Trigger的绝大部分能力
属性只有一个就是cron表达式

我们可以根据对应的触发器取到任务执行的数据,如之前用SimpleScheduleBuilder.repeatMinutelyForTotalCount(10, 5),那么对应的就是SimpleTrigger,我们可以获取执行的次数,剩余的次数等。 触发器的介绍可以参考官方文档

 

Redis分布式锁使用

最近项目中遇到同时有两个线程同时更新一行记录导致后面一条语句执行失败的问题,由于项目是部署在不同的服务器上,这里要控制两个线程的执行顺序,自然想到了使用Redis的锁,废话不多说,下面给出具体实现

/**
 * 核查四要素相同报文是否正在处理,如果有实例正在处理四要素相同报文pass,否则线程等待
 * 
 * @param processData
 */
public void checkPacketProcessRepeat(ProcessData processData) {
	try {
		// 四要素key
		String repeatKey = REPEATKEYSTART + processData.getReviseFlight().getKey();
		while (true) {
			// 设置nx锁,如果nx锁设置成功跳出去,继续执行报文后续处理流程
			if (setNX(repeatKey, EXIST, 3)) {
				log.info("FlightPreProcess-checkPacketProcessRepeat,报文处理拿到NX锁,直接pass,key:{},sourceId:{}", processData.getReviseFlight().getKey(), processData.getReviseFlight()
						.getSourceId());
				return;
			}
			// 如果有当前航班有nx锁、或者nx锁设置失败,则需要等待3秒,等待其他实例处理完成
			log.info("FlightPreProcess-checkPacketProcessRepeat,报文多实例并发处理,需要等待3秒,key:{},sourceId:{}", processData.getReviseFlight().getKey(), processData.getReviseFlight()
					.getSourceId());
			Thread.sleep(3000);
		}
	} catch (Exception e) {
		log.error("FlightPreProcess-checkPacketProcessRepeat,异常,key:{},e:{}", processData.getReviseFlight().getKey(), e);
	}
}

这里根据报文的四要素确定唯一条记录,先调用setNX获取redis锁,如果获取到了就执行后面的逻辑,如果没有获取到则等待3s再重试,下面是setNX方法的实现

/** 设置锁 */
private boolean setNX(final String key, String value, final int exp) {
	return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
		@Override
		public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
			byte[] serializeKey = redisTemplate.getStringSerializer().serialize(key);
			Boolean acquire = connection.incr(serializeKey) == 1;
			// 如果设值成功,则设置过期时间
			if (acquire) {
				connection.expire(serializeKey, exp);
			}
			return acquire;
		}
	});
}

这里使用了redis的incr命令,它是一个原子操作,如果key不存在,那么key的值将初始化为0,然后执行INCR操作,这里判断如果设置成功,则对key设置过期时间,相当于了一个带有时间的锁。

在Redis2.6.12版本后,使用set命令也可以实现分布式锁,具体代码如下:

public static Boolean setNX(final String key, final String value, final int exp) {
        return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
			@Override
			public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
				Jedis jedis = (Jedis) connection.getNativeConnection();
				String result = jedis.set(key, value, "NX", "EX", exp);
				if ("OK".equals(result)) {
					return Boolean.TRUE;
				}
				return Boolean.FALSE;
			}
		});
    }

这里重点说说第3个和第4个参数,这里填的是NX,意思是当key不存在时,我们进行set操作,若key已经存在则不进行任何操作,第4个表示我们要给key设置一个过期时间,具体时间由第5个参数决定。

另外我们这里保存了key对应的value值,所以线程可以根据value值来释放锁,这里的value值可以是线程的ID,比如我们线程后面的逻辑执行失败了,我们可以通过这个value值来尽快释放锁,减少其它线程的等待时间,我们可以使用Lua脚本来实现

private static final Long RELEASE_SUCCESS = 1L;
private static final RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

public static Boolean releaseLock(final String key, final String value) {
			return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
			@Override
			public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
				Jedis jedis = (Jedis) connection.getNativeConnection();
				Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(key),
						Collections.singletonList(value));
				if (RELEASE_SUCCESS.equals(result)) {
					return Boolean.TRUE;
				}
				return Boolean.FALSE;
			}
		});
    }

通过Lua脚本获取对应key的value值,如果value值和给定的一样,则释放锁

 

分布式一致性协议

分布式一致性协议

在分布式系统中为了解决数据的一致性,主要有二阶段提交协议、三阶段提交协议、Poxos算法、TTC协议、Rraft协议、ZAB协议(zookeeper的协议),下面就分别介绍这几种协议

二阶段提交协议

二阶段提交协议主要思想是:有一个协调者,多个参与者,协调者负责发送命令给参与者,确保数据能同时更新到所有节点上,主要步骤如下:

Two-phase_commit

1)协调者将事务的请求发送给所有参与者,询问是否可以提交事务,参与者锁住自己的资源,并写undo/redo日志,如果参与者都准备成功,则向协调者回应“可以提交”

2)协调者所有参与者都会有“可以提交”,此时向参与者发送“正式提交”命令,所有参与者开始提交自己的事务

如上述1)2)有任何不成功,所有参与者都将回滚自己的事务

二阶段提交原理比较简单,实现起来也比较方便,但问题也比较明显,如:

1,同步堵塞:所有参与者在没有收到协调者发送过来的“正式提交”命令,都将锁住资源,其他线程如果要获取资源只能等待

2,单点问题:所有参与者都依靠一个协调者,如果协调者在2)操作突然失去联系,这个时候所有的参与者都不知道如何处理,是否提交事务

3,数据不一致性:在2)操作中,可能由于网络原因,有些节点收到提交命令,有些没有,从而照成数据不一致的问题

三阶段提交协议

Three-phase_commit_diagram

三阶段相比二阶段的思想是在操作1)中多了一步,首先询问:是否可以锁资源,如果所有人同意了才开始锁资源,后面的步骤就童二阶段提交了,它相比二阶段协议,解决了如果协调者突然失去联系,参与者仍可以提交他们的事务

但三阶段实现起来还是比较复杂,而且由于参与者最后还是会提交自己的事务,也会造成数据不一致行

TTC协议

为了减少资源被堵塞的时间,产生了TTC协议,可以这样理解TTC是针对SOA服务的锁,2PC是针对数据库的锁

比如我们需要从A账户转100到B账户,必然会涉及到如下几个步骤 1,读取A账户金额,-100 2,读取B账户金额,+100

2PC就会先锁住A账户和B账号的数据,那么任何其它线程想要读取这个账号数据,都将堵塞

TCC则先会从A账号-100元,这时线程也可以读取A账号的数据了,如果转入B账号发生失败,那么就会调用回滚接口,再将A账号+100元,这实质是一种补偿机制

Poxos算法

理解Poxos算法,先弄清楚几个角色

1,proposers 投票人,可以理解为收集人民意见的人

2,acceptors 接受投票者,可以理解为人大代表

3,learners 学习者,可以理解为记录员,将处理的结果记录下来

608A9B29BB2FE2E9787DDB3A3876268E

你可以这样理解,首先投票人接收到人民的意见,就开始提出一个法案,并把这个法案告诉人大代表,说现在人民提了一个法案,我们来开始讨论这个法案吧,人大代表如果同意了(这里的同意指的是有半数以上的人同意了),那么就开始讨论这个法案,如果人大代表都同意(半数以上人同意)通过这个法案,那么大家就会确定下来,交给记录人员将这个法案记录下来,并告诉人民,现在已经确定了一个新法案

poxos算法也有一个问题,即如果有两个投票人接到了人民的法案,我们现在假设为投票人P1和投票人P2,他们同时接受到两个法案A1,A2,A2的版本要大于A1的版本,这里说明下人大代表会接受法案版本大的那个,如果有下面这种情况 1)投票人P1发送了法案A1,并且人大代表都接受到了这个法案,所以决定开始讨论这个法案,并告诉所有人

2)投票人P2发送了法案A2,人大代表又接收到了法案A2,发现A2的版本大于A1的,所以放弃了A1,开始讨论A2的

3)投票者P1发现自己的法案被放弃了,就又会提出一个新的法案A3,A3的版本大于A2,并会发给人大代表,人大代表又会放弃A2,开始讨论A3

4) 投票者P2发现自己的法案被放弃了,就又会提出一个新的法案A4,A4的版本大于A3,并会发给人大代表,人大代表又会放弃A3,开始讨论A4

这样依次反复,就会出现不断更换法案的问题,所以我们就需要只有一个投票人来发送法案,如果投票人退休了,就再新选一个投票人,下面我们来讲讲ZAB协议

ZAB协议

角色定义:

1,leader,主节点,对应上面的投票人,只能有一个

2,follower,从节点,对应上面的人大代表,必须为多个

685547DB334887C0A05B4A68827CBBC1

这里有一个leader,所以最开始必须有一个选举的过程,确定谁来当leader, 所有节点都向各个节点发送一条消息,表示我要当leader,这里由于节点发送的时间不同,每个命令发给其他节点的时间也不同,每个节点最开始收到请求后,都会同意这个命令,如果某个节点收到半数以上的节点回应“同意”,那么它就会将自己设置为leader,并告诉其它节点,现在我是leader,你们需要听从我的,其它节点就会把自己设置为follower

在zookeeper中,最开始的选举是会将myid最大的节点设为leader节点,之后会根据节点zxid,来确定谁来当下一个leader节点,当确定leader后,具体处理流程如下:

1)收到客户端的一个事务请求,可能是leader或follower,follower收到请求后也会转发给leader

2)leader收到follower的事务请求,开始把事务处理请求发给所有follower,follower收到事务请求开始锁资源,处理,将redo/undo写入日志,如果执行成功告诉leader

3)leader收到半数以上的follower回应成功,则再次发送一个命令给follower,说可以提交事务了

4)follower接受到命令,提交自己的事务,并将结果返给leader

5)leader将结果发给客户端

Raft协议

Raft协议同ZAB协议,只不过ZAB是follower节点向主节点发送心跳,确保主节点是否存活,而Raft是Leader节点向follower节点发送心跳,以下这个动画很生动的讲解了分布式系统下一致性的保证:

http://thesecretlivesofdata.com/raft/

你也可以模拟分布式系统不同场景各节点的情况:

https://raft.github.io/

参考文章:

https://coolshell.cn/articles/10910.html 参考书籍

《从POXOS到Zookeeper分布式原理一致性与实践》

SpringCloud链路追踪

Spring Cloud链路追踪

接着上一篇的文章,今天讲讲spring cloud在分布式系统中的链路跟踪,主要使用的是zipkin框架实现的,上篇文章写道了有一个注册中心Eureka,和两个服务方,一个消费方,我们的消费方也可以做了一个服务,注册到Eureka中,所以我们对消费方也添加EurekaClient和zipkin的maven依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

启动类添加@EurekaClient注解,同样服务方也要添加zipkin的maven依赖

zipkin介绍

Zipkin 是一个开放源代码分布式的跟踪系统,由Twitter公司开源,它致力于收集服务的定时数据,以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现,架构如下:

5ec1521a-26b4-11e7-9679-8c429afdbe0c

每个服务向zipkin报告计时数据,zipkin会根据调用关系通过Zipkin UI生成依赖关系图,显示了多少跟踪请求通过每个服务,该系统让开发者可通过一个 Web 前端轻松的收集和分析数据,例如用户每次请求服务的处理时间等,可方便的监测系统中存在的瓶颈。

Zipkin提供了可插拔数据存储方式:In-Memory、MySql、Cassandra以及Elasticsearch。Zipkin默认是使用http+内存传输和收集,在并发量比较大会影响效率,下面我们我们通过Kafka+ElasticSearch实现服务的传输与收集

创建ZipKin服务

新建一个模块,我们称为zipkinserver,添加下面的依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
   <groupId>io.zipkin.java</groupId>
   <artifactId>zipkin-server</artifactId>
</dependency>
<dependency>
   <groupId>io.zipkin.java</groupId>
   <artifactId>zipkin-autoconfigure-ui</artifactId>
</dependency>

在启动类,添加如下注解:

@SpringBootApplication
@EnableEurekaClient
@EnableZipkinServer
public class ZipkinServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ZipkinServerApplication, args);
    }

}

修改application.yml配置文件,添加kafka收集和ElasticSearch存储,

zipkin:
  storage:
    type: elasticsearch
    elasticsearch:
      hosts: localhost:9300
      index: zipkin

  collector:
    kafka:
      zookeeper: localhost:2181
      topic: zipkin
      groupId: zipkin

然后启动服务,zipkin的默认端口是9494,访问地址:http://localhost:9494

修改服务方和消费方的application.yml,添加zipkin的地址,kafka收集地址

spring: 
  zipkin:
    base-url: http://localhost:9411
    kafka:
      topic: zipkin
  kafka:
    bootstrap-servers: localhost:9092

  sleuth:
    sampler:
      percentage: 1.0

zipkin只有在接口调用后,才会产生数据的调用情况,所以我们先访问消费方的接口,然后再打开zipkin的界面,可以看到dynamic-service和feign的调用关系及耗时情况

31DADE5B71CF7F9EE33D80AE6B097E57 64043E75E8489933DFB3E2FA03A5AF9A

SpringCloud服务注册与发现

Spring Cloud服务注册与发现

Spring Cloud集成了搭建分布式服务一系列框架,如服务注册与发现Eureka,熔断器Hystrix,路由网关Zuul,链路追踪zipkin,今天主要讲解Eureka的使用。

Eureka是什么?

Eureka是Netflix开源的一款提供服务注册和发现的产品,它提供了完整的Service Registry和Service Discovery实现。也是springcloud体系中最重要最核心的组件之一,我们通过下面这样图就可以了解

48EB8D2E311BF36563200EF5B0015EB6

1)服务提供方向Eureka注册自己的服务,

2)消费者向Eureka获取自己需要的服务,和提供方建立连接

3) 如果服务方出现故障,Eureka会自动将服务方从注册列表中删除

搭建项目

创建Eureka服务

首先创建一个Maven项目,指定spring boot,spring cloud 版本

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.1.RELEASE</version>
    <relativePath/>
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Finchley.SR2</spring-cloud.version>
</properties>

创建一个模块,我们称为EurekaServer,使用Eureka只需要引入maven包,然后启动项目就可以了,很方面,如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
    </dependency>
</dependencies>

配置application.yml文件

server:
  port: 8081

eureka:
  instance:
    hostname: localhost
  client:
    registerWithEureka: false
    fetchRegistry: false
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

spring:
  application:
    name: eurka-server

添加注解@EnableEurekaServer,并启动EurekaServer

@SpringBootApplication
@EnableEurekaServer
public class EurakaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurakaServerApplication.class, args);
    }
}

启动EurekaServer,地址为:http://localhost:8081/eureka

创建提供方服务

添加maven依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

创建服务接口

@RestController
public class AirportController {

    @Autowired
    private AirportService airportService;

    @RequestMapping("/getAirport")
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode) {
        return airportService.getAirport(threeCode);
    }

}

@Service
public class AirportService {

    @Value("${server.port}")
    private int port;

    public AirportBean getAirport(String threeCode) {
        AirportBean bean = new AirportBean();
        bean.setName("北京首都国际机场");
        bean.setThreeCode(threeCode);
        bean.setPort(port);
        return bean;
    }

}

public class AirportBean {

    private String threeCode;
    private String name;
    private int port;
}

修改application.yml文件

<code>server:
  port: 8082

spring:
  application:
    name: dynamic-service

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8081/eureka/</code>

添加@EnableEurekaClient注解,这里我们为了方便演示负载均衡,同时也启动了两个实例,端口分别为8082,8083

@SpringBootApplication
@EnableEurekaClient
public class DynamicServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(DynamicServiceApplication.class, args);
    }
}

创建服务消费方

我们再项目下再新建一个模块,称为springcloudclient,添加maven依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

这里我们使用了feign的服务调用方式,Spring cloud有两种服务调用方式,一种是ribbon+restTemplate,另一种是feign,ribbon类似一种rest风格的API调用方式,而feign整合了ribbon,具有负载均衡的能力,通过注解的方式,使代码看起来更加简洁,另外feign整合了Hystrix,具有熔断的能力

调用服务方的接口

@RestController
public class AirportFeignController {

    @Autowired
    private AirportFeignService airportFeignService;

    @RequestMapping(value = "/getAirport",method = RequestMethod.GET)
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode) {
        return airportFeignService.getAirport(threeCode);
    }

}

@FeignClient(value = "dynamic-service", fallback = AirportFeignFallbackService.class)
public interface AirportFeignService {

    @RequestMapping(value = "/getAirport",method = RequestMethod.GET)
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode);

}

// 服务失败后熔断,调用的方法
public class AirportFeignFallbackService implements AirportFeignService {
    @Override
    public AirportBean getAirport(String threeCode) {
        return null;
    }
}

public class AirportBean {
    private String threeCode;
    private String name;
    private int port;
}

配置application.yml文件

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
server:
  port: 8084
spring:
  application:
    name: service-feign

添加@ EnableEurekaClient,@EnableDiscoveryClient, @EnableFeignClients注解,端口为8084,

@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableFeignClients
public class SpringCloudServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudServerApplication.class, args);
    }
}

好了下面可以演示springcloud的服务注册与发现了,通过上面的例子,我们启动了Eureka服务,分别为:8081,同时启动了两个服务提供方,注册到Eureka中,端口分别为8082和8083,接着我们启动了一个服务消费方,端口为8084,我们分别启动他们
打开Eureka的服务页面:http://localhost:8081

55AD7F4965A098E135257B0B04BBF3B6

可以发现有两个服务方已经注册上了,我们调用消费方的接口,发现消费方会使用负载均衡的方式分别访问服务方

 

有道词典

org.springframe …

详细X

  org.springframework.boot   spring-boot-starter-parent   2.1.1.RELEASE      utf – 8   utf – 8   1.8   Finchley.SR2

kafka和zookeeper集群安装

kafka和zookeeper集群安装

上期讲了kafka的作用及应用场景,今天我们来自己搭建一套kafka集群,由于kafka目前的安装包已经自带了zookeeper,所以在搭建zookeeper集群直接使用它内置的即可

准备工作:3台服务器,当然你可以使用自己的虚拟机尝试,如果安装虚拟机,可以查看我的这篇博客
我的三台服务器的IP分别是:192.168.101.116、192.168.101.115、192.168.102.215

安装步骤

下载安装包:

地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0

配置zookeeper

配置kafka的路径、心跳检测时间 初始化连接时follower和leader最长连接时间 follower和leader数据最长同步时间

 dataDir=/data/kafka/zookeeper
 tickTime=2000
 initLimit=5
 syncLimit=2
 server.0=192.168.101.116:2888:3888
 server.1=192.168.101.115:2888:3888
 server.2=192.168.102.215:2888:3888

在每个服务器上注册zookeeper

/data/kafka/zookeeper目录下面touch myid文件
192.168.101.116上执行
echo "0" > /data/kafka/zookeeper/myid
192.168.101.115上执行
echo "1" > /data/kafka/zookeeper/myid
192.168.102.215上执行
echo "2" > /data/kafka/zookeeper/myid

配置kafka

 192.168.101.116上配置
 advertised.listeners=PLAINTEXT://zc1:9092
 broker.id=0

 192.168.101.115上配置
 advertised.listeners=PLAINTEXT://zc2:9092
 broker.id=1

 192.168.102.215上配置
 advertised.listeners=PLAINTEXT://zc3:9092
 broker.id=2

 通用配置
 log.dirs=/data/kafka/kafka-logs
 zookeeper.connect=192.168.101.116:2181,192.168.101.115:2181,192.168.102.215:2181

配置hosts

在每个服务器/etc/hosts中添加如下配置

192.168.101.116 zc1
192.168.101.115 zc2
192.168.102.215 zc3

开放防火墙端口

由于zookeeper和kafka监听了2181、9092、2888、3888,所有我们还要把这些端口添加到防火墙中,编辑/etc/sysconfig/iptables,添加:

-A INPUT -m state --state NEW -m tcp -p tcp --dport 2181 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 2888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 3888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT

启动zookeeper和kafka

先启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

再启动kafka

bin/kafka-server-start.sh config/server.properties &

测试数据写入和读取

使用命令测试

生产者
[root@zc1 kafka_2.11-1.0.0]# bin/kafka-console-producer.sh –broker-list 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic

>hello
>word

消费者
[root@zc2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh –bootstrap-server 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic

hello
word

代码测试

生产者

public class ProducerTest {

    private final static String TOPIC_NAME = "firsttopic";
    private static Producer&lt;String, String&gt; producer;
    private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    public ProducerTest() {
        /** 生产者*/
        InputStream in_proc = this.getClass().getClassLoader().getResourceAsStream("kafka_test_producer.properties");
        Properties prop_proc = new Properties();
        try {
            prop_proc.load(in_proc);
        } catch (IOException e) {
            logger.error("加载kafkacore_test_producer配置文件失败", e);
        }
        producer = new KafkaProducer&lt;String, String&gt;(prop_proc);
    }

    public void execute() {
        while(true) {
            try {
                String key = "CA1234";
                String message = System.currentTimeMillis()+" CA1234,PEK,SHA,2018-02-01";
                this.sendToTestServer(key, message);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                logger.error("任务执行异常",e);
            }

        }
    }

    private void sendToTestServer(String key, String message) {
        logger.info("发送消息:"+message);
        ProducerRecord&lt;String, String&gt; producerRecord = new ProducerRecord&lt;String, String&gt;(TOPIC_NAME, key, message);
        producer.send(producerRecord);
    }

    public static void main(String[] args) {
        new ProducerTest().execute();
    }

}

kafka_test_producer.properties

kafka.topic=firsttopic
group.id=chuanzh
bootstrap.servers=192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092
retries=5
request.timeout.ms=10000
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

消费者

public class ConsumerTest {

    private final static String TOPIC_NAME = "firsttopic";
    private static Consumer&lt;String, String&gt; consumer;
    private final static Logger logger = LoggerFactory.getLogger(DynamicPushTestTask.class);

    public ConsumerTest() {
        /** 消费者*/
        InputStream ksin = this.getClass().getClassLoader().getResourceAsStream("kafka_test_consumer.properties");
        Properties props = new Properties();
        try {
            props.load(ksin);
        } catch (IOException e) {
            logger.error("加载kafkacore_test_consumer配置文件失败", e);
        }
        consumer = new KafkaConsumer&lt;String, String&gt;(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
    }

    public void execute() {
        while(true) {
            try {
                ConsumerRecords&lt;String, String&gt; records = consumer.poll(2);
                logger.info("读取kafka,取到消息数量:" + records.count());
                for (ConsumerRecord&lt;String, String&gt; record : records) {
                    logger.info("value:{}", record.value());
                    logger.info("offset:{}", record.offset());
                }

                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("任务执行异常",e);
            }

        }
    }

    public static void main(String[] args) {
        new ConsumerTest().execute();
    }

}

kafka_test_consumer.properties

kafka.topic=flightcore
group.id=flightPacketPush
bootstrap.servers=43.241.208.208:9092,43.241.228.39:9092,43.241.234.89:9092
metadata.max.age.ms=60000
max.poll.records=50
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 

Kafka使用介绍

Kafka使用介绍

kafka是什么?

kafka是一个分布式消息数据流平台,为处理实时数据提供一个统一、高吞吐、低延迟的平台。其本质是一个提供大规模分布式发布/订阅的消息队列,此外kafka还可以通过kafka connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams–一个Java的流式处理库。

它的2个大的应用: 1,构建实时的流数据管道,确保系统和应用之间数据传输的可靠 2,对应用之间的数据流进行转换和反应。

基本概念

  • Topic kafka将消息分门别类,每一类消息称之为一个主题(Topic) 每个消息(也叫记录recode)是由一个key,一个value和时间戳构成
  • Producer 发布消息的对象称之为主题生产者。
  • Consumer 订阅消息的对象称之为主题消费者
  • Broker 已发布的消息保存在一组服务器中,称之为Kafka集群,集群中每一个服务器都是一个代理(Broker),消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些发布的消息。

kafka的核心API

  • Producer API:允许应用发布一个或多个kafka主题
  • Consumer API:允许应用订阅一个或多个kafka主题
  • Streams API:转换数据流,将一个或多个主题的数据流转换为一个或多个主题的数据流输出
  • Connector API: 允许构建可以重复使用的生产者和消费者,将topic连接到现有的应用或数据系统上,例如,一个建立在数据库上的连接可以捕获表的每一次修改。

kafka的主题(Topic)和日志

topic可理解是一类消息的集合,在kafka中,topic是可以被多个订阅者来消费的 每个topic,kafka包含了一组分区数据,叫partition,如下:

每一个partition是一个有序、不可变的序列记录,每个记录都有一个唯一的序列ID叫offset。 当消息记录被Consumer消费后,这些记录不会被删除,kafka提供了一些配置,比如:按日期、按空间来定时删除记录。 kafka分区有两个目的,一是它便于扩展,不受单个服务器的限制,二是,它可以并行接受和处理多个任务。

分布式

每一个partition日志被分布在kafka的集群服务器上,并且可配置每个parition可重复的份数。 每个partition有一个leader,零个或多个follower,正常情况下leader负责所有的读写请求,而follower负责同步leader上的数据,当leader不可用时,follower会选举一个新的leader替代原来老的leader。

生产者

生产者负责选择一个topic发布消息,同时指定发布到哪一个partition上,最简单的方式是按照partition轮询,也可指定按权重指定。

消费者

消费者有叫一个组(group)的概念,比如多个消费者属于同一个组,那么他们将一起消费这个topic的数据,如下图:

一个kafka集群有两台服务器,4个partition,有两个分组A和B,A有2个消费者,B有4个消费者, 每个partition可以保证数据记录的顺序性,但客户端如果是并行处理,如groupA,C1同时消费P0、P3就可能照成数据顺序错乱的问题,如果要保证数据的一致性,那么顺序处理一个Topic的所有消息,那么就只提供一个分区。

kafka保证

  • 生产者发送到topic的消息会按照他们发送的顺序保存,如果消息M1、M2被同一个producer发送,当M1被先发送,那么它的offset值将会小于M2的
  • 消费者看到的数据也是根据他们保存的顺序
  • 如果一个topic配置了复制因数N,kafka集群将最大允许N-1台服务器同步失败。

Kafka和传统的消息系统之间的区别

  • 结合传统的优点:传统的消息系统分:队列和发布订阅两种模式,队列可以允许多个消费者同时瓜分数据,而发布订阅模式,会将消息通知到每一消费者。kafka结合了这两个模式的优点,当在kafka中,多个消费者的组ID设置为一样时,那么将采用队列的模式,如果组ID不同,则采用发布订阅模式。
  • 更强的顺序性保证:kafka中引入分区功能,一个topic可有多个分区,分区中保证了顺序的一致性,如果启动多个消费者,kafka保证每个消费者只会读取一个分区中的数据,当有多于分区数的消费者,那么这个消费者将一直处于空等待,不会收到任何消息

kafka的存储性能

kafka作为一个消息存储器,他会将消息写入到磁盘,并通过复制镜像,来保证容错。kafka允许所有的写入操作完成后再继续操作。因为kafka中保持了一个指针的方式,在存储50KB和50TB,其性能都是一样的。kafka通过这种指针读取数据,所以数据的大小,不会影响其读写性能。

kafka的流处理

kafka不仅提供了读、写、存储,还提供了对数据流进行处理,比如:一个零售APP,kafka可以从输入topic读取数据,然后使用StreamAPI统计数量,调整价格然后输出到topic中,类似的操作还包括聚合计算、数据流连接等。