分布式一致性协议

分布式一致性协议

在分布式系统中为了解决数据的一致性,主要有二阶段提交协议、三阶段提交协议、Poxos算法、TTC协议、Rraft协议、ZAB协议(zookeeper的协议),下面就分别介绍这几种协议

二阶段提交协议

二阶段提交协议主要思想是:有一个协调者,多个参与者,协调者负责发送命令给参与者,确保数据能同时更新到所有节点上,主要步骤如下:

Two-phase_commit

1)协调者将事务的请求发送给所有参与者,询问是否可以提交事务,参与者锁住自己的资源,并写undo/redo日志,如果参与者都准备成功,则向协调者回应“可以提交”

2)协调者所有参与者都会有“可以提交”,此时向参与者发送“正式提交”命令,所有参与者开始提交自己的事务

如上述1)2)有任何不成功,所有参与者都将回滚自己的事务

二阶段提交原理比较简单,实现起来也比较方便,但问题也比较明显,如:

1,同步堵塞:所有参与者在没有收到协调者发送过来的“正式提交”命令,都将锁住资源,其他线程如果要获取资源只能等待

2,单点问题:所有参与者都依靠一个协调者,如果协调者在2)操作突然失去联系,这个时候所有的参与者都不知道如何处理,是否提交事务

3,数据不一致性:在2)操作中,可能由于网络原因,有些节点收到提交命令,有些没有,从而照成数据不一致的问题

三阶段提交协议

Three-phase_commit_diagram

三阶段相比二阶段的思想是在操作1)中多了一步,首先询问:是否可以锁资源,如果所有人同意了才开始锁资源,后面的步骤就童二阶段提交了,它相比二阶段协议,解决了如果协调者突然失去联系,参与者仍可以提交他们的事务

但三阶段实现起来还是比较复杂,而且由于参与者最后还是会提交自己的事务,也会造成数据不一致行

TTC协议

为了减少资源被堵塞的时间,产生了TTC协议,可以这样理解TTC是针对SOA服务的锁,2PC是针对数据库的锁

比如我们需要从A账户转100到B账户,必然会涉及到如下几个步骤 1,读取A账户金额,-100 2,读取B账户金额,+100

2PC就会先锁住A账户和B账号的数据,那么任何其它线程想要读取这个账号数据,都将堵塞

TCC则先会从A账号-100元,这时线程也可以读取A账号的数据了,如果转入B账号发生失败,那么就会调用回滚接口,再将A账号+100元,这实质是一种补偿机制

Poxos算法

理解Poxos算法,先弄清楚几个角色

1,proposers 投票人,可以理解为收集人民意见的人

2,acceptors 接受投票者,可以理解为人大代表

3,learners 学习者,可以理解为记录员,将处理的结果记录下来

608A9B29BB2FE2E9787DDB3A3876268E

你可以这样理解,首先投票人接收到人民的意见,就开始提出一个法案,并把这个法案告诉人大代表,说现在人民提了一个法案,我们来开始讨论这个法案吧,人大代表如果同意了(这里的同意指的是有半数以上的人同意了),那么就开始讨论这个法案,如果人大代表都同意(半数以上人同意)通过这个法案,那么大家就会确定下来,交给记录人员将这个法案记录下来,并告诉人民,现在已经确定了一个新法案

poxos算法也有一个问题,即如果有两个投票人接到了人民的法案,我们现在假设为投票人P1和投票人P2,他们同时接受到两个法案A1,A2,A2的版本要大于A1的版本,这里说明下人大代表会接受法案版本大的那个,如果有下面这种情况 1)投票人P1发送了法案A1,并且人大代表都接受到了这个法案,所以决定开始讨论这个法案,并告诉所有人

2)投票人P2发送了法案A2,人大代表又接收到了法案A2,发现A2的版本大于A1的,所以放弃了A1,开始讨论A2的

3)投票者P1发现自己的法案被放弃了,就又会提出一个新的法案A3,A3的版本大于A2,并会发给人大代表,人大代表又会放弃A2,开始讨论A3

