| Crates.io | sift_stream |
| lib.rs | sift_stream |
| version | 0.7.2 |
| created_at | 2025-04-01 19:12:51.607955+00 |
| updated_at | 2026-01-16 22:34:11.131912+00 |
| description | A robust Sift telemetry streaming library |
| homepage | https://github.com/sift-stack/sift |
| repository | https://github.com/sift-stack/sift |
| max_upload_size | |
| id | 1615311 |
| size | 623,789 |
SiftStream is a Rust-based telemetry streaming system that provides reliable, high-throughput data ingestion to the Sift platform. The architecture is built around a task-based system with multiple async tasks communicating through channels and control messages to ensure data reliability and fault tolerance.
Features highlights:
See the examples directory for demonstrations on how to stream data to Sift using
sift_stream.
E: Encoder (encodes data) and T: Transport (transmits data)SiftStream uses a separation of concerns between encoding (how data is structured) and transport (how data is transmitted):
Encoder trait: Defines how data is encoded/structured. The encoder is responsible for:
Flow messages) into the appropriate message formatIngestionConfigEncoder for ingestion-config-based encodingTransport trait: Defines how encoded messages are transmitted. The transport is responsible for:
LiveStreaming (gRPC streaming) and FileBackup (file writing)Encodeable trait: Types that can be encoded by an encoder (e.g., Flow, FlowBuilder)
This design allows for future extensibility: new encoding schemes or transport mechanisms can be added independently without affecting the other component.
SiftStream supports two different transport modes:
LiveStreaming: The default transport that streams data directly to Sift via gRPC. This transport supports real-time streaming, optional disk backups, checkpointing, and retry policies. Use SiftStreamBuilder::build() to create a stream with this transport.
FileBackup: A specialized transport that only writes telemetry data to backup files on disk without streaming to Sift. This transport is useful for offline data collection, batch processing, or scenarios with unreliable network connectivity. Use SiftStreamBuilder::build_file_backup() to create a stream with this transport. Note that this transport requires a RecoveryStrategy::RetryWithBackups configuration.
The SiftStream architecture when using LiveStreaming transport consists of three main async tasks that work together to provide reliable data streaming:
Note: FileBackup transport does not use the task system architecture, as it only writes to disk files without streaming to Sift.
Control messages are low-frequency messages sent between tasks via broadcast channels to coordinate checkpointing, error handling, and shutdown processes.
Responsibilities:
backup_tx channelControl Messages Sent:
BackupFull - When backup files reach maximum count limitReingestBackups - When checkpoint fails and backup files need re-ingestionControl Messages Received:
CheckpointComplete { first_message_id, last_message_id } - Signals checkpoint completion with the range of message IDs in the checkpointCheckpointNeedsReingestion { first_message_id, last_message_id } - Signals the current checkpoint will need re-ingestion with the range of message IDsShutdown - Initiates graceful shutdownConditions for Sending Messages:
BackupFull: Triggered when the number of backup files being tracked reaches the configured maximumReingestBackups: Triggered when a message (control or data) has indicated the current checkpoint should be re-ingestedResponsibilities:
ingestion_tx channelControl Messages Sent:
SignalNextCheckpoint - When a new stream is desired to verify messages sent have been successfully receivedCheckpointComplete { first_message_id, last_message_id } - When the current stream has concluded at the end of a checkpoint, includes the range of message IDs in the checkpointCheckpointNeedsReingestion { first_message_id, last_message_id } - When gRPC stream fails, includes the range of message IDs that need re-ingestionControl Messages Received:
BackupFull - Triggers immediate checkpoint completion, reseting the normal checkpoint intervalShutdown - Initiates graceful shutdown with final stream completionConditions for Sending Messages:
SignalNextCheckpoint:
CheckpointComplete { first_message_id, last_message_id }:
CheckpointNeedsReingestion { first_message_id, last_message_id }: When gRPC stream fails with error, includes the range of message IDs that need to be re-ingestedResponsibilities:
ReingestBackups control messageControl Messages Received:
ReingestBackups - Contains list of backup files to re-ingestShutdown - Initiates graceful shutdownIMPORTANT: Data reliability is among the most important goals of sift-stream, however if the backup system falls behind (slow disk writes), a warning will be emitted but data will still be sent for ingestion. Streaming data in this situation is preferred over preventing it entirely. It is highly recommended however that write speeds be sufficiently high for the given data stream being backed up to ensure data is reliably backed up.
The message ID-based checkpoint system helps mitigate the impact of backups falling behind: messages that have already been confirmed as successfully streamed (via successful checkpoints) are automatically skipped during backup, allowing the backup system to catch up more efficiently.
In contrast, if the primary ingestion channel becomes full, the oldest data will be removed in favor of streaming newer data. The data removed during this process will be forwarded to the backup system and will be re-ingested at the next checkpoint.
The default capacities are as follows:
These can be configured however, based on individual streaming needs.
The checkpoint system ensures data reliability by periodically creating checkpoints that can be used for recovery. The system uses message IDs to precisely track which messages have been successfully streamed to Sift, enabling efficient backup management even when ingestion and backup processes are out of sync.
Each message in the stream is assigned a unique, monotonically increasing message ID. Checkpoints track the range of message IDs (first_message_id to last_message_id) that were included in the checkpoint, allowing the backup manager to:
Quickly drain committed messages: Messages that have already been confirmed as successfully streamed (message ID ≤ committed message ID) are skipped during backup, improving efficiency when backups are behind ingestion
Precisely match files to checkpoints: Backup files track the range of message IDs they contain, enabling exact matching with checkpoint ranges
Handle out-of-order scenarios: The system correctly handles cases where ingestion completes before backups catch up, or where backups are ahead of ingestion
checkpoint_interval)CheckpointComplete { first_message_id, last_message_id } control message sent with the range of message IDs in the checkpointWhen a stream completes, an "ok" gRPC status from Sift indicates all messages for that stream have been received. The checkpoint includes the range of message IDs that were successfully streamed.
Backup files can also be retained regardless of successful checkpoints and re-ingestion processes.
The backup system manages files through a rotation policy that balances data reliability with storage efficiency. Understanding how backup files are handled is crucial for optimizing checkpoint behavior and minimizing unnecessary re-ingestion.
first_message_id to last_message_id)The backup manager processes checkpoints in order as backup files catch up to them. When a checkpoint completes:
Files containing message IDs that are fully within a successful checkpoint range are deleted
Files containing message IDs that overlap with a failed checkpoint range are marked for re-ingestion
Files containing message IDs beyond the current checkpoint are retained until their checkpoints complete
The backup system is configured through the DiskBackupPolicy with two key parameters:
max_file_size: Maximum size per backup file before rotationmax_file_count: Maximum number of backup files to maintain before forcing a checkpointKey Principle: Smaller, more frequent checkpoints reduce the amount of data that needs to be re-ingested when failures occur, but increase the overhead of checkpoint management and creating new gRPC streams to Sift.
Generally, the default configuration should be good for most use cases, and is the recommended configuration.
gRPC Stream Failure:
CheckpointNeedsReingestiongRPC Re-Ingest Failure:
Channel Overflow:
The shutdown process ensures graceful termination with data preservation:
Shutdown control message sent to all tasksingestion_tx, backup_tx) are droppedCheckpointComplete message sentIn order to shutdown quickly and not block the calling application, re-ingestion will be halted, though backup files not re-ingested will remain on disk for manual ingestion.
The data channel will be drained, and the final backup file will be sync'd/flushed to disk to preserve all data.
IMPORTANT: Calling applications must ensure graceful shutdown by calling finish() prior to dropping the SiftStream,
otherwise data loss may occur.
SiftStream::send() with an Encodeable type (e.g., Flow)Encoder converts the data to the appropriate message formatTransport sends the encoded message
LiveStreaming: Message ID assignment → Dual routing to ingestion_tx and backup_tx channelsFileBackup: Direct writing to backup filesCheckpointNeedsReingestion { first_message_id, last_message_id } signal with message ID rangeThe system provides comprehensive metrics for monitoring:
The SiftStream architecture provides a robust, fault-tolerant system for streaming telemetry data to Sift. The task-based design with control message coordination ensures reliable data delivery while maintaining high performance and providing comprehensive error recovery mechanisms.