kafka和zookeeper集群安装
上期讲了kafka的作用及应用场景,今天我们来自己搭建一套kafka集群,由于kafka目前的安装包已经自带了zookeeper,所以在搭建zookeeper集群直接使用它内置的即可
准备工作:3台服务器,当然你可以使用自己的虚拟机尝试,如果安装虚拟机,可以查看我的这篇博客。
我的三台服务器的IP分别是:192.168.101.116、192.168.101.115、192.168.102.215
安装步骤
下载安装包:
地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xzf kafka_2.11-1.0.0.tgz cd kafka_2.11-1.0.0
配置zookeeper
配置kafka的路径、心跳检测时间 初始化连接时follower和leader最长连接时间 follower和leader数据最长同步时间
dataDir=/data/kafka/zookeeper tickTime=2000 initLimit=5 syncLimit=2 server.0=192.168.101.116:2888:3888 server.1=192.168.101.115:2888:3888 server.2=192.168.102.215:2888:3888
在每个服务器上注册zookeeper
/data/kafka/zookeeper目录下面touch myid文件 192.168.101.116上执行 echo "0" > /data/kafka/zookeeper/myid 192.168.101.115上执行 echo "1" > /data/kafka/zookeeper/myid 192.168.102.215上执行 echo "2" > /data/kafka/zookeeper/myid
配置kafka
192.168.101.116上配置 advertised.listeners=PLAINTEXT://zc1:9092 broker.id=0 192.168.101.115上配置 advertised.listeners=PLAINTEXT://zc2:9092 broker.id=1 192.168.102.215上配置 advertised.listeners=PLAINTEXT://zc3:9092 broker.id=2 通用配置 log.dirs=/data/kafka/kafka-logs zookeeper.connect=192.168.101.116:2181,192.168.101.115:2181,192.168.102.215:2181
配置hosts
在每个服务器/etc/hosts中添加如下配置
192.168.101.116 zc1 192.168.101.115 zc2 192.168.102.215 zc3
开放防火墙端口
由于zookeeper和kafka监听了2181、9092、2888、3888,所有我们还要把这些端口添加到防火墙中,编辑/etc/sysconfig/iptables,添加:
-A INPUT -m state --state NEW -m tcp -p tcp --dport 2181 -j ACCEPT -A INPUT -m state --state NEW -m tcp -p tcp --dport 2888 -j ACCEPT -A INPUT -m state --state NEW -m tcp -p tcp --dport 3888 -j ACCEPT -A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
启动zookeeper和kafka
先启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
再启动kafka
bin/kafka-server-start.sh config/server.properties &
测试数据写入和读取
使用命令测试
生产者
[root@zc1 kafka_2.11-1.0.0]# bin/kafka-console-producer.sh –broker-list 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic
>hello
>word
消费者
[root@zc2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh –bootstrap-server 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic
hello
word
代码测试
生产者
public class ProducerTest {
private final static String TOPIC_NAME = "firsttopic";
private static Producer<String, String> producer;
private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class);
public ProducerTest() {
/** 生产者*/
InputStream in_proc = this.getClass().getClassLoader().getResourceAsStream("kafka_test_producer.properties");
Properties prop_proc = new Properties();
try {
prop_proc.load(in_proc);
} catch (IOException e) {
logger.error("加载kafkacore_test_producer配置文件失败", e);
}
producer = new KafkaProducer<String, String>(prop_proc);
}
public void execute() {
while(true) {
try {
String key = "CA1234";
String message = System.currentTimeMillis()+" CA1234,PEK,SHA,2018-02-01";
this.sendToTestServer(key, message);
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("任务执行异常",e);
}
}
}
private void sendToTestServer(String key, String message) {
logger.info("发送消息:"+message);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, key, message);
producer.send(producerRecord);
}
public static void main(String[] args) {
new ProducerTest().execute();
}
}
kafka_test_producer.properties
kafka.topic=firsttopic group.id=chuanzh bootstrap.servers=192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 retries=5 request.timeout.ms=10000 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
消费者
public class ConsumerTest {
private final static String TOPIC_NAME = "firsttopic";
private static Consumer<String, String> consumer;
private final static Logger logger = LoggerFactory.getLogger(DynamicPushTestTask.class);
public ConsumerTest() {
/** 消费者*/
InputStream ksin = this.getClass().getClassLoader().getResourceAsStream("kafka_test_consumer.properties");
Properties props = new Properties();
try {
props.load(ksin);
} catch (IOException e) {
logger.error("加载kafkacore_test_consumer配置文件失败", e);
}
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
}
public void execute() {
while(true) {
try {
ConsumerRecords<String, String> records = consumer.poll(2);
logger.info("读取kafka,取到消息数量:" + records.count());
for (ConsumerRecord<String, String> record : records) {
logger.info("value:{}", record.value());
logger.info("offset:{}", record.offset());
}
Thread.sleep(5000);
} catch (InterruptedException e) {
logger.error("任务执行异常",e);
}
}
}
public static void main(String[] args) {
new ConsumerTest().execute();
}
}
kafka_test_consumer.properties
kafka.topic=flightcore group.id=flightPacketPush bootstrap.servers=43.241.208.208:9092,43.241.228.39:9092,43.241.234.89:9092 metadata.max.age.ms=60000 max.poll.records=50 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer