在大数据处理中,实时数据分析是一个重要的需求。随着数据量的不断增长,对于实时分析的挑战也在不断加大,传统的批处理方式已经不能满足实时数据处理的需求,需要一种更加高效的技术来解决这个问题。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是这样一种技术,提供了高效的实时数据仓库管理功能。
本文将介绍袋鼠云基于 Hudi 构建数据湖的整体方案架构及其在实时数据仓库处理方面的特点,并且为大家展示一个使用 Apache Hudi 的简单示例,便于新手上路。
Apache Hudi 介绍
Apache Hudi 是一个开源的数据湖存储系统,可以在 Hadoop 生态系统中提供实时数据仓库处理功能。Hudi 最早由 Uber 开发,后来成为 Apache 顶级项目。
Hudi 主要特性
· 支持快速插入和更新操作,以便在数据仓库中实时处理数据;
· 提供增量查询功能,可有效提高数据分析效率;
· 支持时间点查询,以便查看数据在某一时刻的状态;
· 与 Apache Spark、Hive 等大数据分析工具兼容。
Hudi 架构
Apache Hudi 的架构包括以下几个主要组件:
· Hudi 数据存储:Hudi 数据存储是 Hudi 的核心组件,负责存储数据,数据存储有两种类型:Copy-On-Write(COW)和 Merge-On-Read(MOR);
· Copy-On-Write:COW 存储类型会在对数据进行更新时,创建一个新的数据文件副本,将更新的数据写入副本中,之后,新的数据文件副本会替换原始数据文件;
· Merge-On-Read:MOR 存储类型会在查询时,将更新的数据与原始数据进行合并,这种方式可以减少数据存储的写入延迟,但会增加查询的计算量;
· Hudi 索引:Hudi 索引用于维护数据记录的位置信息,索引有两种类型:内置索引(如 Bloom 过滤器)和外部索引(如 HBase 索引);
· Hudi 查询引擎:Hudi 查询引擎负责处理查询请求,Hudi 支持多种查询引擎,如 Spark SQL、Hive、Presto 等。
Hudi 的使用场景
Apache Hudi 可以帮助企业和组织实现实时数据处理和分析。实时数据处理需要快速地处理和查询数据,同时还需要保证数据的一致性和可靠性。
Apache Hudi 的增量数据处理、ACID 事务性保证、写时合并等技术特性可以帮助企业更好地实现实时数据处理和分析,基于 Hudi 的特性可以在一定程度上在实时数仓的构建过程中承担上下游数据链路的对接(类似 Kafka 的角色)。既能实现增量的数据处理,也能为批流一体的处理提供存储基础。
Hudi 的优势和劣势
● 优势
· 高效处理大规模数据集;
· 支持实时数据更新和查询;
· 实现了增量写入机制,提高了数据访问效率;
· Hudi 可以与流处理管道集成;
· Hudi 提供了时间旅行功能,允许回溯数据的历史版本。
● 劣势
· 在读写数据时需要付出额外的代价;
· 操作比较复杂,需要使用专业的编程语言和工具。
Hudi 在袋鼠云数据湖平台上的实践
Hudi 在袋鼠云数据湖的技术架构
· 元数据的接入,让用户可以快速的对表进行管理;
· 数据快速接入,包括对符合条件的原有表数据进行转换,快速搭建数据湖能力;
· 湖表的管理,监控小文件定期进行合并,提升表的查询性能,内在丰富的表操作功能,包括 time travel ,孤儿文件清理,过期快照清理等;
· 索引构建,提供多种索引包括 bloom filter,zorder 等,提升计算引擎的查询性能。
Hudi 使用示例
在介绍了 Hudi 的基本信息和袋鼠云数据湖平台的结构之后,我们来看一个使用示例,替换 Flink 在内存中的 join 过程。
在 Flink 中对多流 join 往往是比较头疼的场景,需要考虑 state ttl 时间设置,设置太小数据经常关联不上,设置太大内存又需要很高才能保留,我们通过 Hudi 的方式来换个思路实现。
● 构建 catalog
public String createCatalog(){
String createCatalog = "CREATE CATALOG hudi_catalog WITH (n" +
" 'type' = 'hudi',n" +
" 'mode' = 'hms',n" +
" 'default-database' = 'default',n" +
" 'hive.conf.dir' = '/hive_conf_dir',n" +
" 'table.external' = 'true'n" +
")";
return createCatalog;
}
● 创建 hudi 表
public String createHudiTable(){
String createTable = "CREATE TABLE if not exists hudi_catalog.flink_db.test_hudi_flink_join_2 (n" +
" id int ,n" +
" name VARCHAR(10),n" +
" age int ,n" +
" address VARCHAR(10),n" +
" dt VARCHAR(10),n" +
" primary key(id) not enforcedn" +
")n" +
"PARTITIONED BY (dt)n" +
"WITH (n" +
" 'connector' = 'hudi',n" +
" 'table.type' = 'MERGE_ON_READ',n" +
" 'changelog.enabled' = 'true',n" +
" 'index.type' = 'BUCKET',n" +
" 'hoodie.bucket.index.num.buckets' = '2',n" +
String.format(" '%s' = '%s',n", FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE) +
" 'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "'n" +
");";
return createTable;
}
● 更新 hudi 表的 flink_db.test_hudi_flink_join_2 的 id, name, age, dt 列
01 从 kafka 中读取 topic1
public String createKafkaTable1(){
String kafkaSource1 = "CREATE TABLE source1n" +
"(n" +
" id INT,n" +
" name STRING,n" +
" age INT,n" +
" dt String,n" +
" PROCTIME AS PROCTIME()n" +
") WITH (n" +
" 'connector' = 'kafka'n" +
" ,'topic' = 'join_topic1'n" +
" ,'properties.bootstrap.servers' = 'localhost:9092'n" +
" ,'scan.startup.mode' = 'earliest-offset'n" +
" ,'format' = 'json'n" +
" ,'json.timestamp-format.standard' = 'SQL'n" +
" )";
return kafkaSource1;
}
02 从 kafka 中读取 topic2
public String createKafkaTable2(){
String kafkaSource2 = "CREATE TABLE source2n" +
"(n" +
" id INT,n" +
" name STRING,n" +
" address string,n" +
" dt String,n" +
" PROCTIME AS PROCTIME()n" +
") WITH (n" +
" 'connector' = 'kafka'n" +
" ,'topic' = 'join_topic2'n" +
" ,'properties.bootstrap.servers' = 'localhost:9092'n" +
" ,'scan.startup.mode' = 'earliest-offset'n" +
" ,'format' = 'json'n" +
" ,'json.timestamp-format.standard' = 'SQL'n" +
" )";
return kafkaSource2;
}
● 执行插入逻辑1
String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,age,dt) " +
"select id, name,age,dt from source1";
● 通过 spark 查询数据
20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 NULL 1
20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 NULL 1
● 执行插入逻辑2
String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,address,dt) " +
"select id, name, address,dt from source2";
● 运行成功
运行成功后在 spark 中查询对应的表数据:
20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 xc:address45 1
20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 xc:address30 1
可以发现在第二次数据运行之后,表数据的对应字段 address 已经更新,达到了类似在 Flink 中直接执行 join 的效果。
`insert into hudi_catalog.flink_db.test_hudi_flink_join_2
select a.id, a.name, a.age,b.address a.dt from source1 a left join source2 b on a.id = b.id `
《数栈产品白皮书》:https://www.dtstack.com/resources/1004?src=szsm
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szbky
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术qun」,交流最新开源技术信息,qun号码:30537511,项目地址:https://github.com/DTStack
文章来源: 博客园
- 还没有人评论,欢迎说说您的想法!