| Crates.io | iceberg-catalog-hadoop-unofficial |
| lib.rs | iceberg-catalog-hadoop-unofficial |
| version | 0.6.0-alpha.6 |
| created_at | 2025-08-15 03:27:39.159824+00 |
| updated_at | 2025-08-22 06:55:29.259742+00 |
| description | Apache Iceberg Rust Hadoop Catalog |
| homepage | https://rust.iceberg.apache.org/ |
| repository | https://github.com/awol2005ex/iceberg-rust/tree/awol2005ex-my |
| max_upload_size | |
| id | 1796153 |
| size | 206,376 |
This crate contains the unofficial Native Rust implementation of Apache Iceberg Hadoop Catalog.
See the API documentation for examples and the full API.
Single NameNode
use std::collections::HashMap;
use std::sync::Arc;
use datafusion::prelude::SessionContext;
use iceberg_unofficial::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg_unofficial::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hadoop_unofficial::{HadoopCatalog, HadoopCatalogConfig};
use iceberg_datafusion_unofficial::IcebergTableProvider;
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let o = Command::new("cmd")
.arg("/c")
.arg("kinit -kt C:/tmp/hive@DOMAIN.COM.keytab hive@DOMAIN.COM")
.output()
.map_err(|e| e.to_string())?;
println!("kinit output: {:?}", &o);
let properties = HashMap::from([(
"fs.defaultFS".to_string(),
"hdfs://nn1:8020".to_string(),
)]);
let config = HadoopCatalogConfig::builder()
.warehouse("hdfs://nn1:8020/user/hive/iceberg".to_owned())
.properties(properties)
.build();
let catalog = HadoopCatalog::new(config).await?;
catalog
.list_namespaces(None)
.await
.unwrap()
.iter()
.for_each(|namespace| {
println!("namespace: {:?}", namespace);
});
let table_ident = TableIdent::new(
NamespaceIdent::new("ods".to_owned()),
"test1".to_owned(),
);
let table = catalog.load_table(&table_ident).await?;
println!("table:{:?}", table.metadata());
let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test1", Arc::new(table_provider))
.unwrap();
let df = ctx
.sql("SELECT * FROM test1 limit 100")
.await
.unwrap();
df.show().await.unwrap();
Ok(())
}
NameNode HA
use std::collections::HashMap;
use std::sync::Arc;
use datafusion::prelude::SessionContext;
use iceberg_unofficial::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg_unofficial::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hadoop_unofficial::{HadoopCatalog, HadoopCatalogConfig};
use iceberg_datafusion_unofficial::IcebergTableProvider;
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let o = Command::new("cmd")
.arg("/c")
.arg("kinit -kt C:/tmp/hive@DOMAIN.COM.keytab hive@DOMAIN.COM")
.output()
.map_err(|e| e.to_string())?;
println!("kinit output: {:?}", &o);
let properties = HashMap::from([
("fs.defaultFS".to_string(), "hdfs://nameservice1".to_string()),
("dfs.nameservices".to_string(), "nameservice1".to_string()),
(
"dfs.namenode.rpc-address.nameservice1.nn1".to_string(),
"nn1:8020".to_string(),
),
(
"dfs.namenode.rpc-address.nameservice1.nn2".to_string(),
"nn2:8020".to_string(),
),
(
"dfs.ha.namenodes.nameservice1".to_string(),
"nn1,nn2".to_string(),
),]);
let config = HadoopCatalogConfig::builder()
.warehouse("hdfs://nameservice1/user/hive/iceberg".to_owned())
.properties(properties)
.build();
let catalog = HadoopCatalog::new(config).await?;
catalog
.list_namespaces(None)
.await
.unwrap()
.iter()
.for_each(|namespace| {
println!("namespace: {:?}", namespace);
});
let table_ident = TableIdent::new(
NamespaceIdent::new("ods".to_owned()),
"test1".to_owned(),
);
let table = catalog.load_table(&table_ident).await?;
println!("table:{:?}", table.metadata());
let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test1", Arc::new(table_provider))
.unwrap();
let df = ctx
.sql("SELECT * FROM test1 limit 100")
.await
.unwrap();
df.show().await.unwrap();
Ok(())
}