kafka quickstart
https://kafka.apache.org/quickstart
Step 1: Download the code
> wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz > tar -xzf kafka_2.12-2.2.0.tgz > cd kafka_2.12-2.2.0
Step 2: Start the server
zookeeper を起動
$ bin/zookeeper-server-start.sh config/zookeeper.properties
kafka を起動
$ bin/kafka-server-start.sh config/server.properties
Step 3: Create a topic
トピックを作成前の確認。なにもない。
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
トピック:test を作成と確認
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test $ bin/kafka-topics.sh --list --bootstrap-server localhost:9092 test
Step 4: Send some messages
プロデューサーでテキトーなメッセージを入力
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > msg1 > msg2
Step 5: Start a consumer
コンシューマにテキトーがメッセージが表示される
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning msg1 msg2
コンシューマーを停止->起動するともう一度メッセージが表示された。これは --from-beginning がついているためか
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning msg1 msg2
--from-beginning なしコンシューマーを起動し、プロデューサーでテキトーなメッセージを入力すると、続きが表示された。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test msg3 msg4
トピックtest の詳細を確認
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Step 6: Setting up a multi-broker cluster
プロパティファイルを複製して
$ cp config/server.properties config/server-1.properties $ cp config/server.properties config/server-2.properties
編集。1vm上に3コkafkaを立ち上げるため、バッティングしないように調整。
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-2
zookeeperと1コ目のkafkaを起動したまま、2コ目,3コ目の kafka を起動。
$ bin/kafka-server-start.sh config/server-1.properties
$ bin/kafka-server-start.sh config/server-2.properties
トピック:my-replicated-topic を作成。レプリケーション、パーティションを指定している。
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
トピックの確認。「__consumer_offsets」とは?
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092 __consumer_offsets test
トピック:my-replicated-topic を確認。リーダーがnode2になってるみたい。
「リーダー」は、指定されたパーティションのすべての読み取りおよび書き込みを担当するノードです。各ノードは、パーティションのランダムに選択された部分のリーダーになります。 「レプリカ」は、それらがリーダーであるかどうかにかかわらず、このパーティションのログを複製するノードのリストです。 「isr」は「同期」レプリカのセットです。これは現在生きているリーダーのリストに追いついているレプリカリストのサブセットです。
$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824 Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824 Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
zookeeper にも聞いてみた
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
node:0 のトピック一覧
# bin/kafka-topics.sh --list --bootstrap-server localhost:9092 __consumer_offsets my-replicated-topic test
node:1 のトピック一覧
# bin/kafka-topics.sh --list --bootstrap-server localhost:9093 __consumer_offsets my-replicated-topic test
node:2 のトピック一覧。すべてにtestがいるということは、これはノード毎のトピック一覧ではない?kafkaのコンセプトを理解するとわかるかもしれない。__consumer_offsets についても。
# bin/kafka-topics.sh --list --bootstrap-server localhost:9094 __consumer_offsets my-replicated-topic test
いよいよクラスタにメッセージ登録
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic >meeeeeeeeeeeeeeeeeeeeeeesg1 >meeeeeeeeeeeeeeeeeeeeeeesg2
コンシューム
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic meeeeeeeeeeeeeeeeeeeeeeesg1 meeeeeeeeeeeeeeeeeeeeeeesg2
リーダーnode:2 をkill して、トピック確認。node:1がリーダーになっている。
$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824 Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0
続けてメッセージ登録すると、コンシュームしている。
# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic meeeeeeeeeeeeeeeeeeeeeeesg1 meeeeeeeeeeeeeeeeeeeeeeesg2 [2019-05-08 20:44:24,061] WARN [Consumer clientId=consumer-1, groupId=console-consumer-11148] Connection to node 2 (sw1-01/163.43.114.218:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) meeeeeeeeeeeeeeeeeeeeeeesg3