Iceberg
为什么需要Iceberg/为什么需要Lakehouse
数据湖的最主要目的是为了解决Hive速度慢的问题, 利用表格式和索引实现细粒度的数据过滤. 数据湖可以将整个链路变为分钟级, 从离线链路转化为近实时链路并且提升查询速度. 当然, 数据湖还解决了一些其他问题, 如ACID, Schema Evolution, Partition Evolution, Time Travel等.
文件布局
具体查询流程就是从Catalog -> Table Metadata File -> Snapshot -> Manifest List -> Manifest File -> Data File. 如图所示
Catalog
Catalog 本质就是一个维护表元数据文件的目录(个人理解), 有多种实现方式:
HiveCatalog
Hive Metastore
表属性中key值为metadata_location的键值对
HadoopCatalog
文件系统
version-hint.text
JDBC Catalog
关系型数据库
专门的表: 例如jdbc_catalog
REST Catalog
独立的 Web 服务
API端点返回的json
表的元数据文件 ,记录了表的完整定义和历史快照。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 { "format-version" : 1 , "table-uuid" : "f7m1a7b4-c111-407a-a6e1-433a233a1e12" , "location" : "s3://my-bucket/warehouse/db/events" , "last-updated-ms" : 1672531200000 , "last-column-id" : 4 , "schemas" : [ { "type" : "struct" , "schema-id" : 0 , "fields" : [ { "id" : 1 , "name" : "event_ts" , "required" : true , "type" : "timestamptz" , "doc" : "Event timestamp with timezone" } , { "id" : 2 , "name" : "level" , "required" : true , "type" : "string" } , { "id" : 3 , "name" : "message" , "required" : false , "type" : "string" } , { "id" : 4 , "name" : "extra_info" , "required" : false , "type" : { "type" : "map" , "key-id" : 5 , "value-id" : 6 , "value-required" : false } } ] } ] , "current-schema-id" : 0 , "partition-specs" : [ { "spec-id" : 0 , "fields" : [ { "name" : "day" , "transform" : "day" , "source-id" : 1 , "field-id" : 1000 } ] } ] , "default-spec-id" : 0 , "properties" : { "write.format.default" : "parquet" , "commit.retry.num-retries" : "2" } , "current-snapshot-id" : 3051729675574597004 , "snapshots" : [ { "snapshot-id" : 3051729675574597004 , "parent-snapshot-id" : null , "timestamp-ms" : 1672531200000 , "summary" : { "operation" : "append" , "spark.app.id" : "local-1599119293123" , "added-data-files" : "5" , "added-records" : "10550" , "added-files-size" : "34201" , "changed-partition-count" : "1" , "total-records" : "10550" , "total-files-size" : "34201" , "total-data-files" : "5" , "total-delete-files" : "0" , "total-position-deletes" : "0" , "total-equality-deletes" : "0" } , "manifest-list" : "s3://my-bucket/warehouse/db/events/metadata/snap-3051729675574597004-1-2945e143-5152-4091-8869-711e86098059.avro" , "schema-id" : 0 } ] , "snapshot-log" : [ { "timestamp-ms" : 1672531200000 , "snapshot-id" : 3051729675574597004 } ] }
Manifest List (snap---.avro)
列出了所有的Manifest File及相关信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 [ { "manifest_path" : "s3://warehouse/db/table/metadata/e852441a-5c3c-42a2-b2d9-e31a19a9d701-m0.avro" , "manifest_length" : 6224 , "partition_spec_id" : 0 , "content" : 0 , "sequence_number" : 2 , "min_sequence_number" : 2 , "added_snapshot_id" : 874657636349826781 , "added_files_count" : 4 , "existing_files_count" : 0 , "deleted_files_count" : 0 , "added_rows_count" : 4000 , "existing_rows_count" : 0 , "deleted_rows_count" : 0 , "partitions" : [ { "contains_null" : false , "contains_nan" : false , "lower_bound" : "2025-07-31T10:00:00.000Z" , "upper_bound" : "2025-07-31T12:00:00.000Z" } ] , "key_metadata" : null } , { "manifest_path" : "s3://warehouse/db/table/metadata/f334a123-cee3-4733-ac49-bd24a5a176d1-m1.avro" , "manifest_length" : 1056 , "partition_spec_id" : 0 , "content" : 1 , "sequence_number" : 2 , "min_sequence_number" : 2 , "added_snapshot_id" : 874657636349826781 , "added_files_count" : 1 , "existing_files_count" : 0 , "deleted_files_count" : 0 , "added_rows_count" : 50 , "existing_rows_count" : 0 , "deleted_rows_count" : 0 , "partitions" : [ { "contains_null" : false , "contains_nan" : false , "lower_bound" : "2025-07-31T11:00:00.000Z" , "upper_bound" : "2025-07-31T11:00:00.000Z" } ] , "key_metadata" : null } ]
Manifest File (.avro)
包含Data File及相关信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 [ { "status" : 1 , "snapshot_id" : 4876662349891823142 , "sequence_number" : 1 , "file_path" : "s3://my-bucket/db/logs/data/event_date=2025-07-30/00000-1-....parquet" , "file_format" : "PARQUET" , "partition" : { "event_date" : "2025-07-30" } , "record_count" : 4800 , "file_size_in_bytes" : 5242880 , "column_sizes" : { "1" : 76800 , "2" : 24000 , "3" : 4234880 } , "value_counts" : { "1" : 4800 , "2" : 4800 , "3" : 4800 } , "null_value_counts" : { "1" : 0 , "2" : 0 , "3" : 0 } , "nan_value_counts" : { } , "lower_bounds" : { "1" : "2025-07-30T00:00:00.000Z" , "2" : "INFO" } , "upper_bounds" : { "1" : "2025-07-30T11:59:59.999Z" , "2" : "WARN" } } , { "status" : 1 , "snapshot_id" : 4876662349891823142 , "sequence_number" : 1 , "file_path" : "s3://my-bucket/db/logs/data/event_date=2025-07-30/00001-2-....parquet" , "file_format" : "PARQUET" , "partition" : { "event_date" : "2025-07-30" } , "record_count" : 5200 , "file_size_in_bytes" : 5872025 , "column_sizes" : { "1" : 83200 , "2" : 26000 , "3" : 4962825 } , "value_counts" : { "1" : 5200 , "2" : 5200 , "3" : 5200 } , "null_value_counts" : { "1" : 0 , "2" : 0 , "3" : 0 } , "nan_value_counts" : { } , "lower_bounds" : { "1" : "2025-07-30T12:00:00.000Z" , "2" : "INFO" } , "upper_bounds" : { "1" : "2025-07-30T23:59:59.999Z" , "2" : "INFO" } } ]
一个例子
建表
1 2 3 4 5 6 7 8 9 10 CREATE TABLE ice_spark.hdfs.t_user1 ( id int , name string, ts string ) USING iceberg; t_user1/ ├── metadata │ └── v1.metadata.json └── version- hint.text
version-hint.text存的就是当前matadata file的版本信息, 让系统知道是vN.metadata.json
插入一条数据
1 2 3 4 5 6 7 8 9 10 11 INSERT INTO ice_spark.hdfs.ns.t_user1 VALUES (1 , 'hlink' , 20250709 );t_user1/ ├── data │ └── 00000 -5 - ba90a31f- d65c-4e40 -9321 -3 faa8c184547-0 -00001. parquet ├── metadata │ ├── 44 a2bbda- e252-4 d77- b958- b3cf35708b14- m0.avro │ ├── snap-8289612053519823397 -1 -44 a2bbda- e252-4 d77- b958- b3cf35708b14.avro │ ├── v1.metadata.json │ └── v2.metadata.json └── version- hint.text
多了数据文件00000-5-ba90a31f-d65c-4e40-9321-3faa8c184547-0-00001.parquet, version-hint.text更新到版本2, 新增v2.metadata.json, 新增manifest list文件snap-8289612053519823397-1-44a2bbda-e252-4d77-b958-b3cf35708b14.avro, 新增了manifeat file文件44a2bbda-e252-4d77-b958-b3cf35708b14-m0.avro.
再插入一条数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 INSERT INTO ice_spark.hdfs.ns.t_user1 VALUES (2 , 'iceberg' , 20250709 );t_user1/ ├── data │ ├── 00000 -5 - ba90a31f- d65c-4e40 -9321 -3 faa8c184547-0 -00001. parquet │ └── 00000 -7 -906189 d5-3 aba-4 a96-9 f8b-008 d5af2f94d-0 -00001. parquet ├── metadata │ ├── 28173 fec-5 f71-444 c-9 abd-543468e78636 - m0.avro │ ├── 44 a2bbda- e252-4 d77- b958- b3cf35708b14- m0.avro │ ├── snap-4152532609887149918 -1 -28173 fec-5 f71-444 c-9 abd-543468e78636 .avro │ ├── snap-8289612053519823397 -1 -44 a2bbda- e252-4 d77- b958- b3cf35708b14.avro │ ├── v1.metadata.json │ ├── v2.metadata.json │ └── v3.metadata.json └── version- hint.text
与之前的插入操作是同样的道理
Schema Evolution
Schema Evolution简单来说就是对表格式的变更, Schema Evolution只改变元数据, 不改变底层的数据文件, 支持的变更有:
添加字段 (Add) :可以向 schema 中增加新的字段。新增的字段会被分配一个新的、唯一的字段ID。
删除字段 (Delete) :可以从当前 schema 中移除一个字段。
重命名字段 (Rename) :可以更改一个已存在字段的名称,但其字段ID不会改变。
重排字段顺序 (Reorder) :可以调整已存在字段在 schema 中的顺序。
类型提升 (Type Promotion) :可以将原生类型(primitive type)提升为另一个兼容的类型。
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 ALTER TABLE ice_spark_hdfs.t_user ADD COLUMN age int ;ALTER TABLE ice_spark_hdfs.t_user ADD COLUMN sex int AFTER id;ALTER TABLE ice_spark_hdfs.t_user RENAME COLUMN sex TO sex1;ALTER TABLE ice_spark_hdfs.t_user ALTER COLUMN sex1 TYPE bigint ;ALTER TABLE ice_spark_hdfs.t_user ALTER COLUMN sex1 COMMENT 'table sex' ;ALTER TABLE ice_spark_hdfs.t_user DROP COLUMN sex1;
如果新增一个列, 那原来的数据没有这个列, 查询怎么办? 首先Iceberg会意识到原来的Schema没有这个字段, 对于这个没有的字段查询是不会落实的. 这里就要提到**“Column Projection” (列投影)这个机制.** 当查询数据时, Iceberg 使用的是表的**当前 schema(包含了新增字段), 但数据文件本身是用 旧的 schema(**不包含新增字段)写入的. 当 Iceberg 在读取旧数据文件前, 在查询schema时发现查询需要的一个 field-id
在文件中并不存在, 它会遵循一套明确的规则来解析这个值:
从分区数据中获取 :如果该字段存在一个 identity
(恒等)分区转换,并且分区值存在于清单文件(manifest)的 data_file
对象的 partition
结构中,那么就直接使用这个分区值。这主要用于从 Hive 表迁移等场景。
使用名称映射 (Name Mapping) :如果表配置了 schema.name-mapping.default
属性,Iceberg 会尝试使用这个映射从没有 field-id
的旧文件中按名称找到对应的列。
使用 initial-default
(初始默认值) :如果该字段的 schema 定义中包含一个 initial-default
值,那么 Iceberg 就会返回这个预设的默认值。
返回 null
:如果以上所有规则都不适用,那么 Iceberg 将为这个字段返回 null
Partitioning(分区)
核心是将数据按照特定规则(如相同日期、类别的记录)物理存储在一起 (文件夹 dt=20250213),减少查询时扫描的数据量,提升性能。是通过数据存储优化来提升查询性能的一种技术.
一个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 CREATE TABLE ice_spark.hdfs.ns.t_user1 ( id INT , name STRING, dt DATE ) USING icebergPARTITIONED BY (days(dt)); INSERT INTO ice_spark.hdfs.ns.t_user1 VALUES (1 , 'hlink' , 20250709 );t_user1/ ├── data └── dt= 20250709 │ └── 00000 -5 - ba90a31f- d65c-4e40 -9321 -3 faa8c184547-0 -00001. parquet ├── metadata │ ├── 44 a2bbda- e252-4 d77- b958- b3cf35708b14- m0.avro │ ├── snap-8289612053519823397 -1 -44 a2bbda- e252-4 d77- b958- b3cf35708b14.avro │ ├── v1.metadata.json │ └── v2.metadata.json └── version- hint.text
但不一定是这种布局, 这种的类似, 因为Iceberg的分区不一定要放在一个统一的目录下, Iceberg有隐藏分区设计, 通过 逻辑与物理分离 解决了传统分区(如 Hive 分区)的痛点,分区列维护成本高、分区策略无法灵活变更等问题。几个好处:
分区转换函数帮助自动进行分区
查询时不需指明分区
分区策略动态演化
Partition Evolution
修改分区, 和Schema Evolution类似, 只改变元数据**, 不改变底层的数据文件,** 不会修改旧的分区规范,而是会创建一个全新的分区规范 (Partition Spec)
新建一个包含多个分区的表:
1 2 3 4 5 6 7 CREATE TABLE ice_spark_hdfs.t_user_part_hidden ( id bigint , name String, ts timestamp , dt string ) USING iceberg PARTITIONED BY (days(ts), bucket(3 , id));
删除一个分区, 修改一个分区:
1 2 3 ALTER TABLE ice_spark_hdfs.t_user_part_hidden DROP PARTITION FIELD bucket(3 , id);ALTER TABLE ice_spark_hdfs.t_user_part_hidden REPLACE PARTITION FIELD days(ts) WITH months(ts) AS month ;
查询时旧数据按旧分区查, 新数据按新分区查:
遍历当前快照中的每一个清单文件(查Manifest List)
对于每一个清单文件,它会读取该文件元数据中记录的 partition-spec-id
从表元数据的 partition-specs
列表中找到与该 ID 对应的那个历史版本 的分区规范
使用这个与数据匹配的历史分区规范 ,来转换用户查询中的过滤条件,从而对该清单文件下的数据文件进行有效的分区裁剪
Time Travel
Iceberg 的 时间旅行 功能允许用户查询表在 特定时间点 或 特定版本 的历史数据快照,无需手动备份或迁移数据。
时间旅行的核心能力:
按时间戳查询 :指定具体时间(如 '2023-10-01 10:00:00'
),查询该时刻的数据状态
按快照ID查询 :通过唯一快照 ID(如 10963874102873
)定位数据版本。
使用示例:
1 2 SELECT * FROM ice_spark_hdfs.t_user_part_hidden VERSION AS OF 807038129550906544 ;SELECT * FROM ice_spark_hdfs.t_user_part_hidden TIMESTAMP AS OF '2025-02-15 00:39:01.33' ;
如何实现:
遍历snapshot-log, 查询在时间戳正好在指定的时间点之前或之时的最后一个快照ID或者指定的快照ID, 然后就是正常的查询流程
具体参数的设计可以看下面的文档:
spec
声明 : 本文给出的例子不全是真实数据, 只是为了方便理解🥰