ETJava Beta | Java    注册   登录
  • 搜索:
  • kettle从入门到精通 第六十九课 ETL之kettle kettle cdc mysql,轻松实现实时增量同步

    发表于      阅读(21)     博客类别:Crawler     转自:https://www.cnblogs.com/zjBoy/p/18246490
    如有侵权 请联系我们删除  (页面底部联系我们)  

     1、之前kettle cdc mysql的时候使用的方案是canal+kafka+kettle,今天我们一起学习下使用kettle的插件Debezium直接cdc mysql。

     注:CDC (Change Data Capture) 是一种技术,用于捕获和同步数据库中的更改。

    1)Debezium步骤解析mysql binlog日志。

    2)json input步骤解析json字符串。

    3)switch-case 根据op字段进行路由。

    4)create、delete、update、ddl是写日志步骤,模拟后续操作。

    2、Debezium步骤配置如下(只输出dml),更多属性配置参考Debezium官网。如下图所示:

     3、switch-case配置如下,不做过多介绍。

     4、binlog解析之后的dml语句对应的json字符串结构

    CREATE

    {
        "ts_ms": 1718273338000,
        "db": "test",
        "table": "t1",
        "op": "CREATE",
        "after": "{\"id\":6,\"name\":\"金刚葫芦娃\",\"createtime\":1718297698000}",
        "pk": "{\"id\":6}"
    }

    UPDATE

    {
        "ts_ms": 1718273345000,
        "db": "test",
        "table": "t1",
        "op": "UPDATE",
        "before": "{\"id\":6,\"name\":\"金刚葫芦娃\",\"createtime\":1718297698000}",
        "after": "{\"id\":6,\"name\":\"金刚葫芦娃plus\",\"createtime\":1718297698000}",
        "pk": "{\"id\":6}"
    }

    DELETE

    {
        "ts_ms": 1718273369000,
        "db": "test",
        "table": "t1",
        "op": "DELETE",
        "before": "{\"id\":6,\"name\":\"金刚葫芦娃plus\",\"createtime\":1718297698000}",
        "pk": "{\"id\":6}"
    }

     

    5、Debezium步骤配置中设置ddl为true(输出dml和ddl),更多属性配置参考Debezium官网。如下图所示:

     6、binlog解析之后的ddl语句对应的json字符串结构

     

    DROP TABLE
    {
        "ts_ms": 1718274150072,
        "db": "test",
        "table": "t3",
        "op": "ddl",
        "sql": "DROP TABLE `test`.`t3`"
    }
    CREATE TABLE
     
    {
        "ts_ms": 1718274165789,
        "db": "test",
        "table": "t3",
        "op": "ddl",
        "ddl_type": "CREATE",
        "sql": "CREATE TABLE `t3` (\n `id` bigint NOT NULL AUTO_INCREMENT,\n `name` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,\n `createtime` datetime DEFAULT NULL,\n PRIMARY KEY (`id`)\n ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci"
    }