kafka和zookeeper集群安装
上期讲了kafka的作用及应用场景,今天我们来自己搭建一套kafka集群,由于kafka目前的安装包已经自带了zookeeper,所以在搭建zookeeper集群直接使用它内置的即可
准备工作:3台服务器,当然你可以使用自己的虚拟机尝试,如果安装虚拟机,可以查看我的这篇博客。
我的三台服务器的IP分别是:192.168.101.116、192.168.101.115、192.168.102.215
安装步骤
下载安装包:
地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xzf kafka_2.11-1.0.0.tgz cd kafka_2.11-1.0.0
配置zookeeper
配置kafka的路径、心跳检测时间 初始化连接时follower和leader最长连接时间 follower和leader数据最长同步时间
dataDir=/data/kafka/zookeeper tickTime=2000 initLimit=5 syncLimit=2 server.0=192.168.101.116:2888:3888 server.1=192.168.101.115:2888:3888 server.2=192.168.102.215:2888:3888
在每个服务器上注册zookeeper
/data/kafka/zookeeper目录下面touch myid文件 192.168.101.116上执行 echo "0" > /data/kafka/zookeeper/myid 192.168.101.115上执行 echo "1" > /data/kafka/zookeeper/myid 192.168.102.215上执行 echo "2" > /data/kafka/zookeeper/myid
配置kafka
192.168.101.116上配置 advertised.listeners=PLAINTEXT://zc1:9092 broker.id=0 192.168.101.115上配置 advertised.listeners=PLAINTEXT://zc2:9092 broker.id=1 192.168.102.215上配置 advertised.listeners=PLAINTEXT://zc3:9092 broker.id=2 通用配置 log.dirs=/data/kafka/kafka-logs zookeeper.connect=192.168.101.116:2181,192.168.101.115:2181,192.168.102.215:2181
配置hosts
在每个服务器/etc/hosts中添加如下配置
192.168.101.116 zc1 192.168.101.115 zc2 192.168.102.215 zc3
开放防火墙端口
由于zookeeper和kafka监听了2181、9092、2888、3888,所有我们还要把这些端口添加到防火墙中,编辑/etc/sysconfig/iptables,添加:
-A INPUT -m state --state NEW -m tcp -p tcp --dport 2181 -j ACCEPT -A INPUT -m state --state NEW -m tcp -p tcp --dport 2888 -j ACCEPT -A INPUT -m state --state NEW -m tcp -p tcp --dport 3888 -j ACCEPT -A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
启动zookeeper和kafka
先启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
再启动kafka
bin/kafka-server-start.sh config/server.properties &
测试数据写入和读取
使用命令测试
生产者
[root@zc1 kafka_2.11-1.0.0]# bin/kafka-console-producer.sh –broker-list 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic
>hello
>word
消费者
[root@zc2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh –bootstrap-server 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic
hello
word
代码测试
生产者
public class ProducerTest { private final static String TOPIC_NAME = "firsttopic"; private static Producer<String, String> producer; private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class); public ProducerTest() { /** 生产者*/ InputStream in_proc = this.getClass().getClassLoader().getResourceAsStream("kafka_test_producer.properties"); Properties prop_proc = new Properties(); try { prop_proc.load(in_proc); } catch (IOException e) { logger.error("加载kafkacore_test_producer配置文件失败", e); } producer = new KafkaProducer<String, String>(prop_proc); } public void execute() { while(true) { try { String key = "CA1234"; String message = System.currentTimeMillis()+" CA1234,PEK,SHA,2018-02-01"; this.sendToTestServer(key, message); Thread.sleep(500); } catch (InterruptedException e) { logger.error("任务执行异常",e); } } } private void sendToTestServer(String key, String message) { logger.info("发送消息:"+message); ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, key, message); producer.send(producerRecord); } public static void main(String[] args) { new ProducerTest().execute(); } }
kafka_test_producer.properties
kafka.topic=firsttopic group.id=chuanzh bootstrap.servers=192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 retries=5 request.timeout.ms=10000 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
消费者
public class ConsumerTest { private final static String TOPIC_NAME = "firsttopic"; private static Consumer<String, String> consumer; private final static Logger logger = LoggerFactory.getLogger(DynamicPushTestTask.class); public ConsumerTest() { /** 消费者*/ InputStream ksin = this.getClass().getClassLoader().getResourceAsStream("kafka_test_consumer.properties"); Properties props = new Properties(); try { props.load(ksin); } catch (IOException e) { logger.error("加载kafkacore_test_consumer配置文件失败", e); } consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(TOPIC_NAME)); } public void execute() { while(true) { try { ConsumerRecords<String, String> records = consumer.poll(2); logger.info("读取kafka,取到消息数量:" + records.count()); for (ConsumerRecord<String, String> record : records) { logger.info("value:{}", record.value()); logger.info("offset:{}", record.offset()); } Thread.sleep(5000); } catch (InterruptedException e) { logger.error("任务执行异常",e); } } } public static void main(String[] args) { new ConsumerTest().execute(); } }
kafka_test_consumer.properties
kafka.topic=flightcore group.id=flightPacketPush bootstrap.servers=43.241.208.208:9092,43.241.228.39:9092,43.241.234.89:9092 metadata.max.age.ms=60000 max.poll.records=50 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer