tjtjtjのメモ

自分のためのメモです

kafka-connect-jdbc source を試した

プラグイン準備

mysql-connector準備

mysql 準備

db,table,record作成

create database myjdbc;
use myjdbc
create table authors (
  id int(8) not null auto_increment,
  name varchar(20),
  primary key (id)
);
insert into authors (name) values
('qwer'),('asdf'),('zxcv');

確認

mysql> select * from authors;
+----+------+
| id | name |
+----+------+
|  1 | qwer |
|  2 | asdf |
|  3 | zxcv |
+----+------+
3 rows in set (0.00 sec)

zookeeper起動

cd C:\opt\kafka2
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

kafka起動

cd C:\opt\kafka2
bin\windows\kafka-server-start.bat config\server.properties

connect jdbc source 起動

connect-standalone-plugin.properties

bootstrap.servers=localhost:9092
offset.storage.file.filename=/tmp/connect.offsets
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

#plugin.path=C:\opt\kafka2\plugins

connect-jdbc-source.properties

name=myjdbcconnect
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
topic.prefix=myjdbctopic
connection.url=jdbc:mysql://localhost:3306/myjdbc
connection.user=
connection.password=
mode=incrementing
incrementing.column.name=id
table.whitelist=authors

コネクタ起動

cd C:\opt\kafka2
bin\windows\connect-standalone.bat config\connect-standalone-plugin.properties config\connect-jdbc-source.properties

トピック確認

> bin\windows\kafka-topics.bat --list --zookeeper=localhost:2181
__consumer_offsets
connect-test
myjdbctopic-authors

コンシューマ起動

> bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myjdbctopic-authors --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":1,"name":"qwer"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":2,"name":"asdf"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":3,"name":"zxcv"}}

レコード追加

insert into authors (name) values
('aaaaa');

コンシューマ確認

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"authors"},"payload":{"id":4,"name":"aaaaa"}}

C:\tmp\connect.offsets 確認

バイナリファイルの一部

["myjdbcconnect",{"protocol":"1","table":"myjdbc.authors"}]uq........{"incrementing":4}

参考

https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html

https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html

https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html