| Crates.io | tightbeam-rs |
| lib.rs | tightbeam-rs |
| version | 0.5.1 |
| created_at | 2025-10-12 20:40:42.910152+00 |
| updated_at | 2025-12-15 08:46:28.798601+00 |
| description | A secure, high-performance messaging protocol library |
| homepage | https://github.com/wahidgroup/tightbeam |
| repository | https://github.com/wahidgroup/tightbeam |
| max_upload_size | |
| id | 1879685 |
| size | 243,578 |
Warning: This project is under active development. Public APIs and file formats MAY change WITHOUT notice. It is NOT yet production-ready. Warning: Only the
fullfeature is currently supported. Info: Information documented in this README is functional but unreleased.
Security Disclaimer: A SECURITY AUDIT HAS NOT BEEN CONDUCTED. USE AT YOUR OWN RISK.
Copyright (C) Tanveer Wahid, WahidGroup, LLC (2025). All Rights Reserved.
tightbeam is a Layer-5 messaging framework using ASN.1 DER encoding with versioned metadata structures for high-fidelity information transmission.
Zero-Copy, Zero-Panic, no_std-Ready
tightbeam defines a structured, versioned messaging protocol with an information fidelity constraint: I(t) ∈ (0,1) for all t ∈ T. Its philosophy is predicated upon a return to first order principles. Sections follow a [concept → specification → implementation → testing] pattern.
tightbeam's design is based on the principle that information transmission maintains bounded fidelity: I(t) ∈ (0,1) for all time t.
This means:
The I(t) constraint informs all protocol design decisions.
The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "MAY", and "OPTIONAL" in this document are to be interpreted as described in RFC2119.
The following project terms MUST be used consistently:
tightbeam implements high-fidelity information transmission through the following bounds:
VERSION 0
VERSION 1
VERSION 2
VERSION 3
tightbeam uses a trait-based security profile system that separates compile-time algorithm constraints from runtime protocol behavior.
The SecurityProfile trait defines a pure metadata layer that declares
algorithm identifiers (OIDs) for cryptographic operations:
pub trait SecurityProfile {
type DigestOid: AssociatedOid;
type AeadOid: AssociatedOid + AeadKeySize;
type SignatureAlg: SignatureAlgorithmIdentifier;
type CurveOid: AssociatedOid;
type KemOid: AssociatedOid;
const KEY_WRAP_OID: Option<ObjectIdentifier> = None;
}
tightbeam separates cryptographic concerns through specialized provider traits:
DigestProvider: Hash/digest operations (SHA-256, SHA3-256, etc.)AeadProvider: Authenticated encryption (AES-GCM variants)SigningProvider: Signature generation and verification (ECDSA, Ed25519)KdfProvider: Key derivation functions (HKDF)CurveProvider: Elliptic curve operations (secp256k1, P-384)These traits compose into CryptoProvider, allowing components to specify only
the cryptographic capabilities they require rather than depending on the full
provider.
Applications implement the SecurityProfile trait to define their own
cryptographic algorithm constraints:
// Example: Custom application profile
pub struct MyAppProfile;
impl SecurityProfile for MyAppProfile {
type DigestOid = Sha3_256;
type AeadOid = Aes256GcmOid;
type SignatureAlg = Secp256k1Signature;
type CurveOid = Secp256k1Oid;
type KemOid = Kyber1024Oid;
const KEY_WRAP_OID: Option<ObjectIdentifier> = Some(AES_256_WRAP);
}
tightbeam provides TightbeamProfile as a reference implementation and default:
pub struct TightbeamProfile;
impl SecurityProfile for TightbeamProfile {
type DigestOid = Sha3_256;
type AeadOid = Aes256GcmOid;
type SignatureAlg = Secp256k1Signature;
type CurveOid = Secp256k1Oid;
type KemOid = Kyber1024Oid;
const KEY_WRAP_OID: Option<ObjectIdentifier> = Some(AES_256_WRAP);
}
Applications can define multiple profiles for different security contexts (e.g.,
HighSecurityProfile, LegacyProfile, QuantumResistantProfile) and use them
with different message types.
Numeric security levels are a convenience shorthand:
confidential + nonrepudiable + min_version = V1tightbeam supports run-time security profile enforcement at the message type
level through the Message trait and compile-time security enforcement at
the message composition level:
pub trait Message: /* trait bounds */ {
const MIN_VERSION: Version = Version::V0;
const MUST_BE_NON_REPUDIABLE: bool = false;
const MUST_BE_CONFIDENTIAL: bool = false;
const MUST_BE_COMPRESSED: bool = false;
const MUST_BE_PRIORITIZED: bool = false;
const MUST_HAVE_MESSAGE_INTEGRITY: bool = false;
const MUST_HAVE_FRAME_INTEGRITY: bool = false;
const HAS_PROFILE: bool = false;
type Profile: SecurityProfile;
}
HAS_PROFILE: Controls whether the message type enforces algorithm constraints
false (default): Message uses TightbeamProfile but does not enforce algorithm OID matchingtrue: FrameBuilder validates that all cryptographic operations use algorithms from the message's Profile typeProfile Type: Specifies which SecurityProfile implementation constrains algorithm selection
TightbeamProfile if not specifiedSecurityProfileHAS_PROFILE = trueAlgorithm Validation: When HAS_PROFILE = true, the following validations occur at compile time:
<Profile::DigestOid as AssociatedOid>::OID<Profile::AeadOid as AssociatedOid>::OID<Profile::SignatureAlg as SignatureAlgorithmIdentifier>::ALGORITHM_OIDThis ensures that message types with specific security profiles can only be composed with compatible cryptographic algorithms.
MUST_BE_NON_REPUDIABLE = true, the Frame MUST include a nonrepudiation fieldMUST_BE_CONFIDENTIAL = true, the Frame's metadata MUST include a confidentiality fieldMUST_BE_COMPRESSED = true, the Frame's metadata compactness field MUST NOT be noneMUST_BE_PRIORITIZED = true, the Frame's metadata MUST include a priority field (V2+ only)version field MUST be >= the message type's MIN_VERSION requirementWhen composing frames with FrameBuilder, profile constraints are enforced at
compile time if the message type has HAS_PROFILE = true:
Using the compose! Macro:
// Example: Message with custom profile
#[derive(Beamable, Sequence, Clone, Debug, PartialEq)]
#[beam(profile(MyAppProfile))]
struct SecureMessage { data: Vec<u8> }
// compose! macro validates algorithm OIDs match MyAppProfile
let frame = compose! {
V1: id: b"msg-001",
order: 1696521900,
message_integrity: type Sha3_256,
confidentiality<Aes256GcmOid, _>: &cipher,
nonrepudiation<Secp256k1Signature, _>: &signing_key,
message: message
}?;
Using FrameBuilder Directly:
// FrameBuilder validates algorithm OIDs match MyAppProfile
let frame = compose::<SecureMessage>(Version::V1)
.with_message(msg)
.with_id(b"msg-001")
.with_order(timestamp)
.with_message_hasher::<Sha3_256>() // ✓ Matches MyAppProfile::DigestOid
.with_cipher::<Aes256GcmOid, _>(&cipher) // ✓ Matches MyAppProfile::AeadOid
.with_signer::<Secp256k1Signature, _>(&signer) // ✓ Matches MyAppProfile::SignatureAlg
.build()?;
Note: All tightbeam macros are entirely optional and contain underlying functionality and traits for direct/manual implementation.
Validation Rules:
with_message_hasher::<D>() validates D::OID == Profile::DigestOid::OIDwith_witness_hasher::<D>() validates D::OID == Profile::DigestOid::OIDwith_cipher::<C, _>() validates C::OID == Profile::AeadOid::OIDwith_signer::<S, _>() validates S::ALGORITHM_OID == Profile::SignatureAlg::ALGORITHM_OIDError Handling: Algorithm mismatches return TightBeamError::UnexpectedAlgorithmForProfile with
expected and received OIDs for debugging.
These requirements are enforced at:
The #[derive(Beamable)] macro implements the Message trait with these attributes:
Security attributes:
#[beam(message_integrity)], #[beam(frame_integrity)]#[beam(nonrepudiable)], #[beam(confidential)]#[beam(compressed)], #[beam(prioritized)]#[beam(min_version = "V1")]Profile attributes:
#[beam(profile = 1)] or #[beam(profile = 2)] - Numeric levels (sets confidential + nonrepudiable, no OID validation)#[beam(profile(TypeName))] - Type-based profile (enables compile-time OID validation)// Numeric security level (convenience)
#[derive(Beamable, Sequence, Clone, Debug, PartialEq)]
#[beam(profile = 1)]
struct PaymentInstruction { /* fields */ }
// Type-based profile with algorithm enforcement
#[derive(Beamable, Sequence, Clone, Debug, PartialEq)]
#[beam(profile(MyAppProfile), confidential, nonrepudiable, min_version = "V1")]
struct HighSecurityTransfer { /* fields */ }
The CryptoProvider trait composes role-based provider traits to bind concrete
cryptographic implementations to SecurityProfile metadata:
pub trait CryptoProvider:
Default +
Copy + // zero-sized type (ZST),
DigestProvider +
AeadProvider +
SigningProvider +
KdfProvider +
CurveProvider
{
type Profile: SecurityProfile + Default;
fn profile(&self) -> &Self::Profile;
}
DefaultCryptoProvider: Reference implementation combining:
All versions MUST include:
All versions MAY include:
#[derive(Sequence, Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "zeroize", derive(zeroize::ZeroizeOnDrop))]
pub struct Metadata {
// Core fields (V0+)
pub id: Vec<u8>,
pub order: u64,
#[asn1(optional = "true")]
#[cfg_attr(feature = "zeroize", zeroize(skip))]
pub compactness: Option<CompressedData>,
// V1+ fields
#[asn1(context_specific = "0", optional = "true")]
#[cfg_attr(feature = "zeroize", zeroize(skip))]
pub integrity: Option<DigestInfo>,
#[asn1(context_specific = "1", optional = "true")]
#[cfg_attr(feature = "zeroize", zeroize(skip))]
pub confidentiality: Option<EncryptedContentInfo>,
// V2+ fields
#[asn1(context_specific = "2", optional = "true")]
#[cfg_attr(feature = "zeroize", zeroize(skip))]
pub priority: Option<MessagePriority>,
#[asn1(context_specific = "3", optional = "true")]
pub lifetime: Option<u64>,
#[asn1(context_specific = "4", optional = "true")]
#[cfg_attr(feature = "zeroize", zeroize(skip))]
pub previous_frame: Option<DigestInfo>,
// V3+ fields
#[asn1(context_specific = "5", optional = "true")]
pub matrix: Option<Asn1Matrix>,
}
#[derive(Sequence, Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "zeroize", derive(zeroize::ZeroizeOnDrop))]
pub struct Frame {
#[cfg_attr(feature = "zeroize", zeroize(skip))]
pub version: Version,
pub metadata: Metadata,
pub message: Vec<u8>,
#[asn1(context_specific = "0", optional = "true")]
#[cfg_attr(feature = "zeroize", zeroize(skip))]
pub integrity: Option<DigestInfo>,
#[asn1(context_specific = "1", optional = "true")]
#[cfg_attr(feature = "zeroize", zeroize(skip))]
pub nonrepudiation: Option<SignerInfo>,
}
This section provides the complete ASN.1 definitions for all tightbeam protocol structures, encoded using Distinguished Encoding Rules (DER).
Version ::= ENUMERATED {
v0(0),
v1(1),
v2(2)
v3(3)
}
MessagePriority ::= ENUMERATED {
critical(0), -- System/security alerts, emergency notifications
top(1), -- High-priority interactive traffic, real-time responses
high(2), -- Important business messages, time-sensitive data
normal(3), -- Standard message traffic (default)
low(4), -- Non-urgent notifications, background updates
bulk(5), -- Batch processing, large data transfers, logs
heartbeat(6) -- Keep-alive signals, periodic status updates
}
tightbeam uses standard CMS (Cryptographic Message Syntax) structures from RFC 5652 and PKCS standards for cryptographic operations.
From RFC 3447 Section 9.2:
DigestInfo ::= SEQUENCE {
digestAlgorithm AlgorithmIdentifier,
digest OCTET STRING
}
Used in Metadata.integrity, Metadata.previous_frame, and Frame.integrity fields.
From RFC 5652 Section 6.1:
EncryptedContentInfo ::= SEQUENCE {
contentType ContentType,
contentEncryptionAlgorithm ContentEncryptionAlgorithmIdentifier,
encryptedContent [0] IMPLICIT OCTET STRING OPTIONAL
}
Used in Metadata.confidentiality field for message-level encryption.
From RFC 5652 Section 5.3:
SignerInfo ::= SEQUENCE {
version CMSVersion,
sid SignerIdentifier,
digestAlgorithm DigestAlgorithmIdentifier,
signedAttrs [0] IMPLICIT SignedAttributes OPTIONAL,
signatureAlgorithm SignatureAlgorithmIdentifier,
signature SignatureValue,
unsignedAttrs [1] IMPLICIT UnsignedAttributes OPTIONAL
}
Used in Frame.nonrepudiation field for digital signatures.
From RFC 3274 Section 2:
CompressedData ::= SEQUENCE {
version CMSVersion,
compressionAlgorithm CompressionAlgorithmIdentifier,
encapContentInfo EncapsulatedContentInfo
}
Used in Metadata.compactness field for message compression.
Matrix ::= SEQUENCE {
n INTEGER (1..255),
data OCTET STRING (SIZE(1..(255*255))) -- MUST be exactly n*n octets; row-major
}
Metadata ::= SEQUENCE {
-- Core fields (V0+)
id OCTET STRING,
order INTEGER,
compactness CompressedData OPTIONAL,
-- V1+ fields (context-specific tags)
integrity [0] DigestInfo OPTIONAL,
confidentiality [1] EncryptedContentInfo OPTIONAL,
-- V2+ fields (context-specific tags)
priority [2] MessagePriority OPTIONAL,
lifetime [3] INTEGER OPTIONAL,
previous_frame [4] DigestInfo OPTIONAL,
-- V3+ fields (context-specific tags)
matrix [5] Matrix OPTIONAL
}
Frame ::= SEQUENCE {
version Version,
metadata Metadata,
message OCTET STRING,
integrity [0] DigestInfo OPTIONAL,
nonrepudiation [1] SignerInfo OPTIONAL
}
The protocol relies on standard ASN.1 structures from established RFCs.
From RFC 5652 Section 10.1.2:
AlgorithmIdentifier ::= SEQUENCE {
algorithm OBJECT IDENTIFIER,
parameters ANY DEFINED BY algorithm OPTIONAL
}
Implemented via the spki crate.
From RFC 3274 Section 2:
CompressionAlgorithmIdentifier ::= AlgorithmIdentifier
-- Standard compression algorithm OID
id-alg-zlibCompress OBJECT IDENTIFIER ::= { iso(1) member-body(2)
us(840) rsadsi(113549) pkcs(1) pkcs-9(9) smime(16) alg(3) 8 }
-- TightBeam also supports zstd compression
id-alg-zstdCompress OBJECT IDENTIFIER ::= { 1 3 6 1 4 1 50274 1 1 }
Implemented via the cms crate.
From RFC 5246 Section 7.4.1.4.1 (informative):
enum {
none(0), md5(1), sha1(2), sha224(3), sha256(4), sha384(5),
sha512(6), (255)
} HashAlgorithm;
enum { anonymous(0), rsa(1), dsa(2), ecdsa(3), (255) }
SignatureAlgorithm;
Note: TightBeam implementations SHOULD use SHA-256 or stronger hash algorithms and SHOULD NOT use MD5 or SHA-1 for new deployments.
id, order, messagecompactnessintegrity, confidentiality, nonrepudiationpriority, lifetime, previous_framematrixorder field MUST be monotonically increasing within a message sequenceorder values SHOULD be based on reliable timestamp sourcesorder values within the same id namespace MUST be rejectedcompactness is present (not None), the message field MUST contain
compressed data encoded as CompressedData per RFC 3274encapContentInfo within CompressedData MUST use the id-data content
type OID if the compressed data does not conform to any recognized content typeid-alg-zlibCompress for zlib, custom OIDs for zstd -- tightbeam uses
1.2.840.113549.1.9.16.3 pending formal assignment)compressionAlgorithm.parameters, MUST be within algorithm-specific valid rangesThis section clarifies the relationship between message integrity and frame integrity. The goals are: (1) unambiguous validation semantics, and (2) clear data retention choices.
Metadata.integrity), MI MUST bind
the message body.Important properties:
When confidentiality is enabled, tightbeam implementations MUST use Authenticated Encryption with Associated Data (AEAD) ciphers. This requirement is enforced at the type system level through trait bounds:
pub fn with_cipher<C, Cipher>(mut self, cipher: Cipher) -> Self
where
C: AssociatedOid,
Cipher: Aead + 'static, // AEAD trait bound required
T: CheckAeadOid<C>;
This design ensures MI over plaintext is cryptographically sound:
This approach is cryptographically equivalent to Encrypt-then-MAC when AEAD is enforced, as AEAD ciphers provide both confidentiality and authenticity of the ciphertext. An attacker cannot modify the ciphertext (AEAD authentication fails), cannot modify MI without breaking FI (when FI is signed/consensus-finalized), and cannot decrypt without the key.
previous_frame field creates a cryptographic hash chain linking framesprevious_frame indicate branchingThis section specifies what the nonrepudiation signature covers when present.
Signature scope (MUST): The signature MUST be computed over the canonical
DER encoding of the Frame fields EXCLUDING the nonrepudiation field itself;
concretely, it MUST cover:
versionmetadata (including MI when present)messageintegrity (FI) when presentSecurity consequence: Any modification to version, metadata (including MI), message, or FI invalidates the signature. This yields the transitive binding: Signature → FI (envelope) → MI (in metadata) → Message body
When all security features are enabled (MI, FI, AEAD encryption, and signatures), the complete security property chain operates as follows:
Sender operations (in order):
Receiver verification (in order):
This layered approach provides defense in depth:
Any tampering at any layer causes verification to fail, ensuring end-to-end integrity and authenticity guarantees.
The Matrix is a compact, flexible structure for transmitting state information. It uses a grid of cells, encoded with ASN.1 DER, to represent application-defined states with perfect structure, aligning with tightbeam's core constraint: I(t) ∈ (0,1).
The matrix enables applications to:
previous_frame for causal state tracking.The matrix is a 2D grid where each cell holds a number from 0 to 255, with meanings defined by the application (e.g., flags, counters, states, functions). Mathematically, it is a 2D array M of size n × n (n ≤ 255), with elements M[r,c] ∈ {0, ..., 255}. Maximum entropy for a full matrix is H = n² log₂ 256 = 8n² bits, assuming uniform distribution. Sparse matrices, using fewer cells, have lower entropy (e.g., 8k bits for k used cells).
Key Dimensions:
Example: A 2x2 matrix for a game state:
The matrix uses ASN.1 DER for deterministic serialization.
ASN.1 Schema (full matrix):
Matrix ::= SEQUENCE {
n INTEGER (1..255), -- Grid size (n x n)
data OCTET STRING (SIZE(1..65025)) -- Row-major cell values
}
Encoding & Performance:
To constrain I(t) ∈ (0,1):
Set diagonal flags in a 3x3 matrix:
use tightbeam::Matrix;
// Full 3x3 matrix
let mut matrix = Matrix::<3>::default();
matrix.set(0, 0, 1); // Feature A: enabled
matrix.set(1, 1, 1); // Feature B: enabled
matrix.set(2, 2, 0); // Feature C: disabled
// Embed in a frame
let frame = compose! {
V1: id: "config-001",
order: 1000,
message: my_message,
matrix: Some(matrix)
}?;
Visualization (full):
[1, 0, 0]
[0, 1, 0]
[0, 0, 0]
This supports up to 255 flags, extensible by adding new diagonal entries. For structured data, use non-diagonal cells (e.g., M[0,1] = 10 for a count, or map public keys to coordinate regions).
The matrix, combined with the previous_frame field, enables sophisticated
state tracking, modeled as a directed acyclic graph (DAG) of state transitions.
Mathematically, frames form a Markov chain where each matrix M_t at time
t depends on M_{t-1}, linked via cryptographic hashes in previous_frame.
State Evolution:
previous_frame hashes ensure a DAG, where M_t → M_{t-1} via hash verification.previous_frame but differing in M_t represent alternative states.Mathematical Model: Applications define a transition probability P(M_t | M_{t-1}), where changes reflect logic or noise. For example, I(t) = I(M_t; M_{t-1}) / H(M_t) ∈ (0,1) may measure fidelity based on shared state, but I(t) is application-defined, constrained by hash consistency and partial state recovery.
The matrix supports flexible state representation, from simple flags to structured data encoding allowing for dynamic computation.
tightbeam-Protocol-V2 DEFINITIONS EXPLICIT TAGS ::= BEGIN
-- Import standard structures from CMS and PKCS
IMPORTS
AlgorithmIdentifier FROM PKCS-1
{ iso(1) member-body(2) us(840) rsadsi(113549) pkcs(1) pkcs-1(1) },
DigestInfo FROM PKCS-1
{ iso(1) member-body(2) us(840) rsadsi(113549) pkcs(1) pkcs-1(1) },
CompressedData FROM CMS-2004
{ iso(1) member-body(2) us(840) rsadsi(113549) pkcs(1) pkcs-9(9) smime(16) modules(0) cms-2004(24) },
EncryptedContentInfo, SignerInfo FROM CMS-2004
{ iso(1) member-body(2) us(840) rsadsi(113549) pkcs(1) pkcs-9(9) smime(16) modules(0) cms-2004(24) };
-- Core protocol version
Version ::= ENUMERATED {
v0(0),
v1(1),
v2(2),
v3(3)
}
-- Message priority enumeration
MessagePriority ::= ENUMERATED {
critical(0),
top(1),
high(2),
normal(3),
low(4),
bulk(5),
heartbeat(6)
}
-- TightBeam-specific matrix structure
Matrix ::= SEQUENCE {
n INTEGER (1..255),
data OCTET STRING (SIZE(1..(255*255))) -- MUST be exactly n*n octets; row-major
}
-- Core message structures
Metadata ::= SEQUENCE {
id OCTET STRING,
order INTEGER,
compactness CompressedData OPTIONAL,
integrity [0] DigestInfo OPTIONAL,
confidentiality [1] EncryptedContentInfo OPTIONAL,
priority [2] MessagePriority OPTIONAL,
lifetime [3] INTEGER OPTIONAL,
previous_frame [4] DigestInfo OPTIONAL,
matrix [5] Matrix OPTIONAL
}
Frame ::= SEQUENCE {
version Version,
metadata Metadata,
message OCTET STRING,
integrity [0] DigestInfo OPTIONAL,
nonrepudiation [1] SignerInfo OPTIONAL
}
END
Implementations MUST provide:
Implementations MUST OPTIONALLY provide:
Implementations MUST enforce message-level security requirements through:
tightbeam MUST operate over ANY transport protocol:
tightbeam accepts standard key formats (X.509 certificates, raw key material, CMS structures) and delegates key lifecycle management to applications:
The CertificateTrust trait provides certificate chain verification and trust
anchor management. Trust stores are used for:
Building a Trust Store:
let cert = Certificate::try_from(CERT_PEM)?;
let trust_store = CertificateTrustBuilder::<Sha3_256>::from(Secp256k1Policy)
.with_certificate(cert)?
.build();
Adding Certificate Chains:
let trust_store = CertificateTrustBuilder::<Sha3_256>::from(Secp256k1Policy)
.with_chain(vec![root_cert, intermediate_cert, leaf_cert])?
.build();
The tightbeam transport layer provides a pluggable framework for moving bytes between endpoints while enforcing security policies. The transport layer is responsible for the following:
The transport layer uses trait-based architecture:
Trait hierarchy:
Protocol: Bind/connect operationsMessageIO: Read/write envelopesMessageCollector: Server-side with policiesMessageEmitter: Client-side with policies and retryEncryptedProtocol: Adds certificate-based bindingEncryptedMessageIO: Adds encryption/decryptionMessages use ASN.1 DER encoding with two-tier envelopes:
DER tag-length-value encoding provides inherent framing. Default size limits:
TransportEncryptionConfig)TransportEncryptionConfig)TCP transport bridges byte streams with message-oriented Frame API using DER
length-prefixed envelopes. Supports both std::net (sync) and tokio (async).
Example:
use std::net::TcpListener;
use tightbeam::{server, compose, Frame};
let listener = TcpListener::bind("127.0.0.1:8080")?;
server! {
protocol std::net::TcpListener: listener,
handle: |message: Frame| async move {
// Echo the frame back
Ok(Some(message))
}
}
Policies control message flow without modifying transport logic:
GatePolicy Trait:
pub trait GatePolicy: Send + Sync {
fn evaluate(&self, frame: &Frame) -> TransitStatus;
}
ReceptorPolicy Trait:
pub trait ReceptorPolicy<T: Message>: Send + Sync {
fn evaluate(&self, message: &T) -> TransitStatus;
}
RestartPolicy Trait:
pub trait RestartPolicy: Send + Sync {
/// Evaluate whether to restart after a transport operation.
///
/// # Arguments
/// * `frame` - Boxed frame from the failed operation
/// * `failure` - The failure reason
/// * `attempt` - The current attempt number (0-indexed)
///
/// # Returns
/// * `RetryAction` - What action to take (retry with frame, or no retry)
fn evaluate(
&self, frame: Box<Frame>,
failure: &TransportFailure,
attempt: usize
) -> RetryAction;
}
TransitStatus:
pub enum TransitStatus {
#[default]
Request = 0,
Accepted = 1,
Busy = 2,
Unauthorized = 3,
Forbidden = 4,
Timeout = 5,
}
RetryAction:
#[derive(Debug, Clone, PartialEq)]
pub enum RetryAction {
/// Retry with the provided frame (same or modified from input)
Retry(Box<Frame>),
/// Do not retry, propagate the error
NoRetry,
}
GatePolicy - Frame-Level Filtering:
use tightbeam::policy::{GatePolicy, TransitStatus};
// Accept only messages with specific ID patterns
#[derive(Default)]
struct IdPatternGate;
impl GatePolicy for IdPatternGate {
fn evaluate(&self, frame: &Frame) -> TransitStatus {
if frame.metadata.id.starts_with(b"api-") {
TransitStatus::Accepted
} else {
TransitStatus::Forbidden
}
}
}
ReceptorPolicy - Message-Level Filtering:
use tightbeam::policy::ReceptorPolicy;
#[derive(Beamable, Sequence)]
struct RequestMessage {
content: String,
priority: u8,
}
// Only accept high-priority messages
#[derive(Default)]
struct PriorityGate;
impl ReceptorPolicy<RequestMessage> for PriorityGate {
fn evaluate(&self, message: &RequestMessage) -> TransitStatus {
if message.priority >= 5 {
TransitStatus::Accepted
} else {
TransitStatus::Forbidden
}
}
}
RestartPolicy - Retry Strategies:
use tightbeam::transport::policy::{RestartLinearBackoff, RestartExponentialBackoff};
// Linear backoff: 1s, 2s, 3s delays
let restart = RestartLinearBackoff::new(3, 1000, 1, None);
// Exponential backoff: 1s, 2s, 4s, 8s delays
let restart = RestartExponentialBackoff::new(4, 1000, None);
Policy Macro:
tightbeam::policy! {
GatePolicy: OnlyApiMessages |frame| {
if frame.metadata.id.starts_with(b"api-") {
TransitStatus::Accepted
} else {
TransitStatus::Forbidden
}
}
ReceptorPolicy<RequestMessage>: OnlyPingMessages |message| {
if message.content == "PING" {
TransitStatus::Accepted
} else {
TransitStatus::Forbidden
}
}
RestartPolicy: RetryThreeTimes |frame, _failure, attempt| {
if attempt < 3 {
RetryAction::Retry(frame)
} else {
RetryAction::NoRetry
}
}
}
Composing Policies:
// Client-side with policies
let builder = ClientBuilder::<TokioListener>::builder()
.with_emitter_gate(IdPatternGate)
.with_collector_gate(PriorityGate)
.with_restart(RestartLinearBackoff::new(3, 1000, 1, None))
.build();
let mut client = builder.connect(addr).await?;
TightBeam implements two handshake protocols for mutual authentication and session key establishment:
Security Goals:
Three-Phase Exchange:
Phase 1: Client → Server
┌─────────────────────────────────────────────────────────┐
│ ClientHello (ECIES) or KeyExchange (CMS) │
│ - Client nonce (32 bytes) │
│ - Optional SecurityOffer (supported profiles) │
│ - Ephemeral public key (CMS: in KARI structure) │
└─────────────────────────────────────────────────────────┘
Phase 2: Server → Client
┌─────────────────────────────────────────────────────────┐
│ ServerHandshake (ECIES) or ServerFinished (CMS) │
│ - Server certificate │
│ - Server nonce (32 bytes) │
│ - Selected SecurityProfile (if negotiation occurred) │
│ - Signature over transcript hash │
└─────────────────────────────────────────────────────────┘
Phase 3: Client → Server
┌─────────────────────────────────────────────────────────┐
│ ClientKeyExchange (ECIES) or ClientFinished (CMS) │
│ - Encrypted session key │
│ - Optional client certificate (mutual auth) │
│ - Optional client signature (mutual auth) │
└─────────────────────────────────────────────────────────┘
State Machines:
Client States:
Init → HelloSent → KeyExchangeSent → ServerFinishedReceived → ClientFinishedSent → Completed
Server States:
Init → KeyExchangeReceived → ServerFinishedSent → ClientFinishedReceived → Completed
Transcript Hash:
transcript = ClientHello || ServerHandshake || ClientKeyExchange
transcript_hash = SHA3-256(transcript)
The transcript hash binds all handshake messages together, preventing:
Overview:
Uses RFC 5652 Cryptographic Message Syntax with:
Mutual Authentication Flow:
For mutual authentication, client includes certificate and signs transcript in ClientKeyExchange:
Client Side - Building ClientKeyExchange:
┌─────────────────────────────────────────────────────────────────────┐
│ 1. Extend Transcript │
│ transcript = ClientHello || ServerHandshake || ClientKeyExchange │
├─────────────────────────────────────────────────────────────────────┤
│ 2. Sign Extended Transcript │
│ final_hash = SHA3-256(transcript) │
│ client_signed_data = Sign(final_hash, client_priv_key) │
├─────────────────────────────────────────────────────────────────────┤
│ 3. Build ClientKeyExchange │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ ClientKeyExchange { │ │
│ │ enveloped_data: EnvelopedData, // Encrypted CEK │ │
│ │ client_certificate: Some(cert), // Client cert │ │
│ │ client_signature: Some(sig), // Transcript sig │ │
│ │ } │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Server verification flow:
Server Side - Verifying Client Authentication:
┌─────────────────────────────────────────────────────────────┐
│ 1. Validate Client Certificate │
│ ✓ Check certificate chain against trust anchors │
│ ✓ Verify certificate is within validity period │
├─────────────────────────────────────────────────────────────┤
│ 2. Extract Client Public Key │
│ client_pub_key = cert.subject_public_key_info │
├─────────────────────────────────────────────────────────────┤
│ 3. Verify Transcript Signature │
│ verified_hash = Verify(client_signature, client_pub_key) │
├─────────────────────────────────────────────────────────────┤
│ 4. Verify Transcript Match │
│ expected_hash = SHA3-256(transcript) │
│ if verified_hash ≠ expected_hash: │
│ return TranscriptMismatch error │
│ ✓ Client witnessed same transcript │
└─────────────────────────────────────────────────────────────┘
Overview:
Lightweight alternative using ECIES (Elliptic Curve Integrated Encryption Scheme) for key encapsulation. Compact structures without ASN.1 EnvelopedData/SignedData overhead requiring minimal wire format complexity.
Key Differences from CMS:
Mutual Authentication Flow:
For mutual authentication, client includes certificate and signs transcript in ClientKeyExchange:
Client Side - Building ClientKeyExchange:
┌─────────────────────────────────────────────────────────────────────┐
│ 1. Perform ECDH with Server's Public Key │
│ shared_secret = ECDH(client_ephemeral_priv, server_pub_key) │
├─────────────────────────────────────────────────────────────────────┤
│ 2. Derive Session Key via HKDF │
│ session_key = HKDF-SHA3-256( │
│ ikm: shared_secret, │
│ salt: client_nonce || server_nonce, │
│ info: "tightbeam-ecies-session-v1" │
│ ) │
├─────────────────────────────────────────────────────────────────────┤
│ 3. Encrypt Session Key with ECIES │
│ encrypted_key = ECIES-Encrypt( │
│ plaintext: session_key, │
│ recipient_pub_key: server_pub_key │
│ ) │
├─────────────────────────────────────────────────────────────────────┤
│ 4. Sign Extended Transcript │
│ transcript = ClientHello || ServerHandshake || ClientKeyExchange │
│ final_hash = SHA3-256(transcript) │
│ client_signature = Sign(final_hash, client_priv_key) │
├─────────────────────────────────────────────────────────────────────┤
│ 5. Build ClientKeyExchange │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ ClientKeyExchange { │ │
│ │ encrypted_session_key: encrypted_key, │ │
│ │ client_certificate: Some(cert), // Client cert │ │
│ │ client_signature: Some(sig), // Transcript sig │ │
│ │ } │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Server verification flow:
Server Side - Verifying Client Authentication:
┌─────────────────────────────────────────────────────────────┐
│ 1. Validate Client Certificate │
│ ✓ Check certificate chain against trust anchors │
│ ✓ Verify certificate is within validity period │
├─────────────────────────────────────────────────────────────┤
│ 2. Extract Client Public Key │
│ client_pub_key = cert.subject_public_key_info │
├─────────────────────────────────────────────────────────────┤
│ 3. Verify Transcript Signature │
│ verified_hash = Verify(client_signature, client_pub_key) │
├─────────────────────────────────────────────────────────────┤
│ 4. Verify Transcript Match │
│ expected_hash = SHA3-256(transcript) │
│ if verified_hash ≠ expected_hash: │
│ return TranscriptMismatch error │
│ ✓ Client witnessed same transcript │
├─────────────────────────────────────────────────────────────┤
│ 5. Decrypt Session Key │
│ session_key = ECIES-Decrypt( │
│ ciphertext: encrypted_session_key, │
│ server_priv_key: server_private_key │
│ ) │
│ ✓ Session established with forward secrecy │
└─────────────────────────────────────────────────────────────┘
ECIES Encryption Details:
ECIES (Elliptic Curve Integrated Encryption Scheme) combines:
ECIES-Encrypt(plaintext, recipient_pub_key):
┌─────────────────────────────────────────────────────────┐
│ 1. Generate ephemeral key pair │
│ (ephemeral_priv, ephemeral_pub) = GenerateKeyPair() │
├─────────────────────────────────────────────────────────┤
│ 2. Perform ECDH │
│ shared_secret = ECDH(ephemeral_priv, recipient_pub) │
├─────────────────────────────────────────────────────────┤
│ 3. Derive encryption keys │
│ (enc_key, mac_key) = HKDF-SHA3-256( │
│ ikm: shared_secret, │
│ info: ephemeral_pub || "ecies-kdf" │
│ ) │
├─────────────────────────────────────────────────────────┤
│ 4. Encrypt with AEAD │
│ ciphertext = AES-256-GCM.encrypt( │
│ key: enc_key, │
│ plaintext: plaintext, │
│ aad: ephemeral_pub │
│ ) │
├─────────────────────────────────────────────────────────┤
│ 5. Return ECIES message │
│ return (ephemeral_pub || ciphertext) │
└─────────────────────────────────────────────────────────┘
Wire Format Comparison:
| Feature | CMS-Based | ECIES-Based |
|---|---|---|
| Envelope Structure | RFC 5652 nested structures | Simplified ASN.1 structures |
| Key Agreement | KARI (KeyAgreeRecipientInfo) | Raw ECIES with DER encoding |
| Session Key Encryption | EnvelopedData + AES-KW | ECIES + AES-GCM |
| Signatures | SignedData structure | Raw signatures in ASN.1 |
| Size Overhead | ~400-600 bytes | ~200-300 bytes |
| Parsing Complexity | Multi-level ASN.1 nesting | Flat ASN.1 structures |
| Standards Compliance | RFC 5652, RFC 5753 | SECG SEC 1 + custom ASN.1 |
Performance Characteristics:
Security Equivalence:
Both protocols provide identical security properties:
Both CMS and ECIES handshake protocols support cryptographic algorithm
negotiation through SecurityProfile descriptors:
Negotiation Process:
Client Server
│ │
│─── SecurityOffer ───────────────► │
│ supported_profiles: [ │
│ Profile1: SHA3-256 │ ◄─ Select first
│ AES-128-GCM, │ mutual profile
| secp256k1 │
| secp256k1 │
│ Profile2: SHA3-512 │
│ AES-256-GCM │
| ed25519 │
| x25519 │
│ ] │
│ │
│ ◄── SecurityAccept ───────────── │
│ selected_profile: │
│ Profile1 (SHA3-256+ │
│ AES-128-GCM+secp256k1) │
│ │
├═══════════════════════════════════┤
│ All subsequent operations use │
│ selected profile algorithms │
└═══════════════════════════════════┘
Profile Validation:
Profile Negotiation:
// Client offers supported profiles
let security_offer = SecurityOffer {
profiles: vec![
SecurityProfileDesc { /* SHA3-256 + AES-256-GCM + secp256k1 */ },
SecurityProfileDesc { /* SHA-256 + AES-128-GCM + P-256 */ },
],
};
// Server selects first mutually supported profile
let security_accept = SecurityAccept {
profile: SecurityProfileDesc { /* chosen profile */ },
};
Failure Modes:
| Error | Cause | Recovery |
|---|---|---|
CertificateValidationFailed |
Invalid certificate chain | Reject connection |
TranscriptMismatch |
MITM or protocol error | Abort handshake |
NonceReplay |
Duplicate nonce detected | Reject message |
UnsupportedProfile |
No mutual profile | Negotiate or reject |
InvalidState |
Out-of-order message | Reset state machine |
DecryptionFailed |
Wrong key or corrupted data | Abort handshake |
| Threat | Control | Implementation |
|---|---|---|
| Replay Attack | 32-byte nonce + replay set | Server maintains set of seen nonces; rejects duplicates |
| Downgrade Attack | Profile list in signed transcript | Transcript hash covers SecurityOffer/SecurityAccept |
| MITM | Transcript signatures | Both parties sign transcript_hash; verified against certificates |
| Confidentiality | ECDH + HKDF derived AEAD key | Session key never transmitted; derived from ECDH shared secret |
| Forward Secrecy | Ephemeral client keys | New ephemeral key per handshake; compromise doesn't affect past sessions |
| DoS | 16 KiB handshake size cap | Reject oversized handshake messages before processing |
| Certificate Forgery | X.509 chain validation | Verify root of trust Note: Application responsibility |
| Nonce Reuse | Monotonic counter + XOR | Per-message nonce derived from seed XOR counter |
Connection pooling enables efficient connection reuse across multiple requests.
ConnectionPool uses a builder pattern where the pool is configured once
via .builder(), then .connect() retrieves connections from the pool.
Example:
// Create shared pool with configuration (once per application)
let pool = Arc::new(ConnectionPool::<TokioListener>::builder()
.with_config(PoolConfig { max_connections: 3, ..Default::default() })
.with_trust_store(trust_store)
.with_client_identity(CLIENT_CERT, CLIENT_KEY.to_provider::<Secp256k1>()?)?
.with_timeout(Duration::from_millis(5000))
.build());
// Get connection from pool
let mut client = pool.connect(server_addr).await?;
client.emit(frame, None).await?;
// Connection automatically returned to pool on drop
Configuration:
PoolConfig::max_connections: Max connections per destination (default: 64)PoolConfig::idle_timeout: Optional connection expiration (default: None)The tightbeam transport layer and handshake protocols have not yet been independently audited. We welcome help in this area.
The Efficient Exchange-Compute Interconnect or EECI is a software development paradigm inspired by the entomological world. As threads and tunnels underpin the basics of processing and communication, we can start at these base levels and develop from here. The goal of EECI is to operate on these base layers across any transmission protocol:
There are four main components to EECI:
Think of workers as ants, servlets as ant hills, and clusters as ant colonies. Insects have specific functions for which they process organic matter using local information. These functions are often simple, but when combined in large numbers, they can perform complex tasks. The efficiency of each unit is attributed to their fungible nature--how well it can accomplish its singular task.
Workers are the smallest unit of computation in the EECI--the "ants" that do the actual work. They exist because of a fundamental insight: most business logic doesn't need network context. A function that doubles a number doesn't care whether the input came from TCP, UDP, or an in-memory channel. By isolating this logic into workers, we gain:
Workers are intentionally constrained:
These constraints enable the parallelism and fault isolation that make EECI effective. Workers don't coordinate with each other--they just transform input to output.
Note: It is highly discouraged to workaround the Frame limitation by passing the Frame in a message parameter.
worker! MacroWorkers follow an insect-inspired structure: a "head" (configuration), optional "receptors" (gates), a "thorax" (isolation container), and an "abdomen" (handler).
tightbeam::worker! {
name: PingPongWorker<RequestMessage, PongMessage>,
config: {
response: &str,
},
policies: {
with_receptor_gate: [PingGate]
},
handle: |_message, _trace, config| async move {
PongMessage {
result: config.response.to_string(),
}
}
}
The handler receives the message, a trace collector for instrumentation, and the worker's configuration. It returns the output message type.
Workers can be tested using the tb_scenario! macro with environment Worker:
use tightbeam::{tb_scenario, tb_assert_spec, exactly, worker};
tb_assert_spec! {
pub PingPongSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
assertions: [
("worker_called", exactly!(1)),
("response_received", exactly!(1), equals!("pong"))
]
}
}
tb_scenario! {
name: test_ping_pong_worker,
config: ScenarioConf::<()>::builder()
.with_spec(PingPongSpec::latest())
.build(),
environment Worker {
setup: |_trace| {
PingPongWorker::new(PingPongWorkerConf {
response: "pong",
})
},
stimulus: |trace, worker| async move {
trace.event_with("worker_called", &[], ())?;
let request = RequestMessage { content: "ping".to_string() };
let response = worker.relay(Arc::new(request)).await?;
trace.event_with("response_received", &[], response.result)?;
Ok(())
}
}
}
The environment Worker syntax provides:
setup: Creates the worker instance with its configurationstimulus: Sends a message to the worker via relay() and validates the responseServlets are the network endpoints of the EECI--the "anthills" where messages arrive and are processed. While workers handle pure business logic, servlets handle the protocol layer: accepting connections, decoding frames, dispatching to workers, and sending responses.
Servlets sit between the network and workers:
This separation means servlets handle concerns like connection management, frame validation, and response composition--things workers shouldn't know about.
Each servlet is responsible for exactly one message type. This keeps servlets focused and predictable. When you need to handle multiple related message types, use an ASN.1 Choice type to group them:
// A Choice type groups related messages
#[derive(Beamable, Choice)]
pub enum CalcRequest {
Add(AddParams),
Multiply(MultiplyParams),
Divide(DivideParams),
}
Servlets can apply gate policies to filter or validate incoming messages before processing. Common use cases include rate limiting, authentication, and input validation:
servlet! {
pub SecureServlet<Request, EnvConfig = ()>,
protocol: TokioListener,
policies: {
with_collector_gate: [RateLimitGate::new(100), AuthGate::new(key)]
},
handle: |frame, trace, config, workers| async move {
// Only reached if all gates pass
// ...
}
}
Step 1: Define configuration struct outside the macro:
#[derive(Clone)]
pub struct PingPongServletConf {
pub lotto_number: u32,
}
Step 2: Define the servlet using EnvConfig:
tightbeam::servlet! {
pub PingPongServletWithWorker<RequestMessage, EnvConfig = PingPongServletConf>,
protocol: TokioListener,
handle: |frame, trace, config, workers| async move {
// Handler receives Frame, not decoded message
let decoded = decode::<RequestMessage, _>(&frame.message)?;
let decoded_arc = Arc::new(decoded);
// Workers are accessed via the workers parameter
let (ping_result, lucky_result) = tokio::join!(
workers.relay::<PingPongWorker>(Arc::clone(&decoded_arc)),
workers.relay::<LuckyNumberDeterminer>(Arc::clone(&decoded_arc))
);
let reply = match ping_result {
Ok(Some(reply)) => reply,
_ => return Ok(None),
};
let is_winner = match lucky_result {
Ok(Some(is_winner)) => is_winner,
_ => return Ok(None),
};
Ok(Some(compose! {
V0: id: b"response-id",
message: ResponseMessage {
result: reply.result,
is_winner,
}
}?))
}
}
Step 3: Configure workers via ServletConf when starting the servlet:
// Create workers (use ::new, not .start - servlet auto-starts them)
let ping_pong_worker = PingPongWorker::new(());
let lucky_number_worker = LuckyNumberDeterminer::new(LuckyNumberDeterminerConf {
lotto_number: 42,
});
// Build servlet configuration
let servlet_conf = ServletConf::<TokioListener, RequestMessage>::builder()
.with_config(Arc::new(PingPongServletConf { lotto_number: 42 }))
.with_worker(ping_pong_worker)
.with_worker(lucky_number_worker)
.build();
// Start the servlet (workers are auto-started with servlet's trace)
PingPongServletWithWorker::start(trace, Some(servlet_conf)).await?
Worker Lifecycle
Workers follow a two-phase lifecycle:
::new(config) or ::default()) - Creates the worker in an unstarted state.start(trace)) - Spawns the worker's async task loop with a trace collectorWhen workers are added to a servlet via ServletConf::builder().with_worker(worker):
.start(trace) on each worker during servlet startupFor standalone worker testing (outside servlets), use the Worker trait's
start() method explicitly:
let worker = MyWorker::new(config);
let trace = Arc::new(TraceCollector::new());
let started_worker = worker.start(trace).await?;
Efficient Parallel Worker Processing
Workers accept Arc<Input> instead of owned Input to enable efficient
parallel processing. When calling multiple workers in parallel:
Example using tokio::join!:
let decoded_arc = Arc::new(decoded);
let (result1, result2) = tokio::join!(
workers.worker1.relay(Arc::clone(&decoded_arc)),
workers.worker2.relay(Arc::clone(&decoded_arc))
);
Servlets with workers can be tested using environment Servlet:
use tightbeam::{tb_scenario, tb_assert_spec, exactly, servlet, worker};
tb_assert_spec! {
pub CalcServletSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
assertions: [
("servlet_receive", exactly!(1)),
("worker_process", exactly!(1)),
("servlet_respond", exactly!(1)),
("result_verified", exactly!(1), equals!(10u32))
]
}
}
tb_scenario! {
name: test_calc_servlet,
config: ScenarioConf::<CalcServletConf>::builder()
.with_spec(CalcServletSpec::latest())
.with_env_config(CalcServletConf { multiplier: 2 })
.build(),
environment Servlet {
servlet: CalcServlet,
start: |trace, config| async move {
let worker = DoublerWorker::new(());
let servlet_conf = ServletConf::<TokioListener, CalcRequest>::builder()
.with_config(config)
.with_worker(worker)
.build();
CalcServlet::start(trace, Some(servlet_conf)).await
},
setup: |servlet_addr, _config| async move {
let builder = ClientBuilder::<TokioListener>::builder().build();
let client = builder.connect(servlet_addr).await?;
Ok(client)
},
client: |trace, mut client, _config| async move {
let request = compose! {
V0: id: b"calc-req",
message: CalcRequest { value: 5 }
}?;
let response_frame = client.emit(request, None).await?
.ok_or(TightBeamError::MissingResponse)?;
let response: CalcResponse = decode(&response_frame.message)?;
trace.event_with("result_verified", &[], response.result)?;
Ok(())
}
}
}
The environment Servlet syntax provides:
start: Configures and starts the servlet with workerssetup: Creates the client connection to the servletclient: Sends requests and validates responses via trace eventsClusters are the "ant colonies" of the EECI--centralized gateways that coordinate distributed hives. While hives manage individual servlets, clusters provide the higher-level orchestration: routing work requests to the right hive, monitoring hive health, and balancing load across the swarm.
A cluster operates as a gateway server with three primary responsibilities:
Hive Registry: Maintains a dynamic registry of connected hives and their available servlet types. Hives register themselves on startup, announcing which servlet types they can handle.
Work Routing: Receives ClusterWorkRequest messages from external clients,
looks up which hives support the requested servlet type, selects one via load
balancing, and forwards the request.
Health Monitoring: Periodically sends heartbeats to all registered hives. Unresponsive hives are evicted after consecutive failures, ensuring clients are never routed to dead endpoints.
cluster! MacroDefine a cluster type using the cluster! macro:
cluster! {
pub MyCluster,
protocol: TokioListener,
config: ClusterConf::default()
}
The macro accepts an optional digest parameter for custom hash algorithms used
in frame integrity verification:
cluster! {
pub MyCluster,
protocol: TokioListener,
digest: Blake3,
config: ClusterConf::default()
}
Clusters require TLS configuration for secure communication with hives. The cluster acts as a TLS client when connecting to hives, presenting its certificate for mutual authentication. This ensures both parties can verify identity before exchanging work.
let tls = ClusterTlsConfig {
certificate: CertificateSpec::Built(Box::new(cert)),
key: Arc::new(Secp256k1KeyProvider::from(key)),
validators: vec![], // Optional: validators for hive certificates
};
For hives to trust cluster commands (like heartbeats), they must have the cluster's certificate in their trust store. See Trust Stores for details.
Clusters continuously monitor hive health through heartbeats. Each heartbeat is a signed frame sent to the hive, which responds with its current utilization. This serves two purposes:
Configure heartbeat behavior via HeartbeatConf:
let heartbeat_conf = HeartbeatConf::builder()
.with_interval(Duration::from_secs(5)) // Check every 5 seconds
.with_timeout(Duration::from_secs(15)) // Response deadline
.with_max_failures(3) // Evict after 3 failures
.with_max_concurrent(10) // Parallel heartbeat limit
.build();
The on_heartbeat callback enables monitoring and metrics collection:
.with_callback(Arc::new(|event| {
metrics::counter!("heartbeat", "success" => event.success.to_string()).increment(1);
}))
When multiple hives support the same servlet type, the cluster uses a LoadBalancer
to select one. The default is LeastLoaded, which routes to the hive with the
lowest reported utilization. Other strategies include RoundRobin and
PowerOfTwoChoices. Custom strategies can be created by implementin the
LoadBalancer trait.
Clients interact with clusters through work request messages:
ClusterWorkRequest with servlet_type and encoded payloadClusterWorkResponse and returns to client// Client sends:
let request = ClusterWorkRequest {
servlet_type: b"calculator".to_vec(),
payload: encode(&CalcRequest { value: 42 })?,
};
// Client receives:
let response: ClusterWorkResponse = decode(&frame.message)?;
match response.status {
TransitStatus::Accepted => { /* process response.payload */ }
TransitStatus::Forbidden => { /* no hive available */ }
_ => { /* handle other statuses */ }
}
pub struct ClusterConf<L: LoadBalancer = LeastLoaded, D: Digest = Sha3_256> {
/// Load balancing strategy for distributing work across hives
pub load_balancer: L,
/// Heartbeat configuration
pub heartbeat: HeartbeatConf,
/// Gate policies for the gateway (rate limiting, auth, etc.)
pub policies: Vec<Arc<dyn GatePolicy + Send + Sync>>,
/// Connection pool configuration for hive connections
pub pool_config: PoolConfig,
/// Default retry policy for all cluster → hive communication
pub retry_policy: Arc<dyn RestartPolicy + Send + Sync>,
/// TLS configuration for cluster → hive connections
pub tls: ClusterTlsConfig,
}
Clusters can be tested using environment Cluster:
use tightbeam::{tb_scenario, tb_assert_spec, exactly, cluster, hive};
tb_assert_spec! {
pub ClusterRoutingSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
assertions: [
("work_sent", exactly!(1)),
("routing_status", exactly!(1), equals!(TransitStatus::Accepted))
]
}
}
tb_scenario! {
name: cluster_work_routing,
config: ScenarioConf::<ClusterTestConf>::builder()
.with_spec(ClusterRoutingSpec::latest())
.with_env_config(ClusterTestConf {
heartbeat_stats: Arc::new(HeartbeatStats::default()),
heartbeat_interval: Duration::from_millis(50),
})
.build(),
environment Cluster {
cluster: ClusterGateway,
start: |trace, config| async move {
let (cert, key) = create_test_cert_with_key("CN=Cluster Gateway", 365)?;
// Build hive trust from cluster cert
let hive_trust = CertificateTrustBuilder::<Sha3_256>::from(Secp256k1Policy)
.with_chain(vec![cert.clone()])?
.build();
let tls = ClusterTlsConfig {
certificate: CertificateSpec::Built(Box::new(cert)),
key: Arc::new(Secp256k1KeyProvider::from(key)),
validators: vec![],
};
let heartbeat_conf = HeartbeatConf::builder()
.with_interval(config.heartbeat_interval)
.with_callback(Arc::new({
let stats = Arc::clone(&config.heartbeat_stats);
move |event| {
stats.attempts.fetch_add(1, Ordering::SeqCst);
if event.success {
stats.successes.fetch_add(1, Ordering::SeqCst);
}
}
}))
.build();
let cluster_conf = ClusterConf::builder(tls)
.with_heartbeat_config(heartbeat_conf)
.build();
let cluster = ClusterGateway::start(trace, cluster_conf).await?;
Ok((cluster, hive_trust))
},
hives: |trace, hive_trust| {
vec![
TestHive::start(Arc::clone(&trace), Some(HiveConf {
trust_store: Some(Arc::new(hive_trust)),
..Default::default()
}))
]
},
client: |trace, mut client, config| async move {
let request = ClusterWorkRequest {
servlet_type: b"ping".to_vec(),
payload: encode(&PingRequest { value: 21 })?,
};
trace.event("work_sent")?;
let response_frame = client.emit(compose! {
V0: id: b"work-001", message: request
}?, None).await?;
if let Some(frame) = response_frame {
let work_response: ClusterWorkResponse = decode(&frame.message)?;
trace.event_with("routing_status", &[], work_response.status)?;
}
Ok(())
}
}
}
The environment Cluster syntax provides:
cluster: The cluster type to teststart: Configures and starts the cluster, returns (Cluster, HiveTrust) tuplehives: Creates hive instances that register with the cluster (receives trust from start)client: Sends work requests and validates routing via trace eventsHives are the intermediary layer between clusters and servlets. While a servlet handles a single message type on a single port, a hive can manage multiple servlets and coordinate with a cluster for work distribution. Think of hives as "ant nests" that house multiple specialized workers (servlets).
Hives support two distinct operating modes:
Single-Servlet Mode allows a hive to "morph" between different servlet types
dynamically. The cluster sends an ActivateServletRequest, and the hive stops its
current servlet and starts the requested one. This is useful for dynamic workload
reallocation--a hive can switch from handling "ping" requests to "calculator"
requests based on cluster demand.
Multi-Servlet Mode enables a hive to run all its registered servlets
simultaneously, each on a different port. This requires a "mycelial" protocol
(like TCP) that supports multiple endpoints. Call establish_hive() to spawn all
servlets, then register with a cluster to advertise all available capabilities.
The mode is determined by the protocol's capabilities. On mycelial protocols, you typically want multi-servlet mode for maximum throughput.
The term "mycelial" refers to protocols that can spawn multiple endpoints from a
single base address--like how fungal mycelium branches from a central point. TCP
is mycelial because a single host address (e.g., 192.168.1.10) can bind multiple
ports. This allows a hive to spawn servlets on ports 8001, 8002, 8003, etc., each
handling different message types.
Non-mycelial protocols (like in-memory channels) are limited to single-servlet mode.
hive! MacroDefine a hive type with its available servlets:
hive! {
pub MyHive,
protocol: TokioListener,
servlets: {
ping: PingServlet<PingRequest>,
calc: CalculatorServlet<CalcRequest>
}
}
Add security policies to gate incoming messages:
hive! {
pub SecureHive,
protocol: TokioListener,
policies: {
with_collector_gate: [SignatureGate::new(verifying_key)]
},
servlets: {
ping: PingServlet<PingRequest>
}
}
A typical hive lifecycle with cluster integration:
// 1. Start the hive
let mut hive = MyHive::start(trace, Some(HiveConf::default())).await?;
// 2. Establish multi-servlet mode (spawns all servlets on separate ports)
hive.establish_hive().await?;
// 3. Register with cluster (announces available servlet types)
let response = hive.register_with_cluster(cluster_addr).await?;
// 4. Hive now receives work routed by the cluster
// 5. Clean shutdown
hive.stop();
For hives to accept commands from a cluster (heartbeats, management requests),
they must trust the cluster's certificate. Configure this via HiveConf.trust_store:
let hive_conf = HiveConf {
trust_store: Some(Arc::new(cluster_trust_store)),
..Default::default()
};
Without a trust store, all cluster commands are rejected. See Trust Stores for building trust stores from cluster certificates.
Hives include built-in resilience mechanisms:
Backpressure: When utilization exceeds the threshold (default: 90%), the hive signals to the cluster that it's overloaded. The cluster can then route new work to less-loaded hives.
Circuit Breaker: After consecutive failures (default: 3), the circuit opens and the hive temporarily stops accepting work, allowing time for recovery before resuming.
These are configured via HiveConf:
let hive_conf = HiveConf {
backpressure_threshold: BasisPoints::new(8000), // 80%
circuit_breaker_threshold: 5, // Open after 5 failures
circuit_breaker_cooldown_ms: 60_000, // 1 minute cooldown
..Default::default()
};
When a hive manages multiple servlet instances of the same type (for scaling), it
uses a LoadBalancer to select which instance handles each request. The default
LeastLoaded strategy routes to the instance with lowest utilization.
The MessageRouter determines which servlet type handles a given message. The
default TypeBasedRouter uses the message's type information for routing.
For secure communication, configure TLS on the hive:
let tls_config = Arc::new(HiveTlsConfig {
certificate: CertificateSpec::Built(Box::new(cert)),
key: Arc::new(Secp256k1KeyProvider::from(signing_key)),
validators: vec![], // Optional: validate client certificates
});
let hive_conf = HiveConf {
hive_tls: Some(tls_config),
..Default::default()
};
pub struct HiveConf<L: LoadBalancer = LeastLoaded, R: MessageRouter = TypeBasedRouter> {
pub load_balancer: L,
pub router: R,
pub default_scale: ServletScaleConf,
pub servlet_overrides: HashMap<Vec<u8>, ServletScaleConf>,
pub cooldown: Duration, // Default: 5s
pub queue_capacity: u32, // Default: 100
pub backpressure_threshold: BasisPoints, // Default: 9000 (90%)
pub circuit_breaker_threshold: u8, // Default: 3
pub circuit_breaker_cooldown_ms: u64, // Default: 30_000
pub servlet_pool_size: usize, // Default: 8
pub servlet_pool_idle_timeout: Option<Duration>,// Default: 30s
pub drain_timeout: Duration, // Default: 30s
pub trust_store: Option<Arc<dyn CertificateTrust>>,
pub hive_tls: Option<Arc<HiveTlsConfig>>,
}
Hives are typically tested in the context of a cluster environment. See the
Cluster Testing section above for examples using environment Cluster,
which demonstrates how to configure hives with trust stores and verify cluster-hive
communication.
How you wish to model your colonies is beyond the scope of this document. However, it is important to understand the basic building blocks and how they can be combined to create complex systems. The swarm is yours to command.
The tightbeam testing framework provides three progressive verification layers for rigorous behavioral testing of protocol implementations.
The tightbeam testing framework is built on two foundational concepts from formal methods and statistical testing theory:
CSP is a formal language for describing patterns of interaction in concurrent systems, developed by Tony Hoare.12 In tightbeam, CSP provides the mathematical foundation for modeling protocol behavior as labeled transition systems (LTS). Each process specification defines:
CSP enables us to express protocol correctness as refinement relations:
Implementation ⊑ Specification, where ⊑ denotes trace refinement (⊑T) or
failures refinement (⊑F).
FDR is a model checking methodology that verifies CSP refinement relations through exhaustive exploration.3 The framework checks three key properties:
In tightbeam, FDR-style verification uses multi-seed exploration to account for scheduler nondeterminism in cooperatively scheduled systems. This approach, based on research by Pedersen & Chalmers,4 recognizes that refinement verification in systems with cooperative scheduling depends on resource availability and execution interleaving.
The three-layer architecture progressively applies these concepts:
This progressive approach allows developers to start with simple assertions and incrementally add formal verification as protocol complexity grows.
Tightbeam implements formal verification through three complementary layers, each building upon the previous:
| Layer | Feature Flag | Purpose | Specification | Usage |
|---|---|---|---|---|
| L1 AssertSpec | testing |
Runtime assertion verification | tb_assert_spec! |
Required: .with_spec() or .with_specs() |
| L2 ProcessSpec | testing-csp |
CSP state machine modeling | tb_process_spec! |
Optional: .with_csp() |
| L3 Refinement | testing-fdr |
Trace/failures refinement | Inline config | Optional: .with_fdr() |
Layer 1 (Assertions): Verifies that expected events occur with correct cardinality. This provides basic behavioral correctness through declarative assertion specifications.
Layer 2 (CSP Process Models): Adds formal state machine modeling using Communicating Sequential Processes (CSP) theory. Validates that execution traces follow valid state transitions and distinguishes between observable (external) and hidden (internal) events.
Layer 3 (FDR Refinement): Enables multi-seed exploration for exhaustive verification of trace refinement, failures refinement, and divergence freedom. Based on FDR (Failures-Divergences Refinement) model checking methodology.
All three layers are accessed through the tb_scenario! macro, which provides:
The testing framework uses progressive feature flags:
testing: Enables L1 assertion verification (foundation)testing-csp: Enables L1+L2 CSP process modelingtesting-fdr: Enables L1+L2+L3 refinement checking (requires testing-csp)testing-timing: Enables timing verification (WCET, deadline, jitter, slack) - requires testingtesting-schedulability: Enables schedulability analysis (RMA/EDF) - requires testing-timingEach layer builds on the previous, ensuring consistent semantics across verification levels.
AssertSpec defines expected behavioral invariants through declarative assertion specifications. Each specification version declares:
Specifications are versioned using semantic versioning (major.minor.patch) and produce deterministic SHA3-256 hashes over their canonical representation.
tb_assert_spec! {
pub MySpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
tag_filter: ["v1"],
assertions: [
("Received", exactly!(1)),
("Responded", exactly!(1), equals!("ok"))
]
},
V(1,1,0): {
mode: Accept,
gate: Accepted,
tag_filter: ["v1.1"],
assertions: [
("Received", exactly!(1)),
("Responded", exactly!(2))
]
},
}
Version Block Syntax:
V(major, minor, patch): {
mode: <ExecutionMode>, // Accept or Reject
gate: <TransitStatus>, // Accepted, Rejected, etc.
tag_filter: ["tag1", "tag2"], // Optional: filter assertions by tags
assertions: [ // Array of (label, cardinality) or (label, cardinality, equals!(value))
("label", cardinality),
("label", cardinality, equals!(value)),
...
],
events: [Kind, ...] // Optional: when instrumentation enabled
schedulability: { // Optional: when testing-schedulability enabled
task_set: <TaskSet>,
scheduler: RateMonotonic | EarliestDeadlineFirst,
must_be_schedulable: <bool>
}
}
Deterministic Hashing: Each version produces a 32-byte SHA3-256 hash over:
"TBSP" (TightBeam Spec Protocol)testing-schedulability enabled)Basic Specification:
tb_assert_spec! {
pub DemoSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
tag_filter: ["v1"],
assertions: [
("A", exactly!(1)),
("R", exactly!(1))
]
},
V(1,1,0): {
mode: Accept,
gate: Accepted,
tag_filter: ["v1.1"],
assertions: [
("A", exactly!(1)),
("R", exactly!(2))
]
},
}
Each tb_assert_spec! generates a type with the following methods:
impl MySpec {
// Retrieve all defined versions
pub fn all() -> &'static [AssertionSpec];
// Lookup specific version
pub fn get(major: u16, minor: u16, patch: u16) -> Option<&'static AssertionSpec>;
// Get highest semantic version
pub fn latest() -> &'static AssertionSpec;
}
The framework provides cardinality constraint macros:
exactly!(n): Exactly n occurrencesat_least!(n): Minimum n occurrencesat_most!(n): Maximum n occurrencesbetween!(min, max): Range [min, max] occurrencespresent!(): At least one occurrenceabsent!(): Zero occurrencesThe framework provides value assertion helpers for verifying assertion payload values:
equals!(value): Verify assertion value equalitySupported Types:
String, &str, bool, u8, u32, u64, i32, i64, f64equals!(3_600), equals!(42u32)MessagePriority, Version (e.g., equals!(MessagePriority::High), equals!(Version::V2))equals!(Some(value)), equals!(None)equals!(IsSome) (matches any Some(_)), equals!(IsNone) (matches None)Examples:
assertions: [
("priority", exactly!(1), equals!(MessagePriority::High)),
("lifetime", exactly!(1), equals!(3_600)),
("version", exactly!(1), equals!(Version::V2)),
("confidentiality", exactly!(1), equals!(IsSome)),
("optional_field", exactly!(1), equals!(IsNone))
]
Assertions can be tagged with arbitrary string labels for flexible categorization and filtering. Tags enable version-scoped testing where a single scenario can validate multiple protocol versions.
TraceCollector exposes two entry points:
trace.event("label") records a label-only event (no tags/value) and advances the CSP oracle.trace.event_with("label", &["tag"], value) records the label with tags plus an optional value (anything implementing Into<AssertionValue>, e.g. bool, u64, Version, etc.).:trace.event("relay_start")?;
trace.event_with("response_ok", &["tag_a"], true)?;
How Tags Work:
trace.event_with("label", &["tag1", "tag2"], ())tag_filter: ["tag1"] - only assertions with matching tags are validatedExample: Version-Scoped Testing:
tb_assert_spec! {
pub VersionSpec,
V(0,0,0): {
mode: Accept,
gate: Accepted,
tag_filter: ["v0"],
assertions: [
("feature", exactly!(1), equals!(IsNone)),
]
},
V(1,0,0): {
mode: Accept,
gate: Accepted,
tag_filter: ["v1"],
assertions: [
("feature", exactly!(1), equals!(IsNone)),
("v1_specific", exactly!(1))
]
}
}
tb_scenario! {
name: test_all_versions,
config: ScenarioConf::builder()
.with_specs(vec![VersionSpec::get(0, 0, 0), VersionSpec::get(1, 0, 0)])
.build(),
environment Bare {
exec: |trace| {
// Single assertion satisfies both version specs via tags
trace.event_with("feature", &["v0", "v1"], Presence::of_option(&some_option))?;
trace.event_with("v2_specific", &["v1"], ())?;
Ok(())
}
}
}
All such events are emitted via the instrumentation subsystem described in §11. Layer 1–3 verification operates over this event stream as the authoritative trace for a single execution.
Assertion specs support schedulability verification via the schedulability: { }
block when testing-schedulability is enabled:
schedulability: {
task_set: my_task_set,
scheduler: RateMonotonic | EarliestDeadlineFirst,
must_be_schedulable: true
}
Supported schedulers:
Additional features include percentile-based WCET analysis (P50-P99.99), confidence intervals, and fixed-point arithmetic for deterministic calculations. See §10.3.5 for timing constraints in process specifications
ProcessSpec defines labeled transition systems (LTS) for formal process modeling using Communicating Sequential Processes (CSP) theory. A process specification declares:
Enabled with testing-csp feature flag.
tb_process_spec! {
pub ProcessName,
events {
observable { "event1", "event2", ... } // External alphabet (Σ)
hidden { "internal1", "internal2", ... } // Internal alphabet (τ)
}
states {
S0 => { "event1" => S1 } // State transitions
S1 => { "event2" => S2, "event3" => S3 } // Nondeterministic branching
S2 => { "event4" [ guard!(clock1 < 10ms) ] => S3 } // Timed transition with guard
S3 => { "event5" [ guard!(clock2 >= 5ms), reset: ["clock1"] ] => S4 } // Guard with clock reset
S4 => {} // Terminal state
}
terminal { S4 } // Valid end states
choice { S1 } // Nondeterministic states
clocks: { "clock1", "clock2" } // Optional: when testing-timing enabled
timing { // Optional: when testing-timing enabled
wcet: { "event1" => wcet!(10ms) },
jitter: { "event2" => jitter!(5ms) },
deadline: { "start" => "end", deadline!(duration: 100ms) },
slack: { "start" => "end", slack!(min: 5ms) }
}
schedulability { // Optional: when testing-schedulability enabled
scheduler: RateMonotonic, // or EarliestDeadlineFirst
periods: {
"event1" => 50ms,
"event2" => 100ms
}
}
annotations { description: "..." } // Optional metadata
}
When CSP is configured via .with_csp() in tb_scenario!:
use tightbeam::testing::*;
tb_process_spec! {
pub SimpleProcess,
events {
observable { "Received", "Responded" }
hidden { "internal_processing" }
}
states {
Idle => { "Received" => Processing }
Processing => { "internal_processing" => Processing, "Responded" => Idle }
}
terminal { Idle }
choice { Processing }
annotations { description: "Simple request-response with internal processing" }
}
When testing-timing is enabled, process specifications support timing
constraints and timed automata semantics.
Timing Constraints:
Process specs support four types of timing constraints via the timing: { } block:
wcet: { "event" => wcet!(10ms) } - Maximum allowed execution time per eventdeadline: { "start" => "end", deadline!(duration: 100ms) } - Maximum latency between start and end eventsjitter: { "event" => jitter!(5ms) } - Maximum timing variation for an eventdeadline!(duration: 100ms, slack: 5ms) - Minimum safety marginTimed CSP Semantics:
Process specs support timed automata semantics with clock variables and timing guards:
clocks: { "clock1", "clock2" } - Named clocks that advance during execution"event" [ guard!(clock1 < 10ms) ] => State - Transitions enabled only when guard conditions are satisfied"event" [ guard!(clock2 >= 5ms), reset: ["clock1"] ] => State - Reset clocks when transition is takenGuard expressions support: <, <=, >, >=, ==, and ranges (5ms <= x <= 10ms).
Early Pruning and FDR Integration:
Timing violations automatically prune traces during FDR exploration:
When testing-schedulability is also enabled, timing constraints and task
periods are combined into task sets for Rate Monotonic or EDF analysis.
See §10.2.9 for schedulability configuration in assertion specs.
In addition to individual ProcessSpec models, tightbeam supports composed
processes via the CompositionSpec trait and the tb_compose_spec! macro.
Compositions allow you to build larger CSP models from smaller ones using
standard parallel composition operators:
P || Q)P ||| Q)P [| A |] Q)P [| αP | αQ |] Q)The tb_compose_spec! macro generates a type that implements CompositionSpec
and, via a blanket impl, ProcessSpec, so it can be used anywhere a process
spec is expected (including with .with_csp() in tb_scenario!).
Example: Interleaved request/response and retry flows
use tightbeam::testing::*;
// Two simple processes
tb_process_spec! {
pub RequestFlow,
events { observable { "request", "response" } }
states {
Idle => { "request" => Waiting },
Waiting => { "response" => Idle }
}
terminal { Idle }
}
tb_process_spec! {
pub RetryFlow,
events { observable { "retry" } }
states {
RetryIdle => { "retry" => RetryIdle }
}
terminal { RetryIdle }
}
// Compose them with interleaved parallelism
tb_compose_spec! {
pub RequestWithRetry,
processes: {
RequestFlow,
RetryFlow
},
composition: Interleaved,
properties: {
deadlock_free: true,
livelock_free: true,
deterministic: false
}
}
// Use the composed process in a scenario
tb_scenario! {
name: test_request_with_retry,
config: ScenarioConf::builder()
.with_spec(ClientServerSpec::latest())
.with_csp(RequestWithRetry)
.build(),
environment Bare {
exec: |trace| {
trace.event("request")?;
trace.event("retry")?;
trace.event("response")?;
Ok(())
}
}
}
Composition properties (deadlock_free, livelock_free, deterministic) are
checked by the composition verification layer (§10.4, §10.5) and provide an
early sanity check before enabling full FDR refinement.
Refinement checking provides multi-seed exploration for trace and failures
refinement verification. Formal definitions of traces, failures, and divergences
are given in §10.1.1 and §10.5; this section focuses on configuration and
verdict structure. Based on the Failures-Divergences Refinement (FDR)
methodology from CSP theory. Enabled with testing-fdr feature flag.
Verification Properties:
Requirements: Layer 3 requires testing-fdr feature flag. Refinement
checking requires the specs field in FdrConfig to be populated with
specification processes.
fdr: FdrConfig {
seeds: 64, // Number of exploration seeds
max_depth: 128, // Maximum trace depth
max_internal_run: 32, // Divergence detection threshold
timeout_ms: 5000, // Per-seed timeout
specs: vec![], // Processes for refinement checking (empty = exploration mode)
fail_fast: true, // Stop on first violation (default: true)
expect_failure: false, // Expect refinement to fail (default: false)
// Optional scheduler/resource modeling (feature `testing-fault`)
scheduler_count: None, // Number of schedulers (m)
process_count: None, // Number of concurrent processes (n)
scheduler_model: None, // Cooperative / Preemptive model, when enabled
// Optional fault/FMEA configuration (features `testing-fault`, `testing-fmea`)
fault_model: None,
fmea_config: None,
}
Configuration Parameters:
seeds: Number of different scheduler strategies to exploremax_depth: Maximum length of observable tracemax_internal_run: Consecutive hidden events before divergence detectiontimeout_ms: Timeout for each seed explorationspecs: Specification processes for refinement checking (empty vector = exploration mode)fail_fast: Stop on first refinement violation (default: true)expect_failure: Expect refinement to fail for negative tests (default: false)scheduler_count / process_count (feature testing-fault): Optional
resource-modeling parameters where scheduler_count ≤ process_count; when
set, refinement explores traces under constrained scheduler availability
(§10.5.4).scheduler_model (feature testing-fault): Chooses between cooperative and
preemptive scheduler models for refinement.fault_model (feature testing-fault): Enables CSP state-driven fault
injection during FDR exploration (e.g., link drops, node failures).fmea_config (feature testing-fmea): Configures Failure Modes and Effects
Analysis integrated with refinement runs.Operational Modes:
Simple Example:
// Define a simple two-state process
tb_process_spec! {
pub SimpleProcess,
events {
observable { "start", "finish" }
hidden { }
}
states {
Idle => { "start" => Working },
Working => { "finish" => Idle }
}
terminal { Idle }
}
// Define assertion spec
tb_assert_spec! {
pub SimpleSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
assertions: [
("start", exactly!(1)),
("finish", exactly!(1))
]
},
}
// Test with refinement checking
tb_scenario! {
name: test_simple_refinement,
config: ScenarioConf::builder()
.with_spec(SimpleSpec::latest())
.with_fdr(FdrConfig {
seeds: 4,
max_depth: 10,
max_internal_run: 8,
timeout_ms: 500,
specs: vec![SimpleProcess::process()],
fail_fast: true,
expect_failure: false,
})
.build(),
environment Bare {
exec: |trace| {
trace.event("start")?;
trace.event("finish")?;
Ok(())
}
}
}
The seeds parameter controls how many different execution paths are explored
during verification. Each seed produces a different scheduling of concurrent
operations, uncovering race conditions and nondeterministic behavior:
fdr: FdrConfig {
seeds: 64, // Try 64 different execution orderings
// ...
}
Each seed explores different interleaving at nondeterministic choice points, verifying trace refinement, failures refinement, and divergence freedom across all executions.
After multi-seed exploration, tightbeam produces a verdict:
pub struct FdrVerdict {
// Overall status
pub passed: bool,
// Single-process properties
pub divergence_free: bool,
pub deadlock_free: bool,
pub is_deterministic: bool,
// Refinement properties (only when specs provided)
pub trace_refines: bool,
pub failures_refines: bool,
pub divergence_refines: bool,
// Witnesses to violations
pub trace_refinement_witness: Option<Trace>,
pub failures_refinement_witness: Option<Failure>,
pub divergence_refinement_witness: Option<Trace>,
pub determinism_witness: Option<(u64, Trace, Event)>,
pub divergence_witness: Option<(u64, Vec<Event>)>,
pub deadlock_witness: Option<(u64, Trace, State)>,
// Statistics
pub traces_explored: usize,
pub states_visited: usize,
pub seeds_completed: u32,
pub failing_seed: Option<u64>,
}
Verdict Fields:
Note: Refinement properties (trace_refines, failures_refines, divergence_refines) are only meaningful when specs are provided in FdrConfig.
| CSP Model | Tightbeam Layer | Verification Property | Refinement Check |
|---|---|---|---|
| Traces (T) | L1 AssertSpec | Observable event sequences | traces(Impl) ⊆ traces(Spec) |
| Stable Failures (F) | L2 ProcessSpec | Valid refusals at choice points | failures(Impl) ⊆ failures(Spec) |
| Failures-Divergences (FD) | L3 FDR | Livelock freedom (no τ-loops) | divergences(Impl) = ∅ |
Traces Model: Verifies that all observable event sequences produced by the implementation are allowed by the specification. This ensures basic behavioral correctness—the system never produces an unexpected sequence of external events.
Stable Failures Model: Extends trace verification by checking what events a process can refuse after each trace. A stable state is one where no internal progress (τ-transitions) can occur. At choice points, the implementation must not refuse events the specification accepts, preventing incorrect nondeterminism.
Failures-Divergences Model: Adds divergence detection to identify processes
that can make infinite internal progress without external interaction. A divergence
is a τ-loop where the process never becomes stable. The max_internal_run
parameter bounds consecutive hidden events to detect such livelocks.
CSP distinguishes between observable events (external alphabet Σ) and hidden events (internal actions τ). This distinction is fundamental to process refinement:
tb_process_spec! {
pub ClientServerProcess,
events {
// Observable alphabet (Σ): externally visible protocol events
observable { "connect", "request", "response", "disconnect" }
// Hidden alphabet (τ): internal implementation details
hidden { "serialize", "encrypt", "decrypt", "deserialize" }
}
// ...
}
Observable events represent the process's contract with its environment. These form the basis of trace refinement—implementations and specifications must agree on observable behavior.
Hidden events model internal implementation details. They enable refinement
checking where implementations contain details absent from abstract specifications.
Hidden events are projected away when comparing traces: trace \ {τ}.
The instrumentation taxonomy (§11.2) maps tightbeam events to categories:
gate_accept, gate_reject, request_recv, response_send, assert_labelhandler_enter, handler_exit, crypto_step, compress_step, route_step, policy_eval, process_hiddenCSP provides two choice operators:
At choice points, a process has an acceptance set (events it can engage) and refusal set (events it cannot engage in stable state). Failures refinement ensures implementations don't introduce invalid refusals:
states {
// External choice: environment determines next event
Connected => { "request" => Processing, "disconnect" => Idle }
// Internal choice: process may non-deterministically choose path
Processing => { "response" => Responded, "error" => ErrorState }
}
choice { Processing } // Annotate nondeterministic states
The choice annotation declares states where internal nondeterminism may occur.
FDR exploration uses different seeds to explore all possible nondeterministic
branches, ensuring the specification covers all implementation behaviors.
Based on research by Pedersen & Chalmers,4 refinement in cooperatively
scheduled systems depends on resource availability. With n concurrent processes
and m schedulers where m < n, some traces become impossible due to scheduling
constraints.
Tightbeam addresses this through multi-seed exploration: Each seed represents a different scheduling strategy, exploring alternative interleaving of concurrent events. This is analogous to testing with different numbers of schedulers to verify behavior across resource constraints:
fdr: FdrConfig {
seeds: 64, // Explore 64 different scheduling
max_depth: 128, // Bound trace length
max_internal_run: 32, // Divergence detection threshold
timeout_ms: 5000, // Per-seed timeout
}
At nondeterministic choice points, the seed determines which branch to explore. Across all seeds, the framework verifies that:
Tightbeam can export process specifications as CSPM (CSP Machine-readable) format for verification with external tools like FDR4:3
use tightbeam::testing::fdr::CspmExporter;
let process = ClientServerProcess::process();
let exporter = CspmExporter::new(&process);
let mut file = std::fs::File::create("target/tb_csp/client_server.csp")?;
exporter.export(&mut file)?;
Generated CSPM includes:
Process = InitialState \ {| hidden |}This enables:
The FdrTraceExt trait extends ConsumedTrace with CSP-specific analysis:
use tightbeam::testing::fdr::FdrTraceExt;
hooks {
on_pass: |trace, result| {
// Refinement properties
if let Some(ref fdr_verdict) = result.fdr_verdict {
assert!(fdr_verdict.trace_refines);
assert!(fdr_verdict.failures_refines);
assert!(fdr_verdict.divergence_free);
assert!(fdr_verdict.is_deterministic);
}
Ok(())
}
}
Trace Analysis in Hooks: Query process behavior and event sequences:
use tightbeam::testing::fdr::FdrTraceExt;
hooks {
on_pass: |context| {
// Acceptance queries: Check what events are accepted at specific states
if let Some(acceptance) = context.trace.acceptance_at("Connected") {
// At Connected state, process accepts "serialize"
assert!(acceptance.iter().any(|e| e.0 == "serialize"));
}
// Refusal queries: Verify process can refuse events not in acceptance set
// At Connected, process must do "serialize" before "request"
assert!(context.trace.can_refuse_after("Connected", "request"));
assert!(context.trace.can_refuse_after("Connected", "disconnect"));
Ok(())
}
}
These queries enable CSP-style reasoning about process behavior at specific states, validating that the implementation matches the formal specification.
Fault injection enables systematic error testing through CSP state-driven fault
injection during refinement checking. Requires testing-fault feature flag.
use tightbeam::testing::{FaultModel, InjectionStrategy};
use tightbeam::utils::BasisPoints;
let fault_model = FaultModel::from(InjectionStrategy::Deterministic)
.with_fault(
States::Sending, // Type-safe state enum
Event("response"), // Event label
|| NetworkTimeoutError {...}, // Error factory
BasisPoints::new(3000), // 30% probability
)
.with_seed(0xDEADBEEF); // Reproducibility
Deterministic (Counter-Based):
InjectionStrategy::Deterministic
Random (Seeded RNG):
InjectionStrategy::Random
Generate type-safe enums via tb_gen_process_types!:
tb_gen_process_types!(FaultTolerantProcess, Idle, Sending, Retrying, Success, Fallback);
// Generates:
// - fault_tolerant_process::States enum (implements ProcessState)
// - fault_tolerant_process::Event struct (implements ProcessEvent)
Manual implementation:
pub trait ProcessState: Copy + Debug {
fn process_name(&self) -> &'static str;
fn state_name(&self) -> &'static str;
fn full_key(&self) -> Cow<'static, str>;
}
pub trait ProcessEvent: Copy + Debug {
fn event_label(&self) -> &'static str;
}
fdr: FdrConfig {
seeds: 64,
fault_model: Some(fault_model),
specs: vec![MyProcess::process()],
..Default::default()
}
Faults are injected during CSP exploration before state transitions. Injected
faults are recorded in FdrVerdict::faults_injected with full traceability
(state, event, error message, probability).
Example: See tightbeam/tests/fault/basic.rs for a full fault injection
demonstration.
The tb_scenario! macro is the unified entry point for all testing layers,
executing AssertSpec verifications under selectable environments with optional
CSP and FDR verification.
Design Principles:
tb_scenario! {
name: test_function_name, // OPTIONAL: creates standalone #[test] function NOTE: Do NOT use with `fuzz: afl`
config: ScenarioConf::builder() // REQUIRED: Unified configuration
.with_spec(AssertSpecType::latest()) // Layer 1 assertion spec
.with_csp(ProcessSpecType) // OPTIONAL: Layer 2 CSP model (requires testing-csp)
.with_fdr(FdrConfig { ... }) // OPTIONAL: Layer 3 refinement (requires testing-fdr + csp)
.with_trace(TraceConfig::builder() // OPTIONAL: unified trace config (§11)
.with_instrumentation(TbInstrumentationConfig { ... })
.with_logger(LoggerConfig::new(...))
.build())
.with_hooks(TestHooks { ... }) // OPTIONAL: on_pass/on_fail callbacks
.build(),
fuzz: afl, // OPTIONAL: AFL fuzzing mode (requires testing-csp)
environment <Variant> { ... }, // REQUIRED: execution environment (Bare, Worker, ServiceClient, Servlet)
}
See sections 10.3.4 and 10.4 for detailed environment examples.
Bare Environment Example: Pure logic/function invocation
use tightbeam::testing::*;
tb_assert_spec! {
pub BareSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
assertions: [
("Received", exactly!(1)),
("Responded", exactly!(1))
]
},
}
tb_process_spec! {
pub BareProcess,
events {
observable { "Received", "Responded" }
}
states {
Idle => { "Received" => Processing }
Processing => { "Responded" => Idle }
}
terminal { Idle }
}
tb_scenario! {
name: test_bare_environment,
config: ScenarioConf::builder()
.with_spec(BareSpec::latest())
.with_csp(BareProcess)
.build(),
environment Bare {
exec: |trace| {
trace.event("Received")?;
trace.event("Responded")?;
Ok(())
}
}
}
Full Example: All Three Layers with ServiceClient Environment
This example demonstrates progressive verification from L1 through L3:
#![cfg(all(feature = "testing-fdr", feature = "tcp", feature = "tokio"))]
use tightbeam::testing::*;
use tightbeam::trace::TraceCollector;
use tightbeam::transport::tcp::r#async::TokioListener;
use tightbeam::transport::Protocol;
// Layer 1: Assert spec - defines expected assertions and cardinalities
tb_assert_spec! {
pub ClientServerSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
assertions: [
("connect", exactly!(1)),
("request", exactly!(1)),
("response", exactly!(2)),
("disconnect", exactly!(1)),
("message_content", exactly!(1), equals!("test"))
]
},
}
// Layer 2: CSP process spec - models state machine with internal events
tb_process_spec! {
pub ClientServerProcess,
events {
observable { "connect", "request", "response", "disconnect" }
hidden { "serialize", "encrypt", "decrypt", "deserialize" }
}
states {
Idle => { "connect" => Connected }
Connected => { "request" => Processing, "serialize" => Serializing }
Serializing => { "encrypt" => Encrypting }
Encrypting => { "request" => Processing }
Processing => { "decrypt" => Decrypting, "response" => Responded }
Decrypting => { "deserialize" => Processing }
Responded => { "disconnect" => Idle }
}
terminal { Idle }
choice { Connected, Processing }
annotations { description: "Client-server with crypto and nondeterminism" }
}
tb_scenario! {
name: test_client_server_all_layers,
config: ScenarioConf::builder()
.with_spec(ClientServerSpec::latest())
.with_csp(ClientServerProcess)
.with_fdr(FdrConfig {
seeds: 64,
max_depth: 128,
max_internal_run: 32,
timeout_ms: 5000,
specs: vec![ClientServerProcess::process()],
fail_fast: true,
expect_failure: false,
})
.with_hooks(TestHooks {
on_pass: Some(Arc::new(|_trace, _result| {
// Optional: custom logic on test pass
Ok(())
})),
on_fail: Some(Arc::new(|_trace, _result, _violation| {
// Optional: custom logic on test fail
Err("Test failed".into())
})),
})
.build(),
environment ServiceClient {
worker_threads: 2,
server: |trace| async move {
let bind_addr = "127.0.0.1:0".parse().unwrap();
let (listener, addr) = <TokioListener as Protocol>::bind(bind_addr).await?;
let handle = server! {
protocol TokioListener: listener,
assertions: trace.share(),
handle: |frame, trace| async move {
trace.event("connect")?;
trace.event("request")?;
trace.event("response")?;
Some(frame)
}
};
Ok((handle, addr))
},
client: |trace, mut client| async move {
trace.event("response")?;
let frame = compose! {
V0: id: "test",
order: 1u64,
message: TestMessage { content: "test".to_string() }
}?;
let response = client.emit(frame, None).await?;
// Decode response and emit value assertion
if let Some(resp_frame) = response {
let decoded: TestMessage = crate::decode(&resp_frame.message)?;
trace.event_with("message_content", &[], decoded.content)?;
}
trace.event("disconnect")?;
Ok(())
}
}
}
This test verifies:
Hooks provide optional callbacks that can observe and override test outcomes:
.with_hooks(TestHooks { on_pass: Some(...), on_fail: Some(...) })
in the ScenarioConf builder.Arc, of type
Arc<dyn Fn(&HookContext) -> Result<(), TightBeamError> + Send + Sync> for on_pass
and Arc<dyn Fn(&HookContext, &SpecViolation) -> Result<(), TightBeamError> + Send + Sync> for on_fail.Ok(()) means the hook accepts the outcome and the test passes.Err(e) means the hook rejects the outcome and the test failsHookContext containing the consumed trace, FDR verdict
(if enabled), process spec, timing constraints, and assertion spec, allowing
inspection of all verification results.TightBeam integrates AFL.rs, a Rust port of American Fuzzy Lop, for coverage-guided fuzzing of protocol implementations. Unlike deterministic random testing, AFL uses evolutionary algorithms with compile-time instrumentation to discover inputs that trigger new code paths.
How AFL Works:
Integration with tb_scenario!: The fuzz: afl parameter generates
AFL-compatible fuzz targets that leverage the oracle for guided exploration:
tb_scenario! {
fuzz: afl, // ← AFL fuzzing mode
config: ScenarioConf::builder()
.with_spec(MySpec::latest())
.with_csp(MyProcess) // ← oracle for valid state navigation
.build(),
environment Bare {
exec: |trace| {
// AFL provides random bytes, oracle navigates state machine
match trace.oracle().fuzz_from_bytes() {
Ok(()) => {
for event in trace.oracle().trace() {
trace.event(event.0)?;
}
Ok(())
}
Err(_) => Err(TestingError::FuzzInputExhausted.into())
}
}
}
}
Feature Requirements:
testing-csp feature flag (required for CSP oracle)cargo-afl installed: cargo install cargo-aflstd feature flag (required for most fuzz targets)Example Fuzz Target:
//! Simple 3-state workflow fuzz target for AFL
#![cfg(all(feature = "std", feature = "testing-csp"))]
use tightbeam::testing::error::TestingError;
use tightbeam::{at_least, exactly, tb_assert_spec, tb_process_spec, tb_scenario};
// Layer 1: Assertion spec
tb_assert_spec! {
pub SimpleFuzzSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
assertions: [
("start", exactly!(1)),
("action_a", at_least!(0)),
("action_b", at_least!(0)),
("done", exactly!(1))
]
},
}
// Layer 2: CSP process with nondeterministic choices
tb_process_spec! {
pub SimpleFuzzProc,
events {
observable { "start", "action_a", "action_b", "done" }
hidden { }
}
states {
S0 => { "start" => S1 },
S1 => { "action_a" => S1, "action_b" => S1, "done" => S2 }
}
terminal { S2 }
}
// AFL fuzz target - compiled with `cargo afl build`
// Note: AFL fuzz targets generate `fn main()` - do NOT include `name:` parameter
tb_scenario! {
fuzz: afl,
config: ScenarioConf::builder()
.with_spec(SimpleFuzzSpec::latest())
.with_csp(SimpleFuzzProc)
.build(),
environment Bare {
exec: |trace| {
// AFL provides bytes, oracle interprets as state machine choices
match trace.oracle().fuzz_from_bytes() {
Ok(()) => {
for event in trace.oracle().trace() {
trace.event(event.0)?;
}
Ok(())
}
Err(_) => Err(TestingError::FuzzInputExhausted.into())
}
}
}
}
Prerequisites:
cargo install cargo-afl
Run AFL Fuzzer:
# Build fuzz targets first
# Note: Some fuzz targets may require additional features
RUSTFLAGS="--cfg fuzzing" cargo afl build --test fuzzing --features "std,testing-csp,testing-fuzz"
# Create seed input directory
mkdir -p fuzz_in
echo "seed" > fuzz_in/seed.txt
# Run AFL fuzzer (find the actual binary name)
FUZZ_TARGET=$(ls target/debug/deps/fuzzing-* 2>/dev/null | grep -v '\.d$' | head -1)
cargo afl fuzz -i fuzz_in -o fuzz_out "$FUZZ_TARGET"
The CspOracle interprets AFL's random bytes as state machine navigation choices,
ensuring fuzz inputs trigger valid protocol behavior:
How It Works:
AFL Random Bytes CspOracle State Machine
───────────────── ───► ─────────────── ───► ─────────────────
[0x7A, 0x3F, ...] byte % events.len() S0 → S1 → S2 → ...
selects valid event (valid trace)
Benefits:
Example Trace (from crash analysis):
Input: [0x00, 0x01, 0x00, 0x02]
Trace: "start" → "action_a" → "action_b" → "done"
State: S0 → S1 → S1 → S2
Result: Crash at state S1 after "action_b"
TightBeam optionally integrates with AFL's IJON extension5 for state-aware fuzzing. IJON enables "input-to-state correspondence" - bridging the semantic gap between fuzzer input mutations and program state exploration.
IJON Core Concepts:
ijon_max(label, value) - fuzzer tries to maximize valueijon_set(label, value) - fuzzer discovers unique valuesijon_hashint(label, value) - track integer distributionsTightBeam's CSP-Based Approach:
TightBeam automatically derives IJON annotations from CSP process specifications, eliminating manual annotation while providing formal state coverage guarantees:
| Aspect | Standard IJON | TightBeam CSP Oracle |
|---|---|---|
| State Definition | Manual annotations of raw variables | Formal CSP process states (automatic) |
| Annotation Burden | Developer must identify & annotate | Derived from tb_process_spec! |
| Coverage Metric | Arbitrary program values | State + transition coverage (provable) |
| State Abstraction | Low-level (memory, counters, etc.) | High-level (protocol semantics) |
| Validation | None (annotations may be incorrect) | Trace validation (runtime checking) |
| Integration | Explicit IJON_MAX/IJON_SET calls |
Automatic when testing-fuzz-ijon enabled |
Automatic IJON Integration:
When built with --features testing-fuzz-ijon, tightbeam's tb_scenario! macro
automatically inserts IJON calls after each successful fuzz execution
Comparison with Pure AFL:
Without IJON, AFL relies solely on code coverage (edge hit counts). With tightbeam's oracle + IJON:
branch_A, branch_B, branch_C (syntax)State_Init → State_Processing → State_Done (semantics)Example: Magic Value Discovery:
Traditional IJON use case - finding magic values in parsers:
// Standard IJON annotation
if (input[0] == 0xDEADBEEF) {
IJON_MAX("magic_value", input[0]); // Manual annotation
enter_special_state();
}
TightBeam equivalent - no manual annotation needed:
tb_process_spec! {
pub ParserProcess,
events { observable { "magic_detected", "parse_continue" } }
states {
Init => { "magic_detected" => SpecialState, "parse_continue" => Parsing }
SpecialState => { /* ... */ }
}
// IJON automatically reports when SpecialState is reached
}
The following table summarizes capabilities available across the testing layers:
| Capability | testing |
testing-csp |
testing-fdr |
testing-fuzz |
|---|---|---|---|---|
| Basic Verification | ||||
| Single trace verification | ✓ | ✓ | ✓ | ✓ |
| Assertion cardinality checks | ✓ | ✓ | ✓ | ✓ |
| Crash/panic detection | ✓ | ✓ | ✓ | ✓ |
| CSP Modeling | ||||
| CSP process modeling | – | ✓ | ✓ | – |
| Compile-time label validation | – | ✓ | ✓ | – |
| Runtime trace validation | – | ✓ | ✓ | – |
| Terminal state verification | – | ✓ | ✓ | – |
| FDR Refinement | ||||
| Multi-seed exploration | – | – | ✓ | – |
| Trace refinement (⊑T) | – | – | ✓ | – |
| Failures refinement (⊑F) | – | – | ✓ | – |
| Divergence detection (τ-loops) | – | – | ✓ | – |
| Determinism checking | – | – | ✓ | – |
| Refusal set analysis | – | – | ✓ | – |
| Acceptance set queries | – | – | ✓ | – |
| CSPM export (FDR4) | – | – | ✓ | – |
| AFL Fuzzing | ||||
| Coverage-guided fuzzing | – | – | – | ✓ |
| Edge coverage tracking | – | – | – | ✓ |
| Input corpus evolution | – | – | – | ✓ |
| Timing Verification | ||||
| Timing constraints (WCET/Deadline/Jitter/Slack) | testing-timing |
testing-timing |
testing-timing |
– |
| Timed CSP (clocks, guards) | – | testing-timing |
testing-timing |
– |
| Schedulability analysis (RMA/EDF) | – | testing-schedulability |
testing-schedulability |
– |
| Early pruning (timing violations) | – | – | testing-fdr + testing-timing |
– |
| Combined Capabilities | ||||
| CSP oracle for fuzzing | – | – | – | csp + fuzz |
| IJON state annotations | – | – | – | csp + fuzz-ijon |
This section maps tightbeam's verification capabilities to common high-assurance standards and regulations. The framework provides native support for many certification requirements, though final certification evidence and process compliance remain the responsibility of the integrator.
Requirements: 100% MC/DC coverage, systematic fault injection, and complete traceability from requirements to test evidence.
tightbeam Support:
FaultModel
(§10.4.2), configured with with_fault() for specific state-event pairsBasisPoints (0-10000) for precise
injection ratesInjectedFaultRecord tracking in FdrVerdict::faults_injected provides
complete fault campaign traceabilityRequirements: Systematic fault injection with proof that all error paths are exercised and tested.
tightbeam Support:
FaultModel with InjectionStrategy::Deterministic ensures reproducible
fault campaigns (§10.4.2)FdrVerdict tracks error recovery success/failure counts via
error_recovery_successful and error_recovery_failed fieldsRequirements: Fault tree analysis with coverage of all single-event upsets (SEUs) and failure propagation paths.
tightbeam Support:
CompositionSpec (§10.3.6) enables hierarchical fault tree modeling via CSP parallel compositionRequirements: Formal verification methods with machine-checkable evidence and complete attack/failure tree coverage.
tightbeam Support:
FdrVerdict provides machine-readable witnesses to violations (trace/failure/divergence witnesses)Requirements: Enumerate all failure modes, inject each mode, observe effects, and calculate Risk Priority Numbers (RPN) based on Severity × Occurrence × Detection ratings.
tightbeam Support:
FmeaConfig with configurable severity scales (MilStd1629, Iso26262)
and RPN thresholds (default: 100)FmeaReport from FDR verdicts via fmea_config field, containing:
failure_modes: enumerated failure modes with severity/occurrence/detectiontotal_rpn: aggregate risk prioritycritical_failures: indices of failures exceeding RPN thresholdFaultModel::with_fault() allows precise failure mode specification with
error factories and injection probabilitiesFdrVerdict::faults_injected records all injected faults with CSP context
for traceabilityAutomatic FMEA Calculation:
tightbeam automatically calculates Severity, Occurrence, and Detection ratings from FDR exploration results using CSP-based criticality analysis:
Severity (calculated via CSP reachability analysis):
Occurrence (converted from BasisPoints injection probability):
probability_bps / 1000 (0-10000 → 1-10)probability_bps / 2500 (0-10000 → 1-4)Detection (calculated from error recovery statistics):
FdrVerdict::error_recovery_successful vs error_recovery_failed countsFMEA Report Structure:
pub struct FmeaReport {
pub failure_modes: Vec<FailureMode>,
pub severity_scale: SeverityScale,
pub total_rpn: u32,
pub critical_failures: Vec<usize>,
}
pub struct FailureMode {
pub component: String,
pub failure: String,
pub effects: Vec<String>,
pub severity: u8, // Auto-calculated from CSP reachability
pub occurrence: u16, // Auto-converted from BasisPoints
pub detection: u8, // Auto-calculated from recovery stats
pub rpn: u32, // severity × occurrence × detection
}
Example Configuration:
fdr: FdrConfig {
fault_model: Some(FaultModel::default()
.with_fault(
State::Active,
Event::Send,
|| TightBeamError::Unavailable,
BasisPoints::new(2500) // 25% occurrence
)
),
fmea_config: Some(FmeaConfig {
severity_scale: SeverityScale::MilStd1629,
rpn_critical_threshold: 100,
auto_generate: true,
}),
// ... other FDR config
}
The following table summarizes tightbeam's native support for high-assurance standards requirements:
| Standard | Level | Key Requirements | tightbeam Features | Feature Flags |
|---|---|---|---|---|
| DO-178C | DAL A | 100% MC/DC, fault injection, traceability | FaultModel, CSP specs, URN evidence |
testing-fdr, testing-fault |
| ISO 26262 | ASIL-D | Systematic fault injection, FMEA/FMECA | Auto-FMEA (ISO scale), fault campaigns | testing-fdr, testing-fmea |
| IEC 61508 | SIL 4 | Error path coverage, reproducibility | Deterministic injection, multi-seed FDR | testing-fdr, testing-fault |
| ECSS-E-HB-40A | – | SEU coverage, fault tree analysis | Per-transition injection, CSP composition | testing-fdr, testing-fault |
| Common Criteria | EAL7 | Formal methods, machine-checkable evidence | CSP refinement, URN artifacts, CSPM export | testing-fdr |
| MIL-STD-1629 | – | FMEA with RPN calculation | Auto-severity (1-10), auto-RPN | testing-fmea |
Legend:
testing featuretesting-fdr enables FDR refinement checking and multi-seed explorationtesting-fault enables FaultModel and deterministic fault injectiontesting-fmea enables automatic FMEA report generationinstrument enables URN-based evidence artifacts (independent of testing)This section normatively specifies the TightBeam instrumentation subsystem. Instrumentation produces a semantic event sequence consumed by verification logic. It is an observation facility, NOT an application logging API. Tests MUST NOT depend on instrumentation events imperatively; verification MUST treat the event stream as authoritative ground truth for one execution.
Feature Gating:
instrument.Each event MUST have one kind from a closed, feature‑gated set:
gate_accept, gate_reject, request_recv, response_sendassert_label, assert_payloadhandler_enter, handler_exit, crypto_step, compress_step, route_step, policy_evaltesting-csp): process_transition, process_hiddentesting-fdr): seed_start, seed_end, state_expand, state_prune, divergence_detect, refusal_snapshot, enabled_set_samplestart, end, warn, errorHidden/internal events MUST use the internal category.
Instrumentation events are also identified by URNs defined in
tightbeam::utils::urn::specs::TightbeamUrnSpec. The TightbeamUrnSpec format
urn:tightbeam:instrumentation:<resource_type>/<resource_id> provides stable
names for traces, events, seeds, and verdicts, and is used by the
instrumentation subsystem to label evidence artifacts.
Shorthand Event Matching: In tb_assert_spec! and tb_process_spec!, you
can use shorthand labels instead of full URNs. The shorthand "foo" matches
any event containing instrumentation:event/foo, such as
urn:tightbeam:instrumentation:event/foo or urn:custom:instrumentation:event/foo.
Conceptual fixed layout (names illustrative):
trace_id | seq | kind | label? | payload? | phase? | dur_ns? | flags | extras
Requirements:
trace_id MUST uniquely identify the execution instance.seq MUST start at 0 and increment by 1 for each emitted event.kind MUST be a valid taxonomy member.label MUST be present for assertion and labeled process events; otherwise absent.payload MAY be present only if the label is declared payload‑capable.phase SHOULD map to one of: Gate, Handler, Assertion, Response, Crypto, Compression, Routing, Policy, Process, Exploration.dur_ns MAY appear on exit or boundary events and MUST represent a monotonic duration in nanoseconds.flags MUST represent a bitset (e.g. ASSERT_FAIL, HIDDEN, DIVERGENCE, OVERFLOW).extras MAY supply fixed numeric slots and a bounded byte sketch for extended metrics (e.g. enabled set cardinality).Runtime values captured under assert_payload MUST be transformed before emission:
Warning: Secret or potentially sensitive raw data MUST NOT be emitted verbatim.
Instrumentation behavior MUST be controlled by a configuration object
(conceptual fields). Configuration existence itself is gated by instrument:
TbInstrumentationConfig {
enable_payloads: bool,
enable_internal_detail: bool,
sample_enabled_sets: bool,
sample_refusals: bool,
divergence_heuristics: bool,
max_events: u32,
record_durations: bool,
}
Defaults (instrument only):
enable_payloads = falseenable_internal_detail = falsesample_enabled_sets = falsesample_refusals = falsedivergence_heuristics = falserecord_durations = falsemax_events = 1024Layer Interaction (informative): Enabling testing layers does NOT alter these defaults; tests MAY explicitly override fields per scenario.
If max_events is exceeded, the implementation MUST set an OVERFLOW flag,
emit a single warn event, and drop subsequent events.
For every finalized trace an artifact MUST be producible in a canonical binary form (ASN.1 DER).
Canonical ASN.1 DER Schema (conceptual):
EvidenceArtifact ::= SEQUENCE {
specHash OCTET STRING, -- SHA3-256(spec definition)
traceId INTEGER, -- Unique per execution
seed INTEGER OPTIONAL, -- Exploration seed (testing-fdr only)
outcome ENUMERATED { acceptResponse(0), acceptNoResponse(1), reject(2), error(3) },
metrics SEQUENCE {
countEvents INTEGER,
durationNs INTEGER OPTIONAL,
overflow BOOLEAN OPTIONAL
},
events SEQUENCE OF Event
}
Event ::= SEQUENCE {
i INTEGER, -- sequence number
k ENUMERATED { start(0), end(1), warn(2), error(3), gate_accept(4), gate_reject(5), request_recv(6), response_send(7), assert_label(8), assert_payload(9), handler_enter(10), handler_exit(11), crypto_step(12), compress_step(13), route_step(14), policy_eval(15), process_transition(16), process_hidden(17), seed_start(18), seed_end(19), state_expand(20), state_prune(21), divergence_detect(22), refusal_snapshot(23), enabled_set_sample(24) },
l UTF8String OPTIONAL, -- label
payloadHash OCTET STRING OPTIONAL, -- SHA3-256(payload canonical bytes) if captured
durationNs INTEGER OPTIONAL, -- monotonic duration for boundary/exit events
flags BIT STRING OPTIONAL, -- ASSERT_FAIL | HIDDEN | DIVERGENCE | OVERFLOW ...
extras OCTET STRING OPTIONAL -- bounded auxiliary metrics sketch
}
Binary Serialization Requirements:
payloadHash MUST be 32 bytes when present (SHA3-256).Artifact Integrity:
trace_hash MUST be SHA3-256 over the DER encoding of the Events sequence ONLY (excluding surrounding fields).evidence_hash SHOULD be SHA3-256(specHash || trace_hash) where || denotes raw byte concatenation.Privacy:
Implements RFC 5424-compliant logging with trait-based backends.
pub enum LogLevel {
Emergency = 0,
Alert = 1,
Critical = 2,
Error = 3,
Warning = 4,
Notice = 5,
Info = 6,
Debug = 7,
}
pub trait LogBackend: Send + Sync {
fn emit(&self, record: &LogRecord) -> Result<(), LogError>;
fn accepts(&self, level: LogLevel) -> bool;
fn flush(&self) -> Result<(), LogError> { Ok(()) }
}
Built-in backends: StdoutBackend (std only), MultiplexBackend (fan-out).
let filter = LogFilter::new(LogLevel::Warning)
.with_component("security", LogLevel::Debug);
use tightbeam::trace::{TraceConfig, logging::*};
let backend = Box::new(StdoutBackend);
let filter = LogFilter::new(LogLevel::Warning);
let config = LoggerConfig::new(backend, filter)
.with_default_level(LogLevel::Info);
let trace: TraceCollector = TraceConfig::builder()
.with_logger(config)
.build();
trace.event("msg")?.with_log_level(LogLevel::Error).emit();
Note: The event emit may be ellided as events are emitted on drop.
tightbeam provides a small utils module family for cross-cutting concerns.
Module: tightbeam::utils::urn
The URN subsystem provides:
Urn<'a>: RFC 8141-compliant urn:<nid>:<nss> representation.UrnBuilder: a fluent builder for constructing and validating URNs from
either a raw NID/NSS or structured components.UrnSpec / UrnValidationError: traits and error types for
namespace‑specific validation logic.tightbeam::utils::urn::specs::TightbeamUrnSpec: a built‑in spec for
instrumentation URNs of the form
urn:tightbeam:instrumentation:<resource_type>/<resource_id>.TightbeamUrnSpec constrains:
resource_type: one of trace, event, seed, verdict
(case‑insensitive, normalized to lowercase), andresource_id: an application‑defined identifier that must match an
alphanumeric‑with‑hyphen pattern.These URNs can be used by applications to name any kind of resource in a stable, parseable way. Internally, they are also used by the instrumentation subsystem (§11) to tag traces, events, seeds, and verdicts with globally unique identifiers for evidence artifacts and external analysis.
Example: Building a custom application URN
use tightbeam::utils::urn::{UrnBuilder, UrnValidationError};
fn build_customer_urn() -> Result<(), UrnValidationError> {
let urn = UrnBuilder::default()
.with_nid("example")
.with_nss("customer:1234")
.build()?;
assert_eq!(urn.to_string(), "urn:example:customer:1234");
Ok(())
}
Module: tightbeam::utils::task
The job! macro implements the Command Pattern for encapsulating executable
units of work as zero-sized types (ZST). Jobs provide a lightweight abstraction
for reusable, composable, and testable logic without runtime overhead.
Imports:
// Macros are exported at crate root
use tightbeam::job;
// Traits and types are in utils::task
use tightbeam::utils::task::{Job, AsyncJob, Pipeline, join};
Key Properties:
Design Rationale: Jobs serve as a fundamental behavioral unit in tightbeam, analogous to functions-as-objects in object-oriented design. They provide a consistent interface for executable commands across different contexts.
Syntax:
// Async job with tuple input (implements AsyncJob trait)
job! {
name: JobName,
async fn run((param1, param2): (Type1, Type2)) -> ReturnType {
// Implementation
}
}
// Sync job with tuple input (implements Job trait)
job! {
name: JobName,
fn run((param1, param2): (Type1, Type2)) -> ReturnType {
// Implementation
}
}
// No-parameter job (implements Job/AsyncJob with Input = ())
job! {
name: NoParamJob,
fn run() -> ReturnType {
// Implementation
}
}
Traits:
The job! macro automatically implements marker traits for job handling:
/// Synchronous job trait
pub trait Job {
type Input;
type Output;
fn run(input: Self::Input) -> Self::Output;
}
/// Asynchronous job trait
pub trait AsyncJob {
type Input;
type Output;
fn run(input: Self::Input) -> impl Future<Output = Self::Output> + Send;
}
Module: tightbeam::utils::task
The Pipeline trait extends Rust's Result<T, E> with job composition
capabilities, making Result itself a pipeline. This design enables seamless
integration between jobs and standard Rust code with zero learning curve.
Imports:
use tightbeam::job; // Macro for creating jobs
use tightbeam::utils::task::{Pipeline, join};
Core Concept: If you know how to use Result, you already know how to use pipelines.
Design Principles:
Result<T, E> implements Pipeline directlyand_then, map, or_else)PipelineBuilder for automatic trace event emissionBasic Usage - Jobs as Pipelines:
use tightbeam::utils::task::Pipeline;
// Jobs return Results, which are pipelines
let frame = CreateHandshakeRequest::run(client_id, nonce)
.map(|req| req.with_timestamp(now())) // Core Result::map
.and_then(|req| ValidateRequest::run(req)) // Job
.map_err(|e| TightBeamError::ValidationFailed(e)) // Core Result::map_err
.and_then(|req| SendRequest::run(req)) // Another job
.run()?; // Execute the pipeline
Mixed Composition:
// Start from existing Result
let config: Result<Config, Error> = parse_config_file(path);
// Chain jobs onto it organically
config
.and_then(|cfg| ValidateConfig::run(cfg))
.and_then(|cfg| SaveConfig::run(cfg))
.map(|_| "Configuration saved successfully")
.and_then(|msg| NotifyUser::run(msg))
.run()?;
Parallel Execution with join():
use tightbeam::utils::task::join;
// Both return Results, both implement Pipeline
let (encrypted, signed) = join(
EncryptPayload::run(payload),
SignPayload::run(payload)
).run()?;
// Use results
SendRequest::run(encrypted, signed)?;
Fallback Handling:
// Fallback to alternative on error
let frame = SendRequest::run(request)
.or(|| UseCachedResponse::run()) // Try fallback on error
.or_else(|e| HandleError::run(e)) // Error recovery
.run()?;
Automatic Trace with PipelineBuilder:
When you need trace event emission, use PipelineBuilder to create a traced
pipeline. Job names are automatically converted to snake_case URN events:
use tightbeam::utils::task::PipelineBuilder;
// Create pipeline with trace context
PipelineBuilder::new(trace)
.start((client_id, nonce))
// Auto-emits: urn:tightbeam:instrumentation:event/create_handshake_request_start
// urn:tightbeam:instrumentation:event/create_handshake_request_success
.and_then(|(id, n)| CreateHandshakeRequest::run(id, n))
.map(|req| req.validate()) // No trace event (standard map)
// Auto-emits: urn:tightbeam:instrumentation:event/validate_request_*
.and_then(|req| ValidateRequest::run(req))
// Auto-emits: urn:tightbeam:instrumentation:event/send_request_*
.and_then(|req| SendRequest::run(req))
.run()?;
Testing Integration:
Pipelines work seamlessly with tb_scenario!:
// L1: Assertion specification
tb_assert_spec! {
pub PipelineSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
assertions: [
// Shorthand labels match full URNs
("create_handshake_request_start", exactly!(1)),
("create_handshake_request_success", exactly!(1)),
("validate_request_start", exactly!(1)),
("validate_request_success", exactly!(1))
]
}
}
// L2: CSP process specification
tb_process_spec! {
pub PipelineProcess,
events {
observable {
"create_handshake_request_start",
"create_handshake_request_success",
"validate_request_start",
"validate_request_success"
}
hidden {}
}
states {
Idle => { "create_handshake_request_start" => Creating },
Creating => { "create_handshake_request_success" => Validating },
Validating => { "validate_request_start" => ValidatingRun },
ValidatingRun => { "validate_request_success" => Done },
Done => {}
}
terminal { Done }
}
tb_scenario! {
name: test_pipeline_workflow,
config: ScenarioConf::builder()
.with_spec(PipelineSpec::latest())
.with_csp(PipelineProcess)
.build(),
environment Pipeline {
exec: |pipeline| {
let input = ("test-001".to_string(), "nonce".to_string());
pipeline
.start(input)
.and_then(|x| CreateHandshakeRequest::run(x, nonce))
.map(|req| req.with_metadata())
.and_then(|req| ValidateRequest::run(req))
.run()
}
}
}
Complete Example:
use tightbeam::utils::task::{Pipeline, PipelineBuilder, join};
// Mix everything: jobs, Results, parallel execution, fallbacks, trace
let session_id = PipelineBuilder::new(trace)
.start(client_id)
// Job with auto-trace
.and_then(|id| CreateHandshakeRequest::run(id, nonce))
// Standard Result operation
.map(|req| req.add_timestamp(now()))
// Parallel execution
.and_then(|req| {
let encrypted = EncryptPayload::run(payload);
let signed = SignPayload::run(payload);
join(encrypted, signed).map(|(e, s)| (req, e, s))
})
// Send request
.and_then(|(req, enc, sig)| SendRequest::run((req, enc, sig)))
// Fallback on error
.or(|| UseCachedResponse::run())
// Error recovery
.or_else(|e| HandleError::run(e))
// Extract result
.and_then(|resp| ExtractSessionId::run(resp))
.run()?;
This section contains complete, runnable examples demonstrating usage patterns.
This example demonstrates an end-to-end worker and servlet setup tested with
tb_scenario!, covering assertion specs, CSP process specs, and environment
integration.
use tightbeam::testing::*;
// Define assertion spec for worker behavior
tb_assert_spec! {
pub PingPongWorkerSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
assertions: [
("relay_start", exactly!(2)),
("relay_success", exactly!(1)),
("response_result", exactly!(1), equals!("PONG")),
("relay_rejected", exactly!(1))
]
},
}
// Define CSP process spec for worker state machine
tb_process_spec! {
pub PingPongWorkerProcess,
events {
observable { "relay_start", "relay_success", "relay_rejected" }
hidden { "validate_message", "process_message" }
}
states {
Idle => { "relay_start" => Processing }
Processing => { "validate_message" => Validating }
Validating => { "process_message" => Responding, "relay_rejected" => Idle }
Responding => { "relay_success" => Idle }
}
terminal { Idle }
choice { Validating }
}
tb_scenario! {
name: test_ping_pong_worker,
config: ScenarioConf::<()>::builder()
.with_spec(PingPongWorkerSpec::latest())
.with_csp(PingPongWorkerProcess)
.build(),
environment Worker {
setup: |_trace| {
PingPongWorker::default()
},
stimulus: |trace, worker| async move {
// Test accepted message
trace.event("relay_start")?;
let ping_msg = RequestMessage {
content: "PING".to_string(),
lucky_number: 42,
};
let response = worker.relay(Arc::new(ping_msg)).await?;
if let Some(pong) = response {
trace.event("relay_success")?;
trace.event_with("response_result", &[], pong.result)?;
}
// Test rejected message
trace.event("relay_start")?;
let pong_msg = RequestMessage {
content: "PONG".to_string(),
lucky_number: 42,
};
let result = worker.relay(Arc::new(pong_msg)).await;
if result.is_err() {
trace.event("relay_rejected")?;
}
Ok(())
}
}
}
use tightbeam::testing::*;
// Define assertion spec for servlet behavior
tb_assert_spec! {
pub PingPongSpec,
V(1,0,0): {
mode: Accept,
gate: Accepted,
assertions: [
("request_received", exactly!(1)),
("pong_sent", exactly!(1)),
("response_result", exactly!(1), equals!("PONG")),
("is_winner", exactly!(1), equals!(true))
]
},
}
// Define process spec for servlet state machine
tb_process_spec! {
pub PingPongProcess,
events {
observable { "request_received", "pong_sent" }
hidden { "validate_lucky_number", "format_response" }
}
states {
Idle => { "request_received" => Processing }
Processing => { "validate_lucky_number" => Validating }
Validating => { "format_response" => Responding }
Responding => { "pong_sent" => Idle }
}
terminal { Idle }
choice { Processing }
}
tb_scenario! {
name: test_servlet_with_workers,
config: ScenarioConf::builder()
.with_spec(PingPongSpec::latest())
.with_csp(PingPongProcess)
.build(),
environment Servlet {
servlet: PingPongServletWithWorker,
setup: |addr| async move {
Ok(client! { connect TokioListener: addr })
},
client: |trace, mut client| async move {
fn generate_message(
lucky_number: u32,
content: Option<String>
) -> Result<Frame, TightBeamError> {
let message = RequestMessage {
content: content.unwrap_or_else(|| "PING".to_string()),
lucky_number,
};
compose! {
V0: id: b"test-ping",
message: message
}
}
// Client-side assertion before sending
trace.event("request_received")?;
// Test winning case
let ping_message = generate_message(42, None)?;
let response = client.emit(ping_message, None).await?;
let response_message: ResponseMessage = decode(&response.unwrap().message)?;
// Emit value assertions for spec verification
trace.event_with("response_result", &[], response_message.result)?;
trace.event_with("is_winner", &[], response_message.is_winner)?;
// Client-side assertion after receiving
trace.event("pong_sent")?;
Ok(())
}
}
}
This project is licensed under either of
at your option. You may choose whichever license best fits your needs:
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
This means contributors grant rights under BOTH licenses, providing:
The workspace consists of the following components:
C.A.R. Hoare, "Communicating sequential processes," Communications of the ACM, vol. 21, no. 8, pp. 666-677, August 1978. DOI: 10.1145/359576.359585 ↩
A.W. Roscoe, Understanding Concurrent Systems. Springer-Verlag, 2010. ISBN: 978-1-84882-257-3. DOI: 10.1007/978-1-84882-258-0 ↩
University of Oxford, FDR4 User Manual, Version 4.2.7, 2020. Available: https://www.cs.ox.ac.uk/projects/fdr/ ↩ ↩2
M. Pedersen and K. Chalmers, "Refinement Checking of Cooperatively Scheduled Concurrent Systems," in Formal Methods: Foundations and Applications (SBMF 2024), pp. 3-21, 2024. DOI: 10.1007/978-3-031-78561-1_1 ↩ ↩2
C. Aschermann, S. Schumilo, A. Abbasi, and T. Holz, "IJON: Exploring Deep State Spaces via Fuzzing," in 2020 IEEE Symposium on Security and Privacy (SP), San Francisco, CA, USA, 2020, pp. 1597-1612. DOI: 10.1109/SP40000.2020.00117 ↩