Crates.io | amadeus |
lib.rs | amadeus |
version | 0.4.3 |
source | src |
created_at | 2018-08-17 15:44:34.736293 |
updated_at | 2021-05-20 17:53:21.634136 |
description | Harmonious distributed data processing & analysis in Rust. parquet postgres aws s3 cloudfront elb json csv logs hadoop hdfs arrow common crawl |
homepage | https://github.com/constellation-rs/amadeus |
repository | https://github.com/constellation-rs/amadeus |
max_upload_size | |
id | 79956 |
size | 276,894 |
Harmonious distributed data processing & analysis in Rust
π Docs | π Home | π¬ Chat
Amadeus is a batteries-included, low-level reusable building block for the Rust Distributed Computing and Big Data ecosystems.
By design, Amadeus encourages you to write clean and reusable code that works, regardless of data scale, locally or distributed across a cluster. Write once, run at any data scale.
We aim to create a community that is welcoming and helpful to anyone that is interested! Come join us on our Zulip chat to:
Amadeus has deep, pluggable, integration with various file formats, databases and interfaces:
Data format | Source |
Destination |
---|---|---|
CSV | β | β |
JSON | β | β |
XML | π | |
Parquet | β | π¨ |
Avro | π¨ | |
PostgreSQL | β | π¨ |
HDF5 | π | |
Redshift | π | |
CloudFront Logs | β | β |
Common Crawl | β | β |
S3 | β | π¨ |
HDFS | π | π |
β = Working
π¨ = Work in Progress
π = Requested: check out the issue for how to help!
Amadeus is routinely benchmarked and provisional results are very promising:
parquet
crate with these benchmarks.Amadeus is a library that can be used on its own as parallel threadpool, or with Constellation as a distributed cluster.
Constellation is a framework for process distribution and communication, and has backends for a bare cluster (Linux or macOS), a managed Kubernetes cluster, and more in the pipeline.
This will read the Parquet partitions from the S3 bucket, and print the 100 most frequently occuring URLs.
use amadeus::prelude::*;
use amadeus::data::{IpAddr, Url};
use std::error::Error;
#[derive(Data, Clone, PartialEq, Debug)]
struct LogLine {
uri: Option<String>,
requestip: Option<IpAddr>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None, None)?;
let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
.await?;
let top_pages = rows
.par_stream()
.map(|row: Result<LogLine, _>| {
let row = row.unwrap();
(row.uri, row.requestip)
})
.most_distinct(&pool, 100, 0.99, 0.002, 0.0808)
.await;
println!("{:#?}", top_pages);
Ok(())
}
This is typed, so faster, and it goes an analytics step further also, prints top 100 URLs by distinct IPs logged.
use amadeus::prelude::*;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None, None)?;
let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
.await?;
let top_pages = rows
.par_stream()
.map(|row: Result<Value, _>| {
let row = row.ok()?.into_group().ok()?;
row.get("uri")?.clone().into_url().ok()
})
.filter(|row| row.is_some())
.map(Option::unwrap)
.most_frequent(&pool, 100, 0.99, 0.002)
.await;
println!("{:#?}", top_pages);
Ok(())
}
What about loading this data into Postgres? This will create and populate a table called "accesslogs".
use amadeus::prelude::*;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None, None)?;
let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
.await?;
// Note: this isn't yet implemented!
rows.par_stream()
.pipe(Postgres::new("127.0.0.1", PostgresTable::new("accesslogs")));
Ok(())
}
Operations can run on a parallel threadpool or on a distributed process pool.
Amadeus uses the Constellation framework for process distribution and communication. Constellation has backends for a bare cluster (Linux or macOS), and a managed Kubernetes cluster.
use amadeus::dist::prelude::*;
use amadeus::data::{IpAddr, Url};
use constellation::*;
use std::error::Error;
#[derive(Data, Clone, PartialEq, Debug)]
struct LogLine {
uri: Option<String>,
requestip: Option<IpAddr>,
}
fn main() -> Result<(), Box<dyn Error>> {
init(Resources::default());
// #[tokio::main] isn't supported yet so unfortunately setting up the Runtime must be done explicitly
tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap()
.block_on(async {
let pool = ProcessPool::new(None, None, None, Resources::default())?;
let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
.await?;
let top_pages = rows
.dist_stream()
.map(FnMut!(|row: Result<LogLine, _>| {
let row = row.unwrap();
(row.uri, row.requestip)
}))
.most_distinct(&pool, 100, 0.99, 0.002, 0.0808)
.await;
println!("{:#?}", top_pages);
Ok(())
})
}
todo
Take a look at the various examples.
Amadeus is an open source project! If you'd like to contribute, check out the list of βgood first issuesβ. These are all (or should be) issues that are suitable for getting started, and they generally include a detailed set of instructions for what to do. Please ask questions and ping us on our Zulip chat if anything is unclear!
Licensed under Apache License, Version 2.0, (LICENSE.txt or http://www.apache.org/licenses/LICENSE-2.0).
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be licensed as above, without any additional terms or conditions.