iceberg-catalog-hadoop-unofficial

Crates.ioiceberg-catalog-hadoop-unofficial
lib.rsiceberg-catalog-hadoop-unofficial
version0.6.0-alpha.6
created_at2025-08-15 03:27:39.159824+00
updated_at2025-08-22 06:55:29.259742+00
descriptionApache Iceberg Rust Hadoop Catalog
homepagehttps://rust.iceberg.apache.org/
repositoryhttps://github.com/awol2005ex/iceberg-rust/tree/awol2005ex-my
max_upload_size
id1796153
size206,376
(awol2005ex)

documentation

README

Apache Iceberg Hadoop Catalog Unofficial Native Rust Implementation

crates.io docs.rs

This crate contains the unofficial Native Rust implementation of Apache Iceberg Hadoop Catalog.

See the API documentation for examples and the full API.

Usage

Single NameNode

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::prelude::SessionContext;
use iceberg_unofficial::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg_unofficial::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hadoop_unofficial::{HadoopCatalog, HadoopCatalogConfig};
use iceberg_datafusion_unofficial::IcebergTableProvider;
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let o = Command::new("cmd")
                        .arg("/c")
                        .arg("kinit -kt C:/tmp/hive@DOMAIN.COM.keytab hive@DOMAIN.COM")
                        .output()
                        .map_err(|e| e.to_string())?;
    println!("kinit output: {:?}", &o);
    let properties = HashMap::from([(
        "fs.defaultFS".to_string(),
        "hdfs://nn1:8020".to_string(),
    )]);

    let config = HadoopCatalogConfig::builder()
        .warehouse("hdfs://nn1:8020/user/hive/iceberg".to_owned())
        .properties(properties)
        .build();
    let catalog = HadoopCatalog::new(config).await?;

    catalog
        .list_namespaces(None)
        .await
        .unwrap()
        .iter()
        .for_each(|namespace| {
            println!("namespace: {:?}", namespace);
        });

    let table_ident = TableIdent::new(
        NamespaceIdent::new("ods".to_owned()),
        "test1".to_owned(),
    );

    let table = catalog.load_table(&table_ident).await?;

    println!("table:{:?}", table.metadata());

    let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
        .await
        .unwrap();

    let ctx = SessionContext::new();

    ctx.register_table("test1", Arc::new(table_provider))
        .unwrap();
    let df = ctx
        .sql("SELECT * FROM test1 limit 100")
        .await
        .unwrap();
    df.show().await.unwrap();
    Ok(())
}

NameNode HA

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::prelude::SessionContext;
use iceberg_unofficial::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg_unofficial::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hadoop_unofficial::{HadoopCatalog, HadoopCatalogConfig};
use iceberg_datafusion_unofficial::IcebergTableProvider;
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let o = Command::new("cmd")
                        .arg("/c")
                        .arg("kinit -kt C:/tmp/hive@DOMAIN.COM.keytab hive@DOMAIN.COM")
                        .output()
                        .map_err(|e| e.to_string())?;
    println!("kinit output: {:?}", &o);
    let properties = HashMap::from([
        ("fs.defaultFS".to_string(), "hdfs://nameservice1".to_string()),
        ("dfs.nameservices".to_string(), "nameservice1".to_string()),
        (
            "dfs.namenode.rpc-address.nameservice1.nn1".to_string(),
            "nn1:8020".to_string(),
        ),
        (
            "dfs.namenode.rpc-address.nameservice1.nn2".to_string(),
            "nn2:8020".to_string(),
        ),
        (
            "dfs.ha.namenodes.nameservice1".to_string(),
            "nn1,nn2".to_string(),
        ),]);

    let config = HadoopCatalogConfig::builder()
        .warehouse("hdfs://nameservice1/user/hive/iceberg".to_owned())
        .properties(properties)
        .build();
    let catalog = HadoopCatalog::new(config).await?;

    catalog
        .list_namespaces(None)
        .await
        .unwrap()
        .iter()
        .for_each(|namespace| {
            println!("namespace: {:?}", namespace);
        });

    let table_ident = TableIdent::new(
        NamespaceIdent::new("ods".to_owned()),
        "test1".to_owned(),
    );

    let table = catalog.load_table(&table_ident).await?;

    println!("table:{:?}", table.metadata());

    let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
        .await
        .unwrap();

    let ctx = SessionContext::new();

    ctx.register_table("test1", Arc::new(table_provider))
        .unwrap();
    let df = ctx
        .sql("SELECT * FROM test1 limit 100")
        .await
        .unwrap();
    df.show().await.unwrap();
    Ok(())
}

Commit count: 0

cargo fmt