4) 投票者P2发现自己的法案被放弃了,就又会提出一个新的法案A4,A4的版本大于A3,并会发给人大代表,人大代表又会放弃A3,开始讨论A4

这样依次反复,就会出现不断更换法案的问题,所以我们就需要只有一个投票人来发送法案,如果投票人退休了,就再新选一个投票人,下面我们来讲讲ZAB协议

ZAB协议

角色定义:

1,leader,主节点,对应上面的投票人,只能有一个

2,follower,从节点,对应上面的人大代表,必须为多个

685547DB334887C0A05B4A68827CBBC1

这里有一个leader,所以最开始必须有一个选举的过程,确定谁来当leader, 所有节点都向各个节点发送一条消息,表示我要当leader,这里由于节点发送的时间不同,每个命令发给其他节点的时间也不同,每个节点最开始收到请求后,都会同意这个命令,如果某个节点收到半数以上的节点回应“同意”,那么它就会将自己设置为leader,并告诉其它节点,现在我是leader,你们需要听从我的,其它节点就会把自己设置为follower

在zookeeper中,最开始的选举是会将myid最大的节点设为leader节点,之后会根据节点zxid,来确定谁来当下一个leader节点,当确定leader后,具体处理流程如下:

1)收到客户端的一个事务请求,可能是leader或follower,follower收到请求后也会转发给leader

2)leader收到follower的事务请求,开始把事务处理请求发给所有follower,follower收到事务请求开始锁资源,处理,将redo/undo写入日志,如果执行成功告诉leader

3)leader收到半数以上的follower回应成功,则再次发送一个命令给follower,说可以提交事务了

4)follower接受到命令,提交自己的事务,并将结果返给leader

5)leader将结果发给客户端

Raft协议

Raft协议同ZAB协议,只不过ZAB是follower节点向主节点发送心跳,确保主节点是否存活,而Raft是Leader节点向follower节点发送心跳,以下这个动画很生动的讲解了分布式系统下一致性的保证:

http://thesecretlivesofdata.com/raft/

你也可以模拟分布式系统不同场景各节点的情况:

https://raft.github.io/

参考文章:

https://coolshell.cn/articles/10910.html 参考书籍

《从POXOS到Zookeeper分布式原理一致性与实践》

SpringCloud链路追踪

Spring Cloud链路追踪

接着上一篇的文章,今天讲讲spring cloud在分布式系统中的链路跟踪,主要使用的是zipkin框架实现的,上篇文章写道了有一个注册中心Eureka,和两个服务方,一个消费方,我们的消费方也可以做了一个服务,注册到Eureka中,所以我们对消费方也添加EurekaClient和zipkin的maven依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

启动类添加@EurekaClient注解,同样服务方也要添加zipkin的maven依赖

zipkin介绍

Zipkin 是一个开放源代码分布式的跟踪系统,由Twitter公司开源,它致力于收集服务的定时数据,以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现,架构如下:

5ec1521a-26b4-11e7-9679-8c429afdbe0c

每个服务向zipkin报告计时数据,zipkin会根据调用关系通过Zipkin UI生成依赖关系图,显示了多少跟踪请求通过每个服务,该系统让开发者可通过一个 Web 前端轻松的收集和分析数据,例如用户每次请求服务的处理时间等,可方便的监测系统中存在的瓶颈。

Zipkin提供了可插拔数据存储方式:In-Memory、MySql、Cassandra以及Elasticsearch。Zipkin默认是使用http+内存传输和收集,在并发量比较大会影响效率,下面我们我们通过Kafka+ElasticSearch实现服务的传输与收集

创建ZipKin服务

新建一个模块,我们称为zipkinserver,添加下面的依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
   <groupId>io.zipkin.java</groupId>
   <artifactId>zipkin-server</artifactId>
</dependency>
<dependency>
   <groupId>io.zipkin.java</groupId>
   <artifactId>zipkin-autoconfigure-ui</artifactId>
</dependency>

在启动类,添加如下注解:

@SpringBootApplication
@EnableEurekaClient
@EnableZipkinServer
public class ZipkinServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ZipkinServerApplication, args);
    }

}

修改application.yml配置文件,添加kafka收集和ElasticSearch存储,

zipkin:
  storage:
    type: elasticsearch
    elasticsearch:
      hosts: localhost:9300
      index: zipkin

  collector:
    kafka:
      zookeeper: localhost:2181
      topic: zipkin
      groupId: zipkin

然后启动服务,zipkin的默认端口是9494,访问地址:http://localhost:9494

修改服务方和消费方的application.yml,添加zipkin的地址,kafka收集地址

spring: 
  zipkin:
    base-url: http://localhost:9411
    kafka:
      topic: zipkin
  kafka:
    bootstrap-servers: localhost:9092

  sleuth:
    sampler:
      percentage: 1.0

zipkin只有在接口调用后,才会产生数据的调用情况,所以我们先访问消费方的接口,然后再打开zipkin的界面,可以看到dynamic-service和feign的调用关系及耗时情况

31DADE5B71CF7F9EE33D80AE6B097E57 64043E75E8489933DFB3E2FA03A5AF9A

SpringCloud服务注册与发现

Spring Cloud服务注册与发现

Spring Cloud集成了搭建分布式服务一系列框架,如服务注册与发现Eureka,熔断器Hystrix,路由网关Zuul,链路追踪zipkin,今天主要讲解Eureka的使用。

Eureka是什么?

Eureka是Netflix开源的一款提供服务注册和发现的产品,它提供了完整的Service Registry和Service Discovery实现。也是springcloud体系中最重要最核心的组件之一,我们通过下面这样图就可以了解

48EB8D2E311BF36563200EF5B0015EB6

1)服务提供方向Eureka注册自己的服务,

2)消费者向Eureka获取自己需要的服务,和提供方建立连接

3) 如果服务方出现故障,Eureka会自动将服务方从注册列表中删除

搭建项目

创建Eureka服务

首先创建一个Maven项目,指定spring boot,spring cloud 版本

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.1.RELEASE</version>
    <relativePath/>
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Finchley.SR2</spring-cloud.version>
</properties>

创建一个模块,我们称为EurekaServer,使用Eureka只需要引入maven包,然后启动项目就可以了,很方面,如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
    </dependency>
</dependencies>

配置application.yml文件

server:
  port: 8081

eureka:
  instance:
    hostname: localhost
  client:
    registerWithEureka: false
    fetchRegistry: false
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

spring:
  application:
    name: eurka-server

添加注解@EnableEurekaServer,并启动EurekaServer

@SpringBootApplication
@EnableEurekaServer
public class EurakaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurakaServerApplication.class, args);
    }
}

启动EurekaServer,地址为:http://localhost:8081/eureka

创建提供方服务

添加maven依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

创建服务接口

@RestController
public class AirportController {

    @Autowired
    private AirportService airportService;

    @RequestMapping("/getAirport")
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode) {
        return airportService.getAirport(threeCode);
    }

}

@Service
public class AirportService {

    @Value("${server.port}")
    private int port;

    public AirportBean getAirport(String threeCode) {
        AirportBean bean = new AirportBean();
        bean.setName("北京首都国际机场");
        bean.setThreeCode(threeCode);
        bean.setPort(port);
        return bean;
    }

}

public class AirportBean {

    private String threeCode;
    private String name;
    private int port;
}

修改application.yml文件

<code>server:
  port: 8082

spring:
  application:
    name: dynamic-service

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8081/eureka/</code>

添加@EnableEurekaClient注解,这里我们为了方便演示负载均衡,同时也启动了两个实例,端口分别为8082,8083

@SpringBootApplication
@EnableEurekaClient
public class DynamicServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(DynamicServiceApplication.class, args);
    }
}

创建服务消费方

我们再项目下再新建一个模块,称为springcloudclient,添加maven依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

