Crates.io | cu-aligner |
lib.rs | cu-aligner |
version | |
source | src |
created_at | 2024-12-02 21:32:44.187109 |
updated_at | 2024-12-05 22:42:23.009294 |
description | A Copper component to align messages in time. |
homepage | https://github.com/copper-project |
repository | https://github.com/copper-project/copper-rs |
max_upload_size | |
id | 1469271 |
Cargo.toml error: | TOML parse error at line 18, column 1 | 18 | autolib = false | ^^^^^^^ unknown field `autolib`, expected one of `name`, `version`, `edition`, `authors`, `description`, `readme`, `license`, `repository`, `homepage`, `documentation`, `build`, `resolver`, `links`, `default-run`, `default_dash_run`, `rust-version`, `rust_dash_version`, `rust_version`, `license-file`, `license_dash_file`, `license_file`, `licenseFile`, `license_capital_file`, `forced-target`, `forced_dash_target`, `autobins`, `autotests`, `autoexamples`, `autobenches`, `publish`, `metadata`, `keywords`, `categories`, `exclude`, `include` |
size | 0 |
The goal of this Copper component is to align data from different sources.
STREAM 1 STREAM 2
│ │
Msg1_S1 ─╯ │ (discarded)
│ │
═══════════╪═══════════════════════╪═══════════ TIME HORIZON ( present - stale_data_horizon_ms )
│ │
│ │
│ Msg1_S2 ─┤
Msg2_S1 ─┤ │
│ │
Msg3_S1 ─┤ Msg2_S2 ─┤
│ │
│ │
│ │
═══════════╪═══════════════════════╪═══════════ BEGINNING OF ALIGNMENT WINDOW ( t - alignment_window_ms )
│ │
│ Msg3_S2 ─┤
Msg4_S1 ─┤ │ <- This part will be in the output at each process call
│ │
Msg5_S1 ─┤ Msg4_S2 ─┤
│ │
═══════════╪═══════════════════════╪═══════════ MOST RECENT ALIGNED MESSAGE ( t )
│ │
│ Msg5_S2 ─┤
│ │
│ Msg6_S2 ─┤ (not yet aligned)
The timings are taken from the CuMsg::metadata.tov field (time of validity).
The task is generated entirely out of the cu_aligner::define_task
macro:
use cu_aligner::define_task;
use cu29::input_msg;
use cu29::output_msg;
use cu29::cutask::Freezable;
use cu29::config::ComponentConfig;
use cu29::CuResult;
use cu29::cutask::CuTask;
use cu29::cutask::CuMsg;
// Defines a task that aligns two streams of messages, one with payloads f32, the other MyPayload (you can use any rust struct that to implement the traits for CuMsgPayload).
define_task!(MyAlignerTask,
0 => { 15, 7, f32 }, // 5 is the Maximum capacity in nb of messages the internal
// buffer structure can hold before they will me discarded,
// 12 is the Maximum size in nb of messages the output (aligned messages of this type)
1 => { 20, 5, u64 } // or any CuMsgPayload
// you can continue with 2 => etc...
);
You defined task will need to be connected with the matching tasks upstream in the Copper config file.
The type of the input will be a tuple of CuMsg (which is what the aligner expects). From the example:
(CuMsg<f32>, CuMsg<MyPayload>)
.
The type of the output will be a CuMsg of a tuple of CuArrays holding the aligned messages for each stream. From the
example: CuMsg<(CuArray<f32, 7>, CuArray<MyPayload, 5>)>
.
Copper by itself never buffers anything to avoid copies but for this aligner has to copy data until it can align it. It means you will have 1 copy from the input to the internal buffer and 1 copy from the internal buffer to the output.
If your usecase is just to get the latest message from 2 sources, just connect your task to the upstream tasks but do not use this aligner. It is only useful if the time of arrival from the 2 upstream tasks are too far appart in terms of tov.