| Crates.io | inbq |
| lib.rs | inbq |
| version | 0.12.0 |
| created_at | 2025-05-24 13:54:25.874963+00 |
| updated_at | 2026-01-15 14:06:16.160427+00 |
| description | A library for parsing BigQuery queries and extracting schema-aware, column-level lineage. |
| homepage | https://github.com/lpraat/inbq |
| repository | https://github.com/lpraat/inbq |
| max_upload_size | |
| id | 1687449 |
| size | 812,670 |
A library for parsing BigQuery queries and extracting schema-aware, column-level lineage.
pip install inbq
import inbq
catalog = {"schema_objects": []}
def add_table(name: str, columns: list[tuple[str, str]]) -> None:
catalog["schema_objects"].append({
"name": name,
"kind": {
"table": {
"columns": [{"name": name, "dtype": dtype} for name, dtype in columns]
}
}
})
add_table("project.dataset.out", [("id", "int64"), ("val", "float64")])
add_table("project.dataset.t1", [("id", "int64"), ("x", "float64")])
add_table("project.dataset.t2", [("id", "int64"), ("s", "struct<source string, x float64>")])
query = """
declare default_val float64 default (select min(val) from project.dataset.out);
insert into `project.dataset.out`
select
id,
if(x is null or s.x is null, default_val, x + s.x)
from `project.dataset.t1` inner join `project.dataset.t2` using (id)
where s.source = "baz";
"""
pipeline = (
inbq.Pipeline()
.config(raise_exception_on_error=False, parallel=True)
.parse()
.extract_lineage(catalog=catalog, include_raw=False)
)
pipeline_output = inbq.run_pipeline(sqls=[query], pipeline=pipeline)
for ast, output_lineage in zip(pipeline_output.asts, pipeline_output.lineages):
print(f"{ast=}")
print("\nLineage:")
for object in output_lineage.lineage.objects:
print("Inputs:")
for node in object.nodes:
print(
f"{object.name}->{node.name} <- {[f'{input_node.obj_name}->{input_node.node_name}' for input_node in node.inputs]}"
)
print("\nSide inputs:")
for node in object.nodes:
print(
f"{object.name}->{node.name} <- {[f'{input_node.obj_name}->{input_node.node_name}' for input_node in node.side_inputs]}"
)
print("\nReferenced columns:")
for object in output_lineage.referenced_columns.objects:
for node in object.nodes:
print(f"{object.name}->{node.name} referenced in {node.referenced_in}")
# Prints:
# ast=Ast(...)
# Lineage:
# Inputs:
# project.dataset.out->id <- ['project.dataset.t2->id', 'project.dataset.t1->id']
# project.dataset.out->val <- ['project.dataset.t2->s.x', 'project.dataset.t1->x', 'project.dataset.out->val']
#
# Side inputs:
# project.dataset.out->id <- ['project.dataset.t2->s.source']
# project.dataset.out->val <- ['project.dataset.t2->s.source']
#
# Referenced columns:
# project.dataset.out->val referenced in ['default_var', 'select']
# project.dataset.t1->id referenced in ['join', 'select']
# project.dataset.t1->x referenced in ['select']
# project.dataset.t2->id referenced in ['join', 'select']
# project.dataset.t2->s.x referenced in ['select']
# project.dataset.t2->s.source referenced in ['where']
cargo add inbq
use inbq::{
lineage::{
catalog::{Catalog, Column, SchemaObject, SchemaObjectKind},
extract_lineage,
},
parser::Parser,
scanner::Scanner,
};
fn column(name: &str, dtype: &str) -> Column {
Column {
name: name.to_owned(),
dtype: dtype.to_owned(),
}
}
fn main() -> anyhow::Result<()> {
env_logger::init();
let sql = r#"
declare default_val float64 default (select min(val) from project.dataset.out);
insert into `project.dataset.out`
select
id,
if(x is null or s.x is null, default_val, x + s.x)
from `project.dataset.t1` inner join `project.dataset.t2` using (id)
where s.source = "baz";
"#;
let mut scanner = Scanner::new(sql);
scanner.scan()?;
let mut parser = Parser::new(scanner.tokens());
let ast = parser.parse()?;
println!("Syntax Tree: {:?}", ast);
let data_catalog = Catalog {
schema_objects: vec![
SchemaObject {
name: "project.dataset.out".to_owned(),
kind: SchemaObjectKind::Table {
columns: vec![column("id", "int64"), column("val", "int64")],
},
},
SchemaObject {
name: "project.dataset.t1".to_owned(),
kind: SchemaObjectKind::Table {
columns: vec![column("id", "int64"), column("x", "float64")],
},
},
SchemaObject {
name: "project.dataset.t2".to_owned(),
kind: SchemaObjectKind::Table {
columns: vec![
column("id", "int64"),
column("s", "struct<source string, x float64>"),
],
},
},
],
};
let lineage = extract_lineage(&[&ast], &data_catalog, false, true)
.pop()
.unwrap()?;
println!("\nLineage: {:?}", lineage.lineage);
println!("\nReferenced columns: {:?}", lineage.referenced_columns);
Ok(())
}
cargo install inbq
Prepare your data catalog: create a JSON file (e.g., catalog.json) that defines the schema for all tables and views referenced in your SQL queries.
Run inbq: pass the catalog file and your SQL file(s) to the inbq lineage command.
inbq extract-lineage \
--pretty \
--catalog ./examples/lineage/catalog.json \
./examples/lineage/query.sql
The output is written to stdout.