Crates.io | pyspark-arrow-rs |
lib.rs | pyspark-arrow-rs |
version | 0.3.0 |
source | src |
created_at | 2024-07-05 07:16:57.216084 |
updated_at | 2024-08-16 03:12:50.14031 |
description | Derive macros to be used to add some helper functions to Rust structs to make them useable in Pyspark's mapInArrow |
homepage | |
repository | https://github.com/jlloh/pyspark-arrow-rs |
max_upload_size | |
id | 1292429 |
size | 26,074 |
Intention is to be used together with Pyo3 (maturin), arrow-rs, serde_arrow to generate python code that can be used in Spark for ETL jobs
The naive way to try to use Spark would be to use Spark UDFs or Pandas UDFs. Spark is natively written in the JVM, and interops with Python through Py4J. The conversion overhead from a Java RDD into something that a Python UDF can process is quite high.
As such, the way around this is to use Apache Arrow. PySpark already uses Arrow for interop between JVM and Python, and by directly leveraging the Arrow format, we are able to skip any additional overhead and directly read the Arrow object in a Rust function.
RDD -> Spark (Java) -> Arrow -> Pyspark -> Rust
We are able to directly load Arrow objects into a Rust function wrapped in Python (through Maturin) through the pyspark.sql.Dataframe.mapInArrow
API.
This API expects two arguments:
We are able to write a Rust function that returns an arrow-rs RecordBatch item that can be yielded by the python function.
We are also able to pass in the expected schema by using serde_arrow (and a macro provided by this crate) to derive the schema.
This crate provides some macros that make life easier to be used together with mapInArrow
as well as provide some recipes on how to leverage Rust in Pyspark
tests
folder for a real example. But the basic idea is, given a struct, you can generate a spark DDL to be used in map_in_arrow
on a pyspark dataframeuse pyspark_arrow_rs::HasArrowSparkSchema;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, HasArrowSparkSchema)]
pub(crate) struct TestStruct {
pub(crate) col: String,
pub(crate) int_col: i64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_macro() {
let _a = TestStruct {
col: "bla".to_string(),
int_col: 1000,
};
let ddl = TestStruct::get_spark_ddl();
assert_eq!("`col` STRING, `int_col` BIGINT", ddl.unwrap());
}
}