历史
Open sourced in early 2011
Graduation from the Apache Incubator occurred on 23 October 2012
安装 下载
1 2 3 4 5 cd ~/download wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz cd /usr/local tar xzvf ~/download/kafka_2.13-3.0.0.tgz mv kafka_2.13-3.0.0 kafka
启动
1 2 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
后台运行
1 2 3 4 5 bin/zookeeper-server-start.sh -daemon config/zookeeper.properties tail -f logs/zookeeper.out bin/kafka-server-start.sh -daemon config/server.properties tail -f logs/server.log
停止
1 2 bin/zookeeper-server-stop.sh bin/kafka-server-stop.sh
注意
zookeeper默认jvm内存512M
kafka默认jvm内存1G
配置 编辑 config/server.properties
1 2 listeners=PLAINTEXT://内网ip:9092 advertised.listeners=PLAINTEXT://外网ip:9092
advertised.listeners 必须配置
读写 Topic 创建 topic
1 bin/kafka-topics.sh --create --topic foo --bootstrap-server ip:9092 --partitions 1 --replication-factor 1
写 topic
1 2 3 4 bin/kafka-console-producer.sh --topic foo --bootstrap-server ip:9092 # 使用 producer.properties 中的配置 bin/kafka-console-producer.sh --topic foo --producer.config=config/producer.properties --bootstrap-server ip:9092
读 topic
1 2 3 4 5 6 7 8 # 匿名 group id 从头开始消费 bin/kafka-console-consumer.sh --topic foo --from-beginning --bootstrap-server ip:9092 # 指定 group id bin/kafka-console-consumer.sh --topic foo --consumer-property group.id=test1 --bootstrap-server ip:9092 # 使用 consumer.properties 中的配置 bin/kafka-console-consumer.sh --topic foo --consumer.config=config/consumer.properties --bootstrap-server ip:9092
查看状态 Topic 列表
1 bin/kafka-topics.sh --bootstrap-server ip:9092 --list
Topic 详情
1 bin/kafka-topics.sh --describe --topic foo --bootstrap-server ip:9092
Consumer 组列表
1 bin/kafka-consumer-groups.sh --list --bootstrap-server ip:9092
Consumer 组详情
1 bin/kafka-consumer-groups.sh --describe --group test --bootstrap-server ip:9092
Consumer 组成员列表
1 bin/kafka-consumer-groups.sh --describe --group test --members --bootstrap-server ip:9092
排错 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
terminate the kafka environment
delete any data of your local Kafka environment including any events you have created along the way
1 rm -rf /tmp/kafka-logs /tmp/zookeeper
账号认证 编辑 server.properties
1 2 3 4 5 6 listeners=SASL_PLAINTEXT://ip:9092 advertised.listeners=SASL_PLAINTEXT://ip:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN
编辑 zookeeper.properties
1 2 3 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
编辑 producer.properties
1 2 3 security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN compression.type=none
编辑 consumer.properties
1 2 security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
编辑 zookeeper_jaas.conf
1 2 3 4 5 6 Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="xiongjiaxuan" user_admin="xiongjiaxuan"; };
编辑 kafka_server_jaas.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="xiongjiaxuan" user_admin="xiongjiaxuan" user_cms="cms"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="xiongjiaxuan"; };
认证方式启动
在zookeeper启动命令前或zookeeper-server-start.sh
中添加
1 export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/zookeeper_jaas.conf"
在kafka启动命令前或kafka-server-start.sh
中添加
1 export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
客户端测试
编辑 kafka_client_jaas.conf
1 2 3 4 5 KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="xiongjiaxuan"; };
测试读写
1 2 3 4 export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf" bin/kafka-console-producer.sh --topic foo --producer.config=config/producer.properties --bootstrap-server ip:9092 bin/kafka-console-consumer.sh --topic foo --consumer.config=config/consumer.properties --bootstrap-server ip:9092
Topic 列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 编辑 config/config.properties sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD"; security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN bin/kafka-topics.sh --list --bootstrap-server 8.210.141.55:9092 --command-config config/config.properties bin/kafka-topics.sh --describe --topic cms-site --bootstrap-server 8.210.141.55:9092 --command-config config/config.properties bin/kafka-consumer-groups.sh --list --bootstrap-server 8.210.141.55:9092 --command-config config/config.properties bin/kafka-consumer-groups.sh --describe --group cms-article-consumer-dev --bootstrap-server 8.210.141.55:9092 --command-config config/config.properties bin/kafka-consumer-groups.sh --describe --group cms-article-consumer-dev --members --bootstrap-server 8.210.141.55:9092 --command-config config/config.properties bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 8.210.141.55:9092 --topic foo --command-config config/config.properties
高级应用 Optimize
Kafka Rebalancing
Java 集成 Producer 端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
启用事物
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close();
参考
Java 集成 Consumer 端 自动提交 Offset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
手动提交 Offset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }
参考
参考