这里我们使用了feign的服务调用方式,Spring cloud有两种服务调用方式,一种是ribbon+restTemplate,另一种是feign,ribbon类似一种rest风格的API调用方式,而feign整合了ribbon,具有负载均衡的能力,通过注解的方式,使代码看起来更加简洁,另外feign整合了Hystrix,具有熔断的能力

调用服务方的接口

@RestController
public class AirportFeignController {

    @Autowired
    private AirportFeignService airportFeignService;

    @RequestMapping(value = "/getAirport",method = RequestMethod.GET)
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode) {
        return airportFeignService.getAirport(threeCode);
    }

}

@FeignClient(value = "dynamic-service", fallback = AirportFeignFallbackService.class)
public interface AirportFeignService {

    @RequestMapping(value = "/getAirport",method = RequestMethod.GET)
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode);

}

// 服务失败后熔断,调用的方法
public class AirportFeignFallbackService implements AirportFeignService {
    @Override
    public AirportBean getAirport(String threeCode) {
        return null;
    }
}

public class AirportBean {
    private String threeCode;
    private String name;
    private int port;
}

配置application.yml文件

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
server:
  port: 8084
spring:
  application:
    name: service-feign

添加@ EnableEurekaClient,@EnableDiscoveryClient, @EnableFeignClients注解,端口为8084,

@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableFeignClients
public class SpringCloudServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudServerApplication.class, args);
    }
}

好了下面可以演示springcloud的服务注册与发现了,通过上面的例子,我们启动了Eureka服务,分别为:8081,同时启动了两个服务提供方,注册到Eureka中,端口分别为8082和8083,接着我们启动了一个服务消费方,端口为8084,我们分别启动他们
打开Eureka的服务页面:http://localhost:8081

55AD7F4965A098E135257B0B04BBF3B6

可以发现有两个服务方已经注册上了,我们调用消费方的接口,发现消费方会使用负载均衡的方式分别访问服务方

 

有道词典

org.springframe …

详细X

  org.springframework.boot   spring-boot-starter-parent   2.1.1.RELEASE      utf – 8   utf – 8   1.8   Finchley.SR2

kafka和zookeeper集群安装

kafka和zookeeper集群安装

上期讲了kafka的作用及应用场景,今天我们来自己搭建一套kafka集群,由于kafka目前的安装包已经自带了zookeeper,所以在搭建zookeeper集群直接使用它内置的即可

准备工作:3台服务器,当然你可以使用自己的虚拟机尝试,如果安装虚拟机,可以查看我的这篇博客
我的三台服务器的IP分别是:192.168.101.116、192.168.101.115、192.168.102.215

安装步骤

下载安装包:

地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0

配置zookeeper

配置kafka的路径、心跳检测时间 初始化连接时follower和leader最长连接时间 follower和leader数据最长同步时间

 dataDir=/data/kafka/zookeeper
 tickTime=2000
 initLimit=5
 syncLimit=2
 server.0=192.168.101.116:2888:3888
 server.1=192.168.101.115:2888:3888
 server.2=192.168.102.215:2888:3888

在每个服务器上注册zookeeper

/data/kafka/zookeeper目录下面touch myid文件
192.168.101.116上执行
echo "0" > /data/kafka/zookeeper/myid
192.168.101.115上执行
echo "1" > /data/kafka/zookeeper/myid
192.168.102.215上执行
echo "2" > /data/kafka/zookeeper/myid

配置kafka

 192.168.101.116上配置
 advertised.listeners=PLAINTEXT://zc1:9092
 broker.id=0

 192.168.101.115上配置
 advertised.listeners=PLAINTEXT://zc2:9092
 broker.id=1

 192.168.102.215上配置
 advertised.listeners=PLAINTEXT://zc3:9092
 broker.id=2

 通用配置
 log.dirs=/data/kafka/kafka-logs
 zookeeper.connect=192.168.101.116:2181,192.168.101.115:2181,192.168.102.215:2181

配置hosts

在每个服务器/etc/hosts中添加如下配置

192.168.101.116 zc1
192.168.101.115 zc2
192.168.102.215 zc3

开放防火墙端口

