Crates.io | ballista |
lib.rs | ballista |
version | 0.12.0 |
source | src |
created_at | 2019-07-04 20:18:09.309714 |
updated_at | 2024-02-07 00:25:41.258248 |
description | Ballista Distributed Compute |
homepage | https://github.com/apache/arrow-ballista |
repository | https://github.com/apache/arrow-ballista |
max_upload_size | |
id | 145997 |
size | 54,735 |
Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.
The foundational technologies in Ballista are:
Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.
This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.
There are numerous ways to start a Ballista cluster, including support for Docker and Kubernetes. For full documentation, refer to the deployment section of the Ballista User Guide
A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.
cargo install --locked ballista-scheduler
cargo install --locked ballista-executor
With these crates installed, it is now possible to start a scheduler process.
RUST_LOG=info ballista-scheduler
The scheduler will bind to port 50050 by default.
Next, start an executor processes in a new terminal session with the specified concurrency level.
RUST_LOG=info ballista-executor -c 4
The executor will bind to port 50051 by default. Additional executors can be started by manually specifying a bind port. For example:
RUST_LOG=info ballista-executor --bind-port 50052 -c 4
Ballista provides a BallistaContext
as a starting point for creating queries. DataFrames can be created
by invoking the read_csv
, read_parquet
, and sql
methods.
To build a simple ballista example, add the following dependencies to your Cargo.toml
file:
[dependencies]
ballista = "0.11"
datafusion = "28.0.0"
tokio = "1.0"
The following example runs a simple aggregate SQL query against a Parquet file (yellow_tripdata_2022-01.parquet
) from the
New York Taxi and Limousine Commission
data set. Download the file and add it to the testdata
folder before running the example.
use ballista::prelude::*;
use datafusion::prelude::{col, min, max, avg, sum, ParquetReadOptions};
use datafusion::arrow::util::pretty;
use datafusion::prelude::CsvReadOptions;
#[tokio::main]
async fn main() -> Result<()> {
// create configuration
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
// connect to Ballista scheduler
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
let filename = "testdata/yellow_tripdata_2022-01.parquet";
// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["passenger_count", "fare_amount"])?
.aggregate(vec![col("passenger_count")], vec![min(col("fare_amount")), max(col("fare_amount")), avg(col("fare_amount")), sum(col("fare_amount"))])?
.sort(vec![col("passenger_count").sort(true,true)])?;
// this is equivalent to the following SQL
// SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
// FROM tripdata
// GROUP BY passenger_count
// ORDER BY passenger_count
// print the results
df.show().await?;
Ok(())
}
The output should look similar to the following table.
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| | -159.5 | 285.2 | 17.60577640099004 | 1258865.829999991 |
| 0 | -115 | 500 | 11.794859107585335 | 614052.1600000001 |
| 1 | -480 | 401092.32 | 12.61028389876563 | 22623542.879999973 |
| 2 | -250 | 640.5 | 13.79501011585127 | 4732047.139999998 |
| 3 | -130 | 480 | 13.473184817311106 | 1139427.2400000002 |
| 4 | -250 | 464 | 14.232650547832726 | 502711.4499999997 |
| 5 | -52 | 668 | 12.160378472086954 | 624289.51 |
| 6 | -52 | 252.5 | 12.576583325529857 | 402916 |
| 7 | 7 | 79 | 61.77777777777778 | 556 |
| 8 | 8.3 | 115 | 79.9125 | 639.3 |
| 9 | 9.3 | 96.5 | 65.26666666666667 | 195.8 |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
More examples can be found in the arrow-ballista repository.