# hydrogen [][travis-badge]
[Documentation][docs]
hydrogen is a non-blocking socket server framework built atop [epoll][epoll-man-page].
It takes care of the tedious connection and I/O marshaling across threads, and
leaves the specifics of I/O reading and writing up the consumer, through trait
implementations.
---
## Streams
hydrogen works with `Stream` trait Objects so any custom type can be used.
[simple-stream][simple-stream-repo] was built in conjunction and offers several
stream abstractions and types including Plain and Secured streams with basic
and WebSocket framing.
## Multithreaded
hydrogen is multithreaded. It uses one thread for accepting incoming
connections, one for updating epoll reported event, and one used for
marshalling I/O into a threadpool of a user specified size.
## Slab allocation
The connection pool is managed as a slab, which means traversal times are
similar to traversing a Vector, with an insertion and removal time of O(1).
## Examples
### [simple-stream][simple-stream-repo]
``` rust
extern crate hydrogen;
extern crate simple_stream as ss;
use hydrogen;
use hydrogen::{Stream as HydrogenStream, HydrogenSocket};
use ss::frame::Frame;
use ss::frame::simple::{SimpleFrame, SimpleFrameBuilder};
use ss::{Socket, Plain, NonBlocking, SocketOptions};
// Hydrogen requires a type that implements `hydrogen::Stream`.
// We'll implement it atop the `simple-stream` crate.
#[derive(Clone)]
pub struct Stream {
inner: Plain
}
impl HydrogenStream for Stream {
// This method is called when epoll reports data is available for reading.
fn recv(&mut self) -> Result>, Error> {
match self.inner.nb_recv() {
Ok(frame_vec) => {
let mut ret_buf = Vec::>::with_capacity(frame_vec.len());
for frame in frame_vec.iter() {
ret_buf.push(frame.payload());
}
Ok(ret_buf)
}
Err(e) => Err(e)
}
}
// This method is called when a previous attempt to write has returned `ErrorKind::WouldBlock`
// and epoll has reported that the socket is now writable.
fn send(&mut self, buf: &[u8]) -> Result<(), Error> {
let frame = SimpleFrame::new(buf);
self.inner.nb_send(&frame)
}
// This method is called when connection has been reported as reset by epoll, or when any
// `std::io::Error` has been returned.
fn shutdown(&mut self) -> Result<(), Error> {
self.inner.shutdown()
}
}
impl AsRawFd for Stream {
fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() }
}
// The following will be our server that handles all reported events
struct Server;
impl hydrogen::Handler for Server {
fn on_server_created(&mut self, fd: RawFd) {
// Do any secific flag/option setting on the underlying listening fd.
// This will be the fd that accepts all incoming connections.
}
fn on_new_connection(&mut self, fd: RawFd) -> Arc> {
// With the passed fd, create your type that implements `hydrogen::Stream`
// and return it.
}
fn on_data_received(&mut self, socket: HydrogenSocket, buffer: Vec) {
// Called when a complete, consumer defined, chunk of data has been read.
}
fn on_connection_removed(&mut self, fd: RawFd, err: Error) {
// Called when a connection has been removed from the watch list, with the
// `std::io::Error` as the reason removed.
}
}
fn main() {
hydrogen::begin(Server, hydrogen::Config {
addr: "0.0.0.0".to_string(),
port: 1337,
max_threads: 8,
pre_allocated: 100000
});
}
```
### `std::net::TcpStream`
``` rust
extern crate hydrogen;
use std::cell::UnsafeCell;
use std::io::{Read, Write, Error, ErrorKind};
use std::net::{TcpStream, Shutdown};
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use hydrogen::{Stream as HydrogenStream, HydrogenSocket};
pub struct Stream {
inner: TcpStream
}
impl Stream {
pub fn from_tcp_stream(tcp_stream: TcpStream) -> Stream {
tcp_stream.set_nonblocking(true);
Stream {
inner: tcp_stream
}
}
}
impl HydrogenStream for Stream {
// This method is called when epoll reports data is available for reading.
fn recv(&mut self) -> Result>, Error> {
let mut msgs = Vec::>::new();
// Our socket is set to non-blocking, we need to read until
// there is an error or the system returns WouldBlock.
// TcpStream offers no guarantee it will return in non-blocking mode.
// Double check OS specifics on this when using.
// https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
let mut total_read = Vec::::new();
loop {
let mut buf = [0u8; 4098];
let read_result = self.inner.read(&mut buf);
if read_result.is_err() {
let err = read_result.unwrap_err();
if err.kind() == ErrorKind::WouldBlock {
break;
}
return Err(err);
}
let num_read = read_result.unwrap();
total_read.extend_from_slice(&buf[0..num_read]);
}
// Multiple frames, or "msgs", could have been gathered here. Break up
// your frames here and save remainer somewhere to come back to on the
// next reads....
//
// Frame break out code goes here
//
msgs.push(total_read);
return Ok(msgs);
}
// This method is called when a previous attempt to write has returned `ErrorKind::WouldBlock`
// and epoll has reported that the socket is now writable.
fn send(&mut self, buf: &[u8]) -> Result<(), Error> {
self.inner.write_all(buf)
}
// This method is called when connection has been reported as reset by epoll, or when any
// `std::io::Error` has been returned.
fn shutdown(&mut self) -> Result<(), Error> {
self.inner.shutdown(Shutdown::Both)
}
}
impl AsRawFd for Stream {
fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() }
}
// The following will be our server that handles all reported events
struct Server;
impl hydrogen::Handler for Server {
fn on_server_created(&mut self, fd: RawFd) {
// Do any secific flag/option setting on the underlying listening fd.
// This will be the fd that accepts all incoming connections.
}
fn on_new_connection(&mut self, fd: RawFd) -> Arc> {
// With the passed fd, create your type that implements `hydrogen::Stream`
// and return it.
}
fn on_data_received(&mut self, socket: HydrogenSocket, buffer: Vec) {
// Called when a complete, consumer defined, chunk of data has been read.
}
fn on_connection_removed(&mut self, fd: RawFd, err: Error) {
// Called when a connection has been removed from the watch list, with the
// `std::io::Error` as the reason removed.
}
}
fn main() {
hydrogen::begin(Server, hydrogen::Config {
addr: "0.0.0.0".to_string(),
port: 1337,
max_threads: 8,
pre_allocated: 100000
});
}
```
## Author
Nathan Sizemore, nathanrsizemore@gmail.com
## License
hydrogen is available under the MPL-2.0 license. See the LICENSE file for more info.
[travis-badge]: https://travis-ci.org/nathansizemore/hydrogen
[docs]: https://nathansizemore.github.io/hydrogen/hydrogen/index.html
[epoll-man-page]: http://man7.org/linux/man-pages/man7/epoll.7.html
[simple-stream-repo]: https://github.com/nathansizemore/simple-stream