mysql到paimon并基于starrocks做异步分区物化视图全流程
activity_reduce_amount` decimal(16,2) DEFAULT NULL COMMENT '促销金额',`original_total_amount` decimal(16,2) DEFAULT NULL COMMENT '原价金额',`coupon_reduce_amount` decimal(16,2) DEFAULT NULL COMMENT '优惠券',`cou
1,需要的组件: mysql 5.7+,flink 1.18.1,paimon (paimon-flink-1.18-0.8),hdfs(2.7+,推荐3.0+),starrocks(3.2,推荐3.3)
2,可选 :dinky (基于flink的实时计算平台)
通过dinky平台基于flink sql / flink cdc读取mysql数据(全量根据住建排序读表+增量读binlog row模式)写入paimon数据湖中做初步处理,starrocks基于paimon catalog 做成异步分区物化视图加速查询
3,表结构
(1:mysql table:
CREATE TABLE `order_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
`consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
`order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
`delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
`order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
`out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`operate_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '操作时间',
`expire_time` datetime DEFAULT NULL COMMENT '失效时间',
`process_status` varchar(20) DEFAULT NULL COMMENT '进度状态',
`tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
`parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
`province_id` int(20) DEFAULT NULL COMMENT '地区',
`activity_reduce_amount` decimal(16,2) DEFAULT NULL COMMENT '促销金额',
`coupon_reduce_amount` decimal(16,2) DEFAULT NULL COMMENT '优惠券',
`original_total_amount` decimal(16,2) DEFAULT NULL COMMENT '原价金额',
`feight_fee` decimal(16,2) DEFAULT NULL COMMENT '运费',
`feight_fee_reduce` decimal(16,2) DEFAULT NULL COMMENT '运费减免',
`refundable_time` datetime DEFAULT NULL COMMENT '可退款日期(签收后30天)',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='订单表 订单表'
(2:flink external table:
CREATE TABLE order_info_full_mq (
`id` bigint NOT NULL COMMENT '编号',
`consignee` STRING NULL COMMENT '收货人',
`consignee_tel` STRING NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) NULL COMMENT '总金额',
`order_status` STRING NULL COMMENT '订单状态',
`user_id` bigint NULL COMMENT '用户id',
`payment_way` STRING NULL COMMENT '付款方式',
`delivery_address` STRING NULL COMMENT '送货地址',
`order_comment` STRING NULL COMMENT '订单备注',
`out_trade_no` STRING NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` STRING NULL COMMENT '订单描述(第三方支付用)',
`create_time` timestamp(3) NOT NULL COMMENT '创建时间',
`operate_time` timestamp(3) NOT NULL COMMENT '操作时间',
`expire_time` timestamp(3) NULL COMMENT '失效时间',
`process_status` STRING NULL COMMENT '进度状态',
`tracking_no` STRING NULL COMMENT '物流单编号',
`parent_order_id` bigint NULL COMMENT '父订单编号',
`img_url` STRING NULL COMMENT '图片路径',
`province_id` int NULL COMMENT '地区',
`activity_reduce_amount` decimal(16,2) NULL COMMENT '促销金额',
`coupon_reduce_amount` decimal(16,2) NULL COMMENT '优惠券',
`original_total_amount` decimal(16,2) NULL COMMENT '原价金额',
`feight_fee` decimal(16,2) NULL COMMENT '运费',
`feight_fee_reduce` decimal(16,2) NULL COMMENT '运费减免',
`refundable_time` timestamp(3) NULL COMMENT '可退款日期(签收后30天)',
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'scan.startup.mode' = 'earliest-offset',
'hostname' = 'ip',
'port' = '3306',
'username' = 'root',
'password' = 'root_pwd',
'database-name' = 'gmall',
'table-name' = 'order_info',
'server-time-zone' = 'Asia/Shanghai'
);
(3:paimon table (on hdfs):
CREATE CATALOG catalog_paimon WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://ip/data/paimon/warehouse'
);
use CATALOG catalog_paimon;
create DATABASE IF NOT EXISTS ods;
drop table ods.ods_order_info_full2;
CREATE TABLE IF NOT EXISTS ods.ods_order_info_full2(
`id` bigint NOT NULL COMMENT '购物券编号',
`ctime` date COMMENT '分区字段',
`consignee` STRING NULL COMMENT '收货人',
`consignee_tel` STRING NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) NULL COMMENT '总金额',
`order_status` STRING NULL COMMENT '订单状态',
`user_id` bigint NULL COMMENT '用户id',
`payment_way` STRING NULL COMMENT '付款方式',
`delivery_address` STRING NULL COMMENT '送货地址',
`order_comment` STRING NULL COMMENT '订单备注',
`out_trade_no` STRING NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` STRING NULL COMMENT '订单描述(第三方支付用)',
`create_time` timestamp(3) NOT NULL COMMENT '创建时间',
`operate_time` timestamp(3) NOT NULL COMMENT '操作时间',
`expire_time` timestamp(3) NULL COMMENT '失效时间',
`process_status` STRING NULL COMMENT '进度状态',
`tracking_no` STRING NULL COMMENT '物流单编号',
`parent_order_id` bigint NULL COMMENT '父订单编号',
`img_url` STRING NULL COMMENT '图片路径',
`province_id` int NULL COMMENT '地区',
`activity_reduce_amount` decimal(16,2) NULL COMMENT '促销金额',
`coupon_reduce_amount` decimal(16,2) NULL COMMENT '优惠券',
`original_total_amount` decimal(16,2) NULL COMMENT '原价金额',
`feight_fee` decimal(16,2) NULL COMMENT '运费',
`feight_fee_reduce` decimal(16,2) NULL COMMENT '运费减免',
`refundable_time` timestamp(3) NULL COMMENT '可退款日期(签收后30天)',
PRIMARY KEY (`id`,`ctime` ) NOT ENFORCED
) PARTITIONED BY (`ctime` ) WITH (
'connector' = 'paimon',
'metastore.partitioned-table' = 'true',
'file.format' = 'parquet',
'write-buffer-size' = '512mb',
'write-buffer-spillable' = 'true' ,
'partition.expiration-time' = '1 d',
'partition.expiration-check-interval' = '1 h',
'partition.timestamp-formatter' = 'yyyy-MM-dd',
'partition.timestamp-pattern' = '$ctime'
);
(4:flink sql mysql2paimon job
INSERT INTO ods.ods_order_info_full2(
`id`,
`ctime`,
`consignee`,
`consignee_tel`,
`total_amount`,
`order_status`,
`user_id`,
`payment_way`,
`delivery_address`,
`order_comment`,
`out_trade_no`,
`trade_body`,
`create_time`,
`operate_time`,
`expire_time`,
`process_status`,
`tracking_no`,
`parent_order_id`,
`img_url`,
`province_id`,
`activity_reduce_amount`,
`coupon_reduce_amount`,
`original_total_amount`,
`feight_fee`,
`feight_fee_reduce`,
`refundable_time`
)
select
id,
cast(DATE_FORMAT(create_time, 'yyyy-MM-dd') as date) AS ctime,
`consignee`,
`consignee_tel`,
`total_amount`,
`order_status`,
`user_id`,
`payment_way`,
`delivery_address`,
`order_comment`,
`out_trade_no`,
`trade_body`,
`create_time`,
`operate_time`,
`expire_time`,
`process_status`,
`tracking_no`,
`parent_order_id`,
`img_url`,
`province_id`,
`activity_reduce_amount`,
`coupon_reduce_amount`,
`original_total_amount`,
`feight_fee`,
`feight_fee_reduce`,
`refundable_time`
from default_catalog.default_database.order_info_full_mq
where create_time is not null;
(5:starrocks paimon catalog
CREATE EXTERNAL CATALOG `paimon_fs_catalog`
PROPERTIES ("paimon.catalog.type" = "filesystem",
"type" = "paimon",
"paimon.catalog.warehouse" = "hdfs://ip/data/paimon/warehouse"
)
(6:starrocks materialized view
CREATE MATERIALIZED VIEW `dwd_order_info_full`
PARTITION BY (date_trunc('day', `ctime`))
DISTRIBUTED BY HASH(`id`) BUCKETS 3
REFRESH ASYNC START("2024-08-17 08:30:00") EVERY(INTERVAL 1 DAY)
PROPERTIES (
"auto_refresh_partitions_limit" = "7",
"replicated_storage" = "true",
"storage_cooldown_time" = "9999-12-31 23:59:59",
"partition_refresh_number" = "2",
"storage_medium" = "SSD",
"partition_ttl_number" = "-1",
"replication_num" = "3"
) as select * from paimon_fs_catalog.ods.ods_order_info_full2
(7:刷新视图:
refresh materialized view dwd_order_info_full partition start('') end('')

(8:结果:

更多推荐


所有评论(0)