kafka-connect-jdbc sink を試した
source の次は sink も試します。
データベース準備
mysql> create database myjdbc2; Query OK, 1 row affected (0.00 sec) mysql> use myjdbc2; Database changed mysql> show tables; Empty set (0.00 sec)
コンシューマーでメッセージ確認
>bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myjdbctopic-authors --from-beginning {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":1,"name":"qwer"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":2,"name":"asdf"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":3,"name":"zxcv"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":4,"name":"aaaaa"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":5,"name":"bbbb"}} Processed a total of 5 messages
connect jdbc source 起動
connect-standalone-plugin.properties
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 # plugin.path=C:\opt\kafka2\plugins
connect-jdbc-sink.properties
name=myjdbcconnect-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topic=myjdbctopic-authors topics.regex=myjdbctopic-(.*) connection.url=jdbc:mysql://localhost:3306/myjdbc2 connection.user= connection.password= auto.create=true
topics.regex
途中 topics.regex
付けろと怒られた。
org.apache.kafka.common.config.ConfigException: Must configure one of topics or topics.regex
topics.regex
か table.name.format
付けろと。
コネクタ起動
cd C:\opt\kafka2 bin\windows\connect-standalone.bat config\connect-standalone-plugin.properties config\connect-jdbc-sink.properties
db確認
mysql> show tables; +---------------------+ | Tables_in_myjdbc2 | +---------------------+ | myjdbctopic-authors | +---------------------+ 1 row in set (0.00 sec) mysql> desc `myjdbctopic-authors`; +-------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+--------------+------+-----+---------+-------+ | name | varchar(256) | YES | | NULL | | | id | int(11) | NO | | NULL | | +-------+--------------+------+-----+---------+-------+ 2 rows in set (0.01 sec) mysql> show index from `myjdbctopic-authors`; Empty set (0.02 sec) mysql> select * from `myjdbctopic-authors`; +-------+----+ | name | id | +-------+----+ | qwer | 1 | | asdf | 2 | | zxcv | 3 | | aaaaa | 4 | | bbbb | 5 | +-------+----+ 5 rows in set (0.00 sec)
参考
↑の stackoverflow は transforms 使ってテーブル名をいい感じにしているっぽい。