tjtjtjのメモ

自分のためのメモです

kafka コンシューマグループ

コンシューマグループを試した。

あるトピックに複数のコンシューマを接続すると、各コンシューマーが同じメッセージを受信する。 コンシューマグループを使うと同一グループのコンシューマは、同じメッセージを受信しない。同一グループのコンシューマは別パーティションを購読しようとするため。

トピックのパーティション確認

# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic partitioned-topic
Topic:partitioned-topic PartitionCount:3        ReplicationFactor:1     Configs:
        Topic: partitioned-topic        Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: partitioned-topic        Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: partitioned-topic        Partition: 2    Leader: 3       Replicas: 3     Isr: 3

コンシューマ1コ目

# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.21:9092 --topic partitioned-topic --group g1

コンシューマグループ確認。89a2a がパーティション0,1,2 を購読している。

# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.21:9092 --group g1 --describe

TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
partitioned-topic 0          17              17              0               consumer-1-89a2acb0-b36c-4e92-87b3-abc2a0d27ebe /192.168.0.21   consumer-1
partitioned-topic 1          14              14              0               consumer-1-89a2acb0-b36c-4e92-87b3-abc2a0d27ebe /192.168.0.21   consumer-1
partitioned-topic 2          19              19              0               consumer-1-89a2acb0-b36c-4e92-87b3-abc2a0d27ebe /192.168.0.21   consumer-1

コンシューマ2コ目

# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.22:9092 --topic partitioned-topic --group g1

コンシューマグループ確認。89a2a がパーティション0,1、ecadがパーティション2を購読している。

# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.21:9092 --group g1 --describe

TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
partitioned-topic 0          17              17              0               consumer-1-89a2acb0-b36c-4e92-87b3-abc2a0d27ebe /192.168.0.21   consumer-1
partitioned-topic 1          14              14              0               consumer-1-89a2acb0-b36c-4e92-87b3-abc2a0d27ebe /192.168.0.21   consumer-1
partitioned-topic 2          19              19              0               consumer-1-ecad08be-1624-4a48-be03-980b3d5b8ee0 /192.168.0.21   consumer-1

メッセージ送信

この状態で6コのメッセージ送信する。メッセージがラウンドロビンパーティション配送されるなら 89a2が4つ、ecadが2つ受信するだろう。

# bin/kafka-console-producer.sh --broker-list 192.168.0.21:9092 --topic partitioned-topic
>a
>b
>c
>d
>e
>f

コンシューマ1

# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.21:9092 --topic partitioned-topic --group g1
b
c
e
f

コンシューマ2。想定通りの結果

# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.22:9092 --topic partitioned-topic --group g1
a
d

パーティション毎のメッセージ確認

# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.21:9092 --topic partitioned-topic --from-beginning --partition 0
b
e
^CProcessed a total of 2 messages
# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.21:9092 --topic partitioned-topic --from-beginning --partition 1
c
f
^CProcessed a total of 2 messages
# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.21:9092 --topic partitioned-topic --from-beginning --partition 2
a
d
^CProcessed a total of 2 messages