【kafka】connect的timestamp模式无法同一秒插入多条记录问题解决

一、现在问题

同时插入多条时间戳相同的记录

INSERT INTO "ABANK" VALUES ('1', 'CH', '00211', 'UBS Switzerland AG', 'UBSWCHZH31A', ' ', TO_DATE('2015-06-13 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), TO_DATE('1599-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), TO_DATE('2017-07-12 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), 'ADMIN', TO_DATE('1599-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), 'ADMIN', TO_TIMESTAMP('2021-04-20 08:38:59.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), TO_TIMESTAMP('2021-04-20 08:39:59.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), HEXTORAW('0F6970309627DC499210DE76B46ACEF7'));
INSERT INTO "ABANK" VALUES ('1', 'CH', '00213', 'UBS Switzerland AG', 'UBSWCHZH94N', ' ', TO_DATE('2015-06-13 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), TO_DATE('1599-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), TO_DATE('2017-07-12 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), 'ADMIN', TO_DATE('1599-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), 'ADMIN', TO_TIMESTAMP('2021-04-20 08:38:59.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), TO_TIMESTAMP('2021-04-20 08:39:59.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), HEXTORAW('AAEA53BE96A18144BBC16D3016642122'));

源表中正常插入

topic只读到一条

【kafka】connect的timestamp模式无法同一秒插入多条记录问题解决

二、解决-修改source配置

1、原来的connector内容

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp",
"timestamp.column.name": "UPDDATTIM_0",
"topic.prefix": "connector_topic_",
"connection.password": "system",
"connection.user": "system",
"db.timezone": "Asia/Shanghai",
"name": "source_connector_docker_oracle_ABANK",
"connection.url": "jdbc:oracle:thin:@//92.168.0.2:1521/helowin",
"table.whitelist": "TEST.ABANK"
}

2、现在的connector内容

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "UPDDATTIM_0",
"connection.password": "system",
"batch.max.rows": "1000",
"timestamp.delay.interval.ms": "500",
"table.poll.interval.ms": "10000",
"table.whitelist": "TEST.ABANK",
"mode": "timestamp",
"topic.prefix": "connector_topic_",
"connection.user": "system",
"db.timezone": "Asia/Shanghai",
"poll.interval.ms": "2000",
"name": "source_connector_docker_oracle_ABANK",
"connection.url": "jdbc:oracle:thin:@//172.16.5.162:1521/helowin"
}

三、验证

1、执行插入语句

INSERT INTO "ABANK" VALUES ('1', 'CH', '00251', 'UBS Switzerland AG', 'UBSWCHZH80H', ' ', TO_DATE('2015-06-13 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), TO_DATE('1599-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), TO_DATE('2017-07-12 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), 'ADMIN', TO_DATE('1599-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), 'ADMIN', TO_TIMESTAMP('2021-04-20 08:49:59.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), TO_TIMESTAMP('2021-04-20 08:49:59.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), HEXTORAW('2EA8FEBE29D9FA4CA0C89FBC7C217B0F'));
INSERT INTO "ABANK" VALUES ('1', 'CH', '00252', 'UBS Switzerland AG', 'UBSWCHZH94A', ' ', TO_DATE('2015-06-13 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), TO_DATE('1599-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), TO_DATE('2017-07-12 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), 'ADMIN', TO_DATE('1599-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), 'ADMIN', TO_TIMESTAMP('2021-04-20 08:49:59.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), TO_TIMESTAMP('2021-04-20 08:49:59.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), HEXTORAW('54825F81FFA4FB4EB60DF172BB739457'));

2、查看topic

【kafka】connect的timestamp模式无法同一秒插入多条记录问题解决

3、查看目标表是否读到

(1)源表

【kafka】connect的timestamp模式无法同一秒插入多条记录问题解决

(2)目标表

作者:哥们要飞​