Kafka CLI 工具快速指南 | AI生成和翻译

Home 2026.02

Kafka 提供了一些命令行工具用于管理和与集群交互。这是一份关于主要工具及其选项的全面指南。

Core Tools

kafka-topics.sh

管理 topic 的创建、删除和配置。

常见操作:

创建一个 topic:

kafka-topics.sh --bootstrap-server localhost:9092 \
  --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 2 \
  --config retention.ms=86400000

列出所有 topic:

kafka-topics.sh --bootstrap-server localhost:9092 --list

查看 topic 详情:

kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe \
  --topic my-topic

删除一个 topic:

kafka-topics.sh --bootstrap-server localhost:9092 \
  --delete \
  --topic my-topic

修改 partitions 数量:

kafka-topics.sh --bootstrap-server localhost:9092 \
  --alter \
  --topic my-topic \
  --partitions 5

关键选项:

kafka-console-producer.sh

从命令行发送消息。

基本用法:

kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic my-topic

携带 key-value 对:

kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic my-topic \
  --property "parse.key=true" \
  --property "key.separator=:"

从文件输入:

kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic my-topic < input.txt

关键选项:

kafka-console-consumer.sh

从 topic 消费消息。

基本用法:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic my-topic \
  --from-beginning

使用 consumer group:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic my-topic \
  --group my-consumer-group

显示 key 和时间戳:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic my-topic \
  --property print.key=true \
  --property print.timestamp=true \
  --property key.separator="-"

关键选项:

kafka-consumer-groups.sh

管理 consumer groups 和 offsets。

列出所有 consumer group:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看 group 详情:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group \
  --describe

重置 offsets:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group \
  --topic my-topic \
  --reset-offsets \
  --to-earliest \
  --execute

重置 offset 选项:

删除 consumer group:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group \
  --delete

kafka-configs.sh

管理 topic、broker 和 client 的动态配置。

添加/修改 topic 配置:

kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --alter \
  --add-config retention.ms=172800000

查看 topic 配置:

kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --describe

删除配置:

kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --alter \
  --delete-config retention.ms

实体类型:

kafka-log-dirs.sh

查看日志目录信息。

kafka-log-dirs.sh --bootstrap-server localhost:9092 \
  --describe \
  --broker-list 0,1,2 \
  --topic-list my-topic

kafka-reassign-partitions.sh

在 broker 之间重新分配 partition。

生成重分配计划:

kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "0,1,2" \
  --generate

执行重分配:

kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassignment.json \
  --execute

验证重分配:

kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassignment.json \
  --verify

kafka-preferred-replica-election.sh

触发首选副本(preferred replica)的 leader 选举。

kafka-preferred-replica-election.sh --bootstrap-server localhost:9092 \
  --all-topic-partitions

kafka-broker-api-versions.sh

检查 broker 支持的 API 版本。

kafka-broker-api-versions.sh --bootstrap-server localhost:9092

kafka-dump-log.sh

打印日志段(log segment)内容。

kafka-dump-log.sh --files /var/kafka-logs/my-topic-0/00000000000000000000.log \
  --print-data-log

选项:

kafka-verifiable-producer.sh / kafka-verifiable-consumer.sh

用于测试和验证,发送/接收带编号的消息。

kafka-verifiable-producer.sh --bootstrap-server localhost:9092 \
  --topic test-topic \
  --max-messages 100
kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 \
  --topic test-topic \
  --group-id test-group

Common Patterns

检查集群健康状况:

kafka-broker-api-versions.sh --bootstrap-server localhost:9092

监控 consumer 积压(lag):

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group \
  --describe

在 topic 之间流转消息:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic source-topic \
  --from-beginning | \
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic dest-topic

计算 topic 中的消息数量:

kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic my-topic \
  --time -1

所有这些工具都位于 Kafka 安装目录的 bin/ 目录下,并有对应的 Windows .bat 文件。大多数工具支持 --help 以获取详细的选项信息。


Back Donate