Flink CDC使用
- 在本地启动一个MySQL的Docker环境
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4
- 创建表
create database cdc_test;
use cdc_test;
create table cdc_table (
id int primary key auto_increment,
name varchar(1000),
age int
);
- 在idea中新建一个Java项目
- 导入依赖:
2.4.2
1.16.3
1.2.7
com.ververica
flink-connector-mysql-cdc
${flink-cdc.version}
org.apache.flink
flink-connector-base
${flink.version}
org.apache.flink
flink-clients
${flink.version}
org.apache.flink
flink-table-runtime
${flink.version}
org.apache.flink
flink-runtime-web
${flink.version}
ch.qos.logback
logback-classic
${logback.version}
- 编写代码
public class FlinkCDCApplication {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60000L);
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("localhost")
.port(3306)
.databaseList("cdc_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
.tableList("cdc_test.cdc_table") // set captured table
.username("root")
.password("debezium")
.includeSchemaChanges(true)
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC")
.print();
env.execute();
}
}
- 添加日志配置
%d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n
Debezium标准CDC Event格式详解
{
"before": null,
"after": {
"id": 1,
"name": "xing.yu",
"age": 26,
"new_column": "dewu"
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1702723640000,
"snapshot": "false",
"db": "cdc_test",
"sequence": null,
"table": "cdc_table",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 2394,
"row": 0,
"thread": 39,
"query": null
},
"op": "c",
"ts_ms": 1702723640483,
"transaction": null
}
{
// 表数据更新前的值,update/delete
"before": {},
// 表数据更新后的值,create/update
"after": {},
// 元数据信息
"source": {},
// 操作类型 c/d/u
"op": "",
// 记录解析时间
"ts_ms": "",
"transaction": ""
}
Flink CDC生产case
https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html
🚀 超哥分享Flink CDC(二) 🚀
想要快速上手Flink CDC?这篇教程带你从零开始,轻松掌握!
🔧 本地启动MySQL Docker环境
💻 创建表并配置Flink项目
📝 编写代码,实现数据同步
📊 深入解析Debezium标准CDC Event格式
🔗 附上生产案例链接,助你快速实践!
👉 点击了解更多: Flink CDC快速上手教程
#Flink #CDC #大数据 #技术分享 #超哥教程