tjtjtjのメモ

自分のためのメモです

kafka-connect-jdbc sink を試した

source の次は sink も試します。

データベース準備

mysql> create database myjdbc2;
Query OK, 1 row affected (0.00 sec)

mysql> use myjdbc2;
Database changed
mysql> show tables;
Empty set (0.00 sec)

コンシューマーでメッセージ確認

>bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myjdbctopic-authors --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":1,"name":"qwer"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":2,"name":"asdf"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":3,"name":"zxcv"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":4,"name":"aaaaa"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":5,"name":"bbbb"}}
Processed a total of 5 messages

connect jdbc source 起動

connect-standalone-plugin.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
# plugin.path=C:\opt\kafka2\plugins

connect-jdbc-sink.properties

name=myjdbcconnect-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topic=myjdbctopic-authors
topics.regex=myjdbctopic-(.*)
connection.url=jdbc:mysql://localhost:3306/myjdbc2
connection.user=
connection.password=
auto.create=true

topics.regex

途中 topics.regex 付けろと怒られた。

org.apache.kafka.common.config.ConfigException: Must configure one of topics or topics.regex

topics.regextable.name.format 付けろと。

コネクタ起動

cd C:\opt\kafka2
bin\windows\connect-standalone.bat config\connect-standalone-plugin.properties config\connect-jdbc-sink.properties

db確認

mysql> show tables;
+---------------------+
| Tables_in_myjdbc2   |
+---------------------+
| myjdbctopic-authors |
+---------------------+
1 row in set (0.00 sec)

mysql> desc `myjdbctopic-authors`;
+-------+--------------+------+-----+---------+-------+
| Field | Type         | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| name  | varchar(256) | YES  |     | NULL    |       |
| id    | int(11)      | NO   |     | NULL    |       |
+-------+--------------+------+-----+---------+-------+
2 rows in set (0.01 sec)

mysql> show index from `myjdbctopic-authors`;
Empty set (0.02 sec)

mysql> select * from `myjdbctopic-authors`;
+-------+----+
| name  | id |
+-------+----+
| qwer  |  1 |
| asdf  |  2 |
| zxcv  |  3 |
| aaaaa |  4 |
| bbbb  |  5 |
+-------+----+
5 rows in set (0.00 sec)

参考

docs.confluent.io

docs.confluent.io

docs.confluent.io

stackoverflow.com

↑の stackoverflow は transforms 使ってテーブル名をいい感じにしているっぽい。

docs.confluent.io