Crates.io | vineyard-datafusion |
lib.rs | vineyard-datafusion |
version | 0.17.2 |
source | src |
created_at | 2023-09-04 02:39:22.24686 |
updated_at | 2023-09-20 02:58:42.105864 |
description | Vineyard Rust SDK: arrow datafusion integration for DataFrame |
homepage | https://v6d.io |
repository | https://github.com/v6d-io/v6d.git |
max_upload_size | |
id | 962571 |
size | 15,491 |
[!NOTE] Rust nightly is required. The vineyard Rust SDK is still under development. The API may change in the future.
Resolve the UNIX-domain socket from the environment variable VINEYARD_IPC_SOCKET
:
use vineyard::client::*;
let mut client = vineyard::default().unwrap();
Or, using explicit parameter:
use vineyard::client::*;
let mut client = vineyard::connect("/var/run/vineyard.sock").unwrap();
Creating blob:
let mut blob_writer = client.create_blob(N)?;
Get object:
let mut meta_writer = client.get::<DataFrame>(object_id)?;
numpy.ndarray
Python:
import numpy as np
import vineyard
client = vineyard.connect()
np_array = np.random.rand(10, 20).astype(np.int32)
object_id = int(client.put(np_array))
Rust:
let mut client = IPCClient::default()?;
let tensor = client.get::<Int32Tensor>(object_id)?;
assert_that!(tensor.shape().to_vec()).is_equal_to(vec![10, 20]);
pandas.DataFrame
Python
import pandas as pd
import vineyard
client = vineyard.connect()
df = pd.DataFrame({'a': ["1", "2", "3", "4"], 'b': ["5", "6", "7", "8"]})
object_id = int(client.put(df))
Rust
let mut client = IPCClient::default()?;
let dataframe = client.get::<DataFrame>(object_id)?;
assert_that!(dataframe.num_columns()).is_equal_to(2);
assert_that!(dataframe.names().to_vec()).is_equal_to(vec!["a".into(), "b".into()]);
for index in 0..dataframe.num_columns() {
let column = dataframe.column(index);
assert_that!(column.len()).is_equal_to(4);
}
pyarrow.RecordBatch
Python
import pandas as pd
import pyarrow as pa
import vineyard
client = vineyard.connect()
arrays = [
pa.array([1, 2, 3, 4]),
pa.array(["foo", "bar", "baz", "qux"]),
pa.array([3.0, 5.0, 7.0, 9.0]),
]
batch = pa.RecordBatch.from_arrays(arrays, ["f0", "f1", "f2"])
object_id = int(client.put(batch))
Rust
let batch = client.get::<RecordBatch>(object_id)?;
assert_that!(batch.num_columns()).is_equal_to(3);
assert_that!(batch.num_rows()).is_equal_to(4);
let schema = batch.schema();
let names = ["f0", "f1", "f2"];
let recordbatch = batch.as_ref().as_ref();
pyarrow.Table
Python
batches = [batch] * 5
table = pa.Table.from_batches(batches)
object_id = int(client.put(table))
Rust
let mut client = IPCClient::default()?;
let table = client.get::<Table>(object_id)?;
assert_that!(table.num_batches()).is_equal_to(5);
for batch in table.batches().iter() {
// ...
}
polars.DataFrame
Python
import polars
dataframe = polars.DataFrame(table)
object_id = int(client.put(dataframe))
Rust
use vineyard_polars::ds::dataframe::DataFrame;
let mut client = IPCClient::default()?;
let dataframe = client.get::<DataFrame>(object_id)?;
let dataframe = dataframe.as_ref().as_ref();
assert_that!(dataframe.width()).is_equal_to(3);
for column in dataframe.get_columns() {
// ...
}
polars.DataFrame
Python
batches = [batch] * 5
table = pa.Table.from_batches(batches)
object_id = int(client.put(table))
Rust
use vineyard_datafusion::ds::dataframe::DataFrame;
let mut client = IPCClient::default()?;
let dataframe = client.get::<DataFrame>(object_id)?;
let ctx = SessionContext::new();
let table = ctx.read_table(dataframe.table_provider()).unwrap();
assert_that!(block_on(table.count()).unwrap()).is_equal_to(1000);