| Crates.io | streaming_serde_json |
| lib.rs | streaming_serde_json |
| version | 0.1.2 |
| created_at | 2024-11-17 16:36:02.959478+00 |
| updated_at | 2024-11-18 04:06:03.952287+00 |
| description | This is a streaming alternative to serde_json for processing JSON data sources that are too large to fit in memory. |
| homepage | |
| repository | https://github.com/mr-adult/JFC/tree/main/streaming-parser |
| max_upload_size | |
| id | 1451383 |
| size | 114,538 |
This project is still experimental and not intended for production use. It has not been optimized to use any hardware capabilities like SIMD. The main serde_json crate is a better fit for all non-streaming use cases.
This library aims to solve a very simple to understand problem. JSON files can be larger than the memory limitations of the machine that is processing the data. As an example, if I have a file with 300 GB of JSON u64 values as input from a hardware device and am working on a machine with only 64 GB of RAM, that will not fit into my machine's memory. Assuming that JSON is well-formed and is simply in an extremely large array or dictionary, using the standard serde_json crate will not suffice. The standard serde_json crate will eagerly parse all 300 GB of values. This crate provides methods and iterator implementations for lazy-parsing JSON.
This crate only exposes 4 functions. Each returns an iterator that performs the desired conversion (either character stream to value stream or value stream to JSON stream).
Functions:
The simplest example is reading from a file, aggregating, and writing to a file.
Let's say I have a 300 GB stream of object values where each object holds the
min/max and average over a 5 minute window of data. This is placed in
myFile.json, which might look something like this:
[
{
"avg": 100,
"min": 2,
"max": 178
},
{
"avg": 102,
"min": 5,
"max": 198
},
"...etc"
]
If I want to find the global minimum and maximum values in this stream, I could do that with the following code:
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Write};
use utf8_chars::BufReadCharsExt;
use serde_derive::{Serialize, Deserialize};
use streaming_serde_json::{from_value_stream, values_to_json_stream};
#[derive(Deserialize)]
struct InputData {
avg: i32,
min: i32,
max: i32,
}
#[derive(Serialize)]
struct OutputData {
min: i32,
max: i32,
}
let mut reader =
BufReader::new(
File::open("./myFile.json")
.unwrap());
let chars = reader.chars();
// The value stream has a PassThroughError type parameter since most
// sources with data large enough to need this will be fallible to
// read from.
let values = from_value_stream::<InputData, _, _>(chars);
let mut global_min = i32::MAX;
let mut global_max = i32::MIN;
for result in values {
// Your error handling goes here. Since this is a
// demonstration, I'm not worried about panics.
let input_data_value: InputData = result.unwrap();
if input_data_value.min < global_min {
global_min = input_data_value.min;
}
if input_data_value.max > global_max {
global_max = input_data_value.max;
}
}
let output_data = OutputData {
min: global_min,
max: global_max,
};
// Since the output data will be only one item, using the
// buffered writers and streaming JSON output functions
// is overkill, but I am using them to demonstrate their
// usage.
let output_file = OpenOptions::new()
.read(false)
.write(true)
.create(true)
.truncate(true)
.open("./myResultJson.json")
.unwrap();
let mut writer = BufWriter::new(output_file);
// I am using an array, but the values_to_json_stream
// function will accept any type that implements IntoIterator
// and has an Item type that implements Serialize.
let iter = [output_data];
for str in values_to_json_stream(iter) {
writer.write_all(str.unwrap().as_bytes()).unwrap();
}
writer.flush().unwrap();
The simplest example is again, reading from a file, making some modification, and writing to a file.
Let's say I have a 300 GB stream of key value pairs where each value holds the
population of a city in the world. This is placed in worldPopulations.json,
which might look something like this:
{
"Seattle": 755078,
"New York": 19469200,
"...etc"
}
If I want to find the global average population of a city, I could do that with the following code:
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Write};
use utf8_chars::BufReadCharsExt;
use serde_derive::{Serialize, Deserialize};
use streaming_serde_json::{from_key_value_pair_stream, key_value_pairs_to_json_stream};
let mut reader =
BufReader::new(
File::open("./worldPopulations.json")
.unwrap());
let chars = reader.chars();
// The value stream has a PassThroughError type parameter since most
// sources with data large enough to need this API will be fallible to
// read from.
let values = from_key_value_pair_stream::<u32, _, _>(chars);
let mut global_total = 0_u128;
let mut city_count = 0_u32;
for result in values {
// Your error handling goes here. Since this is a
// demonstration, I'm not worried about panics.
let input_data_value: (String, u32) = result.unwrap();
global_total += input_data_value.1 as u128;
city_count += 1;
}
let global_average = global_total / city_count as u128;
// Since the output data will be only one item, using the
// buffered writers and streaming JSON output functions
// is overkill, but I am using them to demonstrate their
// usage.
let output_file = OpenOptions::new()
.read(false)
.write(true)
.create(true)
.truncate(true)
.open("./globalAvgPop.json")
.unwrap();
let mut writer = BufWriter::new(output_file);
// I am using an array, but the values_to_json_stream
// function will accept any type that implements IntoIterator
// and has an Item type that implements Serialize.
let iter = [("global total".to_string(), global_average)];
for str in key_value_pairs_to_json_stream(iter) {
writer.write_all(str.unwrap().as_bytes()).unwrap();
}
writer.flush().unwrap();