tjtjtjのメモ

自分のためのメモです

kafka connect してみる

kafka connect の FileStreamSource を試した。

properties

これから使う config 内の propertiesファイルを確認する。トピックは connect-test。 入口が test.txt。出口がtest.sink.txt。オフセットを /tmp/connect.offsets に記録するんだろう。

connect-standalone.properties

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

file-source

トピック確認。まだない

cd C:\opt\kafka2
bin\windows\kafka-topics.bat --list  --zookeeper=localhost:2181

スタンドアロンコネクタ起動。ファイルがないため warn。sleeping が出続ける。

bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-source.properties
↓↓↓
WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask)

トピック確認。まだできていない

bin\windows\kafka-topics.bat --list  --zookeeper=localhost:2181

テキストファイル生成。「sleeping」が止まる

echo asdf>test.txt
echo qwer>>test.txt
echo zxcv>>test.txt

トピック確認。できている。 /tmp/connect.offsets もできていた。

bin\windows\kafka-topics.bat --list  --zookeeper=localhost:2181
↓↓↓
connect-test

コンシューマー起動

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic connect-test --from-beginning
↓↓↓
{"schema":{"type":"string","optional":false},"payload":"asdf"}
{"schema":{"type":"string","optional":false},"payload":"qwer"}
{"schema":{"type":"string","optional":false},"payload":"zxcv"}

テキスト追記

echo a>>test.txt
echo b>>test.txt
echo c>>test.txt
echo d>>test.txt
echo e>>test.txt

コンシューマに↓が表示された。

{"schema":{"type":"string","optional":false},"payload":"a"}
{"schema":{"type":"string","optional":false},"payload":"b"}
{"schema":{"type":"string","optional":false},"payload":"c"}
{"schema":{"type":"string","optional":false},"payload":"d"}
{"schema":{"type":"string","optional":false},"payload":"e"}

file-sink

スタンドアロンコネクタ-sink起動。失敗

bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-sink.properties
↓↓↓
ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone)
org.apache.kafka.connect.errors.ConnectException: Unable to initialize REST server
        at org.apache.kafka.connect.runtime.rest.RestServer.initializeServer(RestServer.java:177)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:85)
Caused by: java.io.IOException: Failed to bind to 0.0.0.0/0.0.0.0:8083

8083 が開いていた。

curl http://localhost:8083/connector-plugins | jq .
↓↓↓
[
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.2.1"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.2.1"
  }
]

スタンドアロンコネクタ-source を止めると curl は接続失敗。

続けて sink 起動。こんどは成功。test.sink.txt ができた。

bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-sink.properties

test.sink.txt

asdf
qwer
zxcv
a
b
c
d
e