Crates.io | timer-deque-rs |
lib.rs | timer-deque-rs |
version | 0.4.0 |
created_at | 2025-08-21 23:42:23.548714+00 |
updated_at | 2025-09-05 21:35:44.665644+00 |
description | A OS based timer and timer queue which implements timeout queues of different types. |
homepage | |
repository | https://codeberg.org/4neko/timer-deque-rs |
max_upload_size | |
id | 1805625 |
size | 366,566 |
A OS based timer and timer event dequeue which can be used to create a simple timeout scheduling. Can be used just as OS timer or use it together with the timeout deque.
This crate is still incomplete and not properly tested. The proof of concept is now developed.
The pull requests are now supported because the repository was moved to Codeberg. The alternative way is to send patches over the email to patch[at]4neko.org.
In case if you would like to contribute the code, please use pull request. Your pull request should include:
Description of changes and why it is needed.
Test the pull request.
In case of you prefer email and patch files please consider the following:
For each feature or fix, please send patches separatly.
Please write what your patch is implementing or fixing.
I can read the code and I am able to understand it, so don't write a poem or essay in the description to the patches.
Please test your patch.
Can I use the MPL-2.0 licensed code (crate) in larger project licensed with more permissive license like BSD or MIT.
I want to distribute (outside my organization) executable programs or libraries that I have compiled from someone else's unchanged MPL-licensed source code, either standalone or part of a larger work. What do I have to do?
You must inform the recipients where they can get the source for the MPLed code in the executable program or library you are distributing (i.e., you must comply with Section 3.2). You may distribute any executables you create under a license of your choosing, as long as that license does not interfere with the recipients' rights to the source under the terms of the MPL.
Yes, MPL- and Apache-licensed code can be used with an MIT codebase (so in that sense, they are "compatible"). However, the MPL- / Apache-licensed code remains under its original license. (So although compatible, you cannot relicense someone else's MPL or Apache code into the MIT license.) This means that your final codebase will contain a mix of MPL, Apache, and MIT licensed code. As an example, MPL has weak copyleft, so if you modified an MPL file, that file (including your changes) must remain under the MPL license.
You should use this license if you are located in the EU which gives you more advantages over GPL because in case of any disputes, the license allows you to defend your rights in a European Union country, in this case it will be Spain. It has also been translated into all languages of the EU member states.
Matrix of EUPL compatible open source licences
EUPL-1.2
is incompatiable with GPL
according to GNU ORG
This is a free software license. By itself, it has a copyleft comparable to the GPL's, and incompatible with it.
v 0.4.0-development, Rust edition 2024
Changes in the layout
, poll system
and tasks
.
poll
now provides an option to interrupt the polling process from another thread. Also, polling can be interrupted by adding or removing timer.Changes in the layout
, time types
and new deque modes
.
AbsoluteTime
and RelativeTime
. Both
are also a types which modifies the behaviour of the code.OrderdTimerDequeOnce
and OrderdTimerDequePeriodic
. Both are based
on AbsoluteTime
however the latter using RelativeTime
time to offset the next timeout. The difference is: the former removes the item from the queue after timeout, the latter keeps it until manually removed.Changes in the poll
system.
AsTimerFd
can be added to instance.polling
instance by the weak reference. If timer drops, it will be
automatically removed from the poll
.Sources are available under: MPL-2.0 OR EUPL-1.2
The project has moved to Codeberg.
see ./examples/ there
┌────────────────────────────┐
│ │
│ OrderdTimerDequeOnce │
│ │
└────────────────────────────┘
┌─────────────────────────────┐ ┌────────────────────────────┐
│ Deque │ │ TimerFD │
│ │ │ │
│ │ │ timeout: 12332456 │
│ ┌─────────────────────┐ │ │ absoute time │
│ │ ITEM │ │ └───────────┬────────────────┘
│ │ TIMEOUT: 12332456 │ ──┼──────┐ │
│ └─────────────────────┘ │ │ ──────────┼─────────────
│ │ │ │
│ │ │ ▼
│ ┌─────────────────────┐ │ │ ┌────────────────────┐
│ │ ITEM │ │ │ │ │
│ │ TIMOEUT: 12334543 ┼───┼────┐ │ │ timeout │
│ └─────────────────────┘ │ │ │ └─────────┬──────────┘
│ │ │ │ │
│ │ │ │ │
│ ┌─────────────────────┐ │ │ │ ▼
│ │ ITEM │ │ │ │ ┌────────────────────┐
│ │ TIMEOUT: 12335654 │ │ │ │ │ │
│ └─────────────────────┘ │ │ └────► │ pop_front │
│ │ │ │ timeout <= cur_tm │
│ │ │ └─────────┬──────────┘
│ ............ │ │ │
│ │ │ │
│ │ │ │
└─────────────────────────────┘ │ ▼
│ ┌──────────────────┐
│ │ │
└───────►│ set_timer │
│ │
└─────────┬────────┘
│
┌─────────────▼──────────────┐
│ TimerFD │
│ │
│ timeout: 12334543 │
│ absoute time │
└──────────┬─────────────────┘
│
┌───────▼────────────┐
│ │
│ timeout │
└─────────┬──────────┘
┌─────────────────────────────┐
│ │
│ OrderdTimerDequeuPeriodic │
│ │
└─────────────────────────────┘
┌─────────────────────────────┐ ┌────────────────────────────┐
│ Deque │ │ TimerFD │
│ │ │ │
│ │ │ timeout: 12332456 │
│ ┌─────────────────────┐ │ │ absoute time │
│ │ ITEM │ │ ──────────┬────────────────┘
│ │ TIMEOUT: 12332456 │ ──┼───────┐ │
│ │ EXTEND: 100 │ │ │ ──────────┼─────────────
│ └─────────────────────┘ │ │ │
│ │ │ ▼
│ ┌─────────────────────┐ │ │ ┌────────────────────┐
│ │ ITEM │ │ │ │ │
│ │ TIMOEUT: 12334543 ┼───┼────┐ │ │ timeout │
│ │ EXTEND: 120 │ │ │ │ └─────────┬──────────┘
│ └─────────────────────┘ │ │ │ │
│ │ │ │ │
│ ┌─────────────────────┐ │ │ │ ▼
│ │ ITEM │ │ │ │ ┌────────────────────┐
│ │ TIMEOUT: 12335654 │ │ │ │ │ │
│ │ EXTEND: 150 │ │ │ └────►│ pop_front │
│ └─────────────────────┘ │ │ │ timeout <= cur_tm │
│ │ │ └─────────┬──────────┘
┌──────┼───► ............ │ │ │
│ │ │ │ │
│ │ │ │ │
│ └─────────────────────────────┘ │ ▼
│ │ ┌───────────────────────────┐
│ │ │ ITEM │
│ return back with new tm │ │ TIMEOUT: 12332456+100 │
└─────────────────────────────────────────┼────────┼ │
│ │ NEW_TIMEOUT: 12332556 │
│ └──────────┬────────────────┘
│ ▼
│ ┌──────────────────┐
│ │ │
│ │ set_timer │
└─────────►│ │
└─────────┬────────┘
│
┌─────────────▼──────────────┐
│ TimerFD │
│ │
│ timeout: 12334543 │
│ absoute time │
└──────────┬─────────────────┘
│
┌───────▼────────────┐
│ │
│ timeout │
└─────────┬──────────┘
...
The items are stored sorted by timeout VecDeque
. The identification of the item in the queue is perfomed
by PartialEq
and Eq
by the instance or ID, and not by timeout value. The timer is set to the nearest (by time) value.
For every sync example, the event notification is used. In case of Linux, the EPoll is used.
This type of dequeue is consuming the instance.
use std::{cmp::Ordering, fmt, os::fd::{AsFd, AsRawFd, RawFd}, sync::Arc};
use timer_deque_rs::
{
AbsoluteTime,
OrderdTimerDequeOnce,
OrderedTimerDeque,
TimerDequeueConsumer,
TimerPoll,
TimerReadRes
};
#[derive(Debug, PartialEq, Eq, Clone)]
struct TestItem(u64);
impl fmt::Display for TestItem
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "0 = {}", self.0)
}
}
fn main()
{
let ev_watch = TimerPoll::new().unwrap();
let mut time_list =
OrderedTimerDeque
::<TimerDequeueConsumer<Arc<TestItem>, OrderdTimerDequeOnce>>
::new("test_label".into(), 4, false).unwrap();
// add timer to event
ev_watch.add(&time_list).unwrap();
let abs_time = AbsoluteTime::now();
let tss_set1 = abs_time.clone().add_sec(3);
let ent1 = Arc::new(TestItem(1));
let tss_set2 = abs_time.clone().add_sec(7);
let ent2 = Arc::new(TestItem(2));
let tss_set3 = abs_time.clone().add_sec(10);
let ent3 = Arc::new(TestItem(3));
time_list.add_to_timer(ent1.clone(), tss_set1).unwrap();
time_list.add_to_timer(ent2.clone(), tss_set2).unwrap();
time_list.add_to_timer(ent3.clone(), tss_set3).unwrap();
// poll timer for events (it can be done in separate thread)
let res = ev_watch.poll(Option::None).unwrap();
let poll_timeout = AbsoluteTime::now();
assert_eq!(res.is_some(), true);
assert_eq!(res.as_ref().unwrap().len(), 1);
// get timer's FD's to identify which timers are ready
let timer_fd: RawFd = res.as_ref().unwrap()[0];
assert_eq!(timer_fd, time_list.as_fd().as_raw_fd());
// consume event from timer to find out what happened
let res = time_list.wait_for_event().unwrap();
println!("timer timeout with result: {}", res);
// it should be TimerReadRes::Ok with overflow 1
assert_eq!(TimerReadRes::Ok(1), res);
let mut timeout_items: Vec<Arc<TestItem>> = Vec::new();
// read timeout items
time_list.timeout_event_handler(res, &mut timeout_items).unwrap();
assert_eq!(timeout_items.len(), 1); // only one item
assert_eq!(timeout_items[0], ent1); // item 1
println!("timer item: {}, timeout: {}, curtime: {}", &timeout_items[0], tss_set1, poll_timeout);
assert_eq!(tss_set1.seconds_cmp(&poll_timeout) == Ordering::Equal, true);
// removing second item from queue, so ent3 should be next to fire
time_list.remove_from_sched_queue(&ent2).unwrap();
assert_eq!(time_list.timer_queue_len(), 1); // only ent3 left
println!("item {} removed from shed. queue", ent2);
println!("queue len: {}", time_list.timer_queue_len());
// -----
// wait for events (ent3)
let res = ev_watch.poll(Option::None).unwrap();
let poll_timeout = AbsoluteTime::now();
assert_eq!(res.is_some(), true);
assert_eq!(res.as_ref().unwrap().len(), 1);
// get timer's FD's to identify which timers are ready
let timer_fd: RawFd = res.as_ref().unwrap()[0];
assert_eq!(timer_fd, time_list.as_fd().as_raw_fd());
// consume event from timer to find out what happened
let res = time_list.wait_for_event().unwrap();
println!("timer timeout with result: {}", res);
// it should be TimerReadRes::Ok with overflow 1
assert_eq!(TimerReadRes::Ok(1), res);
let mut timeout_items: Vec<Arc<TestItem>> = Vec::new();
time_list.timeout_event_handler(res, &mut timeout_items).unwrap();
assert_eq!(timeout_items.len(), 1); // only one item
assert_eq!(timeout_items[0], ent3); // item 3
println!("timer item: {}, timeout: {}, curtime: {}", &timeout_items[0], tss_set3, poll_timeout);
assert_eq!(tss_set3.seconds_cmp(&poll_timeout) == Ordering::Equal, true);
println!("timer queue len: {}", time_list.timer_queue_len());
assert_eq!(time_list.timer_queue_len(), 0); // queue should be empty
return;
}
This type of dequeue is issuing the ticket for an instance which can be cancelled (ticket).
use std::{cmp::Ordering, fmt, os::fd::{AsFd, AsRawFd, RawFd}};
use timer_deque_rs::
{
AbsoluteTime,
OrderdTimerDequeOnce,
OrderedTimerDeque,
TimerDequeueId,
TimerDequeueTicket,
TimerDequeueTicketIssuer,
TimerPoll,
TimerReadRes
};
#[derive(Debug, PartialEq, Eq)]
struct TestItem(TimerDequeueTicket);
impl fmt::Display for TestItem
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "0 = {}", self.0)
}
}
fn main()
{
let ev_watch = TimerPoll::new().unwrap();
let mut time_list =
OrderedTimerDeque
::<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
::new("test_label".into(), 4, false).unwrap();
// add timer to event
ev_watch.add(&time_list).unwrap();
let abs_time = AbsoluteTime::now();
// add to timer
let tss_set1 = abs_time.clone().add_sec(3);
let ticket1 = time_list.add_to_timer(tss_set1).unwrap();
let ent1 = TestItem(ticket1);
let tss_set2 = abs_time.clone().add_sec(7);
let ticket2 = time_list.add_to_timer(tss_set2).unwrap();
let ent2 = TestItem(ticket2);
let tss_set3 = abs_time.clone().add_sec(10);
let ticket3 = time_list.add_to_timer(tss_set3).unwrap();
let ent3 = TestItem(ticket3);
// poll timer
let res = ev_watch.poll(Option::None).unwrap();
let poll_timeout = AbsoluteTime::now();
assert_eq!(res.is_some(), true);
assert_eq!(res.as_ref().unwrap().len(), 1);
// get timer's FD's to identify which timers are ready
let timer_fd: RawFd = res.as_ref().unwrap()[0];
assert_eq!(timer_fd, time_list.as_fd().as_raw_fd());
// consume event from timer to find out what happened
let res = time_list.wait_for_event().unwrap();
println!("timer timeout with result: {}", res);
// it should be TimerReadRes::Ok with overflow 1
assert_eq!(TimerReadRes::Ok(1), res);
let mut timeout_items: Vec<TimerDequeueId> = Vec::new();
// read timeout items
time_list.timeout_event_handler(res, &mut timeout_items).unwrap();
assert_eq!(timeout_items.len(), 1); // only one item
assert_eq!(timeout_items[0], ent1.0); // item 1
assert_eq!(ent1.0, timeout_items[0]);
assert_eq!(ent1.0.is_queued(), false);
println!("timer item: {}, timeout: {}, curtime: {}", &timeout_items[0], tss_set1, poll_timeout);
assert_eq!(tss_set1.seconds_cmp(&poll_timeout) == Ordering::Equal, true);
// removing second item from queue, so ent3 should be next to fire
println!("item {} removing from shed. queue", ent2);
time_list.remove_from_sched_queue(ent2.0).unwrap();
assert_eq!(time_list.timer_queue_len(), 1); // only ent3 left
println!("queue len: {}", time_list.timer_queue_len());
// -----
// wait for events (ent3)
let res = ev_watch.poll(Option::None).unwrap();
let poll_timeout = AbsoluteTime::now();
assert_eq!(res.is_some(), true);
assert_eq!(res.as_ref().unwrap().len(), 1);
// get timer's FD's to identify which timers are ready
let timer_fd: RawFd = res.as_ref().unwrap()[0];
assert_eq!(timer_fd, time_list.as_fd().as_raw_fd());
// consume event from timer to find out what happened
let res = time_list.wait_for_event().unwrap();
println!("timer timeout with result: {}", res);
// it should be TimerReadRes::Ok with overflow 1
assert_eq!(TimerReadRes::Ok(1), res);
let mut timeout_items: Vec<TimerDequeueId> = Vec::new();
time_list.timeout_event_handler(res, &mut timeout_items).unwrap();
assert_eq!(timeout_items.len(), 1); // only one item
assert_eq!(timeout_items[0], ent3.0); // item 3
println!("timer item: {}, timeout: {}, curtime: {}", &timeout_items[0], tss_set3, poll_timeout);
assert_eq!(tss_set3.seconds_cmp(&poll_timeout) == Ordering::Equal, true);
println!("timer queue len: {}", time_list.timer_queue_len());
assert_eq!(time_list.timer_queue_len(), 0); // queue should be empty
return;
}
This type of dequeue is sending signal i.e notification using provided functionality.
use std::
{
fmt,
os::fd::{AsFd, AsRawFd, RawFd},
sync::{atomic::{AtomicBool, Ordering}, mpsc::{self, SendError}, Arc},
time::Duration
};
use timer_deque_rs::
{
common,
AbsoluteTime,
OrderdTimerDequeOnce,
OrderedTimerDeque,
TimerDequeueSignal,
TimerDequeueSignalTicket,
TimerPoll,
TimerReadRes
};
#[derive(Debug)]
pub struct TestStruct
{
uniq_id: u64,
sig_chan_snd: mpsc::Sender<u64>,
}
impl Eq for TestStruct {}
impl PartialEq for TestStruct
{
fn eq(&self, other: &Self) -> bool
{
return self.uniq_id == other.uniq_id;
}
}
impl fmt::Display for TestStruct
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "uniq_id: {}", self.uniq_id)
}
}
impl TimerDequeueSignal for TestStruct
{
type TimerQueueID = u64;
type TimeoutErr = SendError<Self::TimerQueueID>;
fn get_id(&self) -> Self::TimerQueueID
{
return self.uniq_id;
}
fn sig_timeout(&mut self) -> Result<(), Self::TimeoutErr>
{
// non blocking
return self.sig_chan_snd.send(self.uniq_id);
}
}
impl TestStruct
{
fn new(uniq_id: u64, snd: mpsc::Sender<u64>) -> Self
{
return Self{ uniq_id: uniq_id, sig_chan_snd: snd };
}
}
fn main()
{
let ev_watch = TimerPoll::new().unwrap();
let mut time_list =
OrderedTimerDeque
::<TimerDequeueSignalTicket<TestStruct, OrderdTimerDequeOnce>>
::new("test_label".into(), 4, false).unwrap();
// add timer to event
ev_watch.add(&time_list).unwrap();
let (snd, rcv) = mpsc::channel::<u64>();
let exit_flag = Arc::new(AtomicBool::new(false));
let c_exit_flag = exit_flag.clone();
let thread_hndlr =
std::thread::spawn(move ||
{
let mut exp_ids = vec![300, 100];
loop
{
let Ok(uniq_id) = rcv.recv_timeout(Duration::from_millis(100))
else
{
if c_exit_flag.load(Ordering::Relaxed) == true
{
return;
}
else if exp_ids.len() == 0
{
return;
}
continue;
};
println!("received timeout signal from item: {}", uniq_id);
assert_eq!(exp_ids.pop().unwrap(), uniq_id);
}
}
);
let abs_time = AbsoluteTime::now();
let tss_set1 = abs_time.clone().add_sec(3);
let ent1 = TestStruct::new(100, snd.clone());
let tss_set2 = abs_time.clone().add_sec(7);
let ent2 = TestStruct::new(200, snd.clone());
let tss_set3 = abs_time.clone().add_sec(10);
let ent3 = TestStruct::new(300, snd.clone());
time_list.add_to_timer(ent1, tss_set1).unwrap();
time_list.add_to_timer(ent2, tss_set2).unwrap();
time_list.add_to_timer(ent3, tss_set3).unwrap();
let res = ev_watch.poll(Option::None).unwrap();
let poll_timeout = common::get_current_timestamp().timestamp();
assert_eq!(res.is_some(), true);
assert_eq!(res.as_ref().unwrap().len(), 1);
// get timer's FD's to identify which timers are ready
let timer_fd: RawFd = res.as_ref().unwrap()[0];
assert_eq!(timer_fd, time_list.as_fd().as_raw_fd());
// consume event from timer to find out what happened
let res = time_list.wait_for_event().unwrap();
println!("timer timeout with result: {}, timeout: {}, curtime: {}", res, tss_set1, poll_timeout);
// it should be TimerReadRes::Ok with overflow 1
assert_eq!(TimerReadRes::Ok(1), res);
// read timeout items
time_list.timeout_event_handler(res).unwrap();
// removing second item from queue, so ent3 should be next to fire
time_list.remove_from_sched_queue(&200).unwrap();
// -----
// wait for events (ent3)
let res = ev_watch.poll(Option::None).unwrap();
let poll_timeout = common::get_current_timestamp().timestamp();
assert_eq!(res.is_some(), true);
assert_eq!(res.as_ref().unwrap().len(), 1);
// get timer's FD's to identify which timers are ready
let timer_fd: RawFd = res.as_ref().unwrap()[0];
assert_eq!(timer_fd, time_list.as_fd().as_raw_fd());
// consume event from timer to find out what happened
let res = time_list.wait_for_event().unwrap();
println!("timer timeout with result: {}, timeout: {}, curtime: {}", res, tss_set3, poll_timeout);
// it should be TimerReadRes::Ok with overflow 1
assert_eq!(TimerReadRes::Ok(1), res);
time_list.timeout_event_handler(res).unwrap();
thread_hndlr.join().unwrap();
return;
}
In this example, the consumer type of deque is used in async context by polling the timer. This is not efficient. In the next example, the AsyncFd is used it increase efficiency.
use std::{cmp::Ordering, fmt, sync::Arc};
use timer_deque_rs::
{
AbsoluteTime,
OrderdTimerDequeOnce,
OrderedTimerDeque,
TimerDequeueConsumer,
TimerReadRes
};
#[derive(Debug, PartialEq, Eq, Clone)]
struct TestItem(u64);
impl fmt::Display for TestItem
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "0 = {}", self.0)
}
}
#[tokio::main]
async fn main()
{
let mut time_list =
OrderedTimerDeque
::<TimerDequeueConsumer<Arc<TestItem>, OrderdTimerDequeOnce>>
::new("test_label_async".into(), 4, false)
.unwrap();
let abs_time = AbsoluteTime::now();
let tss_set1 = abs_time.clone().add_sec(3);
let ent1 = Arc::new(TestItem(1));
let tss_set2 = abs_time.clone().add_sec(7);
let ent2 = Arc::new(TestItem(2));
let tss_set3 = abs_time.clone().add_sec(10);
let ent3 = Arc::new(TestItem(3));
time_list.add_to_timer(ent1.clone(), tss_set1).unwrap();
time_list.add_to_timer(ent2.clone(), tss_set2).unwrap();
time_list.add_to_timer(ent3.clone(), tss_set3).unwrap();
let res = time_list.poll().await.unwrap();
let poll_timeout = AbsoluteTime::now();
println!("timer timeout with result: {}", res);
// it should be TimerReadRes::Ok with overflow 1
assert_eq!(TimerReadRes::Ok(1), res);
let mut timeout_items: Vec<Arc<TestItem>> = Vec::new();
// read timeout items
time_list.timeout_event_handler(res, &mut timeout_items).unwrap();
assert_eq!(timeout_items.len(), 1); // only one item
assert_eq!(timeout_items[0], ent1); // item 1
println!("timer item: {}, timeout: {}, curtime: {}", &timeout_items[0], tss_set1, poll_timeout);
assert_eq!(tss_set1.seconds_cmp(&poll_timeout) == Ordering::Equal, true);
// removing second item from queue, so ent3 should be next to fire
time_list.remove_from_sched_queue(&ent2).unwrap();
assert_eq!(time_list.timer_queue_len(), 1); // only ent3 left
println!("item {} removed from shed. queue", ent2);
println!("queue len: {}", time_list.timer_queue_len());
// -----
// wait for events (ent3)
// poll timer again for timeout
let res = time_list.poll().await.unwrap();
let poll_timeout = AbsoluteTime::now();
println!("timer timeout with result: {}", res);
// it should be TimerReadRes::Ok with overflow 1
assert_eq!(TimerReadRes::Ok(1), res);
let mut timeout_items: Vec<Arc<TestItem>> = Vec::new();
time_list.timeout_event_handler(res, &mut timeout_items).unwrap();
assert_eq!(timeout_items.len(), 1); // only one item
assert_eq!(timeout_items[0], ent3); // item 3
println!("timer item: {}, timeout: {}, curtime: {}", &timeout_items[0], tss_set3, poll_timeout);
assert_eq!(tss_set3.seconds_cmp(&poll_timeout) == Ordering::Equal, true);
println!("timer queue len: {}", time_list.timer_queue_len());
assert_eq!(time_list.timer_queue_len(), 0); // queue should be empty
return;
}
In this example, the notifier (signal) type of deque is used in async context by wrapping the timer deque in the AsyncFd. This is more efficient than jus polling the FD for events like in previous example.
use std::fmt;
use timer_deque_rs::
{
common,
AbsoluteTime,
OrderdTimerDequeOnce,
OrderedTimerDeque,
TimerDequeueSignal,
TimerDequeueSignalTicket,
TimerReadRes
};
use tokio::
{
io::{unix::AsyncFd, Interest},
sync::mpsc::{self, error::SendError}
};
#[derive(Debug)]
pub struct TestStruct
{
uniq_id: u64,
sig_chan_snd: mpsc::UnboundedSender<u64>,
}
impl Eq for TestStruct {}
impl PartialEq for TestStruct
{
fn eq(&self, other: &Self) -> bool
{
return self.uniq_id == other.uniq_id;
}
}
impl fmt::Display for TestStruct
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "uniq_id: {}", self.uniq_id)
}
}
impl TimerDequeueSignal for TestStruct
{
type TimerQueueID = u64;
type TimeoutErr = SendError<Self::TimerQueueID>;
fn get_id(&self) -> Self::TimerQueueID
{
return self.uniq_id;
}
async
fn a_sig_timeout(&mut self) -> Result<(), Self::TimeoutErr>
{
// non blocking
return self.sig_chan_snd.send(self.uniq_id);
}
}
impl TestStruct
{
fn new(uniq_id: u64, snd: mpsc::UnboundedSender<u64>) -> Self
{
return Self{ uniq_id: uniq_id, sig_chan_snd: snd };
}
}
#[tokio::main]
async fn main()
{
let time_list =
OrderedTimerDeque
::<TimerDequeueSignalTicket<TestStruct, OrderdTimerDequeOnce>>
::new("test_label".into(), 4, false).unwrap();
let mut async_time_list =
AsyncFd::new(time_list).unwrap();
let (snd, mut rcv) = mpsc::unbounded_channel::<u64>();
let hndl =
tokio::spawn(async move
{
let mut exp_ids = vec![300, 100];
loop
{
let Some(uniq_id) = rcv.recv().await
else
{
return;
};
println!("received timeout signal from item: {}", uniq_id);
assert_eq!(exp_ids.pop().unwrap(), uniq_id);
if exp_ids.len() == 0
{
return;
}
}
}
);
let abs_time = AbsoluteTime::now();
let tss_set1 = abs_time.clone().add_sec(3);
let ent1 = TestStruct::new(100, snd.clone());
let tss_set2 = abs_time.clone().add_sec(7);
let ent2 = TestStruct::new(200, snd.clone());
let tss_set3 = abs_time.clone().add_sec(10);
let ent3 = TestStruct::new(300, snd.clone());
async_time_list.get_mut().add_to_timer(ent1, tss_set1).unwrap();
async_time_list.get_mut().add_to_timer(ent2, tss_set2).unwrap();
async_time_list.get_mut().add_to_timer(ent3, tss_set3).unwrap();
// poll the timer until it becomes ready
let mut read_guard =
async_time_list.ready_mut(Interest::READABLE).await.unwrap();
// clear ready otherwise it will return WOULDBLOCK
read_guard.clear_ready();
let poll_timeout = common::get_current_timestamp().timestamp();
let res = read_guard.get_inner().wait_for_event().unwrap();
println!("timer timeout with result: {}, timeout: {}, curtime: {}", res, tss_set1, poll_timeout);
assert_eq!(TimerReadRes::Ok(1), res);
read_guard.get_inner_mut().async_timeout_event_handler(res).await.unwrap();
// removing second item from queue, so ent3 should be next to fire
async_time_list.get_mut().remove_from_sched_queue(&200).unwrap();
// poll the timer until it becomes ready
let mut read_guard =
async_time_list.ready_mut(Interest::READABLE).await.unwrap();
let poll_timeout = common::get_current_timestamp().timestamp();
let res = read_guard.get_inner().wait_for_event().unwrap();
println!("timer timeout with result: {}, timeout: {}, curtime: {}", res, tss_set1, poll_timeout);
assert_eq!(TimerReadRes::Ok(1), res);
read_guard.get_inner_mut().async_timeout_event_handler(res).await.unwrap();
let _ = hndl.await;
return;
}
Just simple timer based on OS functionality.
use std::{borrow::Cow, cmp::Ordering, time::Instant};
use timer_deque_rs::
{
timer_portable::
{
timer::TimerFd,
FdTimerCom,
TimerExpMode,
TimerFlags,
TimerType
},
AbsoluteTime
};
fn main()
{
// timer init as blocking
let timer =
TimerFd::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
TimerFlags::empty()).unwrap();
let s = Instant::now();
let abs_time = AbsoluteTime::now().add_sec(3);
let exp_time =
TimerExpMode::<AbsoluteTime>::new_oneshot(abs_time);
// setting timer
let res = timer.set_time(exp_time);
println!("timer was set: '{}'", exp_time);
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
let ovf = timer.read().unwrap().unwrap();
let now_abs = AbsoluteTime::now();
let e = s.elapsed();
assert_eq!(ovf, 1);
assert_eq!(abs_time.seconds_cmp(&now_abs) == Ordering::Equal, true);
println!("elapsed: {:?}, now: {}, set: {}", e, now_abs, abs_time);
assert_eq!((e.as_millis() <= 3100), true);
println!("Success");
return;
}
A simple parallel task spawner
use std::{sync::mpsc::{self, RecvTimeoutError, Sender}, time::{Duration, Instant}};
use crate::{periodic_task::sync_tasks::{PeriodicTask, PeriodicTaskResult, PeriodicTaskTime, SyncPeriodicTasks}, AbsoluteTime, RelativeTime};
#[derive(Debug)]
struct TaskStruct1
{
a1: u64,
s: Sender<u64>,
}
impl TaskStruct1
{
fn new(a1: u64, s: Sender<u64>) -> Self
{
return Self{ a1: a1, s };
}
}
impl PeriodicTask for TaskStruct1
{
fn exec(&mut self) -> PeriodicTaskResult
{
println!("taskstruct1 val: {}", self.a1);
let _ = self.s.send(self.a1);
return PeriodicTaskResult::Ok;
}
}
fn main()
{
let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
let (send, recv) = mpsc::channel::<u64>();
let task1 = TaskStruct1::new(2, send);
let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
println!("added");
let val = recv.recv();
println!("{:?}", val);
drop(task1_guard);
}