PyIceberg

article/2025/8/16 3:37:09

在这里插入图片描述

本文翻译整理自:https://py.iceberg.apache.org/

文章目录

  • PyIceberg 入门指南
      • 相关链接资源
    • 安装
    • 连接到目录
    • 写入PyArrow数据框
      • 探索Iceberg数据与元数据文件
    • 更多详情
  • 配置
    • 配置值设置
    • 表配置
      • 写入选项
      • 表行为选项
    • FileIO
      • S3
      • HDFS
      • Azure Data Lake
      • Google Cloud Storage
      • 阿里云对象存储服务(OSS)
      • PyArrow
    • 位置提供器
      • 简单位置提供器
      • 对象存储位置提供器
        • 分区排除
      • 加载自定义位置提供器
    • 目录
      • REST Catalog
        • RESTCatalog中的请求头配置
      • SQL Catalog
      • 内存目录
      • Hive Catalog
      • Glue Catalog
      • DynamoDB Catalog
      • 自定义目录实现
    • 统一AWS凭证配置
    • 并发控制
    • 向后兼容性
    • 纳秒级支持
  • Python 命令行工具
  • Python API
    • 创建表
    • 加载表
      • 目录表
      • 静态表
    • 检查表是否存在
    • 写入支持
      • 部分覆盖
      • Upsert
    • 检查表结构
      • 时间旅行功能
      • 快照
      • 分区
      • 条目
      • 引用
      • 清单文件
      • 元数据日志条目
      • 历史
      • 文件
    • 添加文件
    • 模式演进
      • 按名称合并
      • 添加列
      • 重命名列
      • 移动列
      • 更新列
      • 删除列
    • 分区演进
      • 添加字段
      • 移除字段
      • 重命名字段
    • 表属性
    • 快照属性
    • 快照管理
    • 视图
      • 检查视图是否存在
    • 表统计信息管理
    • 查询数据
      • Apache Arrow
      • Pandas
        • 需求
      • DuckDB
      • Ray
      • Daft
      • Polars
        • 使用 Polars DataFrame 处理数据
        • 使用 Polars LazyFrame 工作


PyIceberg 入门指南

PyIceberg 是一个用于访问 Iceberg 表的 Python 实现,无需依赖 JVM 环境。


相关链接资源

  • Github:https://github.com/apache/iceberg-python
  • 官方文档:https://py.iceberg.apache.org/
  • Iceberg 社区:https://iceberg.apache.org/community/
  • 技术规范:https://iceberg.apache.org/spec/

安装

在安装 PyIceberg 之前,请确保您使用的是最新版本的 pip

pip install --upgrade pip

你可以从 PyPI 安装最新发布版本:

pip install "pyiceberg[s3fs,hive]"

您可以根据需求混合搭配可选依赖项:

键值描述
hive支持Hive元存储
hive-kerberos支持Kerberos环境下的Hive元存储
glue支持AWS Glue
dynamodb支持AWS DynamoDB
sql-postgres支持Postgresql作为后端的SQL目录
sql-sqlite支持SQLite作为后端的SQL目录
pyarrow使用PyArrow作为FileIO实现与对象存储交互
pandas同时安装PyArrow和Pandas
duckdb同时安装PyArrow和DuckDB
ray同时安装PyArrow、Pandas和Ray
daft安装Daft
polars安装Polars
s3fs使用S3FS作为FileIO实现与对象存储交互
adlfs使用ADLFS作为FileIO实现与对象存储交互
snappy支持snappy Avro压缩
gcsfs使用GCSFS作为FileIO实现与对象存储交互
rest-sigv4支持为REST目录生成AWS SIGv4认证头

您需要安装s3fsadlfsgcsfspyarrow才能从对象存储获取文件。


连接到目录

Iceberg利用目录作为组织表的集中管理位置。这可以是传统的Hive目录(用于将Iceberg表与其他表一起存储)、供应商解决方案(如AWS Glue目录),或是Iceberg自身REST协议的实现。请查看配置页面获取所有配置细节。

出于演示目的,我们将配置目录使用SqlCatalog实现,该实现会将信息存储在本地sqlite数据库中。同时我们会将目录配置为在本地文件系统而非对象存储中存储数据文件。由于可扩展性有限,此配置不应在生产环境中使用。

为Iceberg创建临时存储位置:

mkdir /tmp/warehouse

打开 Python 3 REPL 来设置目录:

from pyiceberg.catalog import load_catalogwarehouse_path = "/tmp/warehouse"
catalog = load_catalog("default",**{'type': 'sql',"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db","warehouse": f"file://{warehouse_path}",},
)

sql 目录适用于本地测试,无需额外服务。如需尝试其他目录,请参阅配置说明。


写入PyArrow数据框

我们以出租车数据集为例,将其写入Iceberg表。

首先下载一个月的数据:

curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o /tmp/yellow_tripdata_2023-01.parquet

将其加载到您的 PyArrow 数据框中:

import pyarrow.parquet as pqdf = pq.read_table("/tmp/yellow_tripdata_2023-01.parquet")

创建新的 Iceberg 表:

catalog.create_namespace("default")table = catalog.create_table("default.taxi_dataset",schema=df.schema,
)

将数据框追加到表中:

table.append(df)
len(table.scan().to_arrow())

已向表中写入 3,066,766 行数据。

现在生成用于模型训练的每英里小费特征:

import pyarrow.compute as pcdf = df.append_column("tip_per_mile", pc.divide(df["tip_amount"], df["trip_distance"]))

将表结构演进为包含新列:

with table.update_schema() as update_schema:update_schema.union_by_name(df.schema)

现在我们可以将新的数据帧写入 Iceberg 表:

table.overwrite(df)
print(table.scan().to_arrow())

新列已成功添加:

taxi_dataset(1: VendorID: optional long,2: tpep_pickup_datetime: optional timestamp,3: tpep_dropoff_datetime: optional timestamp,4: passenger_count: optional double,5: trip_distance: optional double,6: RatecodeID: optional double,7: store_and_fwd_flag: optional string,8: PULocationID: optional long,9: DOLocationID: optional long,10: payment_type: optional long,11: fare_amount: optional double,12: extra: optional double,13: mta_tax: optional double,14: tip_amount: optional double,15: tolls_amount: optional double,16: improvement_surcharge: optional double,17: total_amount: optional double,18: congestion_surcharge: optional double,19: airport_fee: optional double,20: tip_per_mile: optional double
),

我们可以看到,有2,371,784行数据具有每英里小费记录:

df = table.scan(row_filter="tip_per_mile > 0").to_arrow()
len(df)

探索Iceberg数据与元数据文件

由于目录配置为使用本地文件系统,我们可以查看Iceberg如何存储上述操作产生的数据和元数据文件。


find /tmp/warehouse/

更多详情

具体细节请查看 CLI 或 Python API 页面。


配置


配置值设置

有三种方式传入配置:

  • 使用 .pyiceberg.yaml 配置文件(推荐方式)
  • 通过环境变量传递
  • 通过CLI或Python API直接传入凭证

配置文件可以存储在以下任一位置(按优先级排序):
1、PYICEBERG_HOME 环境变量指定的目录
2、用户主目录
3、当前工作目录

如需修改 .pyiceberg.yaml 的搜索路径,可以覆盖 PYICEBERG_HOME 环境变量。

另一种方式是通过环境变量进行配置:

export PYICEBERG_CATALOG__DEFAULT__URI=thrift://localhost:9083
export PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID=username
export PYICEBERG_CATALOG__DEFAULT__S3__SECRET_ACCESS_KEY=password

Iceberg 读取的环境变量以 PYICEBERG_ 开头,后接以下 yaml 结构,其中双下划线 __ 表示嵌套字段,而下划线 _ 会被转换为连字符 -

例如,PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID 会在 default 目录下设置 s3.access-key-id


表配置

Iceberg 表支持通过表属性来配置表的行为。


写入选项

选项默认值描述
write.parquet.compression-codec{uncompressed,zstd,gzip,snappy}zstd设置Parquet压缩编解码器。
write.parquet.compression-level整数nullParquet编解码器的压缩级别。若未设置,则由PyIceberg决定
write.parquet.row-group-limit行数1048576单个行组内条目数量的上限
write.parquet.page-size-bytes字节大小1MB为列块内数据页的近似编码大小设置目标阈值
write.parquet.page-row-limit行数20000为列块内最大行数设置目标阈值
write.parquet.dict-size-bytes字节大小2MB设置每个行组的字典页大小限制
write.metadata.previous-versions-max整数100提交后删除前保留的旧版本元数据文件最大数量。
write.metadata.delete-after-commit.enabled布尔值False是否在每次表提交后自动删除旧的已跟踪元数据文件。将保留最近的部分元数据文件,数量可通过属性write.metadata.previous-versions-max设置。
write.object-storage.enabled布尔值False启用ObjectStoreLocationProvider,为文件路径添加哈希组件。
write.object-storage.partitioned-paths布尔值True当启用对象存储时,控制分区值是否包含在文件路径中
write.py-location-provider.impl格式为module.ClassName的字符串null可选项,自定义LocationProvider实现
write.data.path指向位置的字符串{metadata.location}/data设置数据写入的位置。
write.metadata.path指向位置的字符串{metadata.location}/metadata设置元数据写入的位置。

表行为选项

键名选项默认值描述
commit.manifest.target-size-bytes字节大小8388608 (8MB)合并清单文件时的目标大小
commit.manifest.min-count-to-merge清单数量100合并清单文件时的最小数量阈值
commit.manifest-merge.enabled布尔值False控制写入时是否自动合并清单文件

快速追加模式

与Java实现不同,PyIceberg默认采用快速追加模式,因此commit.manifest-merge.enabled默认设置为False


FileIO

Iceberg采用FileIO这一可插拔模块的概念来实现文件的读取、写入和删除操作。默认情况下,PyIceberg会根据文件路径协议(如s3://gs://等)自动尝试初始化匹配的FileIO实现,并优先使用已安装的第一个可用模块。支持以下协议及对应实现:

  • s3, s3a, s3n: PyArrowFileIO, FsspecFileIO
  • gs: PyArrowFileIO
  • file: PyArrowFileIO
  • hdfs: PyArrowFileIO
  • abfs, abfss: FsspecFileIO
  • oss: PyArrowFileIO

用户也可显式指定FileIO实现:

键名示例说明
py-io-implpyiceberg.io.fsspec.FsspecFileIO强制指定FileIO实现类,若无法加载则会明确报错

FileIO模块支持以下配置选项:

S3

键名示例描述
s3.endpointhttps://10.0.19.25/为FileIO配置访问S3服务的替代端点。可用于使S3FileIO兼容任何具有不同端点的S3兼容对象存储服务,或访问虚拟私有云中的私有S3端点。
s3.access-key-idadmin配置用于访问FileIO的静态访问密钥ID。
s3.secret-access-keypassword配置用于访问FileIO的静态秘密访问密钥。
s3.session-tokenAQoDYXdzEJr…配置用于访问FileIO的静态会话令牌。
s3.role-session-namesession为假定角色会话配置的可选标识符。
s3.role-arnarn:aws:…AWS角色ARN。如果提供此参数而非access_key和secret_key,将通过担任此角色获取临时凭证。
s3.signerbearer配置FileIO的签名版本。
s3.signer.urihttp://my.signer:8080/s3当远程签名URI与目录URI不同时进行配置。远程签名仅对FsspecFileIO实现。最终请求将发送至<s3.signer.uri>/<s3.signer.endpoint>
s3.signer.endpointv1/main/s3-sign配置远程签名端点。远程签名仅对FsspecFileIO实现。最终请求将发送至<s3.signer.uri>/<s3.signer.endpoint>(默认值:v1/aws/s3/sign)。
s3.regionus-west-2配置用于初始化S3FileSystem的默认区域。若未设置,PyArrowFileIO会尝试自动解析区域(仅支持AWS S3存储桶)。
s3.resolve-regionFalse仅支持PyArrowFileIO,启用时将始终尝试解析存储桶位置(仅支持AWS S3存储桶)。
s3.proxy-urihttp://my.proxy.com:8080配置FileIO使用的代理服务器。
s3.connect-timeout60.0配置套接字连接超时时间(单位:秒)。
s3.request-timeout60.0在Windows和macOS上配置套接字读取超时时间(单位:秒)。
s3.force-virtual-addressingFalse是否强制使用存储桶虚拟寻址。若为true,则始终启用虚拟寻址;若为false,则仅当endpoint_override为空时启用。适用于仅支持虚拟托管式访问的非AWS后端服务。

HDFS

键名示例描述
hdfs.hosthttps://10.0.19.25/配置要连接的HDFS主机地址
hdfs.port9000配置要连接的HDFS端口号
hdfs.useruser配置用于连接的HDFS用户名
hdfs.kerberos_ticketkerberos_ticket配置Kerberos票据缓存路径

Azure Data Lake

示例描述
adls.connection-stringAccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqF…;BlobEndpoint=http://localhost/连接字符串。可用于通过FileIO连接任何兼容ADLS的对象存储服务(如azurite),这些服务可能使用不同的端点。
adls.account-namedevstoreaccount1要连接的账户名称
adls.account-keyEby8vdM02xNOcqF…用于账户认证的密钥
adls.sas-tokenNuHOuuzdQN7VRM%2FOpOeqBlawRCA845IY05h9eu1Yte4%3D共享访问签名
adls.tenant-idad667be4-b811-11ed-afa1-0242ac120002租户ID
adls.client-idad667be4-b811-11ed-afa1-0242ac120002客户端ID
adls.client-secretoCA3R6P*ka#oa1Sms2J74z…客户端密钥

Google Cloud Storage

键名示例描述
gcs.project-idmy-gcp-project为GCS FileIO配置Google Cloud项目ID。
gcs.oauth2.tokenya29.dr.AfM…用于临时访问的令牌字符串。
gcs.oauth2.token-expires-at1690971805918配置基于访问令牌生成的凭据过期时间(自纪元起的毫秒数)。
gcs.accessread_only配置客户端访问权限,可选值为’read_only’、‘read_write’或’full_control’。
gcs.consistencymd5配置文件写入时的校验方式,可选值为’none’、‘size’或’md5’。
gcs.cache-timeout60配置对象元数据缓存的过期时间(秒)。
gcs.requester-paysFalse配置是否使用请求方付费模式。
gcs.session-kwargs{}配置传递给aiohttp.ClientSession的参数字典(可包含代理设置等)。
gcs.service.hosthttp://0.0.0.0:4443配置GCS FileIO的替代访问端点(格式:协议://主机:端口)。未设置时默认使用环境变量"STORAGE_EMULATOR_HOST"的值,若该变量也未设置则使用标准Google端点。
gcs.default-locationUS配置存储桶的默认创建位置(如’US’或’EUROPE-WEST3’)。
gcs.version-awareFalse配置是否支持GCS存储桶的对象版本控制功能。

阿里云对象存储服务(OSS)

PyIceberg使用S3FileSystem类连接OSS存储桶,因为该服务兼容S3 SDK,只要端点采用虚拟托管样式地址即可。

键名示例描述
s3.endpointhttps://s3.oss-your-bucket-region.aliyuncs.com/为FileIO配置OSS服务的访问端点。必须使用示例中给出的S3兼容端点。
s3.access-key-idadmin配置用于访问FileIO的静态访问密钥ID。
s3.secret-access-keypassword配置用于访问FileIO的静态密钥访问密码。
s3.session-tokenAQoDYXdzEJr…配置用于访问FileIO的静态会话令牌。
s3.force-virtual-addressingTrue是否使用存储桶的虚拟地址。必须设置为True,因为OSS只能通过虚拟托管样式地址访问。

PyArrow

键名示例描述
pyarrow.use-large-types-on-readTrue在表扫描时使用大型PyArrow类型,即large_string、large_binary和large_list字段类型。默认值为True。

位置提供器

Apache Iceberg 使用 LocationProvider 的概念来管理表数据文件的路径。在 PyIceberg 中,LocationProvider 模块设计为可插拔式,允许针对特定用例进行定制,并额外确定元数据文件的位置。表的 LocationProvider 可以通过表属性进行指定。

数据文件和元数据文件的位置均可通过配置表属性 write.data.pathwrite.metadata.path 分别进行自定义。

如需更细粒度的控制,可以重写 LocationProvidernew_data_locationnew_metadata_location 方法,以定义生成文件路径的自定义逻辑。详见 加载自定义位置提供器

PyIceberg 默认使用 SimpleLocationProvider 来管理文件路径。


简单位置提供器

SimpleLocationProvider 提供以 {location}/data/ 为前缀的路径,其中 location 来自表元数据。可以通过设置write.data.path 表配置来覆盖此默认行为。

例如,一个非分区表的数据文件可能具有如下路径:

s3://bucket/ns/table/data/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

当表被分区时,给定分区下的文件会被分组到一个子目录中,该分区键和值作为目录名称——这被称为Hive风格的分区路径格式。例如,按字符串列category分区的表可能有一个数据文件,其路径如下:

s3://bucket/ns/table/data/category=orders/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

对象存储位置提供器

PyIceberg 提供了 ObjectStoreLocationProvider 以及可选的分区排除优化功能,专为存储在对象存储中的表设计。如需了解这些配置的更多背景和动机,请参阅Iceberg Java实现的文档。

当多个文件存储在相同前缀下时,S3等云对象存储通常会对前缀进行请求限流,导致性能下降。ObjectStoreLocationProvider 通过在文件路径中注入二进制目录形式的确定性哈希值来应对这一问题,从而将文件分散到更多对象存储前缀中。

路径以 {location}/data/ 为前缀,其中 location 来自表元数据,其方式类似于 SimpleLocationProvider。这可以通过设置write.data.path表配置来覆盖。

例如,一个按字符串列 category 分区的表可能具有如下位置的数据文件:(注意额外的二进制目录)


s3://bucket/ns/table/data/0101/0110/1001/10110010/category=orders/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

为表启用 ObjectStoreLocationProvider 需要显式将其 write.object-storage.enabled 表属性设置为 True


分区排除

当使用 ObjectStoreLocationProvider 时,表属性 write.object-storage.partitioned-paths(默认为 True)可设置为 False,作为针对对象存储的额外优化。这将完全省略数据文件路径中的分区键和值,以进一步减小键大小。禁用该属性后,上述相同的数据文件将被写入以下路径:(注意缺少 category=orders


s3://bucket/ns/table/data/1101/0100/1011/00111010-00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

加载自定义位置提供器

与 FileIO 类似,可以通过具体继承抽象基类 LocationProvider 来为表提供自定义的 LocationProvider

需要将表属性 write.py-location-provider.impl 设置为自定义 LocationProvider 的完全限定名称(例如 mymodule.MyLocationProvider)。请注意,LocationProvider 是按表配置的,允许为不同表提供不同的位置分配方案。还需注意,Iceberg 的 Java 实现使用不同的表属性 write.location-provider.impl 来配置自定义 Java 实现。

下面展示了一个自定义 LocationProvider 的实现示例。


import uuidclass UUIDLocationProvider(LocationProvider):def __init__(self, table_location: str, table_properties: Properties):super().__init__(table_location, table_properties)def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:# Can use any custom method to generate a file path given the partitioning information and file nameprefix = f"{self.table_location}/{uuid.uuid4()}"return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}"

目录

PyIceberg 目前原生支持 REST、SQL、Hive、Glue 和 DynamoDB 类型的目录。此外,您也可以直接设置目录实现:

键名示例描述
typerest目录类型,可选值为 restsqlhivegluedymamodb,默认为 rest
py-catalog-implmypackage.mymodule.MyCatalog显式设置目录实现类,若无法加载则会明确报错

REST Catalog


catalog:default:uri: http://rest-catalog/ws/credential: t-1234:secretdefault-mtls-secured-catalog:uri: https://rest-catalog/ws/ssl:client:cert: /absolute/path/to/client.crtkey: /absolute/path/to/client.keycabundle: /absolute/path/to/cabundle.pem

示例描述
urihttps://rest-catalog/ws标识REST服务器的URI
ugit-1234:secretHive客户端使用的Hadoop UGI凭证
credentialt-1234:secret初始化目录时用于OAuth2凭证流的认证信息
tokenFEW23.DFSDF.FSDF用于Authorization头的Bearer令牌值
scopeopenid offline corpds:ds:profile请求安全令牌的期望作用域(默认为catalog)
resourcerest_catalog.iceberg.com目标资源或服务的URI
audiencerest_catalog目标资源或服务的逻辑名称
rest.sigv4-enabledtrue使用AWS SigV4协议对REST服务器请求进行签名
rest.signing-regionus-east-1使用SigV4签名请求时指定的区域
rest.signing-nameexecute-api使用SigV4签名请求时指定的服务签名名称
oauth2-server-urihttps://auth-service/cc客户端凭证认证使用的认证URL(默认为uri + ‘v1/oauth/tokens’)

RESTCatalog中的请求头配置

要在RESTCatalog中配置自定义请求头,请在目录属性中添加以header.为前缀的配置项。这样可以确保所有发送至REST服务的HTTP请求都包含指定的请求头。


catalog:default:uri: http://rest-catalog/ws/credential: t-1234:secretheader.content-type: application/vnd.api+json

RESTCatalog规范定义的特定请求头包括:

键名选项默认值描述
header.X-Iceberg-Access-Delegation{vended-credentials,remote-signing}vended-credentials向服务器表明客户端支持通过逗号分隔的访问机制列表进行委托访问。服务器可以选择通过任意或完全不通过请求的机制来提供访问权限

SQL Catalog

SQL Catalog需要一个数据库作为其后端。PyIceberg通过psycopg2支持PostgreSQL和SQLite。必须使用uri属性配置数据库连接。init_catalog_tables是可选的,默认值为True。如果设置为False,初始化SQLCatalog时将不会创建目录表。详情请参阅SQLAlchemy的URL格式文档:

对于PostgreSQL:

catalog:default:type: sqluri: postgresql+psycopg2://username:password@localhost/mydatabaseinit_catalog_tables: false

对于SQLite的情况:

仅限开发用途

SQLite并非为并发场景设计,建议仅将此目录用于探索性或开发目的。


catalog:default:type: sqluri: sqlite:tmp/pyiceberg.dbinit_catalog_tables: false

键名示例默认值描述
uripostgresql+psycopg2://username:password@localhost/mydatabase目录数据库的SQLAlchemy后端URL(参见URL格式文档)
echotruefalseSQLAlchemy引擎的echo参数,用于将所有语句记录到默认日志处理器
pool_pre_pingtruefalseSQLAlchemy引擎的pool_pre_ping参数,用于在每次检出时测试连接活性

内存目录

内存目录基于SqlCatalog构建,并使用SQLite内存数据库作为其后端。

它适用于测试、演示和实验环境,但不适合生产环境,因为不支持并发访问。


catalog:default:type: in-memorywarehouse: /tmp/pyiceberg/warehouse

键名示例默认值描述
warehouse/tmp/pyiceberg/warehousefile:///tmp/iceberg/warehouse内存目录将存储其数据文件的目录路径。

Hive Catalog


catalog:default:uri: thrift://localhost:9083s3.endpoint: http://localhost:9000s3.access-key-id: admins3.secret-access-key: password

键名示例描述
hive.hive2-compatibletrue启用 Hive 2.x 兼容模式
hive.kerberos-authenticationtrue通过 Kerberos 进行认证

使用 Hive 2.x 时,请确保设置兼容性标志:

catalog:default:
...hive.hive2-compatible: true

Glue Catalog

您的AWS凭证可以直接通过Python API传递。否则,请参考如何配置AWS凭证在本地设置您的AWS账户凭证。


catalog:default:type: glueglue.access-key-id: <ACCESS_KEY_ID>glue.secret-access-key: <SECRET_ACCESS_KEY>glue.session-token: <SESSION_TOKEN>glue.region: <REGION_NAME>s3.endpoint: http://localhost:9000s3.access-key-id: admins3.secret-access-key: password
catalog:default:type: glueglue.profile-name: <PROFILE_NAME>glue.region: <REGION_NAME>s3.endpoint: http://localhost:9000s3.access-key-id: admins3.secret-access-key: password

客户端专属属性

glue.* 属性仅适用于 Glue Catalog。如需在 Glue Catalog 和 S3 FileIO 中使用相同的凭证,可设置 client.* 属性。详见统一AWS凭证章节。

键名示例值描述
glue.id111111111111配置Glue Catalog的12位数字ID
glue.skip-archivetrue配置是否跳过旧表版本的归档。默认为true
glue.endpointhttps://glue.us-east-1.amazonaws.com配置GlueCatalog访问Glue服务的备用端点
glue.profile-namedefault配置访问Glue Catalog使用的静态profile
glue.regionus-east-1设置Glue Catalog的区域
glue.access-key-idadmin配置访问Glue Catalog使用的静态访问密钥ID
glue.secret-access-keypassword配置访问Glue Catalog使用的静态密钥
glue.session-tokenAQoDYXdzEJr…配置访问Glue Catalog使用的静态会话令牌
glue.max-retries10配置Glue服务调用的最大重试次数
glue.retry-modestandard配置Glue服务的重试模式。默认为standard

已移除属性

profile_nameregion_nameaws_access_key_idaws_secret_access_keyaws_session_token 属性已在0.8.0版本弃用并移除


DynamoDB Catalog

若需使用AWS DynamoDB作为元数据目录,可通过以下两种方式配置pyiceberg,并参考如何配置AWS凭证在本地设置AWS账户凭证。如需为DynamoDB Catalog和S3 FileIO使用相同凭证,可设置client.*属性。


catalog:default:type: dynamodbtable-name: iceberg

如果您更倾向于显式传递凭据给客户端,而不是依赖环境变量,


catalog:default:type: dynamodbtable-name: icebergdynamodb.access-key-id: <ACCESS_KEY_ID>dynamodb.secret-access-key: <SECRET_ACCESS_KEY>dynamodb.session-token: <SESSION_TOKEN>dynamodb.region: <REGION_NAME>s3.endpoint: http://localhost:9000s3.access-key-id: admins3.secret-access-key: password

客户端专属属性

dynamodb.* 属性仅适用于 DynamoDB Catalog。如需在 DynamoDB Catalog 和 S3 FileIO 中使用相同凭证,可设置 client.* 属性。详见统一AWS凭证章节。

键名示例描述
dynamodb.profile-namedefault配置用于访问 DynamoDB Catalog 的静态凭证配置文件
dynamodb.regionus-east-1设置 DynamoDB Catalog 所在区域
dynamodb.access-key-idadmin配置用于访问 DynamoDB Catalog 的静态访问密钥ID
dynamodb.secret-access-keypassword配置用于访问 DynamoDB Catalog 的静态密钥访问密码
dynamodb.session-tokenAQoDYXdzEJr…配置用于访问 DynamoDB Catalog 的静态会话令牌

已移除属性

profile_nameregion_nameaws_access_key_idaws_secret_access_keyaws_session_token 属性已在 0.8.0 版本弃用并移除


自定义目录实现

如需加载任何自定义目录实现,可按如下方式设置目录配置:

catalog:default:py-catalog-impl: mypackage.mymodule.MyCatalogcustom-key1: value1custom-key2: value2

统一AWS凭证配置

您可以通过配置client.*属性来显式设置Glue/DynamoDB Catalog和S3 FileIO的AWS凭证。例如:

catalog:default:type: glueclient.access-key-id: <ACCESS_KEY_ID>client.secret-access-key: <SECRET_ACCESS_KEY>client.region: <REGION_NAME>

为Glue Catalog和S3 FileIO配置AWS凭证。

键名示例描述
client.regionus-east-1设置Glue/DynamoDB Catalog和S3 FileIO的区域
client.access-key-idadmin配置用于访问Glue/DynamoDB Catalog和S3 FileIO的静态访问密钥ID
client.secret-access-keypassword配置用于访问Glue/DynamoDB Catalog和S3 FileIO的静态秘密访问密钥
client.session-tokenAQoDYXdzEJr…配置用于访问Glue/DynamoDB Catalog和S3 FileIO的静态会话令牌
client.role-session-namesession可选参数,用于标识所承担角色的会话名称
client.role-arnarn:aws:…AWS角色ARN。如果提供此参数而非access_key和secret_key,将通过承担此角色来获取临时凭证

属性优先级

若设置了服务特定属性,client.*属性将被覆盖。例如,若client.region设为us-west-1s3.region设为us-east-1,则S3 FileIO将使用us-east-1作为区域。


并发控制

PyIceberg 采用多线程机制来并行化操作。您可以通过以下两种方式配置工作线程数量:
1、在配置文件中设置 max-workers 参数
2、通过环境变量 PYICEBERG_MAX_WORKERS 指定

默认线程数会根据系统硬件和 Python 版本自动确定,具体细节可参考 Python 官方文档。


向后兼容性

旧版Java实现(<1.4.0)错误地将current-snapshot-id这个可选属性假定为TableMetadata中的必填属性。这意味着如果元数据文件中缺少current-snapshot-id(例如建表时),应用程序会因无法加载表而抛出异常。该问题在较新的Iceberg版本中已修正。但用户仍可通过配置强制PyIceberg生成兼容旧版规范的元数据文件,只需在配置文件中将legacy-current-snapshot-id属性设为"True",或设置环境变量PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID。更多技术细节请参阅PR讨论。


纳秒级支持

PyIceberg 当前在 TimestampType 中仅支持微秒级精度。PyArrow 时间戳类型若为 ‘s’ 或 ‘ms’ 精度,在写入时会自动向上转换为 ‘us’ 精度时间戳。如果需要,‘ns’ 精度的时间戳在写入时也可自动向下转换。可通过在配置文件中设置 downcast-ns-timestamp-to-us-on-write 属性为 “True”,或设置环境变量 PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE 来启用此功能。关于纳秒级支持的长期规划详情,请参阅纳秒时间戳提案文档


Python 命令行工具

PyIceberg 提供了一个 CLI 工具,安装 pyiceberg 包后即可使用。

虽然可以通过 --uri--credential 参数传递 Catalog 路径,但建议按照 Catalog 章节所述配置 ~/.pyiceberg.yaml 文件。


➜  pyiceberg --help
Usage: pyiceberg [OPTIONS] COMMAND [ARGS]...Options:
--catalog TEXT
--verbose BOOLEAN
--output [text|json]
--ugi TEXT
--uri TEXT
--credential TEXT
--help                Show this message and exit.Commands:
describe    Describes a namespace xor table
drop        Operations to drop a namespace or table
list        Lists tables or namespaces
location    Returns the location of the table
properties  Properties on tables/namespaces
rename      Renames a table
schema      Gets the schema of the table
spec        Returns the partition spec of the table
uuid        Returns the UUID of the table

这个示例假设您已设置了默认目录。如果想加载其他目录(例如上文中的 rest 示例),则需要设置 --catalog rest


➜  pyiceberg list
default
nyc
➜  pyiceberg list nyc
nyc.taxis
➜  pyiceberg describe nyc.taxis
Table format version  1
Metadata location     file:/.../nyc.db/taxis/metadata/00000-aa3a3eac-ea08-4255-b890-383a64a94e42.metadata.json
Table UUID            6cdfda33-bfa3-48a7-a09e-7abb462e3460
Last Updated          1661783158061
Partition spec        []
Sort order            []
Current schema        Schema, id=0
├── 1: VendorID: optional long
├── 2: tpep_pickup_datetime: optional timestamptz
├── 3: tpep_dropoff_datetime: optional timestamptz
├── 4: passenger_count: optional double
├── 5: trip_distance: optional double
├── 6: RatecodeID: optional double
├── 7: store_and_fwd_flag: optional string
├── 8: PULocationID: optional long
├── 9: DOLocationID: optional long
├── 10: payment_type: optional long
├── 11: fare_amount: optional double
├── 12: extra: optional double
├── 13: mta_tax: optional double
├── 14: tip_amount: optional double
├── 15: tolls_amount: optional double
├── 16: improvement_surcharge: optional double
├── 17: total_amount: optional double
├── 18: congestion_surcharge: optional double
└── 19: airport_fee: optional double
Current snapshot      Operation.APPEND: id=5937117119577207079, schema_id=0
Snapshots             Snapshots
└── Snapshot 5937117119577207079, schema 0: file:/.../nyc.db/taxis/metadata/snap-5937117119577207079-1-94656c4f-4c66-4600-a4ca-f30377300527.avro
Properties            owner                 root
write.format.default  parquet

或者输出为JSON格式以便自动化处理:

➜  pyiceberg --output json describe nyc.taxis | jq
{"identifier": ["nyc","taxis"],"metadata_location": "file:/.../nyc.db/taxis/metadata/00000-aa3a3eac-ea08-4255-b890-383a64a94e42.metadata.json","metadata": {"location": "file:/.../nyc.db/taxis","table-uuid": "6cdfda33-bfa3-48a7-a09e-7abb462e3460","last-updated-ms": 1661783158061,"last-column-id": 19,"schemas": [{"type": "struct","fields": [{"id": 1,"name": "VendorID","type": "long","required": false},
...{"id": 19,"name": "airport_fee","type": "double","required": false}],"schema-id": 0,"identifier-field-ids": []}],"current-schema-id": 0,"partition-specs": [{"spec-id": 0,"fields": []}],"default-spec-id": 0,"last-partition-id": 999,"properties": {"owner": "root","write.format.default": "parquet"},"current-snapshot-id": 5937117119577207000,"snapshots": [{"snapshot-id": 5937117119577207000,"timestamp-ms": 1661783158061,"manifest-list": "file:/.../nyc.db/taxis/metadata/snap-5937117119577207079-1-94656c4f-4c66-4600-a4ca-f30377300527.avro","summary": {"operation": "append","spark.app.id": "local-1661783139151","added-data-files": "1","added-records": "2979431","added-files-size": "46600777","changed-partition-count": "1","total-records": "2979431","total-files-size": "46600777","total-data-files": "1","total-delete-files": "0","total-position-deletes": "0","total-equality-deletes": "0"},"schema-id": 0}],"snapshot-log": [{"snapshot-id": "5937117119577207079","timestamp-ms": 1661783158061}],"metadata-log": [],"sort-orders": [{"order-id": 0,"fields": []}],"default-sort-order-id": 0,"refs": {"main": {"snapshot-id": 5937117119577207000,"type": "branch"}},"format-version": 1,"schema": {"type": "struct","fields": [{"id": 1,"name": "VendorID","type": "long","required": false},
...{"id": 19,"name": "airport_fee","type": "double","required": false}],"schema-id": 0,"identifier-field-ids": []},"partition-spec": []}
}

Python API

PyIceberg 围绕目录(catalog)机制来加载表。第一步是实例化一个用于加载表的目录。我们使用以下配置来定义一个名为 prod 的目录:

catalog:prod:uri: http://rest-catalog/ws/credential: t-1234:secret

请注意,可以在同一个 .pyiceberg.yaml 文件中定义多个目录。


catalog:hive:uri: thrift://127.0.0.1:9083s3.endpoint: http://127.0.0.1:9000s3.access-key-id: admins3.secret-access-key: passwordrest:uri: https://rest-server:8181/warehouse: my-warehouse

通过调用 load_catalog(name="hive")load_catalog(name="rest") 在 Python 中加载。

这些配置信息必须放置在名为 .pyiceberg.yaml 的文件中,该文件可以位于以下目录之一:

  • $HOME%USERPROFILE% 目录(根据操作系统是 Unix 类或 Windows 类而定)
  • 当前工作目录
  • $PYICEBERG_HOME 目录(如果设置了对应的环境变量)

有关配置选项的更多细节,请参阅专用页面。

然后加载名为 prod 的 catalog:

from pyiceberg.catalog import load_catalogcatalog = load_catalog("docs",**{"uri": "http://127.0.0.1:8181","s3.endpoint": "http://127.0.0.1:9000","py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO","s3.access-key-id": "admin","s3.secret-access-key": "password",}
)

让我们创建一个命名空间:

catalog.create_namespace("docs_example")

然后列出它们:

ns = catalog.list_namespaces()assert ns == [("docs_example",)]

然后列出命名空间中的表:

catalog.list_tables("docs_example")

创建表

要通过目录创建表:

from pyiceberg.schema import Schema
from pyiceberg.types import (TimestampType,FloatType,DoubleType,StringType,NestedField,StructType,
)schema = Schema(NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),NestedField(field_id=5,name="details",field_type=StructType(NestedField(field_id=4, name="created_by", field_type=StringType(), required=False),),required=False,),
)from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransformpartition_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day")
)from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform# Sort on the symbol
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))catalog.create_table(identifier="docs_example.bids",schema=schema,location="s3://pyiceberg",partition_spec=partition_spec,sort_order=sort_order,
)

创建表时,模式中的所有ID都会被重新分配以确保唯一性。

要通过pyarrow模式创建表:

import pyarrow as paschema = pa.schema([pa.field("foo", pa.string(), nullable=True),pa.field("bar", pa.int32(), nullable=False),pa.field("baz", pa.bool_(), nullable=True),]
)catalog.create_table(identifier="docs_example.bids",schema=schema,
)

要以事务方式原子化地创建表并进行后续修改:

with catalog.create_table_transaction(identifier="docs_example.bids",schema=schema,location="s3://pyiceberg",partition_spec=partition_spec,sort_order=sort_order,
) as txn:with txn.update_schema() as update_schema:update_schema.add_column(path="new_column", field_type=StringType())with txn.update_spec() as update_spec:update_spec.add_identity("symbol")txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")

加载表


目录表

加载 bids 表:

table = catalog.load_table("docs_example.bids")
# Equivalent to:
table = catalog.load_table(("docs_example", "bids"))
# The tuple syntax can be used if the namespace or table contains a dot.

这将返回一个表示 Iceberg 表的 Table 对象,该表可被查询和修改。


静态表

要直接从元数据文件加载表(即使用目录),可以按如下方式使用 StaticTable

from pyiceberg.table import StaticTablestatic_table = StaticTable.from_metadata("s3://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)

静态表被视为只读。


检查表是否存在

要检查 bids 表是否存在:

catalog.table_exists("docs_example.bids")

如果表已存在,则返回 True


写入支持

从 PyIceberg 0.6.0 版本开始,通过 Arrow 实现了写入支持功能。让我们来看一个 Arrow 表的示例:

import pyarrow as padf = pa.Table.from_pylist([{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},{"city": "Drachten", "lat": 53.11254, "long": 6.0989},{"city": "Paris", "lat": 48.864716, "long": 2.349014},],
)

接下来,根据该模式创建表:

from pyiceberg.catalog import load_catalogcatalog = load_catalog("default")from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleTypeschema = Schema(NestedField(1, "city", StringType(), required=False),NestedField(2, "lat", DoubleType(), required=False),NestedField(3, "long", DoubleType(), required=False),
)tbl = catalog.create_table("default.cities", schema=schema)

现在将数据写入表:

快速追加

PyIceberg 默认采用快速追加方式以最小化写入数据量。这种方式能实现快速写入,降低冲突可能性。快速追加的缺点是会比普通提交产生更多元数据。计划实现压缩功能,当达到阈值时将自动重写所有元数据,以保持读取性能。


tbl.append(df)# ortbl.overwrite(df)

数据被写入表中,当使用 tbl.scan().to_arrow() 读取表时:

pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"]]
lat: [[52.371807,37.773972,53.11254,48.864716]]
long: [[4.896029,-122.431297,6.0989,2.349014]]

由于目前还没有数据,你们都可以使用 append(df)overwrite(df)。如果想添加更多数据,可以再次使用 .append()

df = pa.Table.from_pylist([{"city": "Groningen", "lat": 53.21917, "long": 6.56667}],
)tbl.append(df)

读取表 tbl.scan().to_arrow() 时,可以看到 Groningen 现在也包含在表中:

pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]]
long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]]

嵌套列表表示不同的 Arrow 缓冲区,其中首次写入会生成一个缓冲区,而第二次追加操作会写入另一个独立缓冲区。这是预期行为,因为系统需要读取两个 Parquet 文件。

为避免写入过程中出现类型错误,您可以使用 Iceberg 表模式来强制指定 PyArrow 表的类型:

from pyiceberg.catalog import load_catalog
import pyarrow as pacatalog = load_catalog("default")
table = catalog.load_table("default.cities")
schema = table.schema().as_arrow()df = pa.Table.from_pylist([{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=schema
)table.append(df)

您可以通过调用 tbl.delete() 并指定所需的 delete_filter 来删除表中的部分数据。


tbl.delete(delete_filter="city == 'Paris'")

在上面的例子中,所有城市字段值等于Paris的记录都将被删除。运行tbl.scan().to_arrow()现在会返回:

pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254],[53.21917]]
long: [[4.896029,-122.431297,6.0989],[6.56667]]

部分覆盖

使用 overwrite API 时,可以通过 overwrite_filter 先删除表中符合过滤条件的数据,然后再追加新数据到表中。

例如,对于以下方式创建的 Iceberg 表:

from pyiceberg.catalog import load_catalogcatalog = load_catalog("default")from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleTypeschema = Schema(NestedField(1, "city", StringType(), required=False),NestedField(2, "lat", DoubleType(), required=False),NestedField(3, "long", DoubleType(), required=False),
)tbl = catalog.create_table("default.cities", schema=schema)

随着初始数据填充到表中:

import pyarrow as pa
df = pa.Table.from_pylist([{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},{"city": "Drachten", "lat": 53.11254, "long": 6.0989},{"city": "Paris", "lat": 48.864716, "long": 2.349014},],
)
tbl.append(df)

你可以用 New York 的记录覆盖 Paris 的记录:

from pyiceberg.expressions import EqualTo
df = pa.Table.from_pylist([{"city": "New York", "lat": 40.7128, "long": 74.0060},]
)
tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris"))

这将通过 tbl.scan().to_arrow() 产生如下结果:

pyarrow.Table
city: large_string
lat: double
long: double
----
city: [["New York"],["Amsterdam","San Francisco","Drachten"]]
lat: [[40.7128],[52.371807,37.773972,53.11254]]
long: [[74.006],[4.896029,-122.431297,6.0989]]

如果 PyIceberg 表已分区,您可以使用 tbl.dynamic_partition_overwrite(df) 来替换现有分区为数据框中提供的新分区。系统会自动从提供的 Arrow 表中检测需要替换的分区。例如,对于一个在 "city" 字段上指定了分区的 Iceberg 表:

from pyiceberg.schema import Schema
from pyiceberg.types import DoubleType, NestedField, StringTypeschema = Schema(NestedField(1, "city", StringType(), required=False),NestedField(2, "lat", DoubleType(), required=False),NestedField(3, "long", DoubleType(), required=False),
)tbl = catalog.create_table("default.cities",schema=schema,partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="city_identity"))
)

我们想要覆盖 "Paris" 分区的数据:

import pyarrow as padf = pa.Table.from_pylist([{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},{"city": "Drachten", "lat": 53.11254, "long": 6.0989},{"city": "Paris", "lat": -48.864716, "long": -2.349014},],
)
tbl.append(df)

然后我们可以用这个 arrow 表调用 dynamic_partition_overwrite

df_corrected = pa.Table.from_pylist([{"city": "Paris", "lat": 48.864716, "long": 2.349014}
])
tbl.dynamic_partition_overwrite(df_corrected)

这将通过 tbl.scan().to_arrow() 产生以下结果:

pyarrow.Table
city: large_string
lat: double
long: double
----
city: [["Paris"],["Amsterdam"],["Drachten"],["San Francisco"]]
lat: [[48.864716],[52.371807],[53.11254],[37.773972]]
long: [[2.349014],[4.896029],[6.0989],[-122.431297]]

Upsert

PyIceberg 支持 upsert 操作,这意味着它能够将 Arrow 表合并到 Iceberg 表中。系统会根据标识字段来判断行是否相同。如果表中已存在某行数据,则会更新该行;如果找不到匹配的行,则会插入新行。

假设有以下包含数据的表:

from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringTypeimport pyarrow as paschema = Schema(NestedField(1, "city", StringType(), required=True),NestedField(2, "inhabitants", IntegerType(), required=True),# Mark City as the identifier field, also known as the primary-keyidentifier_field_ids=[1]
)tbl = catalog.create_table("default.cities", schema=schema)arrow_schema = pa.schema([pa.field("city", pa.string(), nullable=False),pa.field("inhabitants", pa.int32(), nullable=False),]
)# Write some data
df = pa.Table.from_pylist([{"city": "Amsterdam", "inhabitants": 921402},{"city": "San Francisco", "inhabitants": 808988},{"city": "Drachten", "inhabitants": 45019},{"city": "Paris", "inhabitants": 2103000},],schema=arrow_schema
)
tbl.append(df)

接下来,我们将向 Iceberg 表执行 upsert 操作:

df = pa.Table.from_pylist([# Will be updated, the inhabitants has been updated{"city": "Drachten", "inhabitants": 45505},# New row, will be inserted{"city": "Berlin", "inhabitants": 3432000},# Ignored, already exists in the table{"city": "Paris", "inhabitants": 2103000},],schema=arrow_schema
)
upd = tbl.upsert(df)assert upd.rows_updated == 1
assert upd.rows_inserted == 1

PyIceberg 会自动检测哪些行需要更新、插入或可以直接忽略。


检查表结构

可以通过检查表来查看表的元数据信息。


时间旅行功能

要使用时间旅行功能检查表的元数据,请调用inspect table方法并传入snapshot_id参数。该功能支持除snapshotsrefs之外的所有元数据表。


table.inspect.entries(snapshot_id=805611270568163028)

快照

检查表的快照:

table.inspect.snapshots()
pyarrow.Table
committed_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
operation: string
manifest_list: string not null
summary: map<string, string>child 0, entries: struct<key: string not null, value: string> not nullchild 0, key: string not nullchild 1, value: string
----
committed_at: [[2024-03-15 15:01:25.682,2024-03-15 15:01:25.730,2024-03-15 15:01:25.772]]
snapshot_id: [[805611270568163028,3679426539959220963,5588071473139865870]]
parent_id: [[null,805611270568163028,3679426539959220963]]
operation: [["append","overwrite","append"]]
manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-805611270568163028-0-43637daf-ea4b-4ceb-b096-a60c25481eb5.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-3679426539959220963-0-8be81019-adf1-4bb6-a127-e15217bd50b3.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-5588071473139865870-0-1382dd7e-5fbc-4c51-9776-a832d7d0984e.avro"]]
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]

分区

查看表的分区情况:

table.inspect.partitions()
pyarrow.Table
partition: struct<dt_month: int32, dt_day: date32[day]> not nullchild 0, dt_month: int32child 1, dt_day: date32[day]
spec_id: int32 not null
record_count: int64 not null
file_count: int32 not null
total_data_file_size_in_bytes: int64 not null
position_delete_record_count: int64 not null
position_delete_file_count: int32 not null
equality_delete_record_count: int64 not null
equality_delete_file_count: int32 not null
last_updated_at: timestamp[ms]
last_updated_snapshot_id: int64
----
partition: [-- is_valid: all not null-- child 0 type: int32
[null,null,612]-- child 1 type: date32[day]
[null,2021-02-01,null]]
spec_id: [[2,1,0]]
record_count: [[1,1,2]]
file_count: [[1,1,2]]
total_data_file_size_in_bytes: [[641,641,1260]]
position_delete_record_count: [[0,0,0]]
position_delete_file_count: [[0,0,0]]
equality_delete_record_count: [[0,0,0]]
equality_delete_file_count: [[0,0,0]]
last_updated_at: [[2024-04-13 18:59:35.981,2024-04-13 18:59:35.465,2024-04-13 18:59:35.003]]

条目

用于显示表中当前数据文件和删除文件的所有清单条目。


table.inspect.entries()
pyarrow.Table
status: int8 not null
snapshot_id: int64 not null
sequence_number: int64 not null
file_sequence_number: int64 not null
data_file: struct<content: int8 not null, file_path: string not null, file_format: string not null, partition: struct<> not null, record_count: int64 not null, file_size_in_bytes: int64 not null, column_sizes: map<int32, int64>, value_counts: map<int32, int64>, null_value_counts: map<int32, int64>, nan_value_counts: map<int32, int64>, lower_bounds: map<int32, binary>, upper_bounds: map<int32, binary>, key_metadata: binary, split_offsets: list<item: int64>, equality_ids: list<item: int32>, sort_order_id: int32> not nullchild 0, content: int8 not nullchild 1, file_path: string not nullchild 2, file_format: string not nullchild 3, partition: struct<> not nullchild 4, record_count: int64 not nullchild 5, file_size_in_bytes: int64 not nullchild 6, column_sizes: map<int32, int64>child 0, entries: struct<key: int32 not null, value: int64> not nullchild 0, key: int32 not nullchild 1, value: int64child 7, value_counts: map<int32, int64>child 0, entries: struct<key: int32 not null, value: int64> not nullchild 0, key: int32 not nullchild 1, value: int64child 8, null_value_counts: map<int32, int64>child 0, entries: struct<key: int32 not null, value: int64> not nullchild 0, key: int32 not nullchild 1, value: int64child 9, nan_value_counts: map<int32, int64>child 0, entries: struct<key: int32 not null, value: int64> not nullchild 0, key: int32 not nullchild 1, value: int64child 10, lower_bounds: map<int32, binary>child 0, entries: struct<key: int32 not null, value: binary> not nullchild 0, key: int32 not nullchild 1, value: binarychild 11, upper_bounds: map<int32, binary>child 0, entries: struct<key: int32 not null, value: binary> not nullchild 0, key: int32 not nullchild 1, value: binarychild 12, key_metadata: binarychild 13, split_offsets: list<item: int64>child 0, item: int64child 14, equality_ids: list<item: int32>child 0, item: int32child 15, sort_order_id: int32
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>child 0, city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not nullchild 0, column_size: int64child 1, value_count: int64child 2, null_value_count: int64child 3, nan_value_count: int64child 4, lower_bound: stringchild 5, upper_bound: stringchild 1, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not nullchild 0, column_size: int64child 1, value_count: int64child 2, null_value_count: int64child 3, nan_value_count: int64child 4, lower_bound: doublechild 5, upper_bound: doublechild 2, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not nullchild 0, column_size: int64child 1, value_count: int64child 2, null_value_count: int64child 3, nan_value_count: int64child 4, lower_bound: doublechild 5, upper_bound: double
----
status: [[1]]
snapshot_id: [[6245626162224016531]]
sequence_number: [[1]]
file_sequence_number: [[1]]
data_file: [-- is_valid: all not null-- child 0 type: int8
[0]-- child 1 type: string
["s3://warehouse/default/cities/data/00000-0-80766b66-e558-4150-a5cf-85e4c609b9fe.parquet"]-- child 2 type: string
["PARQUET"]-- child 3 type: struct<>-- is_valid: all not null-- child 4 type: int64
[4]-- child 5 type: int64
[1656]-- child 6 type: map<int32, int64>
[keys:[1,2,3]values:[140,135,135]]-- child 7 type: map<int32, int64>
[keys:[1,2,3]values:[4,4,4]]-- child 8 type: map<int32, int64>
[keys:[1,2,3]values:[0,0,0]]-- child 9 type: map<int32, int64>
[keys:[]values:[]]-- child 10 type: map<int32, binary>
[keys:[1,2,3]values:[416D7374657264616D,8602B68311E34240,3A77BB5E9A9B5EC0]]-- child 11 type: map<int32, binary>
[keys:[1,2,3]values:[53616E204672616E636973636F,F5BEF1B5678E4A40,304CA60A46651840]]-- child 12 type: binary
[null]-- child 13 type: list<item: int64>
[[4]]-- child 14 type: list<item: int32>
[null]-- child 15 type: int32
[null]]
readable_metrics: [-- is_valid: all not null-- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string>-- is_valid: all not null-- child 0 type: int64
[140]-- child 1 type: int64
[4]-- child 2 type: int64
[0]-- child 3 type: int64
[null]-- child 4 type: string
["Amsterdam"]-- child 5 type: string
["San Francisco"]-- child 1 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>-- is_valid: all not null-- child 0 type: int64
[135]-- child 1 type: int64
[4]-- child 2 type: int64
[0]-- child 3 type: int64
[null]-- child 4 type: double
[37.773972]-- child 5 type: double
[53.11254]-- child 2 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>-- is_valid: all not null-- child 0 type: int64
[135]-- child 1 type: int64
[4]-- child 2 type: int64
[0]-- child 3 type: int64
[null]-- child 4 type: double
[-122.431297]-- child 5 type: double
[6.0989]]

引用

用于显示数据表已知的快照引用:

table.inspect.refs()
pyarrow.Table
name: string not null
type: string not null
snapshot_id: int64 not null
max_reference_age_in_ms: int64
min_snapshots_to_keep: int32
max_snapshot_age_in_ms: int64
----
name: [["main","testTag"]]
type: [["BRANCH","TAG"]]
snapshot_id: [[2278002651076891950,2278002651076891950]]
max_reference_age_in_ms: [[null,604800000]]
min_snapshots_to_keep: [[null,10]]
max_snapshot_age_in_ms: [[null,604800000]]

清单文件

要显示表的当前文件清单:

table.inspect.manifests()
pyarrow.Table
content: int8 not null
path: string not null
length: int64 not null
partition_spec_id: int32 not null
added_snapshot_id: int64 not null
added_data_files_count: int32 not null
existing_data_files_count: int32 not null
deleted_data_files_count: int32 not null
added_delete_files_count: int32 not null
existing_delete_files_count: int32 not null
deleted_delete_files_count: int32 not null
partition_summaries: list<item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>> not nullchild 0, item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>child 0, contains_null: bool not nullchild 1, contains_nan: boolchild 2, lower_bound: stringchild 3, upper_bound: string
----
content: [[0]]
path: [["s3://warehouse/default/table_metadata_manifests/metadata/3bf5b4c6-a7a4-4b43-a6ce-ca2b4887945a-m0.avro"]]
length: [[6886]]
partition_spec_id: [[0]]
added_snapshot_id: [[3815834705531553721]]
added_data_files_count: [[1]]
existing_data_files_count: [[0]]
deleted_data_files_count: [[0]]
added_delete_files_count: [[0]]
existing_delete_files_count: [[0]]
deleted_delete_files_count: [[0]]
partition_summaries: [[    -- is_valid: all not null-- child 0 type: bool
[false]-- child 1 type: bool
[false]-- child 2 type: string
["test"]-- child 3 type: string
["test"]]]

元数据日志条目

用于显示表元数据日志条目:

table.inspect.metadata_log_entries()
pyarrow.Table
timestamp: timestamp[ms] not null
file: string not null
latest_snapshot_id: int64
latest_schema_id: int32
latest_sequence_number: int64
----
timestamp: [[2024-04-28 17:03:00.214,2024-04-28 17:03:00.352,2024-04-28 17:03:00.445,2024-04-28 17:03:00.498]]
file: [["s3://warehouse/default/table_metadata_log_entries/metadata/00000-0b3b643b-0f3a-4787-83ad-601ba57b7319.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00001-f74e4b2c-0f89-4f55-822d-23d099fd7d54.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00002-97e31507-e4d9-4438-aff1-3c0c5304d271.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00003-6c8b7033-6ad8-4fe4-b64d-d70381aeaddc.metadata.json"]]
latest_snapshot_id: [[null,3958871664825505738,1289234307021405706,7640277914614648349]]
latest_schema_id: [[null,0,0,0]]
latest_sequence_number: [[null,0,0,0]]

历史

要查看表的历史记录:

table.inspect.history()
pyarrow.Table
made_current_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
is_current_ancestor: bool not null
----
made_current_at: [[2024-06-18 16:17:48.768,2024-06-18 16:17:49.240,2024-06-18 16:17:49.343,2024-06-18 16:17:49.511]]
snapshot_id: [[4358109269873137077,3380769165026943338,4358109269873137077,3089420140651211776]]
parent_id: [[null,4358109269873137077,null,4358109269873137077]]
is_current_ancestor: [[true,false,true,true]]

文件

查看表当前快照中的数据文件:

table.inspect.files()
pyarrow.Table
content: int8 not null
file_path: string not null
file_format: dictionary<values=string, indices=int32, ordered=0> not null
spec_id: int32 not null
record_count: int64 not null
file_size_in_bytes: int64 not null
column_sizes: map<int32, int64>child 0, entries: struct<key: int32 not null, value: int64> not nullchild 0, key: int32 not nullchild 1, value: int64
value_counts: map<int32, int64>child 0, entries: struct<key: int32 not null, value: int64> not nullchild 0, key: int32 not nullchild 1, value: int64
null_value_counts: map<int32, int64>child 0, entries: struct<key: int32 not null, value: int64> not nullchild 0, key: int32 not nullchild 1, value: int64
nan_value_counts: map<int32, int64>child 0, entries: struct<key: int32 not null, value: int64> not nullchild 0, key: int32 not nullchild 1, value: int64
lower_bounds: map<int32, binary>child 0, entries: struct<key: int32 not null, value: binary> not nullchild 0, key: int32 not nullchild 1, value: binary
upper_bounds: map<int32, binary>child 0, entries: struct<key: int32 not null, value: binary> not nullchild 0, key: int32 not nullchild 1, value: binary
key_metadata: binary
split_offsets: list<item: int64>child 0, item: int64
equality_ids: list<item: int32>child 0, item: int32
sort_order_id: int32
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: large_string, upper_bound: large_string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>child 0, city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not nullchild 0, column_size: int64child 1, value_count: int64child 2, null_value_count: int64child 3, nan_value_count: int64child 4, lower_bound: large_stringchild 5, upper_bound: large_stringchild 1, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not nullchild 0, column_size: int64child 1, value_count: int64child 2, null_value_count: int64child 3, nan_value_count: int64child 4, lower_bound: doublechild 5, upper_bound: doublechild 2, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not nullchild 0, column_size: int64child 1, value_count: int64child 2, null_value_count: int64child 3, nan_value_count: int64child 4, lower_bound: doublechild 5, upper_bound: double
----
content: [[0,0]]
file_path: [["s3://warehouse/default/table_metadata_files/data/00000-0-9ea7d222-6457-467f-bad5-6fb125c9aa5f.parquet","s3://warehouse/default/table_metadata_files/data/00000-0-afa8893c-de71-4710-97c9-6b01590d0c44.parquet"]]
file_format: [["PARQUET","PARQUET"]]
spec_id: [[0,0]]
record_count: [[3,3]]
file_size_in_bytes: [[5459,5459]]
column_sizes: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109]]]
value_counts: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[3,3,3,3,3,...,3,3,3,3,3],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[3,3,3,3,3,...,3,3,3,3,3]]]
null_value_counts: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[1,1,1,1,1,...,1,1,1,1,1],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[1,1,1,1,1,...,1,1,1,1,1]]]
nan_value_counts: [[keys:[]values:[],keys:[]values:[]]]
lower_bounds: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000]]]
upper_bounds:[[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000]]]
key_metadata: [[0100,0100]]
split_offsets:[[[],[]]]
equality_ids:[[[],[]]]
sort_order_id:[[[],[]]]
readable_metrics: [-- is_valid: all not null-- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: large_string, upper_bound: large_string>-- is_valid: all not null-- child 0 type: int64
[140]-- child 1 type: int64
[4]-- child 2 type: int64
[0]-- child 3 type: int64
[null]-- child 4 type: large_string
["Amsterdam"]-- child 5 type: large_string
["San Francisco"]-- child 1 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>-- is_valid: all not null-- child 0 type: int64
[135]-- child 1 type: int64
[4]-- child 2 type: int64
[0]-- child 3 type: int64
[null]-- child 4 type: double
[37.773972]-- child 5 type: double
[53.11254]-- child 2 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>-- is_valid: all not null-- child 0 type: int64
[135]-- child 1 type: int64
[4]-- child 2 type: int64
[0]-- child 3 type: int64
[null]-- child 4 type: double
[-122.431297]-- child 5 type: double
[6.0989]]

信息

内容指数据文件存储的内容类型:0表示数据1表示位置删除2表示等值删除

若仅查看当前快照中的数据文件或删除文件,请分别使用table.inspect.data_files()table.inspect.delete_files()方法。


添加文件

高级 Iceberg 用户可以选择将现有的 Parquet 文件作为数据文件提交到 Iceberg 表中,而无需重写这些文件。


# Given that these parquet files have schema consistent with the Iceberg tablefile_paths = ["s3a://warehouse/default/existing-1.parquet","s3a://warehouse/default/existing-2.parquet",
]# They can be added to the table without rewriting themtbl.add_files(file_paths=file_paths)# A new snapshot is committed to the table with manifests pointing to the existing parquet files

名称映射

由于add_files直接使用现有文件而不生成新的感知Iceberg模式的Parquet文件,因此要求Iceberg表必须配置名称映射(该映射将Parquet文件中的字段名与Iceberg字段ID对应)。因此,add_files要求Parquet文件的元数据中不能包含字段ID,若表未配置名称映射,则会基于当前表模式自动创建新的名称映射。

分区处理

add_files仅需通过读取现有Parquet文件的元数据页脚来推断每个文件的分区值。该实现还支持将文件添加到采用MonthTransformTruncateTransform等分区转换的Iceberg表(任何preserves_order属性设为True的转换都受支持)。请注意,若PartitionField源列的统计信息未存在于Parquet元数据中,分区值将被推断为None

维护操作影响

由于add_files将现有Parquet文件作为普通数据文件提交至Iceberg表,因此过期快照等破坏性维护操作会移除这些文件。


模式演进

PyIceberg 通过 Python API 提供了完整的模式演进支持。它会自动处理字段 ID 的设置,并确保只进行非破坏性变更(可手动覆盖此限制)。

在以下示例中,.update_schema() 方法直接从表对象调用。


with table.update_schema() as update:update.add_column("some_field", IntegerType(), "doc")

如果您需要进行的更改不仅限于模式演进,还可以启动一个事务:

with table.transaction() as transaction:with transaction.update_schema() as update_schema:update.add_column("some_other_field", IntegerType(), "doc")# ... Update properties etc

按名称合并

通过使用 .union_by_name() 方法,您可以将另一个模式合并到现有模式中,而无需担心字段ID的问题。


from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType, LongTypecatalog = load_catalog()schema = Schema(NestedField(1, "city", StringType(), required=False),NestedField(2, "lat", DoubleType(), required=False),NestedField(3, "long", DoubleType(), required=False),
)table = catalog.create_table("default.locations", schema)new_schema = Schema(NestedField(1, "city", StringType(), required=False),NestedField(2, "lat", DoubleType(), required=False),NestedField(3, "long", DoubleType(), required=False),NestedField(10, "population", LongType(), required=False),
)with table.update_schema() as update:update.union_by_name(new_schema)

现在该表已合并了两个模式print(table.schema())

table {1: city: optional string2: lat: optional double3: long: optional double4: population: optional long
}

添加列

使用 add_column 可以轻松添加列,无需担心字段ID问题。


with table.update_schema() as update:update.add_column("retries", IntegerType(), "Number of retries to place the bid")# In a structupdate.add_column("details", StructType())with table.update_schema() as update:update.add_column(("details", "confirmed_by"), StringType(), "Name of the exchange")

在向复合类型添加列之前,该复合类型必须已存在。复合类型中的字段以元组形式添加。


重命名列

在Iceberg表中重命名字段非常简单:

with table.update_schema() as update:update.rename_column("retries", "num_retries")# This will rename `confirmed_by` to `processed_by` in the `details` structupdate.rename_column(("details", "confirmed_by"), "processed_by")

移动列

调整字段顺序:

with table.update_schema() as update:update.move_first("symbol")# This will move `bid` after `ask`update.move_after("bid", "ask")# This will move `confirmed_by` before `exchange` in the `details` structupdate.move_before(("details", "confirmed_by"), ("details", "exchange"))

更新列

更新字段的类型、描述或必填属性。


with table.update_schema() as update:# Promote a float to a doubleupdate.update_column("bid", field_type=DoubleType())# Make a field optionalupdate.update_column("symbol", required=False)# Update the documentationupdate.update_column("symbol", doc="Name of the share on the exchange")

请注意,某些操作并不兼容,但您仍可通过设置 allow_incompatible_changes 自行承担风险来执行。


with table.update_schema(allow_incompatible_changes=True) as update:# Incompatible change, cannot require an optional fieldupdate.update_column("symbol", required=True)

删除列

删除一个字段,注意这是不兼容的变更(读取器/写入器可能会依赖该字段):

with table.update_schema(allow_incompatible_changes=True) as update:update.delete_column("some_field")# In a structupdate.delete_column(("details", "confirmed_by"))

分区演进

PyIceberg 支持分区演进功能。更多细节请参阅分区演进规范。

在演进分区时,需要使用表上的 update_spec API。


with table.update_spec() as update:update.add_field("id", BucketTransform(16), "bucketed_id")update.add_field("event_ts", DayTransform(), "day_ts")

更新分区规格也可以作为与其他操作一起的事务的一部分来完成。


with table.transaction() as transaction:with transaction.update_spec() as update_spec:update_spec.add_field("id", BucketTransform(16), "bucketed_id")update_spec.add_field("event_ts", DayTransform(), "day_ts")# ... Update properties etc

添加字段

可以通过 add_field API 添加新的分区字段,该接口接收要分区的字段名称、分区转换函数以及可选的分区名称。如果未指定分区名称,系统会自动生成一个。


with table.update_spec() as update:update.add_field("id", BucketTransform(16), "bucketed_id")update.add_field("event_ts", DayTransform(), "day_ts")# identity is a shortcut API for adding an IdentityTransformupdate.identity("some_field")

移除字段

如果某些字段不再适合作为分区依据,也可以通过 remove_field API 来移除这些分区字段。


with table.update_spec() as update:# Remove the partition field with the nameupdate.remove_field("some_partition_name")

重命名字段

可以通过rename_field API来重命名分区字段。


with table.update_spec() as update:# Rename the partition field with the name bucketed_id to sharded_idupdate.rename_field("bucketed_id", "sharded_id")

表属性

通过Transaction API设置和移除属性:

with table.transaction() as transaction:transaction.set_properties(abc="def")assert table.properties == {"abc": "def"}with table.transaction() as transaction:transaction.remove_properties("abc")assert table.properties == {}

或者,不使用上下文管理器:

table = table.transaction().set_properties(abc="def").commit_transaction()assert table.properties == {"abc": "def"}table = table.transaction().remove_properties("abc").commit_transaction()assert table.properties == {}

快照属性

在使用appendoverwrite API写入表时,可以选择性地设置快照属性:

tbl.append(df, snapshot_properties={"abc": "def"})# ortbl.overwrite(df, snapshot_properties={"abc": "def"})assert tbl.metadata.snapshots[-1].summary["abc"] == "def"

快照管理

通过 Table API 管理快照操作:

# To run a specific operation
table.manage_snapshots().create_tag(snapshot_id, "tag123").commit()
# To run multiple operations
table.manage_snapshots().create_tag(snapshot_id1, "tag123").create_tag(snapshot_id2, "tag456").commit()
# Operations are applied on commit.

你也可以使用上下文管理器来进行更多修改:

with table.manage_snapshots() as ms:ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")

视图

PyIceberg 支持视图操作。


检查视图是否存在


from pyiceberg.catalog import load_catalogcatalog = load_catalog("default")
catalog.view_exists("default.bar")

表统计信息管理

通过Table API管理表统计信息的操作:

# To run a specific operation
table.update_statistics().set_statistics(statistics_file=statistics_file).commit()
# To run multiple operations
table.update_statistics().set_statistics(statistics_file1).remove_statistics(snapshot_id2).commit()
# Operations are applied on commit.

你也可以使用上下文管理器来进行更多修改:

with table.update_statistics() as update:update.set_statistics(statistics_file)update.remove_statistics(snapshot_id2)

查询数据

要查询表数据,需要进行表扫描操作。表扫描可以接收以下参数:过滤器条件、列名、可选的记录条数限制以及快照ID。


from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqualcatalog = load_catalog("default")
table = catalog.load_table("nyc.taxis")scan = table.scan(row_filter=GreaterThanOrEqual("trip_distance", 10.0),selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),limit=100,
)# Or filter using a string predicate
scan = table.scan(row_filter="trip_distance > 10.0",
)[task.file.file_path for task in scan.plan_files()]

底层 API plan_files 方法返回一组任务,这些任务提供可能包含匹配行的文件:

["s3://warehouse/wh/nyc/taxis/data/00003-4-42464649-92dd-41ad-b83b-dea1a2fe4b58-00001.parquet"
]

在这种情况下,引擎需要自行过滤文件本身。以下 to_arrow()to_duckdb() 方法已为您实现了这一功能。


Apache Arrow

要求

需要安装pyarrow

使用PyIceberg可以从大表中筛选数据并提取到PyArrow表中:

table.scan(row_filter=GreaterThanOrEqual("trip_distance", 10.0),selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow()

这将返回一个 PyArrow 表:

pyarrow.Table
VendorID: int64
tpep_pickup_datetime: timestamp[us, tz=+00:00]
tpep_dropoff_datetime: timestamp[us, tz=+00:00]
----
VendorID: [[2,1,2,1,1,...,2,2,2,2,2],[2,1,1,1,2,...,1,1,2,1,2],...,[2,2,2,2,2,...,2,6,6,2,2],[2,2,2,2,2,...,2,2,2,2,2]]
tpep_pickup_datetime: [[2021-04-01 00:28:05.000000,...,2021-04-30 23:44:25.000000]]
tpep_dropoff_datetime: [[2021-04-01 00:47:59.000000,...,2021-05-01 00:14:47.000000]]

这将仅拉取可能包含匹配行的文件。

如果更倾向于一次读取一个记录批次,也可以返回一个PyArrow RecordBatchReader:

table.scan(row_filter=GreaterThanOrEqual("trip_distance", 10.0),selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader()

Pandas


需求

使用此功能需要安装pandas

PyIceberg 能轻松从海量表中筛选数据并加载到本地的 Pandas 数据框中。该操作仅会获取查询所需的 Parquet 文件并应用过滤条件,从而减少 IO 操作,提升性能并降低成本。


table.scan(row_filter="trip_distance >= 10.0",selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_pandas()

这将返回一个 Pandas 数据框:


***
VendorID      tpep_pickup_datetime     tpep_dropoff_datetime
0              2 2021-04-01 00:28:05+00:00 2021-04-01 00:47:59+00:00
1              1 2021-04-01 00:39:01+00:00 2021-04-01 00:57:39+00:00
2              2 2021-04-01 00:14:42+00:00 2021-04-01 00:42:59+00:00
3              1 2021-04-01 00:17:17+00:00 2021-04-01 00:43:38+00:00
4              1 2021-04-01 00:24:04+00:00 2021-04-01 00:56:20+00:00
...          ...                       ...                       ...
116976         2 2021-04-30 23:56:18+00:00 2021-05-01 00:29:13+00:00
116977         2 2021-04-30 23:07:41+00:00 2021-04-30 23:37:18+00:00
116978         2 2021-04-30 23:38:28+00:00 2021-05-01 00:12:04+00:00
116979         2 2021-04-30 23:33:00+00:00 2021-04-30 23:59:00+00:00
116980         2 2021-04-30 23:44:25+00:00 2021-05-01 00:14:47+00:00[116981 rows x 3 columns]

建议使用 Pandas 2 或更高版本,因为它将数据存储在 Apache Arrow 后端 中,从而避免了数据拷贝。


DuckDB

要求

需要安装DuckDB。

表扫描结果也可以转换为内存中的DuckDB表:

con = table.scan(row_filter=GreaterThanOrEqual("trip_distance", 10.0),selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_duckdb(table_name="distant_taxi_trips")

我们可以使用游标在DuckDB表上运行查询:

print(con.execute("SELECT tpep_dropoff_datetime - tpep_pickup_datetime AS duration FROM distant_taxi_trips LIMIT 4").fetchall()
)
[(datetime.timedelta(seconds=1194),),(datetime.timedelta(seconds=1118),),(datetime.timedelta(seconds=1697),),(datetime.timedelta(seconds=1581),),
]

Ray

要求

这需要安装Ray。

表扫描也可以转换为Ray数据集:

ray_dataset = table.scan(row_filter=GreaterThanOrEqual("trip_distance", 10.0),selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_ray()

这将返回一个 Ray 数据集:


***
Dataset(num_blocks=1,num_rows=1168798,schema={VendorID: int64,tpep_pickup_datetime: timestamp[us, tz=UTC],tpep_dropoff_datetime: timestamp[us, tz=UTC]}
)

使用 Ray Dataset API 与数据集交互:

print(ray_dataset.take(2))
[{"VendorID": 2,"tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 23, 50),"tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 0, 34, 31),},{"VendorID": 2,"tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 5, 3),"tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 16, 10, 18),},
]

Daft

PyIceberg 与 Daft Dataframes 紧密集成(参见:Daft 与 Iceberg 的集成),它在 PyIceberg 表之上提供了一个完整的惰性优化查询引擎接口。

要求

这需要安装 Daft。

可以轻松将表读取到 Daft Dataframe 中:

df = table.to_daft()  # equivalent to `daft.read_iceberg(table)`
df = df.where(df["trip_distance"] >= 10.0)
df = df.select("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime")

这将返回一个延迟物化的 Daft Dataframe。打印 df 会显示其结构模式:

╭──────────┬───────────────────────────────┬───────────────────────────────╮
│ VendorID ┆ tpep_pickup_datetime          ┆ tpep_dropoff_datetime         │
│ ---      ┆ ---                           ┆ ---                           │
│ Int64    ┆ Timestamp(Microseconds, None) ┆ Timestamp(Microseconds, None) │
╰──────────┴───────────────────────────────┴───────────────────────────────╯(No data to display: Dataframe not materialized)

我们可以使用 df.show() 执行 Dataframe 来预览查询的前几行数据。

该操作已正确优化,能够充分利用 Iceberg 的特性,如隐藏分区和文件级统计信息,以实现高效读取。


df.show(2)
╭──────────┬───────────────────────────────┬───────────────────────────────╮
│ VendorID ┆ tpep_pickup_datetime          ┆ tpep_dropoff_datetime         │
│ ---------                           │
│ Int64    ┆ Timestamp(Microseconds, None) ┆ Timestamp(Microseconds, None) │
╞══════════╪═══════════════════════════════╪═══════════════════════════════╡
│ 22008-12-31T23:23:50.0000002009-01-01T00:34:31.000000    │
├╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 22008-12-31T23:05:03.0000002009-01-01T16:10:18.000000    │
╰──────────┴───────────────────────────────┴───────────────────────────────╯(Showing first 2 rows)

Polars

PyIceberg 与 Polars 的 Dataframe 和 LazyFrame 紧密集成,在 PyIceberg 表之上提供了完整的惰性优化查询引擎接口。

要求

需要安装 polars


pip install pyiceberg['polars']

可以通过 DataFrame 或 LazyFrame 在 Polars 中分析和访问 PyIceberg 数据。如果您的代码使用 Apache Iceberg 数据扫描和检索 API,然后在 Polars 中分析结果 DataFrame,请使用 table.scan().to_polars() API。如果目的是利用 Polars 的高性能过滤和检索功能,请使用从 Iceberg 表导出的 LazyFrame 并配合 table.to_polars() API。


# Get LazyFrame
iceberg_table.to_polars()# Get Data Frame
iceberg_table.scan().to_polars()

使用 Polars DataFrame 处理数据

PyIceberg 可以轻松地从海量表中筛选数据,并将其加载到本地的 Polars 数据框中。该操作仅会获取查询所需的 Parquet 文件并应用过滤条件,从而减少 I/O 操作,提升性能并降低成本。


schema = Schema(NestedField(field_id=1, name='ticket_id', field_type=LongType(), required=True),NestedField(field_id=2, name='customer_id', field_type=LongType(), required=True),NestedField(field_id=3, name='issue', field_type=StringType(), required=False),NestedField(field_id=4, name='created_at', field_type=TimestampType(), required=True),required=True
)iceberg_table = catalog.create_table(identifier='default.product_support_issues',schema=schema
)pa_table_data = pa.Table.from_pylist([{'ticket_id': 1, 'customer_id': 546, 'issue': 'User Login issue', 'created_at': 1650020000000000},{'ticket_id': 2, 'customer_id': 547, 'issue': 'Payment not going through', 'created_at': 1650028640000000},{'ticket_id': 3, 'customer_id': 548, 'issue': 'Error on checkout', 'created_at': 1650037280000000},{'ticket_id': 4, 'customer_id': 549, 'issue': 'Unable to reset password', 'created_at': 1650045920000000},{'ticket_id': 5, 'customer_id': 550, 'issue': 'Account locked', 'created_at': 1650054560000000},{'ticket_id': 6, 'customer_id': 551, 'issue': 'Order not received', 'created_at': 1650063200000000},{'ticket_id': 7, 'customer_id': 552, 'issue': 'Refund not processed', 'created_at': 1650071840000000},{'ticket_id': 8, 'customer_id': 553, 'issue': 'Shipping address issue', 'created_at': 1650080480000000},{'ticket_id': 9, 'customer_id': 554, 'issue': 'Product damaged', 'created_at': 1650089120000000},{'ticket_id': 10, 'customer_id': 555, 'issue': 'Unable to apply discount code', 'created_at': 1650097760000000},{'ticket_id': 11, 'customer_id': 556, 'issue': 'Website not loading', 'created_at': 1650106400000000},{'ticket_id': 12, 'customer_id': 557, 'issue': 'Incorrect order received', 'created_at': 1650115040000000},{'ticket_id': 13, 'customer_id': 558, 'issue': 'Unable to track order', 'created_at': 1650123680000000},{'ticket_id': 14, 'customer_id': 559, 'issue': 'Order delayed', 'created_at': 1650132320000000},{'ticket_id': 15, 'customer_id': 560, 'issue': 'Product not as described', 'created_at': 1650140960000000},{'ticket_id': 16, 'customer_id': 561, 'issue': 'Unable to contact support', 'created_at': 1650149600000000},{'ticket_id': 17, 'customer_id': 562, 'issue': 'Duplicate charge', 'created_at': 1650158240000000},{'ticket_id': 18, 'customer_id': 563, 'issue': 'Unable to update profile', 'created_at': 1650166880000000},{'ticket_id': 19, 'customer_id': 564, 'issue': 'App crashing', 'created_at': 1650175520000000},{'ticket_id': 20, 'customer_id': 565, 'issue': 'Unable to download invoice', 'created_at': 1650184160000000},{'ticket_id': 21, 'customer_id': 566, 'issue': 'Incorrect billing amount', 'created_at': 1650192800000000},], schema=iceberg_table.schema().as_arrow()
)iceberg_table.append(df=pa_table_data
)table.scan(row_filter="ticket_id > 10",
).to_polars()

这将返回一个 Polars DataFrame:

shape: (11, 4)
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
│ ticket_id ┆ customer_id ┆ issue                      ┆ created_at          │
│ ---       ┆ ---         ┆ ---                        ┆ ---                 │
│ i64       ┆ i64         ┆ str                        ┆ datetime[μs]        │
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
│ 11        ┆ 556         ┆ Website not loading        ┆ 2022-04-16 10:53:20 │
│ 12        ┆ 557         ┆ Incorrect order received   ┆ 2022-04-16 13:17:20 │
│ 13        ┆ 558         ┆ Unable to track order      ┆ 2022-04-16 15:41:20 │
│ 14        ┆ 559         ┆ Order delayed              ┆ 2022-04-16 18:05:20 │
│ 15        ┆ 560         ┆ Product not as described   ┆ 2022-04-16 20:29:20 │
│ …         ┆ …           ┆ …                          ┆ …                   │
│ 17        ┆ 562         ┆ Duplicate charge           ┆ 2022-04-17 01:17:20 │
│ 18        ┆ 563         ┆ Unable to update profile   ┆ 2022-04-17 03:41:20 │
│ 19        ┆ 564         ┆ App crashing               ┆ 2022-04-17 06:05:20 │
│ 20        ┆ 565         ┆ Unable to download invoice ┆ 2022-04-17 08:29:20 │
│ 21        ┆ 566         ┆ Incorrect billing amount   ┆ 2022-04-17 10:53:20 │
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘

使用 Polars LazyFrame 工作

PyIceberg 支持基于 Iceberg 表创建 Polars LazyFrame。

使用上述代码示例:

lf = iceberg_table.to_polars().filter(pl.col("ticket_id") > 10)
print(lf.collect())

上述代码片段返回一个 Polars LazyFrame,并定义了一个将由 Polars 执行的过滤器:

shape: (11, 4)
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
│ ticket_id ┆ customer_id ┆ issue                      ┆ created_at          │
│ ---       ┆ ---         ┆ ---                        ┆ ---                 │
│ i64       ┆ i64         ┆ str                        ┆ datetime[μs]        │
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
│ 11        ┆ 556         ┆ Website not loading        ┆ 2022-04-16 10:53:20 │
│ 12        ┆ 557         ┆ Incorrect order received   ┆ 2022-04-16 13:17:20 │
│ 13        ┆ 558         ┆ Unable to track order      ┆ 2022-04-16 15:41:20 │
│ 14        ┆ 559         ┆ Order delayed              ┆ 2022-04-16 18:05:20 │
│ 15        ┆ 560         ┆ Product not as described   ┆ 2022-04-16 20:29:20 │
│ …         ┆ …           ┆ …                          ┆ …                   │
│ 17        ┆ 562         ┆ Duplicate charge           ┆ 2022-04-17 01:17:20 │
│ 18        ┆ 563         ┆ Unable to update profile   ┆ 2022-04-17 03:41:20 │
│ 19        ┆ 564         ┆ App crashing               ┆ 2022-04-17 06:05:20 │
│ 20        ┆ 565         ┆ Unable to download invoice ┆ 2022-04-17 08:29:20 │
│ 21        ┆ 566         ┆ Incorrect billing amount   ┆ 2022-04-17 10:53:20 │
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘

2025-05-28(三)


http://www.hkcw.cn/article/SZxgRhHGIH.shtml

相关文章

女子新装修房子被陌生人拆光 一场离奇的误会

最近,家住浦东新区“芳草苑”小区的张女士遇到了一件烦心事。她家刚刚完成了老房新装,原本计划第二天从出租屋搬回去。但前一天回家查看收尾进度时,发现厨房和卫生间的设施设备被三个陌生人拆了。这些设施和装修材料都是全新的、一线品牌的定制款,这让张女士非常心疼。经过…

男性心梗概率是女性的近两倍 呼吁大家要保持优良生活习惯

在最新一期《医起问》节目中,葛均波院士强调,男性心血管疾病概率是女性的近2倍,心梗风险尤为突出。女性在绝经前因雌激素作用而风险较低,但绝经后概率也趋于一致。不管男女,现代人的诸多不良习惯其实都导致心梗年轻化,院士呼吁大家要保持优良生活习惯,关注心血管的健康!…

涉高考违法有害信息被查处 三部门联合整治

2025年高考临近,为营造良好的全国高考网络环境,教育部联合中央网信办、公安部开展查处涉高考违法有害信息等工作。相关部门提醒公众,互联网并非法外之地,个人和机构需对自己的言论负责。对于编造、传播谣言,故意扰乱公共秩序的行为,将依法受到严厉打击。网络平台应加强内…

福特因软件问题在美召回超百万辆车 后视摄像头故障引关注

美国政府汽车监管机构宣布,福特汽车公司因软件问题在全美召回超过100万辆汽车。福特公司在本月早些时候向美国国家公路交通安全管理局报告了这一问题。该软件故障可能导致车辆在倒车时后视摄像头影像出现延迟、卡顿或不显示,增加了事故风险。这个问题将通过经销商维修或远程软…

哈佛大学校长在毕业典礼上嘲讽特朗普 巧妙暗讽获热烈掌声

哈佛大学校长在毕业典礼上嘲讽特朗普 巧妙暗讽获热烈掌声!当地时间5月29日,美国法官暂停了特朗普政府此前宣布的取消哈佛大学招收外国学生资质的政策,为哈佛大学赢得了暂时的胜利。同一天,在哈佛大学毕业典礼上,校长艾伦加伯巧妙地讽刺了特朗普政府对大学的打压,获得了全…

为什么韩国的生育率开始上升 积极迹象显现

韩国统计厅28日发布的数据显示,2024年7月至今年3月,韩国单月出生人口连续9个月保持增长势头,2025年第一季度生育率也高于去年同期。这对面临人口困境的韩国来说是一个积极迹象。今年3月,韩国出生人口为2.1万,同比增加6.8%。第一季度出生人口总计6.5万,达到三年来最高水平…

A股开盘跳水,是谁引发?主力资金持续撤退

A股开盘跳水,是谁引发?主力资金持续撤退!2025年5月的某个普通交易日,A股市场像战场一样,牛熊双方一开始就互不相让。盘面上红绿交错,投资者的心情像过山车。每个分时线的波动,都牵动着无数账户的盈亏。早上九点半,开盘钟声一响,指数低开了一点点。大家本以为要继续下滑…

【数据集信息整理】道路病害(缺陷)检测数据集归纳

参考 https://www.cnblogs.com/laugh12321/p/17874752.html Machine Learning Datasets | Papers With Code 数据集 paperwithcode上查找 paperwithcode搜索“pavement”关键词可以找到下面8个数据集 博客整理 参考链接1里列出了包括了RDD2022在内的数据集&#xff1a; 需…

【Unity博客节选】Timeline 静态结构分析

注&#xff1a;软件版本Unity 6.0 Timeline 1.8.7 作者&#xff1a;CSDN RingleaderWang 原文&#xff1a;《Unity第25期——Timeline结构及其源码浅析》 文章首发Github&#x1f44d;&#xff1a;《Timeline结构及其源码浅析》 Bilibili 视频版&#x1f44d;&#x1f44d;&a…

男子救助轻生女友时意外溺亡 悲剧引发深思

男子救助轻生女友时意外溺亡!近日,江苏南通发生了一起悲剧。一名女子在社交媒体上发布视频称,她跳江轻生后获救,但救援的男友却不幸溺亡。据参与救援打捞的江苏省启东市飞龙救援队队员金先生介绍,这名女子是自杀,跳江前曾打电话给男友。男友赶到江边时,女子已经跳入江中…

各地龙舟赛又来整活了!2025年端午龙舟赛事地图:25省份“水上竞速”

端午将至,作为上半年最后一个小长假,端午节不仅是中国首个入选世界非物质文化遗产的传统节日,也承载着华夏人民祈福禳灾、爱国强民的美好愿望。随着大家的生活方式日渐多元化,端午节从传统民俗节日升级为热门文旅符号,民俗体验成为假期消费新趋势。龙舟竞渡自古是端午节的…

宝马5系裸车价最低跌至26万元 终端优惠探底

5月29日,北京一家宝马4S店的销售专员对新款宝马5系给出了裸车报价,最低价为26.3万元,但需要选择贷款购车方案。据计算,这款“2025款 525Li 豪华套装”5系,现在落地价仅约32.5万元。另一家北京的宝马4S店则给出了约33万元的落地价。今年4月,“宝马5系跌破29万元”曾冲上热…

白宫幕僚长身份“被盗” FBI介入 幕后操纵者待查

美国联邦政府正在调查一起冒充白宫幕僚长苏珊怀尔斯的事件。一名身份不明人员假冒怀尔斯,联系了多名美国共和党人士和企业高管。目前,FBI与白宫正调查该行为幕后操纵者及其动机。近几周来,一些参议员、美国企业高管以及其他知名人士都收到来自自称是白宫幕僚长的人的短信和电…

幼童走失42小时被警犬搜寻找到 生死救援中的关键角色

5月27日13时许,经过42小时的持续搜救,重庆市公安局刑侦总队警犬追踪小组成功找到走失的2岁男童兵兵。在这场与时间赛跑的救援中,警犬技术发挥了关键作用。5月25日20时21分,綦江区公安局接到报警称石角镇下湾村一名2岁男童兵兵走失。警方立即启动应急预案,组织民警和当地村…

2025电动牙刷哪个牌子质量好?六大品牌质量解析

当消费者搜索2025年电动牙刷排行时,最核心的诉求便是寻找真正经得起考验的口腔护理伙伴;而一份靠谱的电动牙刷推荐清单,必须建立在临床实证与持久耐用的双重标准之上。在专业机构与15万用户的共同验证中,Usmile笑容加Y30以突破性的技术创新交出了答卷——这款获得瑞士SGS安全认…

上传图片转成3D VR效果 / VR效果在项目中落地实践 / 应用到了用photo-sphere-viewer + A-Frame +Threejs 通过不同的技术分别实现了3D VR效果

系统简介 : 该系统为 react TS tailwindcss 响应式系统 , 上传图片后可实现手动旋转 3D 图片,还包含了 6 贴图立方体展示和 6 贴图动态展示 项目亮点 : 包含主流3D VR库 , 可根据具体需求选择具体的技术栈 全部页面概览 这是单面VR页面的代码(gif展示页面) import React, { …

鸿蒙NEXT应用加固工具哪家更好?国内主流的6款对比

随着鸿蒙NEXT系统的推进&#xff0c;越来越多企业将目光投向鸿蒙生态下的应用部署与数据安全。尤其是在核心业务App逐步上架鸿蒙原生平台的当下&#xff0c;如何实现高效、可靠的鸿蒙NEXT应用安全加固&#xff0c;已成为企业技术选型的关键环节。本文将对市面上6款主流的鸿蒙NE…

包粽子日赚500元 传统手艺撬动就业新机遇

包粽子日赚500元 传统手艺撬动就业新机遇!临近端午,粽香四溢的背后,一场关于传统手艺与市场需求的“双向奔赴”正在上演。掌握包粽技艺的年轻人正以“一叶一技”撬动就业新机遇。非遗手艺不仅成为节令经济的热门选择,还催生了产业链条上的多元岗位,比如有外语能力的大学生…

英伟达被曝将在华新建设施 引发美议员安全担忧

英伟达被曝将在华新建设施 引发美议员安全担忧。美国半导体巨头英伟达CEO黄仁勋多次强调,美国对华芯片管制不仅没有成功,反而促使中国加速自主研发芯片的进程。然而,美国议员对此并未采纳,反而针对英伟达在中国的新布局提出质疑。据彭博社和《华尔街日报》报道,英伟达计划…

高盛总裁:债市对美国债务更担心 财政恶化风险加剧

高盛总裁:债市对美国债务更担心 财政恶化风险加剧!高盛总裁John Waldron表示,目前宏观层面的最大风险并不是关税。尽管所有注意力都集中在关税上,但债券市场的关注点正转向美国减税方案和财政状况,这令人担忧。有分析人士指出,债券投资者正在为财政恶化的更高风险定价,他…