基于RBAC模型构建权限系统

什么是权限

当我们打开页面的时候经常会碰到下面的提示,比如一个页面/文件夹被禁止访问,一个button被禁止点击

当权限作为名词的时候,可以理解为对API、页面、功能点的权限控制,当权限作为动词时,可以理解为对某个功能可操作或不可操作,以下是对权限的整体划分:

如果要用一句话来概括:控制Who对what进行How的操作

RBAC模型

用户 是发起操作的主体,按类型分可分为2B和2C用户,可以是后台管理系统的用户,可以是OA系统的内部员工,也可以是面向C端的用户,比如阿里云的用户。

角色 起到了桥梁的作用,连接了用户和权限的关系,每个角色可以关联多个权限,同时一个用户关联多个角色,那么这个用户就有了多个角色的多个权限。

有人会问了为什么用户不直接关联权限呢?在用户基数小的系统,比如20个人的小系统,管理员可以直接把用户和权限关联,工作量并不大,选择一个用户勾选下需要的权限就完事了。

但是在实际企业系统中,用户基数比较大,其中很多人的权限都是一样的,就是个普通访问权限,如果管理员给100人甚至更多授权,工作量巨大。

这就引入了 “角色(Role)” 概念,一个角色可以与多个用户关联,管理员只需要把该角色赋予用户,那么用户就有了该角色下的所有权限,这样设计既提升了效率,也有很大的拓展性。

RBAC-1模型

此模型引入了角色继承(Hierarchical Role)概念,即角色具有上下级的关系,角色间的继承关系可分为一般继承关系和受限继承关系。
一般继承关系仅要求角色继承关系是一个绝对偏序关系,允许角色间的多继承。
而受限继承关系则进一步要求角色继承关系是一个树结构,实现角色间的单继承。这种设计可以给角色分组和分层,一定程度简化了权限管理工作。

RBAC-2模型

基于核心模型的基础上,进行了角色的约束控制,RBAC2模型中添加了责任分离关系。
其规定了权限被赋予角色时,或角色被赋予用户时,以及当用户在某一时刻激活一个角色时所应遵循的强制性规则。责任分离包括静态责任分离和动态责任分离。
主要包括以下约束:
互斥角色: 同一用户只能分配到一组互斥角色集合中至多一个角色,支持责任分离的原则。互斥角色是指各自权限互相制约的两个角色。比如财务部有会计和审核员两个角色,他们是互斥角色,那么用户不能同时拥有这两个角色,体现了职责分离原则
基数约束: 一个角色被分配的用户数量受限;一个用户可拥有的角色数目受限;同样一个角色对应的访问权限数目也应受限,以控制高级权限在系统中的分配
先决条件角色: 即用户想获得某上级角色,必须先获得其下一级的角色
动态限制用户角色:如果一个用户可以拥有两个角色,但运行时只能激活一个角色

RBAC-3模型

即最全面的权限管理,它是基于RBAC-0,将RBAC-1和RBAC-2进行了整合。

RBAC延展-用户组

当平台用户基数增大,角色类型增多时,而且有一部分人具有相同的属性,比如财务部的所有员工,如果直接给用户分配角色,管理员的工作量就会很大。
如果把相同属性的用户归类到某用户组,那么管理员直接给用户组分配角色,用户组里的每个用户即可拥有该角色,以后其他用户加入用户组后,即可自动获取用户组的所有角色,退出用户组,同时也撤销了用户组下的角色,无须管理员手动管理角色。
根据用户组是否有上下级关系,可以分为有上下级的用户组和普通用户组:

  • 具有上下级关系的用户组: 最典型的例子就是部门和职位,可能多数人没有把部门职位和用户组关联起来吧。当然用户组是可以拓展的,部门和职位常用于内部的管理系统,如果是面向C端的系统。比如淘宝网的商家,商家自身也有一套组织架构,比如采购部,销售部,客服部,后勤部等,有些人拥有客服权限,有些人拥有上架权限等等,所以用户组是可以拓展的
  • 普通用户组: 即没有上下级关系,和组织架构,职位都没有关系,也就是说可以跨部门,跨职位。举个例子,某电商后台管理系统,有拼团活动管理角色,我们可以设置一个拼团用户组,该组可以包括研发部的后台开发人员,运营部的运营人员,采购部的人员等等。

每个公司都会涉及到到组织和职位,下面就重点介绍这两个。

组织

常见的组织架构如
我们可以把组织与角色进行关联,用户加入组织后,就会自动获得该组织的全部角色,无须管理员手动授予,大大减少工作量,同时用户在调岗时,只需调整组织,角色即可批量调整。
组织的另外一个作用是控制数据权限,把角色关联到组织,那么该角色只能看到该组织下的数据权限。

职位

假设财务部的职位,每个组织部门下都会有多个职位,比如财务部有总监,会计,出纳等职位,虽然都在同一部门,但是每个职位的权限是不同的,职位高的拥有更多的权限。
总监拥有所有权限,会计和出纳拥有部分权限。特殊情况下,一个人可能身兼多职。

含有组织/职位/用户组的模型

根据以上场景,新的权限模型就可以设计出来了,如下图:

组织/职位/用户组

根据系统的复杂度不同,其中的多对多关系和一对一关系可能会有变化

  • 在单系统且用户类型单一的情况下,用户和组织是一对一关系,组织和职位是一对多关系,用户和职位是一对一关系,组织和角色是一对一关系,职位和角色是一对一关系,用户和用户组是多对对关系,用户组和角色是一对一关系,当然这些关系也可以根据具体业务进行调整。模型设计并不是死的,如果小系统不需要用户组,这块是可以去掉的。
  • 分布式系统且用户类型单一的情况下,到这里权限系统就会变得很复杂,这里就要引入了一个”系统”概念。此时系统架构是个分布式系统,权限系统独立出来,负责所有的系统的权限控制,其他业务系统比如商品中心,订单中心,用户中心,每个系统都有自己的角色和权限,那么权限系统就可以配置其他系统的角色和权限。
  • 分布式系统且用户类型多个的情况下,比如淘宝网,它的用户类型包括内部用户,商家,普通用户,内部用户登录多个后台管理系统,商家登录商家中心

数据权限

数据权限就是用户在同一页面看到的数据是不同的,比如财务部只能看到其部门下的用户数据,采购部只看采购部的数据。在一些大型的公司,全国有很多城市和分公司,比如杭州用户登录系统只能看到杭州的数据,上海用户只能看到上海的数据,解决方案一般是把数据和具体的组织架构关联起来。

举个例子,再给用户授权的时候,用户选择某个角色同时绑定组织如财务部或者合肥分公司,那么该用户就有了该角色下财务部或合肥分公司下的的数据权限

数据库表设计

权限框架

目前比较流行的是Apache Shiro和Spring Security

Apache Shiro

Spring Security

逆波兰算法在规则引擎中的运用

前言

逆波兰表示法(Reverse Polish notation,RPN,或逆波兰记法),是一种是由波兰数学家扬·武卡谢维奇1920年引入的数学表达式方式,在逆波兰记法中,所有操作符置于操作数的后面,因此也被称为后缀表示法。逆波兰记法不需要括号来标识操作符的优先级,比如:3 – 4 + 5,转换为波兰表达式为:3 4 – 5 +

场景

以工单系统为例,比如发起了一个审批流程,通过规则引擎配置了如下规则:

$合同类型$ = ‘商务合同’ || ( $合同总金额$ > 1000000 && $合同总金额$ < 2000000 )

满足上面表达式会走审批1,否则走审批2逻辑。
对于上面的表达式,我们都知道首先应该先比较 || 两边的表达式,||右边的表达式因为是用括号连接,所以需要放在一起做&&比较,最后左右两边做 || 操作

但这是人思维方式,机器并不这样识别,机器可不知道哪个先比较,哪个后比较,所以可以将其转换为如下格式,逆波兰(后缀)表达式

合同类型、商务合同、=、合同总金额、1000000、>、合同总金额、2000000、<、&&、||

上面的表达式,机器就比较好识别了,一般利用栈来输出结果:
当为数值时压栈,当遇到表达式,从栈里面拿出两个数进行比较(运算)

算法流程介绍

