81. 超哥分享Flink CDC(二)

Flink CDC MySQL Docker Java Debezium Data Streaming
本文介绍了如何使用Flink CDC进行MySQL数据变更捕获(CDC)。首先,通过Docker启动MySQL环境并创建测试表。接着,在Java项目中导入Flink CDC相关依赖,并编写代码配置MySQL数据源,设置捕获的数据库和表。代码中启用了检查点机制,并将捕获的数据转换为JSON格式输出。此外,文章还提供了日志配置示例,并详细解释了Debezium标准CDC事件格式,包括操作类型、前后数据值及元数据信息。最后,推荐了Flink CDC的生产案例教程链接,帮助用户快速上手。
文章内容
思维导图
常见问题
社交分享

Flink CDC使用

  1. 在本地启动一个MySQL的Docker环境
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4

  1. 创建表
create database cdc_test;
use cdc_test;

create table cdc_table (
    id int primary key auto_increment,
    name varchar(1000),
    age int
);

  1. 在idea中新建一个Java项目
  2. 导入依赖:
2.4.2
1.16.3
1.2.7


    com.ververica
    flink-connector-mysql-cdc
    ${flink-cdc.version}



    org.apache.flink
    flink-connector-base
    ${flink.version}



    org.apache.flink
    flink-clients
    ${flink.version}


    org.apache.flink
    flink-table-runtime
    ${flink.version}


    org.apache.flink
    flink-runtime-web
    ${flink.version}



    ch.qos.logback
    logback-classic
    ${logback.version}


  1. 编写代码
public class FlinkCDCApplication {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.enableCheckpointing(60000L);

        MySqlSource mySqlSource = MySqlSource.builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("cdc_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
            .tableList("cdc_test.cdc_table") // set captured table
            .username("root")
            .password("debezium")
            .includeSchemaChanges(true)
            .startupOptions(StartupOptions.latest())
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC")
            .print();
        env.execute();
    }
}

  1. 添加日志配置



       
          
             %d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n
          
       

       
          
       

Debezium标准CDC Event格式详解

{
    "before": null,
    "after": {
        "id": 1,
        "name": "xing.yu",
        "age": 26,
        "new_column": "dewu"
    },
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1702723640000,
        "snapshot": "false",
        "db": "cdc_test",
        "sequence": null,
        "table": "cdc_table",
        "server_id": 223344,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 2394,
        "row": 0,
        "thread": 39,
        "query": null
    },
    "op": "c",
    "ts_ms": 1702723640483,
    "transaction": null
}
{
    // 表数据更新前的值,update/delete
    "before": {},
    // 表数据更新后的值,create/update
    "after": {},
    // 元数据信息
    "source": {},
    // 操作类型 c/d/u
    "op": "",
    // 记录解析时间
    "ts_ms": "",
    "transaction": ""
}

Flink CDC生产case

https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html

思维导图生成中,请稍候...

问题 1: 如何在本地启动一个MySQL的Docker环境?
回答: 使用以下命令启动MySQL的Docker环境:
bash docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4


**问题 2:** 如何创建用于CDC测试的MySQL表?  
**回答:** 首先创建数据库`cdc_test`,然后在该数据库中创建表`cdc_table`,表结构如下:  
```sql
create table cdc_table (
    id int primary key auto_increment,
    name varchar(1000),
    age int
);

问题 3: 在Java项目中需要导入哪些依赖来使用Flink CDC?
回答: 需要导入以下依赖:


    com.ververica
    flink-connector-mysql-cdc
    ${flink-cdc.version}


    org.apache.flink
    flink-connector-base
    ${flink.version}


    org.apache.flink
    flink-clients
    ${flink.version}


    org.apache.flink
    flink-table-runtime
    ${flink.version}


    org.apache.flink
    flink-runtime-web
    ${flink.version}


    ch.qos.logback
    logback-classic
    ${logback.version}

问题 4: 如何编写Flink CDC的Java代码来捕获MySQL数据变更?
回答: 使用以下代码示例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60000L);

MySqlSource mySqlSource = MySqlSource.builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("cdc_test")
    .tableList("cdc_test.cdc_table")
    .username("root")
    .password("debezium")
    .includeSchemaChanges(true)
    .startupOptions(StartupOptions.latest())
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC")
    .print();
env.execute();

问题 5: Debezium的CDC事件格式包含哪些关键字段?
回答: Debezium的CDC事件格式包含以下关键字段:

  • before:表数据更新前的值(适用于update/delete操作)。
  • after:表数据更新后的值(适用于create/update操作)。
  • source:元数据信息,如数据库、表、连接器等。
  • op:操作类型(c/d/u分别代表create/delete/update)。
  • ts_ms:记录解析时间。
  • transaction:事务信息。

问题 6: 如何配置日志以输出Flink CDC的运行信息?
回答: 使用以下日志配置:


    
        
            %d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n
        
    
    
        
    

问题 7: 如何同步整个数据库而不是单个表?
回答:MySqlSourcetableList参数中使用".*"来同步整个数据库:

.tableList(".*")

问题 8: 在哪里可以找到更多关于Flink CDC的生产案例?
回答: 可以参考以下链接获取更多生产案例:
https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html