tjtjtjのメモ

自分のためのメモです

kafka quickstart

https://kafka.apache.org/quickstart

Step 1: Download the code

> wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
> tar -xzf kafka_2.12-2.2.0.tgz
> cd kafka_2.12-2.2.0

Step 2: Start the server

zookeeper を起動

$ bin/zookeeper-server-start.sh config/zookeeper.properties

kafka を起動

$ bin/kafka-server-start.sh config/server.properties

Step 3: Create a topic

トピックを作成前の確認。なにもない。

$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092

トピック:test を作成と確認

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

Step 4: Send some messages

プロデューサーでテキトーなメッセージを入力

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> msg1
> msg2

Step 5: Start a consumer

コンシューマにテキトーがメッセージが表示される

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
msg1
msg2

コンシューマーを停止->起動するともう一度メッセージが表示された。これは --from-beginning がついているためか

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
msg1
msg2

--from-beginning なしコンシューマーを起動し、プロデューサーでテキトーなメッセージを入力すると、続きが表示された。

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
msg3
msg4

トピックtest の詳細を確認

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

Step 6: Setting up a multi-broker cluster

プロパティファイルを複製して

$ cp config/server.properties config/server-1.properties
$ cp config/server.properties config/server-2.properties

編集。1vm上に3コkafkaを立ち上げるため、バッティングしないように調整。

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

zookeeperと1コ目のkafkaを起動したまま、2コ目,3コ目の kafka を起動。

$ bin/kafka-server-start.sh config/server-1.properties
$ bin/kafka-server-start.sh config/server-2.properties

トピック:my-replicated-topic を作成。レプリケーションパーティションを指定している。

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

トピックの確認。「__consumer_offsets」とは?

$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
test

トピック:my-replicated-topic を確認。リーダーがnode2になってるみたい。

「リーダー」は、指定されたパーティションのすべての読み取りおよび書き込みを担当するノードです。各ノードは、パーティションのランダムに選択された部分のリーダーになります。 「レプリカ」は、それらがリーダーであるかどうかにかかわらず、このパーティションのログを複製するノードのリストです。 「isr」は「同期」レプリカのセットです。これは現在生きているリーダーのリストに追いついているレプリカリストのサブセットです。

$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:segment.bytes=1073741824
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0

$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:segment.bytes=1073741824
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

zookeeper にも聞いてみた

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0

node:0 のトピック一覧

# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
my-replicated-topic
test

node:1 のトピック一覧

# bin/kafka-topics.sh --list --bootstrap-server localhost:9093
__consumer_offsets
my-replicated-topic
test

node:2 のトピック一覧。すべてにtestがいるということは、これはノード毎のトピック一覧ではない?kafkaのコンセプトを理解するとわかるかもしれない。__consumer_offsets についても。

# bin/kafka-topics.sh --list --bootstrap-server localhost:9094
__consumer_offsets
my-replicated-topic
test

いよいよクラスタにメッセージ登録

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>meeeeeeeeeeeeeeeeeeeeeeesg1
>meeeeeeeeeeeeeeeeeeeeeeesg2

コンシューム

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
meeeeeeeeeeeeeeeeeeeeeeesg1
meeeeeeeeeeeeeeeeeeeeeeesg2

リーダーnode:2 をkill して、トピック確認。node:1がリーダーになっている。

$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3    Configs:segment.bytes=1073741824
        Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 2,1,0 Isr: 1,0

続けてメッセージ登録すると、コンシュームしている。

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
meeeeeeeeeeeeeeeeeeeeeeesg1
meeeeeeeeeeeeeeeeeeeeeeesg2
[2019-05-08 20:44:24,061] WARN [Consumer clientId=consumer-1, groupId=console-consumer-11148] Connection to node 2 (sw1-01/163.43.114.218:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
meeeeeeeeeeeeeeeeeeeeeeesg3