转换为逆波兰表达式的基本思路是:
1、需要两个栈来存放“操作数”(如:合同类型)、运算符(如:||)
2、每个操作符都有自己的优先级
3、如果是操作数,则放入操作数栈
4、如果是运算符(不包括括号),则与运算符栈顶元素进行比较,如果优先级大于栈顶的元素,则直接入栈,如果小于,则将栈顶的元素取出,入操作数栈
5、如果运算符是“(”,则直接放入运算符栈顶,如果运算符是“)”,则从运算符依次取出元素放入操作数栈,直到遇到“(”

下面是我画的一个操作流程:

逆波兰算法

代码实现

public class ExpressionEvaluator {

    private static String leftBraces = "(";
    private static String rightBraces = ")";
    private static Map<String, Integer> operatorPriorityMap = new HashMap<>();
    static {
        operatorPriorityMap.put("(",-1);
        operatorPriorityMap.put(")",-1);

        operatorPriorityMap.put("||",1);
        operatorPriorityMap.put("&&",1);
        operatorPriorityMap.put("=",2);
        operatorPriorityMap.put(">",2);
        operatorPriorityMap.put("<",2);

//        operatorPriorityMap.put("+",1);
//        operatorPriorityMap.put("-",1);
//        operatorPriorityMap.put("*",2);
//        operatorPriorityMap.put("/",2);
    }
    /**
     * 中缀表达式转化为后缀表达式
     * @param expression
     * @return
     */
    public Queue<String> parseExpression(String expression) {
        String[] expressionArr = expression.split(" ");
        Stack<String> operatorStack = new Stack();
        Queue<String> operandQueue = new LinkedBlockingQueue<>();
        for(int i=0;i<expressionArr.length;i++) {
            if (operatorPriorityMap.containsKey(expressionArr[i])) {
                //操作符
                if (expressionArr[i].equals(leftBraces)) {
                    //遇到左括号
                    operatorStack.add(expressionArr[i]);
                }else if (expressionArr[i].equals(rightBraces)) {
                    //遇到右括号
                    appendLeftBracesOperator(operatorStack, operandQueue);
                } else {
                    if (operatorStack.isEmpty()) {
                        operatorStack.add(expressionArr[i]);
                        continue;
                    }
                    String topOperate = operatorStack.peek();
                    if (comparePriority(expressionArr[i], topOperate) >= 0) {
                        //优先级大于栈顶的元素
                        operatorStack.add(expressionArr[i]);
                    } else {
                        //优先级小于栈顶的元素
                        appendLowPriority(operatorStack, operandQueue, expressionArr[i]);
                    }
                }
            } else {
                //操作数
                operandQueue.add(expressionArr[i]);
            }
        }
        while (!operatorStack.isEmpty()) {
            operandQueue.add(operatorStack.pop());
        }
        return operandQueue;
    }

    /**
     * 将操作符栈低优先级的元素移到操作数队列中
     * @param operatorStack
     * @param operandStack
     * @param operator
     */
    private void appendLowPriority(Stack<String> operatorStack, Queue<String> operandStack, String operator) {
        while (!operatorStack.isEmpty() && comparePriority(operator, operatorStack.peek()) < 0) {
            String topOperate = operatorStack.pop();
            operandStack.add(topOperate);
        }
        operatorStack.add(operator);
    }

    /**
     * 将左括号之前的操作符元素移动到操作数栈中
     * @param operatorStack
     * @param operandStack
     */
    private void appendLeftBracesOperator(Stack<String> operatorStack, Queue<String> operandStack) {
        String topOperator = operatorStack.pop();
        while (!topOperator.equals(leftBraces)) {
            operandStack.add(topOperator);
            topOperator = operatorStack.pop();
        }
      }

      public static void main(String[] args) {
        //String s = "1 + ( 2 + 3 * 4 ) * 2 / 4 - 5";
        String s = "商务合同1 = 商务合同 || ( 1100000 > 1000000 && 1100000 < 2000000 )";
        ExpressionEvaluator ee = new ExpressionEvaluator();
        Queue<String> queue = ee.parseExpression(s);
        System.out.println(queue.toString());
      }
    }

逻辑运算

通过将表达式转换后,通过栈就可以实现对表达式的计算,具体思路:

1、遍历队列,如果是操作数,则入栈

2、如果是操作符,则从栈顶取出两个操作数进行运算操作

3、将最终结果返回

代码实现

    /**
     * 计算后缀表达式结果
     * @param queue
     * @return
     */
    public boolean calcExpression(Queue<String> queue) {
        if(Objects.isNull(queue)) {
            throw new RuntimeException("表达式为空");
        }

        String s = queue.poll();
        Stack<Object> stack = new Stack();
        while (Objects.nonNull(s)) {
            if (operatorPriorityMap.containsKey(s)) {
                Object o1 = stack.pop();
                Object o2 = stack.pop();
                if (">".equals(s)) {
                    stack.push(Double.parseDouble(o2.toString()) > Double.parseDouble(o1.toString()));
                } else if ("<".equals(s)) {
                    stack.push(Double.parseDouble(o2.toString()) < Double.parseDouble(o1.toString()));
                } else if ("=".equals(s)) {
                    stack.push(o2.equals(o1));
                } else if ("||".equals(s)) {
                    stack.push(Boolean.parseBoolean(o2.toString()) || Boolean.parseBoolean(o1.toString()));
                } else if ("&&".equals(s)){
                    stack.push(Boolean.parseBoolean(o2.toString()) && Boolean.parseBoolean(o1.toString()));
                }
            } else {
                stack.push(s);
            }
            s = queue.poll();
        }

        return Boolean.parseBoolean(stack.pop().toString());
    }

Oauth2介绍

oauth2是什么?

oauth2.0是一个标准的授权协议,它可以为第三方应用,如:web程序、桌面/移动客户端等提供授权,获取用户的数据,它的标准是RFC 6749 文件,该文件解释了oauth的定义:

OAuth 引入了一个授权层,用来分离两种不同的角色:客户端和资源所有者。资源所有者同意以后,资源服务器可以向客户端颁发令牌。客户端通过令牌,去请求数据

其核心可以理解为:服务端向客户端颁发一个令牌,客户端通过令牌来访问服务端的数据。

为什么要使用oauth2.0

举个通俗的例子:古代的皇宫是皇上居住的地方,普通人是不允许进出的,某一天来了一个人对着守卫说:我是皇上七舅老爷的外孙女,快让我进去。

正常情况下守卫肯定不让他进去,谁知道这个人是不是骗子,守卫一般会找人禀明皇上,皇上跑过来一看,果真是他七舅老爷的外孙女,此时守卫才会放他进去

但隔了几天守卫换了,这个人出去了一趟又要进宫,或者他要进皇宫其它的地方,是不是又得走一遍流程,这样搞了几次后,皇上也不耐烦了,每次我总这样跑来跑去,效率也太低了,要是明天八舅老爷的外孙女来了不得累死,好了,我给你们发一张令牌:“皇室亲属进去牌”,并备注:这个人的信息,及可以进出的门。

这样门卫看到这样令牌后,直接根据上面的备注信息确认是否让他进去就可以了

oauth2.0的四种授权方式

  • 授权码(Authorization Code)
  • 隐藏式(implicit)
  • 密码式(password)
  • 客户端凭证(client credentials)

授权码方式

先思考一个问题:比如我们要访问B网站,B网站依赖于A网站给的用户数据,该如何做?
授权码的方式,是先在第三方应用申请一个授权码,然后用该码获取令牌。
比如下面链接:

https://b.com/oauth/authorize?
  response_type=code&amp;
  client_id=app37482284&amp;
  redirect_uri=https%3A%2F%2Fa.com%2Fcallback&amp;
  scope=read&amp;state=xcoiv98y2kd22vusuye3kch

首先用户访问B网站,获取一个授权码,client_id:是让B知道是谁在访问,redirect_uri:B网站接受或拒绝后跳转的网址
scope:标识要求授权的范围,这里是只读
state:随机生成的一个字符串,表示客户端的当前状态,服务端授权后会原封不动的返回这个值,简单的来说:这个参数主要保证请求的授权的客户端和后面使用授权服务的客户端是一个,防止CSRF攻击,可参考:《OAuth2.0忽略state参数引发的CSRF漏洞》(https://blog.csdn.net/gjb724332682/article/details/54428808)

用户点击这个链接后,会跳转到一个授权页面,类型微信的授权页面,如下:

然后用户点击授权,B网站会生成一个授权码,然后将需要的用户信息通过:
授权码 -> 临时令牌 的关系保存下来,然后将授权码返给A网站,跳转之后的地址为:

https://a.com/callback?code=8x4d3N674g2&amp;state=xcoiv98y2kd22vusuye3kch

然后A网站通过授权码在后端调用B的接口就可以获取到令牌了,
注意:这里获取令牌是在后端进行,A调用B服务器接口还需要传入一个密钥。
这样就避免了攻击者拿到授权码后在其它服务器上窃取令牌。

https://b.com/oauth/token?
 client_id=app37482284&amp;
 client_secret=X9gK6uB20sD1myc8cRe&amp;
 grant_type=authorization_code&amp;
 code=AUTHORIZATION_CODE&amp;
 redirect_uri=CALLBACK_URL

然后B网站根据授权码拿到令牌(token),返回给A的回调地址,A收到token后,就可以通过token获取到用户信息。

HTTP/1.1 200 OK
Content-Type: application/json
Cache-Control: no-store
Pragma: no-cache

{
  "access_token":"MTQ0NjJkZmQ5OTM2NDE1ZTZjNGZmZjI3",
  "token_type":"bearer",
  "expires_in":3600,
  "refresh_token":"IwOGYzYTlmM2YxOTQ5MGE3YmNmMDFkNTVk",
  "scope":"create delete"
}

回顾上面的调用流程,主要分为两个步骤:

  • 用户在客户端确认授权,获取授权码
  • 根据授权码获取令牌

为什么要先获取一个授权码,然后再根据授权码获取令牌呢?
因为获取授权码往往是在客户端上发起,如果第一步就获取到了令牌,那么攻击者很容易就窃取到令牌,所以获取令牌往往是放到服务端来做,这样的安全性更高。

隐藏式

隐藏式实质是授权码方式的一种缩略版,即直接是第一步就通过客户端访问获取到令牌,以上也说过这种方式的安全性很低,极可能存在中间人攻击的风险。
第一步,同样是客户端授权

https://b.com/oauth/authorize?
  response_type=token&amp;
  client_id=app37482284&amp;
  redirect_uri=https%3A%2F%2Fa.com%2Fcallback&amp;
  scope=read&amp;state=xcoiv98y2kd22vusuye3kch

第二步,直接回调到A网站地址,并携带令牌

https://a.com/callback#token=ACCESS_TOKEN

因为这种方式安全很低,已经不推荐使用,之所以出现这种方式,主要是因为最原始的应用不支持浏览器访问外部主机链接,所以采取了这种妥协的方式,后面由于Cross-Origin Resource Sharing (CORS)技术的出现,解决了这个问题,目前对于web服务推荐使用授权码方式,具体可参考:
《Why you should stop using the OAuth implicit grant!》(https://medium.com/oauth-2/why-you-should-stop-using-the-oauth-implicit-grant-2436ced1c926)

《Is the OAuth 2.0 Implicit Flow Dead?》(https://developer.okta.com/blog/2019/05/01/is-the-oauth-implicit-flow-dead)

密码式

密码式是单点登录用到比较多的一种场景。即客户端直接通过用户名和密码请求获取令牌。

先说说单点登录,比如我们登录了taobao.com网站,准备买一些东西,可看了半天没看到合适的,准备到tmall.com看看,淘宝和天猫都是阿里系的,既然都是一家的,完全可以复用taobao网站的登录状态。授权给tmall使用。
所以这就可以用到oauth2的密码方式,用户登录了一次,服务端把生成一个令牌,保存用户的信息,taobao和tmall都可以共享。

POST /oauth/token HTTP/1.1
Host: authorization-server.com
Content-type: application/x-www-form-urlencoded

https://taobao/token?grant_type=password

&amp;username=guestuser1
&amp;password=123654ww
&amp;client_id=xxxxxxxxxx

返回令牌信息

{
  "access_token": "ece8ec32-5855-48c5-838b-0d7f20c64d1c",
  "token_type": "bearer",
  "expires_in": 3600,
  "scope": "create"
}

grant_type:授权方式,password表示:密码式
username:用户名
password:密码

在获取令牌的时候,服务端会将令牌->用户信息保存起来,后面客户端直接通过令牌就可以获取到用户信息。

客户端凭证

凭证式实质也是授权码的一种缩略版,可以理解为授权码方式的第二步,获取令牌都在服务端进行

https://b.com/token?
  grant_type=client_credentials&amp;
  client_id=app37482284&amp;
 client_secret=X9gK6uB20sD1myc8cR

这种方式适用于没有前端的应用,主要是用于第三方应用在后端访问传递数据。

MySQL事务隔离级别

对于数据库的隔离级别之前一直没有做详细整理,最近项目运行中发现了一个问题,所以抽时间对这块认真研究了下

业务场景:
服务A在处理流程中,会调用外部服务B,然后写入一条数据,服务B执行完成后,会回调服务C的接口更新服务A写入的数据。
问题:
在服务B回调服务C的时候总是找不到服务A写入的数据,在服务C中添加延时重试,问题依然存在,但此时查看数据库,对应的数据是已经存在。

先说原因吧,是因为MySQL的事务默认隔离级别是:可重复读。
在服务A调用服务B后,还没有写入数据到数据库,服务B就已经回调服务C了,服务C此时肯定是找不到对应的数据的,由于MySQL默认隔离级别是可重复读(即在一个事务中,对于同一份数据读取都是一样的),所以即使服务A已经写入了数据,服务C依然读取不到。

解决方案:
在服务C中的查询,不要放到一个事务里面,单独提取一个方法,后面的更新逻辑放到同一个事务中

什么是事务?

数据库事务是数据库管理系统执行过程中的一个逻辑单位,由一个有限的数据库操作序列构成。
比如:某人在商店购买100商品,其中包括两个操作:
1.该人账户减少100元
2.商店账户增加100元
这两个操作要么同时执行成功,要么同时执行失败。

数据库事务的ACID性质

  • Atomic:原子性,将所有SQL作为原子工作单元执行,要么全部执行,要么全部不执行;
  • Consistent:一致性,事务完成后,所有数据的状态都是一致的,即该人的账户减少了100,商店的账户必须增加100
  • Isolation:隔离性,比如两个人从同一个账户取款,这两个事务对数据的修改必须相互隔离,具体隔离策略后面具体讲解。
  • Duration:持久性,事务完成后,对数据库数据的修改必须被持久化存储。

数据库的隔离级别

在单个事务中,不需要做隔离,所谓数据库隔离级别是针对在并发事务的情况下,解决导致的一系列问题,这些问题包括:脏读、不可重复读、幻读,具体隔离级别如下图:

隔离级别 脏读 不可重复读 幻读
SERIALIZABLE(串行化) 避免 避免 避免
REPEATABLE READ(可重复读) 避免 避免 允许
READ COMMITED(读已提交) 避免 允许 允许
READ UNCOMMITED(读未提交) 允许 允许 允许

SERIALIZABLE(串行化)

当两个事务同时操作数据库中相同数据时,如果第一个事务已经在访问该数据,第二个事务只能停下来等待,必须等到第一个事务结束后才能恢复运行。因此这两个事务实际上是串行化方式运行。

REPEATABLE READ(可重复读)

一个事务在执行过程中可以看到其他事务已经提交的新插入的记录,但是不能看到其他事务对已有记录的更新。

READ COMMITTED(读已提交数据)

一个事务在执行过程中可以看到其他事务已经提交的新插入的记录,而且还能看到其他事务已经提交的对已有记录的更新。

READ UNCOMMITTED(读未提交数据)

一个事务在执行过程中可以看到其他事务没有提交的新插入的记录,而且还能看到其他事务没有提交的对已有记录的更新。

没有事务隔离级别导致的问题

如果没有数据库的隔离级别,数据库的数据是实时变化的,即每个事务都可以读到其它事务修改后的数据,下面结合实例介绍每个场景的问题。

脏读

定义:读到未提交更新的数据

时间点 事务1 事务2
T1 开始事务
T2 开始事务
T3 查询账户余额为1000
T4 取出100后金额为900
T5 查询账户金额为900(脏读)
T6 撤销事务,余额恢复为1000
T7 存入100元后,金额变为1000
T8 提交事务

如上事务1取出金额100后又回滚了,即啥都没做,但事务2存入了100,但最终的金额确还是1000,正确应该是1100。
在T5时间节点出现了脏读,如果数据库配置了隔离级别为SERIALIZABLE、REPEATABLE READ、READ COMMITTED,在事务1没有提交的时候,事务2读取的都是原来的值就不会出现问题。

不可重复读

定义:在同一个数据中,两次读取到的数据不一致,读到了其他数据提交更新的数据

时间点 事务1 事务2
T1 开始事务
T2 开始事务
T3 查询账户余额为1000
T4 查询账户余额为1000
T5 取出100后金额为900
T6 提交事务
T7 查询账户余额为900(与T4读取的不一致)

事务2的两次读取到的数据不一致,第二次读取到了事务1提交的数据

幻读

定义:读取到另一个事务已提交插入或删除的数据。

时间点 事务1 事务2
T1 开始事务
T2 开始事务
T3 统计一年级1班所有的学生人数为40人
T4 一年级1班新增一名学生
T5 提交事务
T6 再次统计一年级1班的所有学生人数为41人

事务2第一次统计人数为40人,第二次统计为41人,两次统计的结果不一致,同样如果T4时间节点转走一名学生,也会出现不一致

不可重复读和幻读看起来比较类似,都是一个事务里面读取到两次不同的结果
本质的区别是:不可重复读是由于数据更新导致数据不一致导致,幻读是由于插入或删除了数据导致的。

分布式事务seata的AT模式介绍

seata是阿里开源的一款分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,本文主要介绍AT模式的使用。

seata安装

下载seata服务,官方地址:https://github.com/seata/seata/releases
在Linux下,下载完成后,直接解压,通过命令安装即可:

sh ./bin/seata-server.sh

支持的启动参数

参数 全写 作用 备注
-h –host 指定在注册中心注册的 IP 不指定时获取当前的 IP,外部访问部署在云环境和容器中的 server 建议指定
-p –port 指定 server 启动的端口 默认为 8091
-m –storeMode 事务日志存储方式 支持file和db,默认为 file
-n –serverNode 用于指定seata-server节点ID ,如 1,2,3…, 默认为 1
-e –seataEnv 指定 seata-server 运行环境 如 dev, test 等, 服务启动时会使用 registry-dev.conf 这样的配置

如:

sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file

seata的AT模式介绍

AT模式实质是两阶段提交协议的演变,具体如下:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源
  • 二阶段:
    提交异步化,非常快速地完成。
    回滚通过一阶段的回滚日志进行反向补偿。

业务背景:
用户调用系统A的store服务,store服务调用系统B的company服务,company服务会新增一条数据,然后把companyId返回系统A,然后系统A通过companyId再新增一条store数据。

一般如果store服务执行失败了,直接抛异常了,所以company服务也不会执行,
但如果store服务执行成功了,已经写了一条数据到数据库,执行company服务时失败了,就会产生数据不一致的问题。

使用seata的AT模式,主要分为下面几个步骤:

  • 配置seata服务及创建事务表
  • 调用方配置(对应上面的store服务)
  • 服务提供方配置(对应上面的company服务)

配置seata服务及创建事务表

配置conf/file.conf文件

<code>## transaction log store, only used in server side
store {
  ## store mode: file、db
  mode = "db" //修改为db模式,标识事务信息用db存储
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://192.168.234.1:3306/seata?useUnicode=true&amp;characterEncoding=utf8&amp;useSSL=false&amp;&amp;serverTimezone=UTC" //修改数据库连接
    user = "seata" //修改数据库账号
    password = "123456" //修改数据库密码
    minConn = 5
    maxConn = 30
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
  }
}
## server configuration, only used in server side
service {
  #vgroup-&gt;rgroup
  vgroup_mapping.chuanzh_tx_group = "default" //chuanzh_tx_group为自定义的事务组名称,要和客户端配置保持一致
  #only support single node
  default.grouplist = "192.168.234.128:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

</code>

上面配置共修改了3个地方:

  1. 存储模式改为db模式,需要创建3张事务表,如下:
    <code>-- the table to store GlobalSession data
     CREATE TABLE IF NOT EXISTS `global_table`
     (
         `xid`                       VARCHAR(128) NOT NULL,
         `transaction_id`            BIGINT,
         `status`                    TINYINT      NOT NULL,
         `application_id`            VARCHAR(32),
         `transaction_service_group` VARCHAR(32),
         `transaction_name`          VARCHAR(128),
         `timeout`                   INT,
         `begin_time`                BIGINT,
         `application_data`          VARCHAR(2000),
         `gmt_create`                DATETIME,
         `gmt_modified`              DATETIME,
         PRIMARY KEY (`xid`),
         KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
         KEY `idx_transaction_id` (`transaction_id`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
     -- the table to store BranchSession data
     CREATE TABLE IF NOT EXISTS `branch_table`
     (
         `branch_id`         BIGINT       NOT NULL,
         `xid`               VARCHAR(128) NOT NULL,
         `transaction_id`    BIGINT,
         `resource_group_id` VARCHAR(32),
         `resource_id`       VARCHAR(256),
         `branch_type`       VARCHAR(8),
         `status`            TINYINT,
         `client_id`         VARCHAR(64),
         `application_data`  VARCHAR(2000),
         `gmt_create`        DATETIME(6),
         `gmt_modified`      DATETIME(6),
         PRIMARY KEY (`branch_id`),
         KEY `idx_xid` (`xid`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
     -- the table to store lock data
     CREATE TABLE IF NOT EXISTS `lock_table`
     (
         `row_key`        VARCHAR(128) NOT NULL,
         `xid`            VARCHAR(96),
         `transaction_id` BIGINT,
         `branch_id`      BIGINT       NOT NULL,
         `resource_id`    VARCHAR(256),
         `table_name`     VARCHAR(32),
         `pk`             VARCHAR(36),
         `gmt_create`     DATETIME,
         `gmt_modified`   DATETIME,
         PRIMARY KEY (`row_key`),
         KEY `idx_branch_id` (`branch_id`)
     ) ENGINE = InnoDB
       DEFAULT CHARSET = utf8;
    
    </code>
  2. 修改数据库连接,注意如果你安装的是MySQL8,则需要修改MySQL8的驱动:driverClassName = “com.mysql.cj.jdbc.Driver”,不然会出现启动报错的问题,详细请参考:seata启动MySQL报错 #359(https://github.com/seata/seata-samples/issues/359)。
  3. 修改事务的组名,你也可以不修改,我这里使用的是:chuanzh_tx_group
  4. 创建业务事务表,记录业务需要回滚的数据,在分布式事务中,每个参与的业务数据库都需要添加对应的表
    <code>CREATE TABLE `undo_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(100) NOT NULL,
      `context` varchar(128) NOT NULL,
      `rollback_info` longblob NOT NULL,
      `log_status` int(11) NOT NULL,
      `log_created` datetime NOT NULL,
      `log_modified` datetime NOT NULL,
      `ext` varchar(100) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
    </code>

配置conf/registry.conf文件

<code>registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"  修改注册方式,微服务调用使用的是Eureka

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://192.168.234.1:8081/eureka"  //修改Eureka地址
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}
</code>

以上修改了使用Eureka方式注册,并配置了Eureka地址,启动MySQL、Eureka服务后,就可以启动seata服务了。

调用方配置(store-server)

maven配置,使用seata-spring-boot-starter,自动配置的方式,不需要再添加file.conf和register.conf文件

<code>    &lt;!--druid--&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;com.alibaba&lt;/groupId&gt;
        &lt;artifactId&gt;druid-spring-boot-starter&lt;/artifactId&gt;
        &lt;version&gt;${druid-spring-boot-starter.version}&lt;/version&gt;
    &lt;/dependency&gt;

    &lt;!--seata--&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;io.seata&lt;/groupId&gt;
        &lt;artifactId&gt;seata-spring-boot-starter&lt;/artifactId&gt;
        &lt;version&gt;1.2.0&lt;/version&gt;
    &lt;/dependency&gt;
</code>

application.properties配置:

<code>server.port=9090
spring.application.name=store-server

mybatis.type-aliases-package=com.chuanzh.model
mybatis.mapper-locations=classpath:mapper/*.xml

spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&amp;characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

#注意这里的事务组配置要和服务端一致
seata.tx-service-group=chuanzh_tx_group
seata.service.vgroup-mapping.chuanzh_tx_group=default
seata.service.grouplist.default=192.168.234.128:8091

logging.level.io.seata=DEBUG
## eureka
eureka.client.serviceUrl.defaultZone= http://localhost:8081/eureka/

</code>

数据源配置,因为seata是对数据库的datasource进行了接管和代理,所以在每个参与分布式事务的数据源都要进行如下配置:

<code>@Configuration
public class DataSourceConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        DruidDataSource druidDataSource = new DruidDataSource();
        return druidDataSource;
    }

    @Primary
    @Bean("dataSource")
    public DataSourceProxy dataSource(DataSource druidDataSource){
        return new DataSourceProxy(druidDataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy)throws Exception{
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:/mapper/*.xml"));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }

}
</code>

注意配置了数据源后,启动会出现循环依赖的问题,如下,

还需要在启动类排除dataSource自动配置,其它的解决方法,可以参考:集成fescar数据源循环依赖错误解决方案(https://blog.csdn.net/kangsa998/article/details/90042406)

<code>@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
</code>

配置请求拦截器,生成一个请求事务ID,用于在微服务中传递

<code>@Configuration
public class SeataRequestInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
        String xid = RootContext.getXID();
        if (StringUtils.isNotBlank(xid)) {
            //构建请求头
            requestTemplate.header("TX_XID", xid);
        }
    }
}
</code>

服务提供方配置(company-server)

maven、application.properties、数据源配置同调用方配置,区别主要是拦截器的配置,如下:

<code>@Slf4j
@Component
public class SeataHandlerInterceptor implements HandlerInterceptor {

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String xid = RootContext.getXID();
        String rpcXid = request.getHeader("TX_XID");
        //获取全局事务编号
        if(log.isDebugEnabled()) {
            log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
        }
        if(xid == null &amp;&amp; rpcXid != null) {
            //设置全局事务编号
            RootContext.bind(rpcXid);
            if(log.isDebugEnabled()) {
                log.debug("bind {} to RootContext", rpcXid);
            }
        }
        return true;
    }
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception e) {
        String rpcXid = request.getHeader("TX_XID");
        if(!StringUtils.isEmpty(rpcXid)) {
            String unbindXid = RootContext.unbind();
            if(log.isDebugEnabled()) {
                log.debug("unbind {} from RootContext", unbindXid);
            }

            if(!rpcXid.equalsIgnoreCase(unbindXid)) {
                log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
                if(unbindXid != null) {
                    RootContext.bind(unbindXid);
                    log.warn("bind {} back to RootContext", unbindXid);
                }
            }

        }
    }

}
</code>
<code>@Configuration
public class WebMvcConfiguration implements WebMvcConfigurer {

    @Autowired
    private SeataHandlerInterceptor seataHandlerInterceptor;

    public void addInterceptors(InterceptorRegistry registry) {
        //注册HandlerInterceptor,拦截所有请求
        registry.addInterceptor(seataHandlerInterceptor).addPathPatterns(new String[]{"/**"});
    }

}
</code>

添加全局事务注解

在服务调用方的方法上添加@GlobalTransactional注解,下面模拟了一种场景,如果companyId为偶数,则会抛异常。

<code>    @GlobalTransactional(rollbackFor = Exception.class)
    public void create(StoreEntity storeEntity) throws Exception {
        CompanyEntity companyEntity = new CompanyEntity();
        companyEntity.setName(storeEntity.getName());
        companyEntity = companyFeign.createCompany(companyEntity);

        /**
         * 模拟异常
         */
        if (companyEntity.getId() % 2 == 0) {
            throw new Exception();
        }

        /** 写入store数据 */
        storeEntity.setCompanyId(companyEntity.getId());
        storeMapper.insert(storeEntity);
    }
</code>

经过测试,companyFeign.createCompany服务调用后会先向数据库写一条数据,当create方法执行抛异常,就会事务回滚,删除掉原先的company数据

ElasticSearch高并发场景写入优化

ElasticSearch高并发场景写入优化

ElasticSearch号称分布式全文搜索引擎,但使用不当依然会很慢,特别是在高并发写入时,会存在写入超时的问题。

在公司内部,基本所有的日志都会放入到ElasticSearch,比如接口访问时间日志、动态/计划数据审核日志、动态/计划抓取报文日志、动态报文更新日志等,每天的写入数据量巨大,最初我们基于ElasticSearch封装了一层,使用内部RestHighLevelClient获取数据,部分代码如下:

@Slf4j
public class ElasticsearchService {

    private RestHighLevelClient client;

    public ElasticsearchService(RestHighLevelClient client) {
        if (client == null)
            throw new IllegalArgumentException("client");
        this.client = client;
    }

    /**
     * 根据id获取文档
     **/
    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;
        try {
            GetRequest getRequest = new GetRequest(index, type, id);
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

            return getResponse;

        } catch (Exception e) {
            log.error("获取文档失败", e);
        }
        return null;
    }

    /**
     * 插入文档, id可以为空,id如果为空,就自动生成id
     **/
    public IndexResponse insertDocument(String index, String type, String id, String json) {
        if (StringUtils.isAnyBlank(index, type, json))
            return null;

        try {
            IndexRequest indexRequest = generateInsertRequest(index, type, id, json);

            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            return indexResponse;
        } catch (Exception e) {
            log.error("创建文档失败", e);
        }
        return null;
    }
}

然而在放到线上生成环境下,经常会出现写入失败数据丢失的情况,发现是数据量写入太大,调用接口超时,直接返回错误了。RestHighLevelClient实质就是通过httpclient请求接口发送数据,是基于http协议。

查看Elasticsearch文档,发现Elasticsearch同样支持使用Transport传输,其内部使用netty长连接通信,并且可以设置处理连接的数量。

新建一个TransportClientFactory用来创建TransportClient,这里连接需要使用密钥,具体填写自己Elasticsearch的账号密码。

public class TransportClientFactory {
    // 默认处理核心数,cpu核心数
    private static int DEFAULT_PROCESSORS = 8;
    // 配置核心数
    private static int processors = 8;

    public static TransportClient create(String addresses, int port, String keyStore, String keyPassword, String trustStore, String trustPassword) throws Exception {

        if (StringUtils.isBlank(addresses)) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (StringUtils.isAnyBlank(keyStore, keyPassword, trustStore, trustPassword)) {
            throw new IllegalArgumentException("缺少证书或密码");
        }

        String[] addrs = addresses.split(",");
        if (addrs.length == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        List<String> validAddrs = new ArrayList<>();
        for (String addr : addrs) {
            if (StringUtils.isNotBlank(addr)) {
                validAddrs.add(addr.trim());
            }
        }

        if (validAddrs.size() == 0) {
            throw new IllegalArgumentException("请输入ES的地址,多个地址之间用逗号分隔");
        }

        if (processors < DEFAULT_PROCESSORS) {
            processors = DEFAULT_PROCESSORS;
        }
        if (processors > 32) {
            processors = 32;
        }
        int threadPoolCore = processors * 2;
        int threadPoolMax = processors * 2;
        Settings settings = Settings
                .builder()
                .put("path.home", ".")
                // .put("cluster.name", clusterName)
                .put("client.transport.ignore_cluster_name", true).put("searchguard.ssl.transport.enabled", true)
                .put("searchguard.ssl.transport.enforce_hostname_verification", false).put("searchguard.ssl.transport.keystore_filepath", keyStore)
                .put("searchguard.ssl.transport.keystore_password", keyPassword).put("searchguard.ssl.transport.truststore_filepath", trustStore)
                .put("searchguard.ssl.transport.truststore_password", trustPassword).put("processors", processors).put("thread_pool.flush.core", threadPoolCore)
                .put("thread_pool.flush.max", threadPoolMax).build();

        TransportClient client = new PreBuiltTransportClient(settings, SearchGuardPlugin.class);

        for (String validAddr : validAddrs) {
            client.addTransportAddress(new TransportAddress(InetAddress.getByName(validAddr), port));
        }

        return client;
    }

    public static int getProcessors() {
        return processors;
    }

    public static void setProcessors(int processors) {
        TransportClientFactory.processors = processors;
    }

}

修改ElasticsearchService

@Slf4j
public class ElasticsearchService {

    /**
     * 默认10秒超时
     **/
    private long bulkTimeoutSecond = 10;

    private TransportClient client;

    public ElasticsearchService(TransportClient client) {
        this.client = client;
    }

    public ElasticsearchService(TransportClient client, int asyncTimeoutSecond) {
        this.client = client;
        this.bulkTimeoutSecond = asyncTimeoutSecond;
    }

    public GetResponse getDocument(String index, String type, String id) {
        if (StringUtils.isAnyBlank(index, type, id))
            return null;

        return this.client.prepareGet(index, type, id).get();
    }

    public IndexResponse createDocument(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        IndexRequestBuilder builder = createDocumentRequestBuilder(index, type, id, msg);
        if (builder == null)
            return null;

        return builder.get();
    }

    private IndexRequestBuilder createDocumentRequestBuilder(String index, String type, String id, String msg) {
        if (StringUtils.isAnyBlank(index, type, msg))
            return null;

        if (StringUtils.isBlank(id)) {
            return this.client.prepareIndex(index, type).setSource(msg, XContentType.JSON);
        } else {
            return this.client.prepareIndex(index, type, id).setSource(msg, XContentType.JSON);
        }
    }

}

另外为了提升写入效率,可以批量异步一次性写入,使用bulk方法,如下:

  /**
     * 批量操作,listener可空,支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    public void bulkDocumentOperationAsync(List<DocWriteRequest<?>> requests, ActionListener<BulkResponse> listener) {
        if (requests == null || requests.size() == 0)
            return;

        BulkRequest request = prepareBulkRequests(requests);
        if (request == null)
            return;

        this.client.bulk(request, listener);
    }

    /**
     * 支持 IndexRequest, UpdateRequest, DeleteRequest
     **/
    private BulkRequest prepareBulkRequests(List<DocWriteRequest<?>> requests) {
        BulkRequestBuilder builder = bulkRequestBuilder();

        for (DocWriteRequest<?> request : requests) {
            if (request instanceof IndexRequest) {
                builder.add((IndexRequest) request);
            } else if (request instanceof UpdateRequest) {
                builder.add((UpdateRequest) request);
            } else if (request instanceof DeleteRequest) {
                builder.add((DeleteRequest) request);
            }
        }

        if (builder.numberOfActions() == 0) {
            return null;
        }
        return builder.request();
    }

在dao层,我们可以一次性写入多条数据

    /**
     * 异步写入动态日志
     * 
     * @param flightRecords
     */
    public void insertFlightRecodeListAsync(List<FlightRecord> flightRecords) {
        try {
            if (CollectionUtils.isEmpty(flightRecords)) {
                return;
            }
            ElasticsearchService elasticsearchService = elasticsearchProvider.getElasticsearchService();
            List<DocWriteRequest<?>> requests = new ArrayList(flightRecords.size());
            for (FlightRecord flightRecord : flightRecords) {
                // index以航班日期为准
                String index = elasticsearchProvider.FLIGHT_RECORD_INDEX_PREFIX + flightRecord.getLocalDate().replace("-", ".");
                requests.add(elasticsearchService.createDocumentRequest(index, elasticsearchProvider.FLIGHT_RECORD_TYPE, null, JSON.toJSONString(flightRecord)));

            }
            if (CollectionUtils.isEmpty(requests)) {
                return;
            }
            elasticsearchService.bulkDocumentOperationAsync(requests, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkItemResponses) {
                }

                @Override
                public void onFailure(Exception e) {
                    log.error("flightRecord写入ES失败", e);
                }
            });
        } catch (Exception e) {
            log.error("insertFlightRecodeListAsync Exception", e);
        }
    }

其它写入性能优化:

  1. 去掉不必要的字段分词和索引,ElasticSearch会默认对所有字段进行分词,一般查询我们都是根据航班号、航班日期、起飞机场、到达机场查询;所以没必要的字段,我们可以使用keyword类型,不进行分词。
    PUT my_index
    {
      "mappings": {
        "my_type": {
          "properties": {
            "tail_no": { 
              "type": "keyword",
              "index": false
            }
          }
        }
      }
    }
  2. 对于一些普通的日志,比如动态/计划抓取网页日志,数据丢失也无所谓,可以禁用掉refresh和replia,即index.refreshinterval设置为-1,将index.numberof_replicas设置为0即可。
    PUT my_index
    {
      "settings": {
        "index": {
            "refresh_interval" : "-1",
            "number_of_replicas" : 0
        }
      }
    }
  3. 使用ElasticSearch自增ID,如果我们要手动给ElasticSearch document设置一个id,那么ElasticSearch需要每次都去确认一下那个id是否存在,这个过程是比较耗费时间的。如果我们使用自动生成的id,那么ElasticSearch就可以跳过这个步骤,写入性能会更好。
  4. 使用多线程写入ElasticSearch,单线程发送bulk请求是无法最大化ElasticSearch集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。为了更好的利用集群的资源,这样多线程并发写入,可以减少每次底层磁盘fsync的次数和开销。一样,可以对单个ElasticSearch节点的单个shard做压测,比如说,先是2个线程,然后是4个线程,然后是8个线程,16个,每次线程数量倍增。一旦发现es返回了TOOMANYREQUESTS的错误,JavaClient也就是EsRejectedExecutionException,此时那么就说明ElasticSearch是说已经到了一个并发写入的最大瓶颈了,此时我们就知道最多只能支撑这么高的并发写入了。
  5. 适当增加index buffer的大小,如果我们要进行非常重的高并发写入操作,那么最好将index buffer调大一些,indices.memory.indexbuffersize,这个可以调节大一些,设置的这个index buffer大小,是所有的shard公用的,但是如果除以shard数量以后,算出来平均每个shard可以使用的内存大小,一般建议,但是对于每个shard来说,最多给512mb,因为再大性能就没什么提升了。ElasticSearch会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个buffer。默认这个参数的值是10%,也就是jvm heap的10%,如果我们给jvm heap分配10gb内存,那么这个index buffer就有1gb,对于两个shard共享来说,是足够的了。

Kafka分区分配策略

Kafka分区分配策略

我们知道每个Topic会分配为很多partitions,Producers会将数据分配到每个partitions中,然后消费者Consumers从partitions中获取数据消费,那么Producers是如何将数据分到partitions中?Consumers又怎么知道从哪个partitions中消费数据?

生产者往Topic写数据

我们从product.send方法入手,看看里面的具体实现,可以看到在调用send方法时,其内部是调用了doSend方法,在doSend方法中有一个获取partitions的方法

int partition = partition(record, serializedKey, serializedValue, cluster);

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

从上面代码中,首先先选择配置的分区,如果没有配置则使用默认的分区,即使用了DefaultPartitioner中的partition方法

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // 没有可用的分区,则给一个不可用分区
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // 根据key的hash值和分区数取模
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

上面代码中先会根据topic获取所有的分区
1,如果key为null,则通过先产生随机数,之后在该数上自增的方式产生一个数nextValue,如果存在可用分区,将nextValue转为正数之后对可用分区进行取模操作,如果不存在可用分区,则将nextValue对总分区数进行取模操作

2,如果key不为空,就先获取key的hash值,然后和分区数进行取模操作

消费者从Topic读数据

kafka默认对消费分区指定了两种策略,分别为Range策略(org.apache.kafka.clients.consumer.RangeAssignor)和RoundRobin策略(org.apache.kafka.clients.consumer.RoundRobinAssignor),它们都实现了PartitionAssignor接口

Range策略

比如有10个分区,分别为P1、P2、P3、P4、P5、P6、P7、P8、P9、P10,三个消费者C1、C2、C3,消费如下图:

C4F4C4E974548380CE4D5274D30F20B9

我们来看看源代码:

@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                Map<String, Subscription> subscriptions) {
    // 得到topic和订阅的消费者集合信息,例如{t1:[c1,c2,c3], t2:[c1,c2,c3,c4]}                                            
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    // 将consumersPerTopic信息转换为assignment,memberId就是消费者client.id+uuid(kafka在client.id上追加的)
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());

    // 遍历每个Topic,获取所有的订阅消费者
    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
        String topic = topicEntry.getKey();
        List<String> consumersForTopic = topicEntry.getValue();

        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        // 如果Topic没有分区,则调过
        if (numPartitionsForTopic == null)
            continue;

         // 将Topic的订阅者根据字典排序
        Collections.sort(consumersForTopic);
         // 总分区数/订阅者的数量 得到每个订阅者应该分配分区数
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        // 无法整除的剩余分区数量
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

        List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        //遍历所有的消费者
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
              //分配到的分区的开始位置
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            // 分配到的分区数量(整除分配到的分区数量,加上1个无法整除分配到的分区--如果有资格分配到这个分区的话。判断是否有资格分配到这个分区:如果整除后余数为m,那么排序后的消费者集合中前m个消费者都能分配到一个额外的分区)
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            //给消费者分配分区
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        }
    }
    return assignment;
}

上面的代码添加了注释很清楚的展现了range的实现,对应上面的例子,如果有4个消费者C1、C2、C3、C4,那么根据上面的算法:

C1 -> [P1,P2,P3] ,C2 -> [P4,P5,P6] ,C3 -> [P7,P8] C4 -> [P9,P10] 。取余多出来的两个分区,由最前n个消费者来消费

RoundRobin策略

将主题的所有分区依次分配给消费者,比如有两个Topic:T1[P1,P2,P3,P4],T2[P5,P6,P7,P8,P9,P10],若C1、C2订阅了T1,C2、C3订阅了T2,那么C1将消费T1[P1,P3],C2将消费T1[P2,P4,P6,P8,P10],C3将消费T2[P5,P7,P9],如下图:

A8E30AE2B2B3245CCEB5102A9F1B8470

@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                Map<String, Subscription> subscriptions) {
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());
    // 将消费集合先按字典排序,构建成一个环形迭代器
    CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
   // 按Topic的名称排序,得到Topic下的所有分区
    for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
        final String topic = partition.topic();

        while (!subscriptions.get(assigner.peek()).topics().contains(topic))
            assigner.next();
        // 给消费者分配分区,并轮询到下一个消费者
        assignment.get(assigner.next()).add(partition);
    }
    return assignment;
}

/**
 * 根据消费者得到订阅的Topic下的所有分区
 * Topic按名称字典排序
 */
public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
                                                Map<String, Subscription> subscriptions) {
    SortedSet<String> topics = new TreeSet<>();
    for (Subscription subscription : subscriptions.values())
        topics.addAll(subscription.topics());

    List<TopicPartition> allPartitions = new ArrayList<>();
    for (String topic : topics) {
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        if (numPartitionsForTopic != null)
            allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));
    }
    return allPartitions;
}

 

使用Sharing-JDBC实现分表

使用Sharing-JDBC实现分表

Sharing-JDBC介绍

在创建数据库时,我们最先考虑的是按模块对数据库进行划为,但即使这样,单表数据量还是出现大数量的情况,Sharing-JDBC可以对表进行水平切分,将数据均分到不同表中

通过日期水平切分

目前我们航班动态数据全球每天航班20多万,考虑到我们业务场景,用户都是通过航班号+日期来查询一个航班,所以我们采取使用日期来水平分表

添加依赖

<dependency>
	<groupId>com.dangdang</groupId>
	<artifactId>sharding-jdbc-core</artifactId>
	<version>1.5.4.1</version>
</dependency>
<dependency>
	<groupId>com.dangdang</groupId>
	<artifactId>sharding-jdbc-config-spring</artifactId>
	<version>1.5.4.1</version>
</dependency>
<!--mysql flight数据源 -->
<bean id="mysqlDataSource" class="com.zaxxer.hikari.HikariDataSource" destroy-method="close">
...
</bean>

<!--shared jdbc -->

<rdb:strategy id="tableShardingStrategy" sharding-columns="local_date" algorithm-class="com.huoli.songshan.sharding.FlyDateTableShardingAlgorithm" />
<rdb:data-source id="shardingDataSource">
	<rdb:sharding-rule data-sources="mysqlDataSource" default-data-source="mysqlDataSource">
		<rdb:table-rules>
			<rdb:table-rule logic-table="flight_info" actual-tables="flight_info_${2012..2020}${['01','02','03','04','05','06','07','08','09','10','11','12']}${0..3}${0..9}"
				table-strategy="tableShardingStrategy" />
		</rdb:table-rules>
		<rdb:default-database-strategy sharding-columns="none" algorithm-class="com.dangdang.ddframe.rdb.sharding.api.strategy.database.NoneDatabaseShardingAlgorithm" />
	</rdb:sharding-rule>
	<rdb:props>
		<prop key="sql.show">false</prop>
	</rdb:props>
</rdb:data-source>

先配置一个dataSource数据源,这里我们使用的是hikari,再通过shardingDataSource对dataSource进行包装,按照表中localdate字段对表进行拆分,即表名依次为flightinfo20120101、flightinfo20120102,到flightinfo20201231,下面我们实现根据日期localdate映射到对应表上

public final class FlyDateTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Date> {
    private DateTimeFormatter dt = DateTimeFormat.forPattern("yyyyMMdd");

    @Override
    public String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<Date> shardingValue) {
        DateTime datetime = new DateTime(shardingValue.getValue());
        String flydate = datetime.toString(dt);
        for (String each : availableTargetNames) {
            if (each.endsWith(flydate)) {
                return each;
            }
        }
        throw new IllegalArgumentException();
    }

    @Override
    public Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<Date> shardingValue) {
        Collection<String> result = new LinkedHashSet<>(availableTargetNames.size());
        for (Date value : shardingValue.getValues()) {
            DateTime datetime = new DateTime(value);
            String flydate = datetime.toString(dt);
            for (String tableName : availableTargetNames) {
                if (tableName.endsWith(flydate)) {
                    result.add(tableName);
                }
            }
        }
        return result;
    }

    @Override
    public Collection<String> doBetweenSharding(Collection<String> availableTargetNames,
            ShardingValue<Date> shardingValue) {
        Collection<String> result = new LinkedHashSet<>(availableTargetNames.size());
        Range<Date> range = (Range<Date>) shardingValue.getValueRange();
        for (Date value = range.lowerEndpoint(); value.before(range.upperEndpoint()) || value.equals(range.upperEndpoint()); value = addDays(value, 1)) {
            DateTime datetime = new DateTime(value);
            String flydate = datetime.toString(dt);
            for (String each : availableTargetNames) {
                if (each.endsWith(flydate)) {
                    result.add(each);
                }
            }
        }
        return result;
    }

    private Date addDays(Date date, int days) {
        return new Date(new DateTime(new DateTime(date).plusDays(days)).toDate().getTime());
    }
}

这里实现了SQL的equal、in、between方法,即通过使用select * from flight_info where local_date=’2019-01-01′,会自动映射flight_info_20190101表中查询。

通过用户ID哈希取模进行拆分

在用户订阅航班动态数据后,我们需要保存用户的订阅数据,以便后期推送动态消息给用户,用户可以在登陆的情况下订阅航班,也可以在未登陆的情况下订阅航班,用户登陆后可以拿到用户的phoneId(设备ID)和userId(用户ID),用户未登陆只能获取到用户的phoneId(设备ID),所以我们区分两种情况,使用两种表来存放用户信息trip_subscribe_nologin和trip_subscribe_login,用户未登陆根据phoneId来分表,用户登陆的情况根据userId来分表

和之前一样,我们需要先配置一个分表策略

<!-- trip_subscribe_nonlogin分表策略 -->
<rdb:strategy id="subscribeNonLoginTableShardingStrategy" sharding-columns="uid" algorithm-class="com.huoli.trip.dao.sharding.SubscribeNonLoginTableShardingAlgorithm" />
<!-- trip_subscribe_login分表策略 -->
<rdb:strategy id="subscribeLoginTableShardingStrategy" sharding-columns="user_id" algorithm-class="com.huoli.trip.dao.sharding.SubscribeLoginTableShardingAlgorithm" />

<!-- 分表配置配置 -->
<rdb:data-source id="shardingDataSource">
	<rdb:sharding-rule data-sources="mysqlDataSource">
		<rdb:table-rules>
			<rdb:table-rule logic-table="trip_subscribe_nonlogin" table-strategy="subscribeNonLoginTableShardingStrategy" actual-tables="trip_subscribe_nonlogin_${0..63}">
				<rdb:generate-key-column column-name="id" column-key-generator-class="com.huoli.trip.dao.sharding.TripKeyGenerator" />
			</rdb:table-rule>
			<rdb:table-rule logic-table="trip_subscribe_login" table-strategy="subscribeLoginTableShardingStrategy" actual-tables="trip_subscribe_login_${0..63}">
				<rdb:generate-key-column column-name="id" column-key-generator-class="com.huoli.trip.dao.sharding.TripKeyGenerator" />
			</rdb:table-rule>
		</rdb:table-rules>
		<rdb:default-database-strategy sharding-columns="none" algorithm-class="com.dangdang.ddframe.rdb.sharding.api.strategy.database.NoneDatabaseShardingAlgorithm" />
	</rdb:sharding-rule>
	<rdb:props>
		<prop key="sql.show">true</prop>
	</rdb:props>
</rdb:data-source>

这里是根据phoneId、userId分别分成了64张表,即trip_subscribe_nologin有64张表(trip_subscribe_nologin_1,trip_subscribe_nologin_2 …),trip_subscribe_login有64张表(trip_subscribe_login_1,trip_subscribe_login_2 …),具体代码实现

public interface Shard<T> {

    /** 根据参数计算分片标识 */
    public T calculateShard(Object... args);
}

public class SubscribeNonLoginTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<String>, Shard<String> {

    /** 分片数量 */
    private final int shardNum = 64;

    public String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<String> shardingValue) {
        String shard = calculateShard(shardingValue.getValue());
        for (String tableName : availableTargetNames) {
            if (tableName.endsWith("_" + shard)) {
                return tableName;
            }
        }

        throw new IllegalArgumentException("未找到该表:trip_subscribe_nonlogin_" + shard);
    }

    public Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<String> shardingValue) {
        Collection<String> result = new LinkedHashSet<String>(availableTargetNames.size());
        for (String value : shardingValue.getValues()) {
            String shard = calculateShard(value);
            for (String tableName : availableTargetNames) {
                if (tableName.endsWith("_" + shard)) {
                    result.add(tableName);
                }
            }
        }
        return result;
    }

    public Collection<String> doBetweenSharding(Collection<String> availableTargetNames, ShardingValue<String> shardingValue) {
        // 不会出现between两个uid之间的查询需求,所以无需实现该方法
        return new LinkedHashSet<String>(availableTargetNames.size());
    }

    public String calculateShard(Object... args) {
        String phoneId = (String) args[0];
        return "" + (phoneId.hashCode() & 0x7fffffff) % shardNum;
    }
}

public class SubscribeLoginTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<String>, Shard<String> {

    /** 分片数量 */
    private final int shardNum = 64;

    /**
     * doEqualSharding/doInSharding同SubscribeNonLoginTableShardingAlgorithm
     */

    public String calculateShard(Object... args) {
        String userId = (String) args[0];
        return "" + (userId.hashCode() & 0x7fffffff) % shardNum;
    }
}

这里主要介绍下calculateShard方法,首先是先拿到了phoneId或userId,获取hashCode,然后和0x7fffffff进行&运算,0x7fffffff表示long的最大值,这一步是为了保证得到的index的第一位为0,也就是为了得到一个正数。因为有符号数第一位0代表正数,1代表负数,然后和分片数shardNum,使表数据分布在shardNum内

另外这里还用到了id的生成策略,即生成一个全局的自增ID,sharing-jdbc自带了DefaultKeyGenerator生成器可以实现

public class TripKeyGenerator implements KeyGenerator {
    private static Logger logger = LoggerFactory.getLogger(TripKeyGenerator.class);

    private DefaultKeyGenerator defaultKeyGenerator;

    public TripKeyGenerator() {
        defaultKeyGenerator = new DefaultKeyGenerator();
    }

    static {
        /** 从配置中获取workId */
        DefaultKeyGenerator.setWorkerId(workerId);
    }

    @Override
    public synchronized Number generateKey() {
        return defaultKeyGenerator.generateKey();
    }

}

通过twitter的snowlflake也可以生成唯一ID,具体可以参考:这里

 

Elasticsearch排序(三)

Elasticsearch排序(三)

Elasticsearch中排序是通过设置score字段来进行排序,score是一个浮点类型,所以我们对靠前的数据设置一个较大的值,然后根据倒序排列。

同样对于航班号查询,有时候需要根据航班号,日期查询出所有的航班号,用户会输入185,2018-12-01,这时需要返回所有相关航司的航班号。
比如匹配的航班有:CA1858,B6185,QF185,ZH1858,对于精确匹配,B6185、QF185要排在CA1858、ZH1858之前,对于非精确排序,CA航司要排在ZH航司前面

创建一个索引

之前我们对航班号创建了一个索引,用户可以输入CA1,出现CA1*的结果,现在用户输入185,是纯数字型的,所以我们需要单独一个字段(flightNum)保存这个航班数字,然后对它创建索引

PUT flight
{
  "settings": {
    "analysis": {
      "analyzer": {
        "flightNoAndNumAnalyzer": {
          "tokenizer": "flightNoAndNumTokenizer"
        }
      },
      "tokenizer": {
        "flightNoAndNumTokenizer": {
          "type": "edge_ngram",
          "min_gram": 3,
          "max_gram": 8,
          "token_chars": ["letter","digit"]
        }
      }
    }
  },
  "mappings": {       
      "dynamic": {           
          "properties": {   
              "flightNo": {
                 "type": "text",
                 "analyzer" : "flightNoAndNumAnalyzer"                          
              },
              "flightNum": {
                 "type": "text",
                 "analyzer" : "flightNoAndNumAnalyzer"                          
              }
          }
      }
  }

}

如果是航班数字查询,我们只需要根据航司来设定优先级,同时设定航班数字全匹配的优先级最高,然后查询的时候再通过constant_score统一来设置boost参数

GET flight/dynamic/_search
{
  "query": {
    "bool": {
      "must": [
        {"prefix": {
          "flightNum": {
            "value": "185"
          }
        }}
      ], 
      "should": [
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "CA"
              }
            },
            "boost": 4
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "ZH"
              }
            },
            "boost": 3
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "B6"
              }
            },
            "boost": 2
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "airline": "QF"
              }
            },
            "boost": 1
          }
        },
        {
          "constant_score": {
            "filter": {
              "term": {
                "flightNum": "185"
              }
            },
            "boost": 10
          }
        }
      ]
      }
    }
}

可以看到CA航司比B6高,但是当查询flightNum为185的时候,评分设置为最高,所以B6186会优先显示,其次是QF185,最后是CA1858和ZH1858 具体代码实现:

//航班数字优先级设置
BoolQueryBuilder qb = QueryBuilders.boolQuery()
                    .must(prefixQuery("flightNum", keyword))
                    .should(constantScoreQuery(matchQuery("flightNum", keyword)).boost(10f))

//航司优先级设置
airNumer.put("CA", 4f);
airNumer.put("ZH", 3f);
airNumer.put("B6", 4f);
airNumer.put("QF", 1f);
for (String airline : airNumer.keySet()) {
    queryBuilder.should(constantScoreQuery(matchQuery("airline", airline)).boost(airNumer.get(airline)));
}

 

 

 

Elasticsearch创建索引及数据(二)

Elasticsearch创建索引及数据(二)

第一篇介绍了Elasticsearch安装及基本用法,下面我们自己来创建一个索引,并写入一些数据

创建索引

基本语法

PUT /my_index
{
    "settings": { ... any settings ... },
    "mappings": {
        "type_one": { ... any mappings ... },
        "type_two": { ... any mappings ... },
        ...
    }

在创建索引的时候,我们需要设置索引被存放的分片数量、分析器、类型设置,分片数量、分析器通过settings来设置,类型映射通过mappings来设置,Elasticsearch默认创建索引是5个分片,1个副本,可通过settings来修改它,这里我们就遵循默认值不动,如下:

PUT flight
{
    "settings" : {
        "index" : {
            "number_of_shards" : 5, 
            "number_of_replicas" : 1 
        }
    }
}
{
  "acknowledged": true,
  "shards_acknowledged": true,
  "index": "flight"
}

创建一个flight索引,大括号后面为默认配置,如果不想修改它,都可以去掉,然后在新建一个type为dynamic,并写入数据

PUT /flight/dynamic/1
{
    "flightNo":"CA1858",
    "flightDate":"2018-12-01",
    "depCode":"SHA",
    "arrCode":"PEK",
    "state": "到达",
    "subState": "",
    "depPlanTime":"2018-12-01 07:45",
    "arrPlanTime":"2018-12-01 10:10",
    "depReadyTime":"2018-12-01 07:45",
    "arrReadyTime":"2018-12-01 09:45",
    "depTime":"2018-12-01 07:54",
    "arrTime":"2018-12-01 07:46",
    "distance":1076, 
    "tailNo":"B2487", 
    "depTerm":"T2", 
    "arrTerm":"T3", 
    "gate":"48", 
    "luggage":"32"
}

创建一条数据后,ES会默认会对所有字段添加索引,当然也可以不指定ID,ES会默认生成一个ID,自动生成的ID有22个字符长,类似:wM0OSFhDQXGZAWDf0-drSA

{
  "_index": "flight",
  "_type": "dynamic",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 0,
  "_primary_term": 1
}

下面我们来查询数据,ES默认情况下是禁用了source,即我们查询时,不会返回source里面的内容,只会返回数据ID,我们可以指定需要返回的_srouce字段

GET flight/dynamic/_search
{
    "query":   { "match_all": {}},
    "_source": [ "flightNo", "flightDate"]
}
{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "flight",
        "_type": "dynamic",
        "_id": "1",
        "_score": 1,
        "_source": {
          "flightNo": "CA1858",
          "flightDate": "2018-12-01"
        }
      }
    ]
  }
}

配置分析器

分析器介绍

分析器主要是将一块文本分成适合于倒排索引的独立 词条,通过这些词条我们可以搜索到指定的文档

分析器主要有以下几个功能:
1,字符过滤:通过整理字符串,如去掉HTML,将&转换为and
2,分词器:将字符串分成单个词条,比如通过空格或者标点符号来拆分
3,Token过滤:比如将Quick这种词条统一转换为小写,删除a,and,the这些无用词,增加近义词条(如:jump和leap这种同义词)

比如我们有一个航班号CA1858,我们需要通过输入CA18就可以返回对应的结果,那么我们就需要对其进行分词,如果直接使用下面的查询是获取不到数据的

GET flight/dynamic/_search
{
  "query": {
    "match": {
      "flightNo": "CA18"
    }
  }
}

我们可以查看ES对flightNo的分词情况

POST flight/_analyze
{
  "field": "flightNo",
  "text": "CA1858"
}
{
  "tokens": [
    {
      "token": "ca1858",
      "start_offset": 0,
      "end_offset": 6,
      "type": "<ALPHANUM>",
      "position": 0
    }
  ]
}

可以看到ES是对航班号转换为小写后直接进行了倒排索引,没有进行分词,直接查询CA18肯定搜索不到,下面我们自定义一个分析器

PUT flight
{
  "settings": {
    "analysis": {
      "analyzer": {
        "flightNoAnalyzer": {
          "tokenizer": "flightNoTokenizer"
        }
      },
      "tokenizer": {
        "flightNoTokenizer": {
          "type": "edge_ngram",
          "min_gram": 4,
          "max_gram": 8,
          "token_chars": ["letter","digit"]
        }
      }
    }
  },
  "mappings": {       
      "dynamic": {           
          "properties": {   
              "flightNo": {
                 "type": "text",
                 "analyzer" : "flightNoAnalyzer"                          
              }
          }
      }
  }

}

edgengram为ES自带的分词器,ES自带了8种分析器,具体可以查看官方文档,我们自定义了一个analyzer,使用edgengram来进行分词,字符长度从4到8,然后将自定义的分析器映射到flightNo字段上,之后我们使用查询CA18就可以获取到结果了