由于zookeeper和kafka监听了2181、9092、2888、3888,所有我们还要把这些端口添加到防火墙中,编辑/etc/sysconfig/iptables,添加:

-A INPUT -m state --state NEW -m tcp -p tcp --dport 2181 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 2888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 3888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT

启动zookeeper和kafka

先启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

再启动kafka

bin/kafka-server-start.sh config/server.properties &

测试数据写入和读取

使用命令测试

生产者
[root@zc1 kafka_2.11-1.0.0]# bin/kafka-console-producer.sh –broker-list 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic

>hello
>word

消费者
[root@zc2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh –bootstrap-server 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic

hello
word

代码测试

生产者

public class ProducerTest {

    private final static String TOPIC_NAME = "firsttopic";
    private static Producer&lt;String, String&gt; producer;
    private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    public ProducerTest() {
        /** 生产者*/
        InputStream in_proc = this.getClass().getClassLoader().getResourceAsStream("kafka_test_producer.properties");
        Properties prop_proc = new Properties();
        try {
            prop_proc.load(in_proc);
        } catch (IOException e) {
            logger.error("加载kafkacore_test_producer配置文件失败", e);
        }
        producer = new KafkaProducer&lt;String, String&gt;(prop_proc);
    }

    public void execute() {
        while(true) {
            try {
                String key = "CA1234";
                String message = System.currentTimeMillis()+" CA1234,PEK,SHA,2018-02-01";
                this.sendToTestServer(key, message);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                logger.error("任务执行异常",e);
            }

        }
    }

    private void sendToTestServer(String key, String message) {
        logger.info("发送消息:"+message);
        ProducerRecord&lt;String, String&gt; producerRecord = new ProducerRecord&lt;String, String&gt;(TOPIC_NAME, key, message);
        producer.send(producerRecord);
    }

    public static void main(String[] args) {
        new ProducerTest().execute();
    }

}

kafka_test_producer.properties

kafka.topic=firsttopic
group.id=chuanzh
bootstrap.servers=192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092
retries=5
request.timeout.ms=10000
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

消费者

public class ConsumerTest {

    private final static String TOPIC_NAME = "firsttopic";
    private static Consumer&lt;String, String&gt; consumer;
    private final static Logger logger = LoggerFactory.getLogger(DynamicPushTestTask.class);

    public ConsumerTest() {
        /** 消费者*/
        InputStream ksin = this.getClass().getClassLoader().getResourceAsStream("kafka_test_consumer.properties");
        Properties props = new Properties();
        try {
            props.load(ksin);
        } catch (IOException e) {
            logger.error("加载kafkacore_test_consumer配置文件失败", e);
        }
        consumer = new KafkaConsumer&lt;String, String&gt;(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
    }

    public void execute() {
        while(true) {
            try {
                ConsumerRecords&lt;String, String&gt; records = consumer.poll(2);
                logger.info("读取kafka,取到消息数量:" + records.count());
                for (ConsumerRecord&lt;String, String&gt; record : records) {
                    logger.info("value:{}", record.value());
                    logger.info("offset:{}", record.offset());
                }

                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("任务执行异常",e);
            }

        }
    }

    public static void main(String[] args) {
        new ConsumerTest().execute();
    }

}

kafka_test_consumer.properties

kafka.topic=flightcore
group.id=flightPacketPush
bootstrap.servers=43.241.208.208:9092,43.241.228.39:9092,43.241.234.89:9092
metadata.max.age.ms=60000
max.poll.records=50
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 

Kafka使用介绍

Kafka使用介绍

kafka是什么?

kafka是一个分布式消息数据流平台,为处理实时数据提供一个统一、高吞吐、低延迟的平台。其本质是一个提供大规模分布式发布/订阅的消息队列,此外kafka还可以通过kafka connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams–一个Java的流式处理库。

它的2个大的应用: 1,构建实时的流数据管道,确保系统和应用之间数据传输的可靠 2,对应用之间的数据流进行转换和反应。

基本概念

  • Topic kafka将消息分门别类,每一类消息称之为一个主题(Topic) 每个消息(也叫记录recode)是由一个key,一个value和时间戳构成
  • Producer 发布消息的对象称之为主题生产者。
  • Consumer 订阅消息的对象称之为主题消费者
  • Broker 已发布的消息保存在一组服务器中,称之为Kafka集群,集群中每一个服务器都是一个代理(Broker),消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些发布的消息。

kafka的核心API

  • Producer API:允许应用发布一个或多个kafka主题
  • Consumer API:允许应用订阅一个或多个kafka主题
  • Streams API:转换数据流,将一个或多个主题的数据流转换为一个或多个主题的数据流输出
  • Connector API: 允许构建可以重复使用的生产者和消费者,将topic连接到现有的应用或数据系统上,例如,一个建立在数据库上的连接可以捕获表的每一次修改。

kafka的主题(Topic)和日志

topic可理解是一类消息的集合,在kafka中,topic是可以被多个订阅者来消费的 每个topic,kafka包含了一组分区数据,叫partition,如下:

每一个partition是一个有序、不可变的序列记录,每个记录都有一个唯一的序列ID叫offset。 当消息记录被Consumer消费后,这些记录不会被删除,kafka提供了一些配置,比如:按日期、按空间来定时删除记录。 kafka分区有两个目的,一是它便于扩展,不受单个服务器的限制,二是,它可以并行接受和处理多个任务。

分布式

每一个partition日志被分布在kafka的集群服务器上,并且可配置每个parition可重复的份数。 每个partition有一个leader,零个或多个follower,正常情况下leader负责所有的读写请求,而follower负责同步leader上的数据,当leader不可用时,follower会选举一个新的leader替代原来老的leader。

生产者

生产者负责选择一个topic发布消息,同时指定发布到哪一个partition上,最简单的方式是按照partition轮询,也可指定按权重指定。

消费者

消费者有叫一个组(group)的概念,比如多个消费者属于同一个组,那么他们将一起消费这个topic的数据,如下图:

一个kafka集群有两台服务器,4个partition,有两个分组A和B,A有2个消费者,B有4个消费者, 每个partition可以保证数据记录的顺序性,但客户端如果是并行处理,如groupA,C1同时消费P0、P3就可能照成数据顺序错乱的问题,如果要保证数据的一致性,那么顺序处理一个Topic的所有消息,那么就只提供一个分区。

kafka保证

  • 生产者发送到topic的消息会按照他们发送的顺序保存,如果消息M1、M2被同一个producer发送,当M1被先发送,那么它的offset值将会小于M2的
  • 消费者看到的数据也是根据他们保存的顺序
  • 如果一个topic配置了复制因数N,kafka集群将最大允许N-1台服务器同步失败。

Kafka和传统的消息系统之间的区别

  • 结合传统的优点:传统的消息系统分:队列和发布订阅两种模式,队列可以允许多个消费者同时瓜分数据,而发布订阅模式,会将消息通知到每一消费者。kafka结合了这两个模式的优点,当在kafka中,多个消费者的组ID设置为一样时,那么将采用队列的模式,如果组ID不同,则采用发布订阅模式。
  • 更强的顺序性保证:kafka中引入分区功能,一个topic可有多个分区,分区中保证了顺序的一致性,如果启动多个消费者,kafka保证每个消费者只会读取一个分区中的数据,当有多于分区数的消费者,那么这个消费者将一直处于空等待,不会收到任何消息

kafka的存储性能

kafka作为一个消息存储器,他会将消息写入到磁盘,并通过复制镜像,来保证容错。kafka允许所有的写入操作完成后再继续操作。因为kafka中保持了一个指针的方式,在存储50KB和50TB,其性能都是一样的。kafka通过这种指针读取数据,所以数据的大小,不会影响其读写性能。

kafka的流处理

kafka不仅提供了读、写、存储,还提供了对数据流进行处理,比如:一个零售APP,kafka可以从输入topic读取数据,然后使用StreamAPI统计数量,调整价格然后输出到topic中,类似的操作还包括聚合计算、数据流连接等。