Apache Kafka

历史

  • Open sourced in early 2011
  • Graduation from the Apache Incubator occurred on 23 October 2012

安装

下载

1
2
3
4
5
cd ~/download
wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz
cd /usr/local
tar xzvf ~/download/kafka_2.13-3.0.0.tgz
mv kafka_2.13-3.0.0 kafka

启动

1
2
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

后台运行

1
2
3
4
5
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
tail -f logs/zookeeper.out

bin/kafka-server-start.sh -daemon config/server.properties
tail -f logs/server.log

停止

1
2
bin/zookeeper-server-stop.sh
bin/kafka-server-stop.sh

注意

  • zookeeper默认jvm内存512M
  • kafka默认jvm内存1G

配置

编辑 config/server.properties

1
2
listeners=PLAINTEXT://内网ip:9092
advertised.listeners=PLAINTEXT://外网ip:9092

advertised.listeners 必须配置

读写 Topic

创建 topic

1
bin/kafka-topics.sh --create --topic foo --bootstrap-server ip:9092 --partitions 1 --replication-factor 1

写 topic

1
2
3
4
bin/kafka-console-producer.sh --topic foo --bootstrap-server ip:9092

# 使用 producer.properties 中的配置
bin/kafka-console-producer.sh --topic foo --producer.config=config/producer.properties --bootstrap-server ip:9092

读 topic

1
2
3
4
5
6
7
8
# 匿名 group id 从头开始消费
bin/kafka-console-consumer.sh --topic foo --from-beginning --bootstrap-server ip:9092

# 指定 group id
bin/kafka-console-consumer.sh --topic foo --consumer-property group.id=test1 --bootstrap-server ip:9092

# 使用 consumer.properties 中的配置
bin/kafka-console-consumer.sh --topic foo --consumer.config=config/consumer.properties --bootstrap-server ip:9092

查看状态

Topic 列表

1
bin/kafka-topics.sh --bootstrap-server ip:9092 --list

Topic 详情

1
bin/kafka-topics.sh --describe --topic foo --bootstrap-server ip:9092

Consumer 组列表

1
bin/kafka-consumer-groups.sh --list --bootstrap-server ip:9092

Consumer 组详情

1
bin/kafka-consumer-groups.sh --describe --group test --bootstrap-server ip:9092

Consumer 组成员列表

1
bin/kafka-consumer-groups.sh --describe --group test --members --bootstrap-server ip:9092

排错

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

terminate the kafka environment

delete any data of your local Kafka environment including any events you have created along the way

1
rm -rf /tmp/kafka-logs /tmp/zookeeper

账号认证

编辑 server.properties

1
2
3
4
5
6
listeners=SASL_PLAINTEXT://ip:9092
advertised.listeners=SASL_PLAINTEXT://ip:9092

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

编辑 zookeeper.properties

1
2
3
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

编辑 producer.properties

1
2
3
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
compression.type=none

编辑 consumer.properties

1
2
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

编辑 zookeeper_jaas.conf

1
2
3
4
5
6
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="xiongjiaxuan"
user_admin="xiongjiaxuan";
};

编辑 kafka_server_jaas.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="xiongjiaxuan"
user_admin="xiongjiaxuan"
user_cms="cms";
};

Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="xiongjiaxuan";
};

认证方式启动

在zookeeper启动命令前或zookeeper-server-start.sh中添加

1
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/zookeeper_jaas.conf"

在kafka启动命令前或kafka-server-start.sh中添加

1
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"

客户端测试

编辑 kafka_client_jaas.conf

1
2
3
4
5
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="xiongjiaxuan";
};

测试读写

1
2
3
4
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"

bin/kafka-console-producer.sh --topic foo --producer.config=config/producer.properties --bootstrap-server ip:9092
bin/kafka-console-consumer.sh --topic foo --consumer.config=config/consumer.properties --bootstrap-server ip:9092

Topic 列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
编辑 config/config.properties

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

bin/kafka-topics.sh --list --bootstrap-server 8.210.141.55:9092 --command-config config/config.properties
bin/kafka-topics.sh --describe --topic cms-site --bootstrap-server 8.210.141.55:9092 --command-config config/config.properties

bin/kafka-consumer-groups.sh --list --bootstrap-server 8.210.141.55:9092 --command-config config/config.properties
bin/kafka-consumer-groups.sh --describe --group cms-article-consumer-dev --bootstrap-server 8.210.141.55:9092 --command-config config/config.properties
bin/kafka-consumer-groups.sh --describe --group cms-article-consumer-dev --members --bootstrap-server 8.210.141.55:9092 --command-config config/config.properties

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 8.210.141.55:9092 --topic foo --command-config config/config.properties

高级应用

Optimize

Kafka Rebalancing

Java 集成 Producer 端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

启用事物

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();

参考

Java 集成 Consumer 端

自动提交 Offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

手动提交 Offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}

参考

参考