safe_drive: Formally Specified Rust Bindings for ROS2
safe_drive
is a Rust bindings for ROS2.
This library provides formal specifications and tested the specifications by using a model checker.
Therefore, you can clearly understand how the scheduler work and the safeness of it.
Specifications
Some algorithms we adopted are formally specified and tested the safeness by using TLA+. Original ROS2's executor (rclcpp) is suffering from starvation. In contrast, the starvation freedom of our executor has been validated by not only dynamic analysis but also formal verification.
See specifications.
We specified and tested as follows.
- Single Threaded Callback Execution
- Deadlock freedom
- Starvation freedom
- Scheduling Core Algorithm
- Validate the insertion algorithm
- Termination
- Initialize Once
- Deadlock freedom
- Termination
- Initialization is performed just once
Tutorial
This chapter explain how to use safe_drive
and
interoperability of existing ROS2's project written in C++ by using colcon
.
Setting-up
Install Dependencies
$ sudo apt install curl gnupg2 lsb-release python3-pip git
Install Rust
$ sudo curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
$ source $HOME/.cargo/env
Choose Default
when installing Rust.
Install ROS2
$ curl -s https://raw.githubusercontent.com/ros/rosdistro/master/ros.asc | sudo apt-key add -
$ sudo sh -c 'echo "deb [arch=$(dpkg --print-architecture)] http://packages.ros.org/ros2/ubuntu $(lsb_release -cs) main" > /etc/apt/sources.list.d/ros2-latest.list'
$ sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
$ sudo apt update
$ sudo apt install ros-iron-desktop python3-colcon-common-extensions
$ . /opt/ros/iron/setup.bash
Install Colcon-cargo
$ pip install git+https://github.com/tier4/colcon-cargo.git
$ pip install git+https://github.com/colcon/colcon-ros-cargo.git
Install Cargo-ament
$ git clone https://github.com/tier4/cargo-ament-build.git
$ cd cargo-ament-build
$ cargo install --path .
Publish and Subscribe
This chapter introduces a simple example of publish and subscribe communication.
This communication is so-called topic by ROS2.
There are N
senders and M
receivers in a topic.
This tutorial implements 1 sender (in Rust) and 2 receivers (in Rust and C++).
The following figure is a example topology.
graph TD; Publisher1-->TopicA; Publisher2-->TopicA; TopicA-->Subscriber1; TopicA-->Subscriber2; TopicA-->Subscriber3; Publisher3-->TopicB; Publisher4-->TopicB; Publisher5-->TopicB; TopicB-->Subscriber4; TopicB-->Subscriber5;
There are 2 topics, TopicA
and TopicB
in this figure.
If Publisher2
of TopicA
send a message,
Subscriber1-3
subscribing TopicA
receive the message.
Subscriber4
and Subscriber5
can receive messages sent from
Publisher3-5
.
Directories
First of all, create working directories as follows.
$ mkdir pubsub
$ mkdir pubsub/src
Then create projects of Rust by using cargo
as follows.
$ cd pubsub/src
$ cargo new my_talker
$ cargo new my_listener
Following directories are important directories we will use throughout this tutorial.
Directories | What? |
---|---|
pubsub/src/my_talker | publisher in Rust |
pubsub/src/my_listener | subscriber in Rust |
pubsub/install | created by colcon |
Workspace of Rust
To handle multiple projects of cargo
, we recommend to prepare Cargo.toml
for the workspace as follows.
pubsub/src/Cargo.toml
[workspace]
members = ["my_talker", "my_listener"]
The workspace enables rust-analyzer
and reduces compilation time.
Talker in Rust
Let's start implementing a publisher.
We created and will edit files of the talker as follows.
These files will automatically created when you do cargo new
as described before.
Files | What? |
---|---|
my_talker/Cargo.toml | for Cargo |
my_talker/package.xml | for ROS2 |
my_talker/src/main.rs | source code |
Edit my_talker/Cargo.toml
Cargo.toml
is used to describe a project of Rust.
Add safe_drive to the dependencies of Cargo.toml
to use it.
Currently, safe_drive's repository is private,
and we need to specify the path as follows.
pubsub/src/my_talker/Cargo.toml
[dependencies]
safe_drive = "0.4"
std_msgs = { path = "/tmp/safe_drive_tutorial/pubsub/std_msgs" }
[package.metadata.ros]
msg = ["std_msgs"]
msg_dir = "/tmp/safe_drive_tutorial/pubsub"
safe_drive_version = "0.3"
Edit my_talker/src/main.rs
my_talker is very simple. This does as follows.
- Create a context.
- Create a node from the context.
- Create a publisher from the node.
- Send a message periodically.
A context is a resource manager of ROS2, and it must be created first of all. A node contains publishers, subscribers, servers, and clients. A node is a functional unit of ROS2. Publishers and subscribers can be created by a node.
The following figure is an example hierarchy.
graph TD; Context-->NodeA NodeA-->SubscriberA NodeA-->SubscriberB NodeA-->PublisherA NodeA-->PublisherB NodeA-->PublisherC
In this figure, NodeA
is created by a context,
and subscribers and publishers are created by the node.
Now, we have understood the basics of ROS2. The following is a code of my_talker. You should also understand what this code is doing.
pubsub/src/my_talker/src/main.rs
use safe_drive::{ context::Context, error::DynError, logger::Logger, pr_info }; use std::time::Duration; fn main() -> Result<(), DynError> { // Create a context. let ctx = Context::new()?; // Create a node. let node = ctx.create_node("my_talker", None, Default::default())?; // Create a publisher. let publisher = node.create_publisher::<std_msgs::msg::String>("my_topic", None)?; // Create a logger. let logger = Logger::new("my_talker"); let mut cnt = 0; // Counter. let mut msg = std_msgs::msg::String::new().unwrap(); loop { // Create a message to be sent. let data = format!("Hello, World!: cnt = {cnt}"); msg.data.assign(&data); pr_info!(logger, "send: {}", msg.data); // Print log. // Send a message. publisher.send(&msg)?; // Sleep 1s. cnt += 1; std::thread::sleep(Duration::from_secs(1)); } }
my_talker in Detail
create_node
creates a node.
#![allow(unused)] fn main() { // Create a node. let node = ctx.create_node("my_talker", None, Default::default())?; }
The arguments indicate as follows.
"my_talker"
: the name of the node.- Node : namespace.
- We can pass a namespace like
Some("namespace")
. - If
None
is passed, the node has no namespace.
- We can pass a namespace like
Default::default()
: option of node.
create_publisher
creates a publisher.
#![allow(unused)] fn main() { // Create a publisher. let publisher = node.create_publisher::<std_msgs::msg::String>("my_topic", None)?; }
<std_msgs::msg::String>
: the publisher can send values ofstd_msgs::msg::String
."my_topic"
: the topic name to which the publisher send messages.None
: QoS of the publisher. QoS will be described in a later chapter.
A message can be sent by send
method as follows.
#![allow(unused)] fn main() { // Create a message to be sent. let data = format!("Hello, World!: cnt = {cnt}"); msg.data.assign(&data); // Send a message. publisher.send(&msg)?; }
Logger
is used to print some messages.
It can be created as follows.
#![allow(unused)] fn main() { // Create a logger. let logger = Logger::new("my_talker"); }
The argument of "my_talker"
is the name of the logger.
To print a message, use pr_*
macros as follows.
#![allow(unused)] fn main() { pr_info!(logger, "send: {}", msg.data); // Print log. }
There are macros for logging as follows.
pr_debug!
: debugpr_info!
: informationpr_warn!
: warningpr_err!
: errorpr_fatal!
: fatal
Create my_talker/package.xml
package.xml
is used by colcon, which is a build tool used by ROS2.
It contains the package name, maintainer, description, etc, as follows.
pubsub/src/my_talker/package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>my_talker</name>
<version>0.0.0</version>
<description>My Talker in Rust</description>
<maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
<license>Apache License 2.0</license>
<depend>std_msgs</depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cargo</build_type>
</export>
</package>
Copy and paste this, then edit it if you want.
Execute the Talker
Before compiling, ensure that you load setting of ROS2 as follows. If you have already done so, you do not need this.
$ . /opt/ros/iron/setup.bash
Then compile by using colcon as follows.
Before compiling, change the current directory to pubsub
,
which is the top directory of our project.
$ cd pubsub
$ colcon build --cargo-args --release
--cargo-args --release
is a option to pass the --release
argument to cargo
.
cargo
optimizes the code when --release
is specified.
To launch my_talker
,
load the setting and use ros2
command as follows.
$ . ./install/setup.bash
$ ros2 run my_talker my_talker
[INFO] [1656048392.368568500] [my_talker]: send: Hello, World!: cnt = 0
[INFO] [1656048393.368787500] [my_talker]: send: Hello, World!: cnt = 1
[INFO] [1656048394.368994200] [my_talker]: send: Hello, World!: cnt = 2
Listener in Rust
Let's then implement a listener. We created and will edit files of the listener as follows.
Files | What? |
---|---|
my_listener/Cargo.toml | for Cargo |
my_listener/package.xml | for ROS2 |
my_listener/src/main.rs | source code |
Edit my_listener/Cargo.toml
my_listener
also requires safe_drive.
Add safe_drive to the dependencies as follows.
# pubsub/src/my_listener/Cargo.toml
[dependencies]
safe_drive = "0.4"
std_msgs = { path = "/tmp/safe_drive_tutorial/pubsub/std_msgs" }
[package.metadata.ros]
msg = ["std_msgs"]
msg_dir = "/tmp/safe_drive_tutorial/pubsub"
safe_drive_version = "0.3"
Edit my_listener/src/main.rs
To implement subscriber,
we have to prepare a callback function of the subscriber.
This is the main difference from my_talker
.
pubsub/src/my_listener/src/main.rs
use safe_drive::{ context::Context, error::DynError, logger::Logger, pr_info, }; fn main() -> Result<(), DynError> { // Create a context. let ctx = Context::new()?; // Create a node. let node = ctx.create_node("my_listener", None, Default::default())?; // Create a subscriber. let subscriber = node.create_subscriber::<std_msgs::msg::String>("my_topic", None)?; // Create a logger. let logger = Logger::new("my_listener"); // Create a selector. let mut selector = ctx.create_selector()?; // Add a callback function. selector.add_subscriber( subscriber, Box::new(move |msg| { pr_info!(logger, "receive: {}", msg.data); }), ); // Spin. loop { selector.wait()?; } }
my_listener
in Detail
Similar to the publisher,
create_subscriber
creates a subscriber.
#![allow(unused)] fn main() { // Create a subscriber. let subscriber = node.create_subscriber::<std_msgs::msg::String>("my_topic", None)?; }
The arguments are as follows.
"my_topic"
: the name of the topic to which the subscriber is subscribing.None
: QoS of the subscriber. Discussed in a later chapter.
To add and invoke a callback function.
We need to create a selector,
which can wait some events and invoke callback functions of the events.
A selector is similar to select
or epoll
function,
and equivalent to a executer of ROS2.
#![allow(unused)] fn main() { // Create a selector. let mut selector = ctx.create_selector()?; // Add a callback function. selector.add_subscriber( subscriber, Box::new(move |msg| { pr_info!(logger, "receive: {}", msg.data); }), ); // Spin. loop { selector.wait()?; } }
The arguments of add_subscriber
method are as follows.
subscriber
: the subscriber.Box::new(move |msg| { pr_info!(logger, "receive: {}", msg.data); })
: the callback function.false
: the callback function is called only once or not. Iffalse
is passed, the callback function is invoked every time and forever.
selector.wait()
wait events.
To receive events forever, use infinite loop.
Create my_listener/package.xml
package.xml
is almost same as above.
The only difference is the name of the package,
which is my_listener
.
pubsub/src/my_listener/package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>my_listener</name>
<version>0.0.0</version>
<description>My Listener in Rust</description>
<maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
<license>Apache License 2.0</license>
<depend>std_msgs</depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cargo</build_type>
</export>
</package>
Execute the Listener
Before compiling and executing, execute my_talker
in other terminal,
and don't forget to load the setting of ROS2 as mentioned above.
The compilation can be done as follows.
$ cd pubsub
$ colcon build --cargo-args --release
To execute my_listener
, load the setting of our projects and
execute it by using ros2
command as follows.
$ . ./install/setup.bash
$ ros2 run my_listener my_listener
[INFO] [1656050459.231579900] [my_listener]: receive: Hello, World!: cnt = 4
[INFO] [1656050460.231831200] [my_listener]: receive: Hello, World!: cnt = 5
[INFO] [1656050461.232120000] [my_listener]: receive: Hello, World!: cnt = 6
Nicely done!
We are receiving messages sent from my_talker
.
Timer
This tutorial does not use colcon
to build.
We use only cargo
, which is a Rust's standard build system.
Don't forget loading ROS2's environment as follows. If you already done so, you do not need this.
$ . /opt/ros/iron/setup.bash
Wall-timer
A wall-timer is a timer which periodically invoked. This section describes how to use a wall-timer.
Let's create a package by using a cargo
as follows.
$ cargo new wall_timer
$ cd wall_timer
Then, add safe_drive
to the dependencies of Cargo.toml
.
[dependencies]
safe_drive = "0.4"
The following code is an example using a wall-timer.
The important method is Selector::add_wall_timer()
which takes
a name, a duration, and a callback function.
use safe_drive::{ context::Context, error::DynError, logger::Logger, msg::common_interfaces::std_msgs, pr_info, }; use std::{rc::Rc, time::Duration}; fn main() -> Result<(), DynError> { // Create a context, a node, a subscriber, a publisher, and a selector. let ctx = Context::new()?; let node = ctx.create_node("my_node", None, Default::default())?; let subscriber = node.create_subscriber::<std_msgs::msg::UInt64>("my_topic", None)?; let publisher = node.create_publisher::<std_msgs::msg::UInt64>("my_topic", None)?; let mut selector = ctx.create_selector()?; // Create a logger. // To share this by multiple callback functions, use Rc. let logger = Rc::new(Logger::new("wall timer example")); // Add a wall timer to publish periodically. let mut cnt = Box::new(0); let mut msg = std_msgs::msg::UInt64::new().unwrap(); let logger1 = logger.clone(); selector.add_wall_timer( "publisher", // the name of timer Duration::from_secs(1), Box::new(move || { msg.data = *cnt; *cnt += 1; publisher.send(&msg).unwrap(); pr_info!(logger1, "send: msg.data = {}", msg.data); }), ); // Add a subscriber. selector.add_subscriber( subscriber, Box::new(move |msg| { pr_info!(logger, "received: msg.data = {}", msg.data); }), ); // Spin. loop { selector.wait()?; } }
Timers can be set by a method of selector as follows,
and the timers will be invoked when calling the Selector::wait()
methods.
#![allow(unused)] fn main() { selector.add_wall_timer( "publisher", // the name of timer Duration::from_secs(1), Box::new(move || { msg.data = *cnt; *cnt += 1; publisher.send(&msg).unwrap(); pr_info!(logger1, "send: msg.data = {}", msg.data); }), ); }
"publisher"
is the name of this timer. The name is used for statistics. You can use any name.Duration::from_secs(1)
is the duration for periodic invoking. This argument means the callback function is invoked every 1 second.Box::new(move || ...)
is the callback function.
There is a publisher invoked by a timer, and a subscriber in this code. When executing this, transmission and reception will be confirmed as follows.
$ cargo run
[INFO] [1656557242.842509800] [wall timer example]: send: msg.data = 0
[INFO] [1656557242.842953300] [wall timer example]: received: msg.data = 0
[INFO] [1656557243.843103800] [wall timer example]: send: msg.data = 1
[INFO] [1656557243.843272900] [wall timer example]: received: msg.data = 1
[INFO] [1656557244.843574600] [wall timer example]: send: msg.data = 2
[INFO] [1656557244.844021200] [wall timer example]: received: msg.data = 2
[INFO] [1656557245.844349800] [wall timer example]: send: msg.data = 3
[INFO] [1656557245.844702900] [wall timer example]: received: msg.data = 3
One-shot Timer
A wall-timer is invoked periodically,
but one-shot timer is invoked only once.
A one-shot can be set by the Selector::add_timer()
method as follows.
use safe_drive::{context::Context, error::DynError, logger::Logger, pr_info}; use std::{cell::RefCell, collections::VecDeque, rc::Rc, time::Duration}; pub fn main() -> Result<(), DynError> { // Create a context, a publisher, and a logger. let ctx = Context::new()?; let mut selector = ctx.create_selector()?; let logger = Rc::new(Logger::new("one-shot timer example")); let queue = Rc::new(RefCell::new(VecDeque::new())); // Add a one-shot timer. let queue1 = queue.clone(); selector.add_timer( Duration::from_secs(2), Box::new(move || { pr_info!(logger, "fired!"); // Insert a timer to the queue. let mut q = queue1.borrow_mut(); let logger1 = logger.clone(); q.push_back(( Duration::from_secs(2), (Box::new(move || pr_info!(logger1, "fired! again!"))), )); }), ); // Spin. loop { { // Set timers. let mut q = queue.borrow_mut(); while let Some((dur, f)) = q.pop_front() { selector.add_timer(dur, f); } } selector.wait()?; } }
Selector::add_timer()
does not take the name,
but other arguments are same as Selector::add_wall_timer()
.
#![allow(unused)] fn main() { selector.add_timer( Duration::from_secs(2), Box::new(move || ...), ); }
Duration::from_secs(2)
is a duration indicating when the timer will be invoked.Box::new(move || ...)
is the callback function.
This code reenables a timer in the callback function.
To reenable, the callback takes a queue
and
timers in the queue
is reenabled in the spin as follows.
#![allow(unused)] fn main() { // Spin. loop { { // Set timers. let mut q = queue.borrow_mut(); while let Some((dur, f)) = q.pop_front() { selector.add_timer(dur, f); } } selector.wait()?; } }
The important thing is that the borrowed resources must be released.
To release definitely, the code fraction borrowing the queue
is surrounded by braces.
The following is a execution result of this code.
$ cargo run
[INFO] [1657070943.324438900] [one-shot timer example]: fired!
[INFO] [1657070945.324675600] [one-shot timer example]: fired! again!
Multi-threaded Publish and Subscribe
Previous chapters use a selector to wait messages. A selector can be used by only a single thread. This means previous implementation is single-threaded.
To execute tasks concurrently, we can use the async/await feature of Rust.
By using this, we can implement multi-threaded applications.
In this chapter, we will describe how to use safe_drive
with async/await.
We use async_std
as a asynchronous library,
but you can use Tokio
instead.
Creating Projects
Before implementing, we need to prepare projects as follows.
$ mkdir -p mt_pubsub/src
$ cd mt_pubsub/src
$ cargo new publishers
$ cargo new subscribers
The mt_pubsub
is the root directory of this project,
and we created publishers
and subscribers
of Rust's projects.
At the moment, the following directories is present.
Directories | What? |
---|---|
mt_pubsub/src/publishers | publishers in Rust |
mt_pubsub/src/subscribers | subscribers in Rust |
Common Configuration
Then, we have to create or edit the following files for basic configurations.
Files | What? |
---|---|
mt_pubsub/src/publishers/Cargo.toml | for Cargo |
mt_pubsub/src/publishers/package.xml | for ROS2 |
mt_pubsub/src/publishers/build.rs | for rustc |
mt_pubsub/src/subscribers/Cargo.toml | for Cargo |
mt_pubsub/src/subscribers/package.xml | for ROS2 |
mt_pubsub/src/subscribers/build.rs | for rustc |
mt_pubsub/src/Cargo.toml | for Cargo's workspace |
To enable rust-analyzer in the mt_pubsub
directory and reduce the compilation time,
prepare workspace's Cargo.toml
as follows.
mt_pubsub/src/Cargo.toml
[workspace]
members = ["publishers", "subscribers"]
package.xml
s are almost same as Publish and Subscribe.
If you cannot understand what these files do,
please go back to the previous chapter.
publishers/package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>publishers</name>
<version>0.0.0</version>
<description>MT-Publishers</description>
<maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
<license>Apache License 2.0</license>
<depend>std_msgs</depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cargo</build_type>
</export>
</package>
subscribers/package.xml
<name>subscribers</name>
<description>MT-Subscribers</description>
To use async_std
, we have to update Cargo.toml
as follows.
Cargo.toml
[dependencies]
async-std = { version = "1", features = ["attributes"] }
safe_drive = "0.4"
std_msgs = { path = "/tmp/safe_drive_tutorial/mt_pubsub/std_msgs" }
[package.metadata.ros]
msg = ["std_msgs"]
msg_dir = "/tmp/safe_drive_tutorial/mt_pubsub"
safe_drive_version = "0.3"
Publishers
The publishers
publishes messages to 2 topics periodically.
This create 2 tasks.
One is send a message every 500ms,
and another is send a message every 250ms.
The code of the publishers
is as follows.
mt_pubsub/src/publishers/src/main.rs
use safe_drive::{context::Context, error::DynError}; use std::time::Duration; #[async_std::main] async fn main() -> Result<(), DynError> { // Create a context and a node. let ctx = Context::new()?; let node = ctx.create_node("publishers", None, Default::default())?; // Create publishers. let publisher1 = node.create_publisher::<std_msgs::msg::String>("topic1", None)?; let publisher2 = node.create_publisher::<std_msgs::msg::String>("topic2", None)?; // Create a task which sends "Hello, World!". let task1 = async_std::task::spawn(async move { let mut msg = std_msgs::msg::String::new().unwrap(); msg.data.assign("Hello, World!"); for _ in 0..50 { publisher1.send(&msg).unwrap(); // Send a message. async_std::task::sleep(Duration::from_millis(500)).await; // Sleep 500ms. } }); // Create a task which sends "Hello, Universe!". let task2 = async_std::task::spawn(async move { let mut msg = std_msgs::msg::String::new().unwrap(); msg.data.assign("Hello, Universe!"); for _ in 0..100 { publisher2.send(&msg).unwrap(); // Send a message. async_std::task::sleep(Duration::from_millis(250)).await; // Sleep 250ms. } }); task1.await; task2.await; Ok(()) }
The async_std::task::spawn
creates a asynchronous task,
which is similar to a thread.
The following is a excerpt creating a task which sends "Hello, World!"
to "topic1" every 500ms.
#![allow(unused)] fn main() { // Create a task which sends "Hello, World!". let task1 = async_std::task::spawn(async move { let mut msg = std_msgs::msg::String::new().unwrap(); msg.data.assign("Hello, World!"); for _ in 0..50 { publisher1.send(&msg).unwrap(); // Send a message. async_std::task::sleep(Duration::from_millis(500)).await; // Sleep 500ms. } }); }
This excerpt sends a message by Publisher::send
and sleep by async_std::task::sleep
.
Note that this uses async_std::task::sleep
instead of std::thread::sleep
,
because the former is non-blocking but the the latter is blocking.
Calling blocking functions should be avoided in asynchronous tasks.
Subscribers
Then, let's implement the subscribers
.
The main function is almost same as previous one.
mt_pubsub/src/subscribers/src/main
use safe_drive::{ context::Context, error::DynError, logger::Logger, pr_info, topic::subscriber::Subscriber, }; #[async_std::main] async fn main() -> Result<(), DynError> { // Create a context and a node. let ctx = Context::new()?; let node = ctx.create_node("subscribers", None, Default::default())?; // Create subscribers. let subscriber1 = node.create_subscriber::<std_msgs::msg::String>("topic1", None)?; let subscriber2 = node.create_subscriber::<std_msgs::msg::String>("topic2", None)?; // Receive messages. let task1 = async_std::task::spawn(receiver(subscriber1)); let task2 = async_std::task::spawn(receiver(subscriber2)); task1.await?; task2.await?; Ok(()) } async fn receiver(mut subscriber: Subscriber<std_msgs::msg::String>) -> Result<(), DynError> { let logger = Logger::new(subscriber.get_topic_name()); loop { // Receive a message let msg = subscriber.recv().await?; pr_info!(logger, "{}", msg.data); } }
Subscriber::recv
is an asynchronous function to receive a message.
To receive asynchronously, use the .await
keyword.
If there is a message to be received, recv().await
returns the message immediately.
Otherwise, it yields the execution to another task and sleeps until arriving a message.
Modification for Timeout
By using a timeout mechanism provided by asynchronous libraries, we can implement receiving with timeout. Timeout can be implemented as follows.
#![allow(unused)] fn main() { use async_std::future::timeout; use safe_drive::pr_warn; use std::time::Duration; async fn receiver(mut subscriber: Subscriber<std_msgs::msg::String>) -> Result<(), DynError> { let logger = Logger::new(subscriber.get_topic_name()); loop { // Receive a message with 3s timeout. match timeout(Duration::from_secs(3), subscriber.recv()).await { Ok(result) => pr_info!(logger, "received: {}", result?.data), Err(_) => { // Timed out. Finish receiving. pr_warn!(logger, "timeout"); break; } } } Ok(()) } }
async_std::timeout
is a function to represent timeout.
timeout(Duration::from_secs(3), subscriber.recv()).await
calls subscriber.recv()
asynchronously,
but it will be timed out after 3s later.
Execution
First, execute publishers
in a terminal application window as follows.
$ cd mt_pubsub
$ colcon build --cargo-args --release
$ . ./install/setup.bash
$ ros2 run publishers publishers
Then, execute subscribers
in another terminal application window as follows.
This uses timeout for receiving.
$ cd mt_pubsub.
$ . ./install/setup.bash
$ ros2 run subscribers subscribers
[INFO] [1657076046.969809600] [topic2]: received: Hello, Universe!
[INFO] [1657076047.220104100] [topic2]: received: Hello, Universe!
[INFO] [1657076047.447426100] [topic1]: received: Hello, World!
[INFO] [1657076047.470348600] [topic2]: received: Hello, Universe!
[INFO] [1657076047.721980900] [topic2]: received: Hello, Universe!
[WARN] [1657076050.448393200] [topic1]: timeout
[WARN] [1657076050.722433800] [topic2]: timeout
Nicely done! We are sending and receiving messages asynchronously. In addition to that, the timeouts work correctly.
User Defined Data Structure
Until previous tutorial, we used pre-defined message types. In this tutorial, we will describe how to define user defined types.
Create Project Directory
Then create a project directory as follows.
$ mkdir -p msgtest/src
Throughout this tutorial, we will create 4 packages as follows.
packages | description |
---|---|
msgtest/src/my_interfaces | defining types of ROS2 |
msgtest/src/talker | a publisher |
msgtest/src/listener | a subscriber |
Workspace's Cargo.toml
The workspace's Cargo.toml
should be created as follows.
msgtest/src/Cargo.toml
[workspace]
members = ["talker", "listener"]
Then, create projects as follows.
$ cd msgtest/src
$ cargo new talker
$ cargo new listener
Define User Defined Type
To define message types, we have to create a ROS2's package, and create a '.msg' files. The package can be created in the ordinary way of ROS2 as follows.
$ cd msgtest/src
$ ros2 pkg create --build-type ament_cmake my_interfaces
$ cd my_interfaces
$ mkdir msg
$ cd msg
Primitive Type: my_interfaces/msg/MyMsg.msg
Then create a file, msgtest/src/my_interfaces/msg/MyMsg.msg
, as follows.
int32 integer_value
int32[] unbounded_integer_array
int32[5] five_integers_array
int32[<=5] up_to_five_integers_array
There are 4 values in this type.
integer_value
: a value of theint32
typeunbounded_integer_array
: an unbounded array of theint32
typefive_integers_array
: an array which size is 5 of theint32
typeup_to_five_integers_array
: an array whose size is up to 5 of theint32
type
Using User Defiend Type: my_interfaces/msg/MyMsgs.msg
We can use the MyMsg
previously defined in another message type, msgtest/src/my_interfaces/msg/MyMsgs.msg
, as follows.
MyMsg msg1
MyMsg msg2
String Type: my_interfaces/msg/MyMsgStr.msg
A size of an array can be specified as described above,
a length of a string can be also specified.
For example, string<=10
is a type of string,
but its length is up to 10.
We prepare msgtest/src/my_interfaces/msg/MyMsgStr.msg
as follows.
string message
string[2] static_array_str
string[] dynamic_array_str
string[<=3] bounded_array_str
string<=10 bounded_str
string<=10[2] static_array_bounded_str
string<=10[] dynamic_array_bounded_str
string<=10[<=3] bounded_array_bounded_str
Edit my_interfaces/CMakeLists.txt
To generate C or C++ files and libraries for used defined types,
we have to edit CMakeLists.txt
as follows.
msgtest/src/my_interfaces/CMakeLists.txt
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/MyMsg.msg"
"msg/MyMsgs.msg"
"msg/MyMsgStr.msg"
)
We have to specify messages files in CMakeLists.txt
.
Edit my_interfaces/package.xml
We also have to edit package.xml
as follows.
msgtest/src/my_interfaces/package.xml
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
User Defined Type in Rust
Primitive types are translated into Rust's types as follows.
ROS2 | Rust |
---|---|
bool | bool |
byte | u8 |
char | i8 |
int8 | i8 |
uint8 | u8 |
int16 | i16 |
uint16 | u16 |
int32 | i32 |
uint32 | u32 |
int64 | i64 |
uint64 | u64 |
float32 | f32 |
float64 | f64 |
string | safe_drive::msg::RosString |
Generated Types
Array types are generated as follows.
ROS2 | Rust |
---|---|
int32[5] | [i32; 5] |
int32[] | safe_drive::msg::I32Seq<0> |
int32[<=5] | safe_drive::msg::I32Seq<5> |
0
of I32Seq<0>
indicates unbounded, and 5
of I32Seq<5>
indicates less than or equal to 5.
So, MyMsg
and MyMsgs
are generated as follows.
Talker
Let's implement a talker which publishes MyMsgs
periodically.
Edit talker/Cargo.toml
To use the generated types in Rust, we have to edit Cargo.toml
as follows.
The most important thing is to add my_interfaces
, which defines message types we use.
# msgtest/src/talker/Cargo.toml
[dependencies]
safe_drive = "0.4"
my_interfaces = { path = "/tmp/safe_drive_tutorial/msgtest/my_interfaces" }
[package.metadata.ros]
msg = ["my_interfaces"]
msg_dir = "/tmp/safe_drive_tutorial/msgtest"
safe_drive_version = "0.3"
Create talker/package.xml
Then create package.xml
as follows.
msgtest/src/talker/package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>talker</name>
<version>0.0.0</version>
<description>Talker in Rust</description>
<maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
<license>Apache License 2.0</license>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<depend>my_interfaces</depend>
<export>
<build_type>ament_cargo</build_type>
</export>
</package>
Don't forget <depend>my_interfaces</depend>
.
Generated Files
When you run colcon
, it generate my_interfaces
in Rust.
$ cd msgtest
$ colcon build --cargo-args --release
Then, you can find Rust's files as follows.
/tmp/safe_drive_tutorial/msgtest/my_interfaces/src/msg/my_msg.rs
#![allow(unused)] fn main() { #[repr(C)] #[derive(Debug)] pub struct MyMsg { pub integer_value: i32, pub unbounded_integer_array: safe_drive::msg::I32Seq<0>, pub five_integers_array: [i32; 5], pub up_to_five_integers_array: safe_drive::msg::I32Seq<5>, } }
/tmp/safe_drive_tutorial/msgtest/my_interfaces/src/msg/my_msgs.rs
#![allow(unused)] fn main() { #[repr(C)] #[derive(Debug)] pub struct MyMsgs { pub msg1: crate::msg::my_msg::MyMsg, pub msg2: crate::msg::my_msg::MyMsg, } }
Edit talker/src/main.rs
If you want to know how to implement a subscriber or a publisher, please see a tutorial of Pub/Sub. This section describes how to handle a messages generated by ros2msg_to_rs
.
msgtest/src/talker/src/main.rs
use safe_drive::{ context::Context, error::DynError, logger::Logger, msg::{I32Seq, RosStringSeq}, pr_info, }; use std::time::Duration; fn main() -> Result<(), DynError> { // Create a context. let ctx = Context::new()?; // Create a node. let node = ctx.create_node("talker", None, Default::default())?; // Create a publisher. let publisher = node.create_publisher::<my_interfaces::msg::MyMsgs>("my_topic", None)?; // Create a logger. let logger = Logger::new("talker"); // Create a message let my_msg1 = create_message()?; let my_msg2 = create_message()?; let mut my_msgs = my_interfaces::msg::MyMsgs::new().ok_or("failed to create MyMsgs")?; my_msgs.msg1 = my_msg1; my_msgs.msg2 = my_msg2; loop { pr_info!(logger, "send: {:?}", my_msgs); // Print log. // Send a message. publisher.send(&my_msgs)?; std::thread::sleep(Duration::from_secs(1)); } } fn create_message() -> Result<my_interfaces::msg::MyMsg, DynError> { let mut my_msg = my_interfaces::msg::MyMsg::new().unwrap(); my_msg.integer_value = 10; // int32[5] five_integers_array my_msg.five_integers_array[0] = 11; my_msg.five_integers_array[1] = 13; my_msg.five_integers_array[2] = 49; my_msg.five_integers_array[3] = 55; my_msg.five_integers_array[4] = 19; // int32[] unbounded_integer_array let mut msgs = I32Seq::new(3).unwrap(); let ref_msgs = msgs.as_slice_mut(); ref_msgs[0] = 6; ref_msgs[1] = 7; ref_msgs[2] = 8; my_msg.unbounded_integer_array = msgs; // int32[<=5] up_to_five_integers_array let mut msgs = I32Seq::new(2).unwrap(); let ref_msgs = msgs.as_slice_mut(); ref_msgs[0] = 2; ref_msgs[1] = 3; my_msg.up_to_five_integers_array = msgs; Ok(my_msg) }
Primitive types and arrays can be handles in the ordinary way of Rust as follows.
#![allow(unused)] fn main() { my_msg.integer_value = 10; // int32[5] five_integers_array my_msg.five_integers_array[0] = 11; my_msg.five_integers_array[1] = 13; my_msg.five_integers_array[2] = 49; my_msg.five_integers_array[3] = 55; my_msg.five_integers_array[4] = 19; }
To access elements of unbounded or bounded arrays,
we can use as_slice_mut()
or as_slice()
methods as follows.
#![allow(unused)] fn main() { // unbounded or unbounded array let mut msgs = I32Seq::new(3).unwrap(); let ref_msgs = msgs.as_slice_mut(); }
as_slice_mut()
returns a mutable slice,
you can thus update the elements of the array via the slice.
Listener
Let's then implement a listener which receive messages published by the talker.
Edit listener/Cargo.toml
The listener also requires my_interfaces
, and edit Cargo.toml
as follows.
# msgtest/src/listener/Cargo.toml
[dependencies]
safe_drive = "0.4"
my_interfaces = { path = "/tmp/safe_drive_tutorial/msgtest/my_interfaces" }
[package.metadata.ros]
msg = ["my_interfaces"]
msg_dir = "/tmp/safe_drive_tutorial/msgtest"
safe_drive_version = "0.3"
Create listener/package.xml
Then create package.xml
as follows.
msgtest/src/listener/package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>listener</name>
<version>0.0.0</version>
<description>Listener in Rust</description>
<maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
<license>Apache License 2.0</license>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<depend>my_interfaces</depend>
<export>
<build_type>ament_cargo</build_type>
</export>
</package>
Don't forget <depend>my_interfaces</depend>
.
Edit listener/src/main.rs
The listener can also be implemented straightforwardly as follows.
msgtest/src/listener/src/main.rs
use my_interfaces; use safe_drive::{context::Context, error::DynError, logger::Logger, pr_info}; fn main() -> Result<(), DynError> { // Create a context. let ctx = Context::new()?; // Create a node. let node = ctx.create_node("listener", None, Default::default())?; // Create a subscriber. let subscriber = node.create_subscriber::<my_interfaces::msg::MyMsgs>("my_topic", None)?; // Create a logger. let logger = Logger::new("listener"); // Create a selector. let mut selector = ctx.create_selector()?; pr_info!(logger, "listening"); // Add a callback function. selector.add_subscriber( subscriber, Box::new(move |msg| { let msg = &msg.msg1; pr_info!(logger, "message: {}", msg.integer_value); for msg in msg.five_integers_array.iter() { pr_info!(logger, "five_integers_array: {}", msg); } for msg in msg.unbounded_integer_array.iter() { pr_info!(logger, "unbounded_integer_array: {}", msg); } for msg in msg.up_to_five_integers_array.iter() { pr_info!(logger, "up_to_five_integers_array: {}", msg); } }), ); // Spin. loop { selector.wait()?; } }
Compilation and Execution
Now, we can compile and execute the talker and listener. Let's do it!
Compile
The compilation can be performed by using colcon
as follows.
$ cd msgtest
$ colcon build --cargo-args --release
$ . ./install/setup.bash
Execute Listener
The listener can be executed by using ros2
as follows.
After executing the talker, it receives messages as follows.
$ ros2 run listener listener
[INFO] [1658305910.013449534] [listener]: listening
[INFO] [1658305914.359791460] [listener]: message: 10
[INFO] [1658305914.359839382] [listener]: five_integers_array: 11
[INFO] [1658305914.359867532] [listener]: five_integers_array: 13
[INFO] [1658305914.359880763] [listener]: five_integers_array: 49
[INFO] [1658305914.359889731] [listener]: five_integers_array: 55
[INFO] [1658305914.359900913] [listener]: five_integers_array: 19
[INFO] [1658305914.359912534] [listener]: unbounded_integer_array: 6
[INFO] [1658305914.359924084] [listener]: unbounded_integer_array: 7
[INFO] [1658305914.359936971] [listener]: unbounded_integer_array: 8
[INFO] [1658305914.359946479] [listener]: up_to_five_integers_array: 2
[INFO] [1658305914.359959422] [listener]: up_to_five_integers_array: 3
Execute Talker
To execute the talker, open a new terminal window and execute it as follows.
$ cd msgtest
$ . ./install/setup.bash
$ ros2 run talker talker
[INFO] [1658305913.359250753] [talker]: send: MyMsgs { msg1: MyMsg { integer_value: 10, unbounded_integer_array: I32Seq(rosidl_runtime_c__int32__Sequence { data: 0x55a0653f7aa0, size: 3, capacity: 3 }), five_integers_array: [11, 13, 49, 55, 19], up_to_five_integers_array: I32Seq(rosidl_runtime_c__int32__Sequence { data: 0x55a0653efaa0, size: 2, capacity: 2 }) }, msg2: MyMsg { integer_value: 10, unbounded_integer_array: I32Seq(rosidl_runtime_c__int32__Sequence { data: 0x55a0653f7e30, size: 3, capacity: 3 }), five_integers_array: [11, 13, 49, 55, 19], up_to_five_integers_array: I32Seq(rosidl_runtime_c__int32__Sequence { data: 0x55a0653f7e50, size: 2, capacity: 2 }) }
Nicely done! Now, we can define new types and handle the types in Rust.
String Type
Arrays of string are bit different from arrays of primitive types.
string
is unbounded string, and string<=5
is bounded string whose length is up to 5.
So, there are arrays for unbounded and bounded strings as follows; msg
is safe_drive::msg
.
ROS | Rust |
---|---|
string | msg::RosString<0> |
string[] | msg::StringSeq<0, 0> |
string[<=5] | msg::StringSeq<0, 5> |
string[10] | [msg::RosString<0>; 10] |
string<=5 | msg::RosString<5> |
string<=5[<=10] | msg::StringSeq<5, 10> |
string<=5[10] | [msg::RosString<5>; 10] |
RosString<0>
is a type of unbounded string, and RosString<5>
is a type of bounded string whose length is less than or equal to 5.
5
of StringSeq<5, 10>
indicates the length of a string is less than or equal to 5,
and 10
of it indicates the length of the array is less than or equal to 10.
For example, the following ROS2 message type can be translated into the MyMsgStr
as follows.
string message
string[2] static_array_str
string[] dynamic_array_str
string[<=3] bounded_array_str
string<=10 bounded_str
string<=10[2] static_array_bounded_str
string<=10[] dynamic_array_bounded_str
string<=10[<=3] bounded_array_bounded_str
/tmp/safe_drive_tutorial/msgtest/my_interfaces/src/msg/my_msg_str.rs
#![allow(unused)] fn main() { #[repr(C)] #[derive(Debug)] pub struct MyMsgStr { pub message: safe_drive::msg::RosString<0>, pub static_array_str: [safe_drive::msg::RosString<0>; 2], pub dynamic_array_str: safe_drive::msg::RosStringSeq<0, 0>, pub bounded_array_str: safe_drive::msg::RosStringSeq<0, 3>, pub bounded_str: safe_drive::msg::RosString<10>, pub static_array_bounded_str: [safe_drive::msg::RosString<10>; 2], pub dynamic_array_bounded_str: safe_drive::msg::RosStringSeq<10, 0>, pub bounded_array_bounded_str: safe_drive::msg::RosStringSeq<10, 3>, } }
To access to elements of string arrays,
we can use as_slice_mut()
or as_slice_mut()
as follows.
#![allow(unused)] fn main() { fn _create_message_str() -> Result<my_interfaces::msg::MyMsgStr, DynError> { let mut my_msg = my_interfaces::msg::MyMsgStr::new().unwrap(); // string message my_msg.message.assign("Hello, World!"); // string[2] static_array_str my_msg.static_array_str[0].assign("static array 0"); my_msg.static_array_str[1].assign("static array 1"); // string[] dynamic_array_str let mut msgs = RosStringSeq::new(3).ok_or("failed to create string")?; let ref_msgs = msgs.as_slice_mut(); ref_msgs[0].assign("dynamic array 0"); ref_msgs[1].assign("dynamic array 1"); ref_msgs[2].assign("dynamic array 2"); my_msg.dynamic_array_str = msgs; // string[<=3] bounded_array_str let mut msgs = RosStringSeq::new(2).ok_or("failed to create string")?; let ref_msgs = msgs.as_slice_mut(); ref_msgs[0].assign("bounded array 0"); ref_msgs[1].assign("bounded array 1"); my_msg.bounded_array_str = msgs; // string<=10 bounded_str my_msg.bounded_str.assign("Hello!"); // string<=10[2] static_array_bounded_str my_msg.static_array_bounded_str[0].assign("msg1"); my_msg.static_array_bounded_str[1].assign("msg2"); // string<=10[] dynamic_array_bounded_str let mut msgs = RosStringSeq::new(3).ok_or("failed to create string")?; let ref_msgs = msgs.as_slice_mut(); ref_msgs[0].assign("msg3"); ref_msgs[1].assign("msg4"); ref_msgs[2].assign("msg5"); my_msg.dynamic_array_bounded_str = msgs; // string<=10[<=3] bounded_array_bounded_str let mut msgs = RosStringSeq::new(2).ok_or("failed to create string")?; let ref_msgs = msgs.as_slice_mut(); ref_msgs[0].assign("msg3"); ref_msgs[1].assign("msg5"); my_msg.bounded_array_bounded_str = msgs; Ok(my_msg) } }
Service
A service is a communication consisting requests and responses. There are a server and clients for a service as follows.
graph TD; Server--response-->Client1; Server--response-->Client2; Server--response-->Client3; Client1--request-->Server; Client2--request-->Server; Client3--request-->Server;
A client sends a request and the server replies the request.
In this tutorial, we will describe how to implement a client and a server in Rust by using safe_drive
.
Create Base Files
In this tutorial, we prepare 3 Rust's projects and 1 C's project. The following table shows the directories we use.
Directories | Description |
---|---|
srvtest/src/server | server in Rust |
srvtest/src/client | client in Rust |
srvtest/src/srvmsg | message type |
srvmsg
is a ROS2's project to define a user defined type for a service we implement.
$ mkdir -p srvtest/src
$ cd srvtest/src
$ cargo new server
$ cargo new client
$ ros2 pkg create --build-type ament_cmake srvmsg
In addition to that, create the workspace's Cargo.toml
as follows.
srvtest/src/Cargo.toml
[workspace]
members = [ "client", "server"]
Define Protocol
First of all, let's define a protocol for a service.
ROS2 provides a special format to define a protocol,
and it should be described in a .srv
file.
Usually, .srv
files are placed in a srv
directory.
So, we create srv
directory as follows.
$ cd srvtest/src/srvmsg
$ mkdir srv
Create srvmsg/srv/AddTwoInts.srv
Then, create AddTwoInts.srv
file in which a protocol is specified as follows.
uint32 x
uint32 y
---
uint32 result
uint32 x
and uint32 y
above --
are types which must be
included in a request, and uint32 result
is a type which must be
included in a response.
We will implement a server which takes 2 integer values
and returns the summation of the values.
Edit srvmsg/CMakeLists.txt
To generate shared libraries from AddTwoInts.srv
,
CMakeLists.txt
must be updated as follows.
# srvtest/src/srvmsg/CMakeLists.txt
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"srv/AddTwoInts.srv"
)
Edit srvmsg/package.xml
In addition to that, the following lines must be added to package.xml
.
srvtest/src/srvmsg/package.xml
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
Server
A server can be implemented in a straightforward way.
Edit server/Cargo.toml
To generate Rust types from .srv
files,
we have to edit Cargo.toml
as follows.
[dependencies]
safe_drive = "0.4"
srvmsg = { path = "/tmp/safe_drive_tutorial/srvtest/srvmsg" }
tokio = { version = "1", features = ["full"] }
[package.metadata.ros]
msg = ["srvmsg"]
msg_dir = "/tmp/safe_drive_tutorial/srvtest"
safe_drive_version = "0.3"
Create server/package.xml
package.xml
is also required as follows.
srvtest/src/server/package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>server</name>
<version>0.0.0</version>
<description>Server in Rust</description>
<maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
<license>Apache License 2.0</license>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<depend>srvmsg</depend>
<export>
<build_type>ament_cargo</build_type>
</export>
</package>
Don't forget <depend>srvmsg</depend>
.
Generate Types
AddTwoInts.srv
will be translated to struct AddTwoInts
as follows.
#![allow(unused)] fn main() { #[derive(Debug)] pub struct AddTwoInts; impl ServiceMsg for AddTwoInts { type Request = AddTwoInts_Request; type Response = AddTwoInts_Response; fn type_support() -> *const rcl::rosidl_service_type_support_t { unsafe { rosidl_typesupport_c__get_service_type_support_handle__srvmsg__srv__AddTwoInts() } } } }
struct AddTwoInts
implements ServiceMsg trait
.
ServiceMsg::Request
and ServiceMsg::Response
are types of request and response, respectively.
AddTwoInts_Request
and AddTowInts_Response
are as follows.
#![allow(unused)] fn main() { #[repr(C)] #[derive(Debug)] pub struct AddTwoInts_Request { pub x: u32, pub y: u32, } #[repr(C)] #[derive(Debug)] pub struct AddTwoInts_Response { pub result: u32, } }
Edit server/src/main.rs
You have to create a server by create_server()
method and register a callback function to a selector as follows.
use safe_drive::{context::Context, error::DynError, logger::Logger, pr_error, qos::Profile}; use srvmsg::srv::{AddTwoInts, AddTwoInts_Response}; fn main() -> Result<(), DynError> { // Create a context. let ctx = Context::new()?; // Create a node. let node = ctx.create_node("server_node", None, Default::default())?; // Create a server. let server = node.create_server::<AddTwoInts>("my_service", Some(Profile::default()))?; // Create a selector. let mut selector = ctx.create_selector()?; // Create a logger. let logger = Logger::new("server"); selector.add_server( server, Box::new(move |msg, _header| { let mut response = AddTwoInts_Response::new().unwrap(); pr_error!(logger, "recv: {:?}", msg); response.result = msg.x + msg.y; response }), ); loop { selector.wait()?; // Spin. } }
Let's dig into detail.
#![allow(unused)] fn main() { // Create a server. let server = node.create_server::<AddTwoInts>("my_service", Some(Profile::default()))?; }
AddTwoInts
of create_server::<AddTwoInts>
indicates the protocol of the server.
The arguments of "my_service"
and Some(Profile::default())
are the service name and QoS.
If you pass None
instead of Some(Profile::default())
, Profile::default()
is used.
The callback was passed as follows.
#![allow(unused)] fn main() { selector.add_server( server, Box::new(move |msg, _header| { let mut response = AddTwoInts_Response::new().unwrap(); pr_error!(logger, "recv: {:?}", msg); response.result = msg.x + msg.y; response }), ); }
The callback function must take a message sent by a client and a header including sequence number, time, etc.
msg
's types is AddTwoInts_Request
and a value of AddTwoInts_Response
, which is a response, must be returned.
Client
Edit client/Cargo.toml
safe_drive
, and tokio
must be added to Cargo.toml
as follows.
srvtest/src/client/Cargo.toml
[dependencies]
safe_drive = "0.4"
srvmsg = { path = "/tmp/safe_drive_tutorial/srvtest/srvmsg" }
tokio = { version = "1", features = ["full"] }
[package.metadata.ros]
msg = ["srvmsg"]
msg_dir = "/tmp/safe_drive_tutorial/srvtest"
safe_drive_version = "0.3"
Create client/package.xml
package.xml
is also required as follows.
srvtest/src/client/package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>client</name>
<version>0.0.0</version>
<description>Client in Rust</description>
<maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
<license>Apache License 2.0</license>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<depend>srvmsg</depend>
<export>
<build_type>ament_cargo</build_type>
</export>
</package>
Don't forget <depend>srvmsg</depend>
.
Edit client/src/main.rs
We recommend to use async/await to implement a client, because a client wait a response, but when the response is replied is unpredictable. In this tutorial, we use Tokio, which is the most popular asynchronous library of Rust.
use safe_drive::{ context::Context, error::DynError, logger::Logger, pr_error, pr_info, pr_warn, qos::Profile, }; use srvmsg::srv::{AddTwoInts, AddTwoInts_Request}; use std::time::Duration; use tokio::time::timeout; #[tokio::main] async fn main() -> Result<(), DynError> { // Create a context and a node. let ctx = Context::new()?; let node = ctx.create_node("client", None, Default::default())?; let logger = Logger::new("client"); // Create a client. let mut client = node.create_client::<AddTwoInts>("my_service", Some(Profile::default()))?; let mut n = 0; loop { let mut request = AddTwoInts_Request::new().unwrap(); request.x = n; request.y = n + 1; n += 1; // Send a request. let client_rcv = client.send(&request)?; // Receive a request. let mut receiver = client_rcv.recv(); match timeout(Duration::from_secs(1), &mut receiver).await { Ok(Ok((c, response, _header))) => { pr_info!(logger, "received {:?}", response); client = c; } Ok(Err(e)) => { pr_error!(logger, "error: {e}"); return Err(e); } Err(elapsed) => { pr_warn!(logger, "timeout: {elapsed}"); client = receiver.give_up(); } } tokio::time::sleep(Duration::from_secs(1)).await; } }
Because a response will be never returned,
we use tokio::time::timeout
to receive a response.
timeout(Duration::from_secs(1), &mut receiver).await
takes a duration and a awaitable future.
The awaitable future can be taken by client_rcv.recv()
.
Note that it uses tokio::time::sleep
instead of
std::thread::sleep
because to avoid calling blocking functions.
client.send()
consumes client
and it returns client_recv
to receive a response.
client_rcv.recv()
consumes client_recv
and client_rcv.recv().await
returns new client, a response, and a header.
The new client must be used to send next request.
This means that you cannot ignore receiving a response. If you forget to receive a response, you cannot send any request again, because you need to get a new client by receiving a response.. Otherwise, you encountered compilation errors.
Execution
First of all, you have to compile it and execute a server as follows.
$ cd srvtest
$ colcon build --cargo-args --release
$ . ./install/setup.bash
$ ros2 run server server
Then, in another terminal, execute a client as follows.
$ cd srvtest
$ . ./install/setup.bash
$ ros2 run client client
[WARN] [1659604527.720730018] [client]: timeout: deadline has elapsed
[INFO] [1659604528.722220697] [client]: received AddTwoInts_Response { result: 3 }
[INFO] [1659604529.723525686] [client]: received AddTwoInts_Response { result: 5 }
[INFO] [1659604530.724820326] [client]: received AddTwoInts_Response { result: 7 }
Nicely done! We used async/await for the client, and it can be used for the server as follows.
#![allow(unused)] fn main() { async { server.recv().await }; }
Parameter
This chapter introduces how to use parameters. A node can have parameters as follows.
graph TD; NodeA --> ParamA1:i64; NodeA --> ParamA2:bool; NodeA --> ParamA3:String; NodeB --> ParamB1:f64; NodeB --> ParamB2:f64;
In this figure, NodeA has 3 parameters whose types are
i64
, bool
, and String
, respectively,
and NodeB has 2 parameters whose types are f64
.
Parameters can be read/write from external nodes.
To receive a notification of updating a parameter, we can use a callback function or async/await. In this chapter, we will explain how to handle it.
Packages
We will prepare 2 packages as follows.
$ mkdir params
$ cd params
$ cargo new param_server
$ cargo new async_param_server
param_server
explains a callback based parameter handling, and async_param_server
explains an async/await based.
To manage 2 packages, let's prepare Cargo.toml
file for a workspace as follows.
params/Cargo.toml
[workspace]
members = ["param_server", "async_param_server"]
The following table shows files we use in this chapter.
File | What? |
---|---|
params/param_server/ | callback based parameter server |
params/async_param_server/ | async/await based parameter server |
params/Cargo.toml | workspace configuration |
Type of Parameter
Before explaining handlers, let's see types of parameters.
The type of parameter value is defined by safe_drive::parameter::Value
as follows.
#![allow(unused)] fn main() { pub enum Value { NotSet, Bool(bool), I64(i64), F64(f64), String(String), VecBool(Vec<bool>), VecI64(Vec<i64>), VecU8(Vec<u8>), VecF64(Vec<f64>), VecString(Vec<String>), } }
This means that i64
is valid,
but i8
, i32
, and other user defined types are invalid.
A parameter is associated with a descriptor of safe_drive::parameter::Descriptor
as follows.
#![allow(unused)] fn main() { pub struct Descriptor { pub description: String, pub additional_constraints: String, pub read_only: bool, pub dynamic_typing: bool, pub floating_point_range: Option<FloatingPointRange>, pub integer_range: Option<IntegerRange>, } }
So, a parameter and parameters can be represented by safe_drive::parameter::{Parameter, Parameters}
as follows.
#![allow(unused)] fn main() { pub struct Parameter { pub descriptor: Descriptor, pub value: Value, } pub struct Parameters { /* omitted private fields */ } }
and a parameter server can be represented by
safe_drive::parameter::ParameterServer
as follows.
#![allow(unused)] fn main() { pub struct ParameterServer { pub params: Arc<RwLock<Parameters>> // omitted private fields } }
In summary, a parameter server can be represented as follows.
graph TD; ParameterServer --> Parameters; Parameters --> Parameter; Parameter --> Descriptor; Parameter --> Value;
Parameter Setting and Waiting Update by Callback
We will explain how to set parameters, and how to wait updating by using a callback function.
Edit param_server/Cargo.toml
Add safe_drive
to the dependencies section of Cargo.toml
as follows.
[dependencies]
safe_drive = "0.4"
Edit param_server/src/main.rs
param_server
can be implemented as follows.
This sets 2 parameters up and waits updating.
use safe_drive::{ context::Context, error::DynError, logger::Logger, parameter::{Descriptor, Parameter, Value}, pr_info, }; fn main() -> Result<(), DynError> { // Create a context and a node. let ctx = Context::new()?; let node = ctx.create_node("param_server", None, Default::default())?; // Create a parameter server. let param_server = node.create_parameter_server()?; { // Set parameters. let mut params = param_server.params.write(); // Write lock // Statically typed parameter. params.set_parameter( "my_flag".to_string(), // parameter name Value::Bool(false), // value false, // read only? Some("my flag's description".to_string()), // description )?; // Dynamically typed parameter. params.set_dynamically_typed_parameter( "my_dynamic_type_flag".to_string(), // parameter name Value::Bool(false), // value false, // read only? Some("my dynamic type flag's description".to_string()), // description )?; // Add Directly from Parameter struct let parameter_to_set = Parameter { descriptor: Descriptor { description: "my parameter description".to_string(), // parameter description additional_constraints: "my parameter addutional_constraints".to_string(), // parameter additional constraints read_only: false, // read only ? dynamic_typing: false, // static or Dynamic floating_point_range: None, // floating point range integer_range: None, // integer point range }, value: Value::Bool(false), // value }; params.add_parameter( ("my parameter").to_string(), // name parameter_to_set, // parameter )?; } // Create a logger and a selector. let logger = Logger::new("param_server"); let mut selector = ctx.create_selector()?; // Add a callback function to the parameter server. selector.add_parameter_server( param_server, Box::new(move |params, updated| { // Print updated parameters. let mut keys = String::new(); for key in updated.iter() { let value = ¶ms.get_parameter(key).unwrap().value; keys = format!("{keys}{key} = {}, ", value); } pr_info!(logger, "updated parameters: {keys}"); }), ); // Spin. loop { selector.wait()?; } }
param_server
in Detail
First of all, we have to create a parameter server by create_parameter_server()
method as follows.
#![allow(unused)] fn main() { // Create a parameter server. let param_server = node.create_parameter_server()?; }
Then set parameters as follows.
To update parameters, we have to acquire a write lock
for mutual exclusion by write()
method.
#![allow(unused)] fn main() { { // Set parameters. let mut params = param_server.params.write(); // Write lock // Statically typed parameter. params.set_parameter( "my_flag".to_string(), // parameter name Value::Bool(false), // value false, // read only? Some("my flag's description".to_string()), // description )?; // Dynamically typed parameter. params.set_dynamically_typed_parameter( "my_dynamic_type_flag".to_string(), // parameter name Value::Bool(false), // value false, // read only? Some("my dynamic type flag's description".to_string()), // description )?; // Add Directly from Parameter struct let parameter_to_set = Parameter { descriptor: Descriptor { description: "my parameter description".to_string(), // parameter description additional_constraints: "my parameter addutional_constraints".to_string(), // parameter additional constraints read_only: false, // read only ? dynamic_typing: false, // static or Dynamic floating_point_range: None, // floating point range integer_range: None, // integer point range }, value: Value::Bool(false), // value }; params.add_parameter( ("my parameter").to_string(), // name parameter_to_set, // parameter )?; } }
To set a statically typed parameters, use set_parameter()
.
A statically typed parameter cannot be updated
by a value whose type is different from original type.
To set a dynamically typed parameters, use set_dynamically_typed_parameter()
.
A dynamically typed parameter can be updated by an arbitrary type.
To set directly from the Parameter struct, use add_parameter()
.
A Parameter struct can contain additional_constraints.
Finally, register a callback function to wait updating parameters as follows.
#![allow(unused)] fn main() { // Add a callback function to the parameter server. selector.add_parameter_server( param_server, Box::new(move |params, updated| { // Print updated parameters. let mut keys = String::new(); for key in updated.iter() { let value = ¶ms.get_parameter(key).unwrap().value; keys = format!("{keys}{key} = {}, ", value); } pr_info!(logger, "updated parameters: {keys}"); }), ); // Spin. loop { selector.wait()?; } }
1st argument of the closure, params
, is a value of parameters,
and 2nd argument, updated
is a value containing updated parameters.
Execute param_server
Then, let's execute param_server
and get/set a parameter.
First, execute param_server
in a terminal application window as follows.
$ cargo run --bin param_server
Finished dev [unoptimized + debuginfo] target(s) in 0.01s
Running `target/debug/param_server`
[INFO] [1669873997.622330908] [param_server]: updated parameters: my_flag = true,
Then, get and set the parameter by using ros2
command in another terminal application window as follows.
$ ros2 param get param_server my_flag
Boolean value is: False
$ ros2 param set param_server my_flag True
Set parameter successful
$ ros2 param get param_server my_flag
Boolean value is: True
$ ros2 param set param_server my_flag 10
Setting parameter failed: failed type checking: dst = Bool, src = I64
We can get and set boolean values,
but integer value cannot be set because my_flag
is boolean type
and statically typed.
Asynchronous Wait
Then, we explain async/await based parameter server.
Edit async_param_server/Cargo.toml
Add safe_drive
and tokio
to the dependencies section of Cargo.toml
as follows.
[dependencies]
safe_drive = "0.4"
tokio = { version = "1", features = ["full"] }
Edit async_param_server/src/main.rs
async_param_server
can be implemented as follows.
This sets 2 parameters up and waits updating.
use safe_drive::{ context::Context, error::DynError, logger::Logger, parameter::{Descriptor, Parameter, Value}, pr_info, }; #[tokio::main] async fn main() -> Result<(), DynError> { // Create a context and a node. let ctx = Context::new()?; let node = ctx.create_node("async_param_server", None, Default::default())?; // Create a parameter server. let mut param_server = node.create_parameter_server()?; { // Set parameters. let mut params = param_server.params.write(); // Write lock // Statically typed parameter. params.set_parameter( "my_flag".to_string(), // parameter name Value::Bool(false), // value false, // read only? Some("my flag's description".to_string()), // description )?; // Dynamically typed parameter. params.set_dynamically_typed_parameter( "my_dynamic_type_flag".to_string(), // parameter name Value::Bool(false), // value false, // read only? Some("my dynamic type flag's description".to_string()), // description )?; // Add Directly from Parameter struct let parameter_to_set = Parameter { descriptor: Descriptor { description: "my parameter description".to_string(), // parameter description additional_constraints: "my parameter addutional_constraints".to_string(), // parameter additional constraints read_only: false, // read only ? dynamic_typing: false, // static or Dynamic floating_point_range: None, // floating point range integer_range: None, // integer point range }, value: Value::Bool(false), // value }; params.add_parameter( ("my parameter").to_string(), // name parameter_to_set, // parameter )?; } // Create a logger. let logger = Logger::new("async_param_server"); loop { // Wait update asynchronously. let updated = param_server.wait().await?; let params = param_server.params.read(); // Read lock // Print updated parameters. let mut keys = String::new(); for key in updated.iter() { let value = ¶ms.get_parameter(key).unwrap().value; keys = format!("{keys}{key} = {}, ", value); } pr_info!(logger, "updated parameters: {keys}"); } }
Important things are only following 2 lines.
#![allow(unused)] fn main() { // Wait update asynchronously. let updated = param_server.wait().await?; let params = param_server.params.read(); // Read lock }
To wait updating, use wait().await
and acquire a read lock by read()
method
to read parameters.
wait().await
returns updated parameters.
Zero Copy Publish and Subscribe
In this chapter, we will introduce how to use zero copy communications by CycloneDDS. To use CycloneDDS, please install it as Eclipse Cyclone DDS.
Create Project
First of all, create a project as follows.
$ cargo new zerocpy
The files we use are as follows. cyclonedds.xml
will be created and used later.
Files | What? |
---|---|
zerocopy/Cargo.toml | Cargo.toml |
zerocopy/src/main.rs | main.rs |
zerocopy/cyclonedds.xml | configuration of CycloneDDS |
Add safe_drive
to dependencies section of Cargo.toml
as follows.
Cargo.toml
[dependencies]
safe_drive = "0.4"
main.rs
main.rs
can be implemented as follows,
but almost every lines are same as shown before.
main.rs
use safe_drive::{context::Context, msg::common_interfaces::std_msgs}; use std::{error::Error, time::Duration}; const TOPIC_NAME: &str = "pubsub_loaned"; fn main() -> Result<(), Box<dyn Error + Sync + Send + 'static>> { // create a context let ctx = Context::new()?; // create a subscribe node let node_sub = ctx.create_node("loaned_sub_node", None, Default::default())?; // create a publish node let node_pub = ctx.create_node("loaned_pub_node", None, Default::default())?; std::thread::sleep(Duration::from_millis(500)); // create a publisher and a subscriber let subscriber = node_sub.create_subscriber::<std_msgs::msg::Bool>(TOPIC_NAME, None)?; let publisher = node_pub.create_publisher::<std_msgs::msg::Bool>(TOPIC_NAME, None)?; let mut loaned = publisher.borrow_loaned_message()?; *loaned = std_msgs::msg::Bool::new().ok_or("failed to new Bool")?; loaned.data = false; publisher.send_loaned(loaned)?; // send message std::thread::sleep(Duration::from_millis(500)); // wait messages let mut selector = ctx.create_selector()?; selector.add_subscriber( subscriber, Box::new(move |msg| { assert!(!msg.data); }), ); selector.wait()?; Ok(()) }
The important thing is using borrow_loaned_message()
and send_loaned()
methods as follows.
#![allow(unused)] fn main() { let mut loaned = publisher.borrow_loaned_message()?; *loaned = std_msgs::msg::Bool::new().ok_or("failed to new Bool")?; loaned.data = false; publisher.send_loaned(loaned)?; // send message }
borrow_loaned_message()
borrows a memory region from CycloneDDS and
send_loaned()
sends a message in the borrowed region.
safe_drive
automatically check whether zero copy is available or not,
and it uses conventional copied APIs if zero copy is not available.
Setting-up Zero Copy
To enable zero copy, please prepare cyclonedds.xml, which is a configuration file of CycloneDDS, as follows. You can use arbitrary name for it.
cyclonedds.xml
<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://cdds.io/config https://raw.githubusercontent.com/eclipse-cyclonedds/cyclonedds/iceoryx/etc/cyclonedds.xsd">
<Domain id="any">
<SharedMemory>
<Enable>true</Enable>
<LogLevel>info</LogLevel>
</SharedMemory>
</Domain>
</CycloneDDS>
<SharedMemory>
tag indicates the configuration of zero copy.
To use zero copy enabled CycloneDDS, please set environment arguments as follows.
$ export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
$ export CYCLONEDDS_URI=file://path/to/cyclonedds.xml
In addition to that, CycloneDDS requires iox-roudi
daemon to use zero copy.
It can be launched as follows.
$ iox-roudi
Execute
After that, zero copy communications can be performed!
$ cargo run
Custom Memory Allocator
Under construction.
$ mkdir custom_alloc
$ cargo new forwarder
$ cargo new requester
$ cat Cargo.toml
[workspace]
members = ["requester", "forwarder"]
Request to Server in Callback
In this chapter, we will introduce how to access a server in a callback function. The code introduced here can be implemented by using async/await more beautifully.
Subscriber and Client
The concept here is very simple.
We use 2 selectors for a callback function and accessing a server,
and recv_timeout()
to receive a response from the server.
We only show Rust code of a publisher as follows. Other files can be found in Source code.
use safe_drive::{ context::Context, error::DynError, logger::Logger, msg::common_interfaces::{std_msgs, std_srvs}, pr_fatal, pr_info, selector::Selector, service::client::Client, topic::subscriber::Subscriber, RecvResult, }; use std::time::Duration; fn main() -> Result<(), DynError> { // Create a context and a node. let ctx = Context::new()?; let node = ctx.create_node("listen_client", None, Default::default())?; // Create 2 selectors. let selector = ctx.create_selector()?; let selector_client = ctx.create_selector()?; // Create a subscriber, a client, and a logger. let subscriber = node.create_subscriber::<std_msgs::msg::Empty>("pubsubsrv_topic", None)?; let client = node.create_client::<std_srvs::srv::Empty>("pubsubsrv_service", None)?; worker(selector, selector_client, subscriber, client)?; Ok(()) } fn worker( mut selector: Selector, mut selector_client: Selector, subscriber: Subscriber<std_msgs::msg::Empty>, client: Client<std_srvs::srv::Empty>, ) -> Result<(), DynError> { let mut client = Some(client); let logger = Logger::new("listen_client"); selector.add_subscriber( subscriber, Box::new(move |_msg| { pr_info!(logger, "receive a message"); // Take the client. let c = client.take().unwrap(); let request = std_srvs::srv::EmptyRequest::new().unwrap(); // Send a request. let receiver = c.send(&request).unwrap(); // Receive a response. match receiver.recv_timeout(Duration::from_millis(20), &mut selector_client) { RecvResult::Ok((c, _response, _header)) => { pr_info!(logger, "receive a response"); client = Some(c) } RecvResult::RetryLater(r) => client = Some(r.give_up()), RecvResult::Err(e) => { pr_fatal!(logger, "{e}"); panic!() } } }), ); loop { selector.wait()?; } }
in Detail
The important pieces are as follows.
This code create 2 selectors first.
selector_client
is used for a service.
#![allow(unused)] fn main() { // Create 2 selectors. let selector = ctx.create_selector()?; let selector_client = ctx.create_selector()?; }
In the callback function, recv_timeout()
method, which takes
selector_client
, is used as follows.
#![allow(unused)] fn main() { // Send a request. let receiver = c.send(&request).unwrap(); // Receive a response. match receiver.recv_timeout(Duration::from_millis(20), &mut selector_client) { RecvResult::Ok((c, _response, _header)) => { pr_info!(logger, "receive a response"); client = Some(c) } RecvResult::RetryLater(r) => client = Some(r.give_up()), RecvResult::Err(e) => { pr_fatal!(logger, "{e}"); panic!() } } }
In this callback function,
it sends a request to the server and receives a response
by recv_timeout()
method.
State Change
A service's state can be represented as follows.
graph LR; Start-->Send Send-->Receive Receive-->Send
To represent the state, safe_drive
uses the type system of Rust.
First, a client is created as follows.
At that time, it must be Send
state.
#![allow(unused)] fn main() { let mut client = Some(client); // Send state. }
send()
method consumes the client and returns
receiver
.
It means the state is changed from Send to Receive.
#![allow(unused)] fn main() { let c = client.take().unwrap(); let receiver = c.send(&request).unwrap(); // Change state from Send to Receive. }
recv_timeout()
also consumes receiver
and returns a new client
to send a new request if it successfully receives a response.
#![allow(unused)] fn main() { // Receive a response. match receiver.recv_timeout(Duration::from_millis(20), &mut selector_client) { RecvResult::Ok((c, _response, _header)) => { pr_info!(logger, "receive a response"); client = Some(c) } RecvResult::RetryLater(r) => client = Some(r.give_up()), RecvResult::Err(e) => { pr_fatal!(logger, "{e}"); panic!() } } }
When it has timed out, you can choose retrying or giving up.
This code chooses giving up by give_up()
method.
give_up()
methods consumes a receiver and return a new
client to send a new request.
This code will work, but we think using async/await is better than this because it is designed for asynchronous programming. I recommend to rewrite this code by using async/await.
Contribution Guide
This chapter explains safe_drive
's internal briefly to contribute to it.
Setting-up for Contribution
Basic Setting-up
See Setting-up for basic setting-up.
Setting-up Development Environment
To build safe_drive
,
you need install bindgen-cli
and ros2msg_to_rs
as follows.
$ cargo install bindgen-cli
$ cargo install --git https://github.com/tier4/ros2msg_to_rs.git
bindgen_cli
is a transpiler from C to Rust,
and ros2msg_to_rs
is also a transpiler from .msg and .srv to Rust.
If you want to contribute to documents you are reading now,
please install mdbook
and mdbook-mermaind
as follows.
$ cargo install mdbook
$ cargo install mdbook-mermaid
Finally, download safe_drive
as follows.
$ git clone https://github.com/tier4/safe_drive
$ cd safe_drive
Following chapters introduce how to hack safe_drive
in this directory.
Use Docker
We provide Docker files in docker.
You can use this to hack safe_drive
as follows, alternatively.
$ git clone https://github.com/tier4/safe_drive.git
$ cd safe_drive/docker
$ docker compose build
$ docker compose up -d
$ docker exec -it docker-safe_drive-1 /bin/zsh
Build and Test
Build and test can be done by using make
as follows.
Build:
$ make
Test:
$ make test
make test
executes test codes in tests,
and codes in documentation comment.
cargo test
may not work because it requires some environment variables
and some code generation.
Please see Makefile.
Cleaning-up can be done as follows.
$ make clean
Editor Setting-up
safe_drive
has features like iron
, humble
or galactic
, which indicates ROS2 version.
We recommend pass one of a feature to enable editor's support.
If you use ROS2 humble and Visual Studio Code,
please add iron
in rust-analyzer -> cargo -> features
as follows.
{
"rust-analyzer.cargo.features": [
"iron"
]
}
Client applications not necessary to specify a feature of ROS2 version,
because build.rs detects
ROS2 version from ROS_DISTRO
environment variable.
Generating Messages Included by safe_drive
For client applications, safe_drive
provides a way to transpile from ROS2's .idl files to Rust crates by using safe_drive_msg.
safe_drive
itself includes ROS2's some basic messages to handle parameter
servers.
The basic messages can be generated by ros2msg_to_rs, and located in src/msg.
safe_drive
contains common_interfaces,
unique_identifier_msgs,
and rcl_interfaces.
These messages can be transpiled as follows.
{safe_drive}
of the following means a directory path to safe_drive
.
$ git clone https://github.com/ros2/common_interfaces.git -b humble
$ ros2msg_to_rs --disable-common-interfaces -s crate -i common_interfaces -o {safe_drive}/src/msg/humble/common_interfaces
$ mkdir ros2msg && cd ros2msg
$ git clone https://github.com/ros2/unique_identifier_msgs.git -b humble
$ ros2msg_to_rs --disable-common-interfaces -s crate -i . -o {safe_drive}/src/msg/humble/ros2msg
$ git clone https://github.com/ros2/rcl_interfaces.git -b humble
$ rm -rf rcl_interfaces/test_msgs rcl_interfaces/builtin_interfaces
$ ros2msg_to_rs --disable-common-interfaces -s crate -i rcl_interfaces -o {safe_drive}/src/msg/humble/interfaces
-s crate
means crate
is the name of top module.
-i
and -o
means input and output directories.
Because safe_drive
defines builtin_interfaces
internally, it must be removed before transpilation.
To support newer version of ROS2, these files should be updated.
C APIs
C APIs of ROS2 are defined in src/msg/humble/runtime_c.rs and src/rcl. These files are generated by Makefile in supplements/bindgen.
MT Unsafe APIs
MT unsafe APIs requires mutex to prevent data races.
So, these MT unsafe APIs must be redefined in MTUnsafeFn
in src/rcl.rs.
The redefined functions can be called via rcl::MT_UNSAFE_FN
, which is a global variable, after locking.
For example, rcl::rcl_guard_condition_init()
are defined as follows.
#![allow(unused)] fn main() { pub(crate) static MT_UNSAFE_FN: Lazy<Mutex<MTUnsafeFn>> = Lazy::new(|| Mutex::new(MTUnsafeFn::new())); pub(crate) struct MTUnsafeFn; impl MTUnsafeFn { pub fn rcl_guard_condition_init( &self, guard_condition: *mut rcl_guard_condition_t, context: *mut rcl_context_t, options: rcl_guard_condition_options_t, ) -> RCLResult<()> { ret_val_to_err(unsafe { self::rcl_guard_condition_init(guard_condition, context, options) }) } } }
It can be called as follows.
#![allow(unused)] fn main() { let guard = rcl::MT_UNSAFE_FN.lock(); guard.rcl_guard_condition_init( &mut guard_condition, unsafe { context.as_ptr_mut() }, rcl::rcl_guard_condition_options_t { allocator }, )?; }
MT Safe APIs
MT safe APIs are redefined in MTSafeFn
in src/rcl.rs.
For example, rcl::rcl_shutdown()
is defined as follows.
#![allow(unused)] fn main() { pub(crate) struct MTSafeFn; impl MTSafeFn { pub fn rcl_shutdown(context: *mut rcl_context_t) -> RCLResult<()> { ret_val_to_err(unsafe { self::rcl_shutdown(context) }) } } }
It can be called without lock as follows.
#![allow(unused)] fn main() { rcl::MTSafeFn::rcl_shutdown(&mut self.context).unwrap(); }
Documentation
You can contribute to documents you are reading now. Documents are written in Markdown and located in mdbook. These Markdown files are compiled to HTML files and located in docs, and published at https://tier4.github.io/safe_drive/.
You can use (Mermaid)[https://mermaid.js.org/#/] notation in Markdown as follows.
graph TD; A-->B; A-->C; B-->D; C-->D;
The compilation can be done by make
as follows.
$ make doc
We are not native English speaker. So, we need your help to improve documents!