fastwebsockets-stream

Crates.iofastwebsockets-stream
lib.rsfastwebsockets-stream
version0.1.0
created_at2025-11-24 19:32:07.537386+00
updated_at2025-11-24 19:32:07.537386+00
descriptionAn adapter that exposes fastwebsockets::WebSocket as a Tokio AsyncRead/AsyncWrite stream.
homepagehttps://github.com/evgeniygem/fastwebsockets-stream
repositoryhttps://github.com/evgeniygem/fastwebsockets-stream
max_upload_size
id1948551
size125,180
(evgeniygem)

documentation

https://docs.rs/fastwebsockets-stream

README

fastwebsockets-stream

Crates.io Documentation License: MIT

fastwebsockets-stream provides alightweight adapter that exposes a fastwebsockets::WebSocket<S> as a tokio-compatible byte stream by implementing AsyncRead and AsyncWrite. It makes it easy to layer existing codecs (for example tokio_util::codec::Framed) or to reuse I/O-based logic over WebSocket application payloads without reimplementing WebSocket framing.

Features

  • Converts a fastwebsockets::WebSocket into a byte stream.
  • Supports Binary and Text payloads via PayloadType.
  • Implements both AsyncRead and AsyncWrite traits.
  • Handles control frames (Ping/Pong/Close) automatically through the underlying fastwebsockets settings.
  • Integrates seamlessly with the [fastwebsockets] crate and tokio ecosystem.

Example: Server

use fastwebsockets::{upgrade, WebSocketError};
use fastwebsockets_stream::{PayloadType, WebSocketStream};
use hyper::{Request, Response};
use hyper::body::{Bytes, Incoming};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use http_body_util::Empty;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::net::Ipv4Addr;

#[tokio::test]
async fn server_example() {
    let listener = tokio::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0u16))
        .await
        .unwrap();
    let addr = listener.local_addr().unwrap();

    tokio::spawn(async move {
        loop {
            let (stream, _) = listener.accept().await.unwrap();
            let io = TokioIo::new(stream);
            tokio::spawn(async move {
                if let Err(err) = http1::Builder::new()
                    .serve_connection(io, service_fn(handle))
                    .with_upgrades()
                    .await
                {
                    eprintln!("Error: {:?}", err);
                }
            });
        }
    });

    println!("Server listening on {}", addr);
}

async fn handle(mut req: Request<Incoming>) -> Result<Response<Empty<Bytes>>, WebSocketError> {
    assert!(upgrade::is_upgrade_request(&req));
    let (resp, ws_fut) = upgrade::upgrade(&mut req)?;
    tokio::spawn(async move {
        let mut buf = [0u8; 6];
        let websocket = ws_fut.await.unwrap();
        let mut ws_stream = WebSocketStream::new(websocket, PayloadType::Binary);
        ws_stream.write(b"Hello!").await.unwrap();
        ws_stream.read(&mut buf).await.unwrap();
        assert_eq!(&buf, b"Hello!");
    });
    Ok(resp)
}

Example: Client

use fastwebsockets::{Frame, OpCode, handshake};
use fastwebsockets_stream::{PayloadType, WebSocketStream};
use http_body_util::Empty;
use hyper::header::{UPGRADE, CONNECTION};
use hyper::body::Bytes;
use tokio::net::TcpStream;

#[tokio::test]
async fn client_example() {
    let addr = "127.0.0.1:9000";
    let stream = TcpStream::connect(addr).await.unwrap();
    let request = http::Request::builder()
        .method("GET")
        .uri("ws://127.0.0.1:9000")
        .header("Host", "127.0.0.1")
        .header(UPGRADE, "websocket")
        .header(CONNECTION, "upgrade")
        .header("Sec-WebSocket-Key", handshake::generate_key())
        .header("Sec-WebSocket-Version", "13")
        .body(Empty::<Bytes>::new())
        .unwrap();

    let (mut ws, _) = handshake::client(&tokio::task::spawn, request, stream).await.unwrap();
    let msg = ws.read_frame().await.unwrap();
    assert_eq!(msg.opcode, OpCode::Binary);
    ws.write_frame(Frame::binary(msg.payload)).await.unwrap();
}

API overview

WebSocketStream<S>

An adapter type that implements tokio::io::AsyncRead and tokio::io::AsyncWrite for a wrapped fastwebsockets::WebSocket<S>.

Important behaviors:

  • Reads present application data frames (Text/Binary) as a continuous byte stream. If a single websocket frame's payload is larger than the caller's read buffer, the remainder is buffered internally and delivered by subsequent reads.
  • A Close frame is mapped to EOF; subsequent reads return Ok(()) with zero bytes.
  • Writes produce exactly one WebSocket data frame per call to write.
  • The adapter temporarily takes ownership of the inner WebSocket while an asynchronous read or write operation is in flight. into_inner() will return the inner WebSocket only if it is not currently owned by an outstanding future.

Key methods:

  • WebSocketStream::new(websocket, payload_type) — create a new adapter.
  • into_inner(self) -> Option<WebSocket<S>> — attempt to recover the inner websocket if no operation is in-progress.
  • is_closed(&self) -> bool — returns true if a Close frame was observed.

PayloadType

Enum with variants Binary and Text specifying which opcode the stream expects and will emit.

Usage tips

  • If you want to use a codec that frames logical application-level messages (for example LinesCodec, length-delimited frames, or protobuf), combine WebSocketStream with tokio_util::codec::Framed.

  • Remember that each call to write becomes a single websocket frame. If your application expects to stream a single large logical message in several frames, implement that message-level framing in your codec.

License

Licensed under the MIT License. See LICENSE for details.

Commit count: 0

cargo fmt