Kafka分区分配策略
我们知道每个Topic会分配为很多partitions,Producers会将数据分配到每个partitions中,然后消费者Consumers从partitions中获取数据消费,那么Producers是如何将数据分到partitions中?Consumers又怎么知道从哪个partitions中消费数据?
生产者往Topic写数据
我们从product.send方法入手,看看里面的具体实现,可以看到在调用send方法时,其内部是调用了doSend方法,在doSend方法中有一个获取partitions的方法
int partition = partition(record, serializedKey, serializedValue, cluster); private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
从上面代码中,首先先选择配置的分区,如果没有配置则使用默认的分区,即使用了DefaultPartitioner中的partition方法
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // 没有可用的分区,则给一个不可用分区 return Utils.toPositive(nextValue) % numPartitions; } } else { // 根据key的hash值和分区数取模 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
上面代码中先会根据topic获取所有的分区
1,如果key为null,则通过先产生随机数,之后在该数上自增的方式产生一个数nextValue,如果存在可用分区,将nextValue转为正数之后对可用分区进行取模操作,如果不存在可用分区,则将nextValue对总分区数进行取模操作
2,如果key不为空,就先获取key的hash值,然后和分区数进行取模操作
消费者从Topic读数据
kafka默认对消费分区指定了两种策略,分别为Range策略(org.apache.kafka.clients.consumer.RangeAssignor)和RoundRobin策略(org.apache.kafka.clients.consumer.RoundRobinAssignor),它们都实现了PartitionAssignor接口
Range策略
比如有10个分区,分别为P1、P2、P3、P4、P5、P6、P7、P8、P9、P10,三个消费者C1、C2、C3,消费如下图:
我们来看看源代码:
@Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { // 得到topic和订阅的消费者集合信息,例如{t1:[c1,c2,c3], t2:[c1,c2,c3,c4]} Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); Map<String, List<TopicPartition>> assignment = new HashMap<>(); // 将consumersPerTopic信息转换为assignment,memberId就是消费者client.id+uuid(kafka在client.id上追加的) for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); // 遍历每个Topic,获取所有的订阅消费者 for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List<String> consumersForTopic = topicEntry.getValue(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); // 如果Topic没有分区,则调过 if (numPartitionsForTopic == null) continue; // 将Topic的订阅者根据字典排序 Collections.sort(consumersForTopic); // 总分区数/订阅者的数量 得到每个订阅者应该分配分区数 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); // 无法整除的剩余分区数量 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); //遍历所有的消费者 for (int i = 0, n = consumersForTopic.size(); i < n; i++) { //分配到的分区的开始位置 int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); // 分配到的分区数量(整除分配到的分区数量,加上1个无法整除分配到的分区--如果有资格分配到这个分区的话。判断是否有资格分配到这个分区:如果整除后余数为m,那么排序后的消费者集合中前m个消费者都能分配到一个额外的分区) int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); //给消费者分配分区 assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; }
上面的代码添加了注释很清楚的展现了range的实现,对应上面的例子,如果有4个消费者C1、C2、C3、C4,那么根据上面的算法:
C1 -> [P1,P2,P3] ,C2 -> [P4,P5,P6] ,C3 -> [P7,P8] C4 -> [P9,P10] 。取余多出来的两个分区,由最前n个消费者来消费
RoundRobin策略
将主题的所有分区依次分配给消费者,比如有两个Topic:T1[P1,P2,P3,P4],T2[P5,P6,P7,P8,P9,P10],若C1、C2订阅了T1,C2、C3订阅了T2,那么C1将消费T1[P1,P3],C2将消费T1[P2,P4,P6,P8,P10],C3将消费T2[P5,P7,P9],如下图:
@Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); // 将消费集合先按字典排序,构建成一个环形迭代器 CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet())); // 按Topic的名称排序,得到Topic下的所有分区 for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { final String topic = partition.topic(); while (!subscriptions.get(assigner.peek()).topics().contains(topic)) assigner.next(); // 给消费者分配分区,并轮询到下一个消费者 assignment.get(assigner.next()).add(partition); } return assignment; } /** * 根据消费者得到订阅的Topic下的所有分区 * Topic按名称字典排序 */ public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { SortedSet<String> topics = new TreeSet<>(); for (Subscription subscription : subscriptions.values()) topics.addAll(subscription.topics()); List<TopicPartition> allPartitions = new ArrayList<>(); for (String topic : topics) { Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic != null) allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic)); } return allPartitions; }