kafka connect してみる
kafka connect の FileStreamSource を試した。
properties
これから使う config 内の propertiesファイルを確認する。トピックは connect-test。 入口が test.txt。出口がtest.sink.txt。オフセットを /tmp/connect.offsets に記録するんだろう。
connect-standalone.properties
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000
connect-file-source.properties
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test
connect-file-sink.properties
name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test
file-source
トピック確認。まだない
cd C:\opt\kafka2 bin\windows\kafka-topics.bat --list --zookeeper=localhost:2181
スタンドアロンコネクタ起動。ファイルがないため warn。sleeping が出続ける。
bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-source.properties ↓↓↓ WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask)
トピック確認。まだできていない
bin\windows\kafka-topics.bat --list --zookeeper=localhost:2181
テキストファイル生成。「sleeping」が止まる
echo asdf>test.txt echo qwer>>test.txt echo zxcv>>test.txt
トピック確認。できている。 /tmp/connect.offsets
もできていた。
bin\windows\kafka-topics.bat --list --zookeeper=localhost:2181 ↓↓↓ connect-test
コンシューマー起動
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic connect-test --from-beginning ↓↓↓ {"schema":{"type":"string","optional":false},"payload":"asdf"} {"schema":{"type":"string","optional":false},"payload":"qwer"} {"schema":{"type":"string","optional":false},"payload":"zxcv"}
テキスト追記
echo a>>test.txt echo b>>test.txt echo c>>test.txt echo d>>test.txt echo e>>test.txt
コンシューマに↓が表示された。
{"schema":{"type":"string","optional":false},"payload":"a"} {"schema":{"type":"string","optional":false},"payload":"b"} {"schema":{"type":"string","optional":false},"payload":"c"} {"schema":{"type":"string","optional":false},"payload":"d"} {"schema":{"type":"string","optional":false},"payload":"e"}
file-sink
スタンドアロンコネクタ-sink起動。失敗
bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-sink.properties ↓↓↓ ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone) org.apache.kafka.connect.errors.ConnectException: Unable to initialize REST server at org.apache.kafka.connect.runtime.rest.RestServer.initializeServer(RestServer.java:177) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:85) Caused by: java.io.IOException: Failed to bind to 0.0.0.0/0.0.0.0:8083
8083 が開いていた。
curl http://localhost:8083/connector-plugins | jq . ↓↓↓ [ { "class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink", "version": "2.2.1" }, { "class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source", "version": "2.2.1" } ]
スタンドアロンコネクタ-source を止めると curl は接続失敗。
続けて sink 起動。こんどは成功。test.sink.txt ができた。
bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-sink.properties
test.sink.txt
asdf qwer zxcv a b c d e