1,需要的组件: mysql 5.7+,flink 1.18.1,paimon (paimon-flink-1.18-0.8),hdfs(2.7+,推荐3.0+),starrocks(3.2,推荐3.3)

2,可选 :dinky (基于flink的实时计算平台)

通过dinky平台基于flink sql / flink cdc读取mysql数据(全量根据住建排序读表+增量读binlog row模式)写入paimon数据湖中做初步处理,starrocks基于paimon catalog 做成异步分区物化视图加速查询

3,表结构

(1:mysql table:

CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `operate_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '操作时间',
  `expire_time` datetime DEFAULT NULL COMMENT '失效时间',
  `process_status` varchar(20) DEFAULT NULL COMMENT '进度状态',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  `province_id` int(20) DEFAULT NULL COMMENT '地区',
  `activity_reduce_amount` decimal(16,2) DEFAULT NULL COMMENT '促销金额',
  `coupon_reduce_amount` decimal(16,2) DEFAULT NULL COMMENT '优惠券',
  `original_total_amount` decimal(16,2) DEFAULT NULL COMMENT '原价金额',
  `feight_fee` decimal(16,2) DEFAULT NULL COMMENT '运费',
  `feight_fee_reduce` decimal(16,2) DEFAULT NULL COMMENT '运费减免',
  `refundable_time` datetime DEFAULT NULL COMMENT '可退款日期(签收后30天)',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='订单表 订单表'

(2:flink external table:

CREATE TABLE order_info_full_mq (

    `id` bigint NOT NULL  COMMENT '编号',

    `consignee` STRING  NULL COMMENT '收货人',

    `consignee_tel` STRING  NULL COMMENT '收件人电话',

    `total_amount` decimal(10,2)  NULL COMMENT '总金额',

    `order_status` STRING  NULL COMMENT '订单状态',

    `user_id` bigint  NULL COMMENT '用户id',

    `payment_way` STRING  NULL COMMENT '付款方式',

    `delivery_address` STRING  NULL COMMENT '送货地址',

    `order_comment` STRING  NULL COMMENT '订单备注',

    `out_trade_no` STRING  NULL COMMENT '订单交易编号(第三方支付用)',

    `trade_body` STRING  NULL COMMENT '订单描述(第三方支付用)',

    `create_time` timestamp(3) NOT NULL   COMMENT '创建时间',

    `operate_time` timestamp(3) NOT NULL   COMMENT '操作时间',

    `expire_time` timestamp(3)  NULL COMMENT '失效时间',

    `process_status` STRING  NULL COMMENT '进度状态',

    `tracking_no` STRING  NULL COMMENT '物流单编号',

    `parent_order_id` bigint  NULL COMMENT '父订单编号',

    `img_url` STRING  NULL COMMENT '图片路径',

    `province_id` int  NULL COMMENT '地区',

    `activity_reduce_amount` decimal(16,2)  NULL COMMENT '促销金额',

    `coupon_reduce_amount` decimal(16,2)  NULL COMMENT '优惠券',

    `original_total_amount` decimal(16,2)  NULL COMMENT '原价金额',

    `feight_fee` decimal(16,2)  NULL COMMENT '运费',

    `feight_fee_reduce` decimal(16,2)  NULL COMMENT '运费减免',

    `refundable_time` timestamp(3)  NULL COMMENT '可退款日期(签收后30天)',

     PRIMARY KEY(`id`) NOT ENFORCED

) WITH (

      'connector' = 'mysql-cdc',

      'scan.startup.mode' = 'earliest-offset',

      'hostname' = 'ip',

      'port' = '3306',

      'username' = 'root',

      'password' = 'root_pwd',

      'database-name' = 'gmall',

      'table-name' = 'order_info',

      'server-time-zone' = 'Asia/Shanghai'

      );

(3:paimon table (on hdfs):

CREATE CATALOG catalog_paimon WITH (

    'type' = 'paimon',

    'warehouse' = 'hdfs://ip/data/paimon/warehouse'

);

use CATALOG catalog_paimon;

create  DATABASE IF NOT EXISTS ods;

drop table ods.ods_order_info_full2;

CREATE TABLE IF NOT EXISTS ods.ods_order_info_full2(

    `id` bigint NOT NULL  COMMENT '购物券编号',

    `ctime` date COMMENT '分区字段',

    `consignee` STRING  NULL COMMENT '收货人',

    `consignee_tel` STRING  NULL COMMENT '收件人电话',

    `total_amount` decimal(10,2)  NULL COMMENT '总金额',

    `order_status` STRING  NULL COMMENT '订单状态',

    `user_id` bigint  NULL COMMENT '用户id',

    `payment_way` STRING  NULL COMMENT '付款方式',

    `delivery_address` STRING  NULL COMMENT '送货地址',

    `order_comment` STRING  NULL COMMENT '订单备注',

    `out_trade_no` STRING  NULL COMMENT '订单交易编号(第三方支付用)',

    `trade_body` STRING  NULL COMMENT '订单描述(第三方支付用)',

    `create_time` timestamp(3) NOT NULL   COMMENT '创建时间',

    `operate_time` timestamp(3) NOT NULL   COMMENT '操作时间',

    `expire_time` timestamp(3)  NULL COMMENT '失效时间',

    `process_status` STRING  NULL COMMENT '进度状态',

    `tracking_no` STRING  NULL COMMENT '物流单编号',

    `parent_order_id` bigint  NULL COMMENT '父订单编号',

    `img_url` STRING  NULL COMMENT '图片路径',

    `province_id` int  NULL COMMENT '地区',

    `activity_reduce_amount` decimal(16,2)  NULL COMMENT '促销金额',

    `coupon_reduce_amount` decimal(16,2)  NULL COMMENT '优惠券',

    `original_total_amount` decimal(16,2)  NULL COMMENT '原价金额',

    `feight_fee` decimal(16,2)  NULL COMMENT '运费',

    `feight_fee_reduce` decimal(16,2)  NULL COMMENT '运费减免',

    `refundable_time` timestamp(3)  NULL COMMENT '可退款日期(签收后30天)',

    PRIMARY KEY (`id`,`ctime` ) NOT ENFORCED

    )   PARTITIONED BY (`ctime` ) WITH (

   'connector' = 'paimon',

   'metastore.partitioned-table' = 'true',

   'file.format' = 'parquet',

   'write-buffer-size' = '512mb',

   'write-buffer-spillable' = 'true' ,

   'partition.expiration-time' = '1 d',

   'partition.expiration-check-interval' = '1 h',

   'partition.timestamp-formatter' = 'yyyy-MM-dd',

   'partition.timestamp-pattern' = '$ctime'

   );

(4:flink sql  mysql2paimon job

INSERT INTO ods.ods_order_info_full2(

    `id`,

    `ctime`,

    `consignee`,

    `consignee_tel`,

    `total_amount`,

    `order_status`,

    `user_id`,

    `payment_way`,

    `delivery_address`,

    `order_comment`,

    `out_trade_no`,

    `trade_body`,

    `create_time`,

    `operate_time`,

    `expire_time`,

    `process_status`,

    `tracking_no`,

    `parent_order_id`,

    `img_url`,

    `province_id`,

    `activity_reduce_amount`,

    `coupon_reduce_amount`,

    `original_total_amount`,

    `feight_fee`,

    `feight_fee_reduce`,

    `refundable_time`

)

select

    id,

    cast(DATE_FORMAT(create_time, 'yyyy-MM-dd') as date) AS ctime,

    `consignee`,

    `consignee_tel`,

    `total_amount`,

    `order_status`,

    `user_id`,

    `payment_way`,

    `delivery_address`,

    `order_comment`,

    `out_trade_no`,

    `trade_body`,

    `create_time`,

    `operate_time`,

    `expire_time`,

    `process_status`,

    `tracking_no`,

    `parent_order_id`,

    `img_url`,

    `province_id`,

    `activity_reduce_amount`,

    `coupon_reduce_amount`,

    `original_total_amount`,

    `feight_fee`,

    `feight_fee_reduce`,

    `refundable_time`

from default_catalog.default_database.order_info_full_mq

where create_time is not null;

(5:starrocks paimon catalog

CREATE EXTERNAL CATALOG `paimon_fs_catalog`
PROPERTIES ("paimon.catalog.type"  =  "filesystem",
"type"  =  "paimon",
"paimon.catalog.warehouse"  =  "hdfs://ip/data/paimon/warehouse"
)

(6:starrocks materialized view

CREATE MATERIALIZED VIEW `dwd_order_info_full` 
PARTITION BY (date_trunc('day', `ctime`))
DISTRIBUTED BY HASH(`id`) BUCKETS 3 
REFRESH ASYNC START("2024-08-17 08:30:00") EVERY(INTERVAL 1 DAY)
PROPERTIES (
"auto_refresh_partitions_limit" = "7",
"replicated_storage" = "true",
"storage_cooldown_time" = "9999-12-31 23:59:59",
"partition_refresh_number" = "2",
"storage_medium" = "SSD",
"partition_ttl_number" = "-1",
"replication_num" = "3"
) as select * from paimon_fs_catalog.ods.ods_order_info_full2

(7:刷新视图:

refresh materialized view dwd_order_info_full partition start('') end('')

(8:结果:

Logo

电影级数字人,免显卡端渲染SDK,十行代码即可调用,工业级demo免费开源下载!

更多推荐