kafka-connect-jdbc source を試した
プラグイン準備
- ここから ダウンロード https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
- kafka-connect-jdbc-5.3.1.jar を C:\opt\kafka2\libs に配置
mysql-connector準備
- ここからダウンロード https://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.47
- mysql-connector-java-5.1.48.jar を C:\opt\kafka2\libs に配置
mysql 準備
db,table,record作成
create database myjdbc; use myjdbc create table authors ( id int(8) not null auto_increment, name varchar(20), primary key (id) ); insert into authors (name) values ('qwer'),('asdf'),('zxcv');
確認
mysql> select * from authors; +----+------+ | id | name | +----+------+ | 1 | qwer | | 2 | asdf | | 3 | zxcv | +----+------+ 3 rows in set (0.00 sec)
zookeeper起動
cd C:\opt\kafka2 bin\windows\zookeeper-server-start.bat config\zookeeper.properties
kafka起動
cd C:\opt\kafka2 bin\windows\kafka-server-start.bat config\server.properties
connect jdbc source 起動
connect-standalone-plugin.properties
bootstrap.servers=localhost:9092 offset.storage.file.filename=/tmp/connect.offsets key.converter.schemas.enable=true value.converter.schemas.enable=true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter #plugin.path=C:\opt\kafka2\plugins
connect-jdbc-source.properties
name=myjdbcconnect connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 topic.prefix=myjdbctopic connection.url=jdbc:mysql://localhost:3306/myjdbc connection.user= connection.password= mode=incrementing incrementing.column.name=id table.whitelist=authors
コネクタ起動
cd C:\opt\kafka2 bin\windows\connect-standalone.bat config\connect-standalone-plugin.properties config\connect-jdbc-source.properties
トピック確認
> bin\windows\kafka-topics.bat --list --zookeeper=localhost:2181 __consumer_offsets connect-test myjdbctopic-authors
コンシューマ起動
> bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myjdbctopic-authors --from-beginning {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":1,"name":"qwer"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":2,"name":"asdf"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":3,"name":"zxcv"}}
レコード追加
insert into authors (name) values ('aaaaa');
コンシューマ確認
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":4,"name":"aaaaa"}}
C:\tmp\connect.offsets 確認
バイナリファイルの一部
["myjdbcconnect",{"protocol":"1","table":"myjdbc.authors"}]uq........{"incrementing":4}
参考
https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html
https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html