Crates.io | turbomcp-core |
lib.rs | turbomcp-core |
version | 1.1.2 |
created_at | 2025-08-26 17:15:43.320228+00 |
updated_at | 2025-09-25 03:24:59.87504+00 |
description | Core abstractions and optimized message processing for the TurboMCP SDK |
homepage | https://turbomcp.org |
repository | https://github.com/Epistates/turbomcp |
max_upload_size | |
id | 1811550 |
size | 585,627 |
Core abstractions and utilities for the TurboMCP framework, providing foundational types, session management, and optimized message processing for Model Context Protocol implementations.
turbomcp-core
provides the essential building blocks for MCP implementations in Rust. It includes session management, request contexts, error handling, message types, and performance-optimized data structures.
simd-json
and sonic-rs
thiserror
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ TurboMCP Core โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Message Processing โ
โ โโโ JSON parsing with optional SIMD โ
โ โโโ Bytes-based message handling โ
โ โโโ Structured data types โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Session & Context Management โ
โ โโโ RequestContext lifecycle โ
โ โโโ Thread-safe session state โ
โ โโโ Correlation ID management โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Error Handling & Observability โ
โ โโโ Structured McpError types โ
โ โโโ Context preservation โ
โ โโโ Metrics & tracing hooks โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
use turbomcp_core::{RequestContext, Message, Context, McpResult};
// Create a request context for correlation and observability
let mut context = RequestContext::new();
// Message parsing with optional SIMD acceleration
let json_data = br#"{"jsonrpc": "2.0", "method": "tools/list"}"#;
let message = Message::parse(json_data)?;
// Context provides rich observability and user information
context.info("Processing request").await?;
// Context features for authentication and user information
if context.is_authenticated() {
let user = context.user().unwrap_or("unknown");
let roles = context.roles();
context.info(&format!("Authenticated user: {}, roles: {:?}", user, roles)).await?;
}
use turbomcp_core::{SessionManager, SessionConfig};
// Configure session management with LRU eviction
let config = SessionConfig::new()
.with_max_sessions(1000)
.with_ttl_seconds(3600);
let session_manager = SessionManager::with_config(config);
// Sessions are automatically managed with efficient cleanup
let session = session_manager.create_session().await?;
use turbomcp_core::{McpError, McpResult};
fn process_request() -> McpResult<String> {
// Rich error types with automatic context
if invalid_input {
return Err(McpError::InvalidInput(
"Request missing required field".to_string()
));
}
// Errors automatically include correlation context
Ok("processed".to_string())
}
Enable SIMD acceleration for JSON processing:
[dependencies]
turbomcp-core = { version = "1.1.0", features = ["simd"] }
Note: SIMD features require compatible CPU architectures (x86_64 with SSE2+ or ARM with NEON).
Feature | Description | Default |
---|---|---|
simd |
Enable SIMD-accelerated JSON processing | โ |
metrics |
Enable built-in performance metrics | โ |
tracing |
Enable distributed tracing support | โ |
compression |
Enable message compression utilities | โ |
turbomcp-core
is automatically included when using the main TurboMCP framework:
use turbomcp::prelude::*;
// Core functionality is available through the prelude
#[server]
impl MyServer {
#[tool("Example with context")]
async fn my_tool(&self, ctx: Context) -> McpResult<String> {
// Context is powered by turbomcp-core
ctx.info("Processing request").await?;
Ok("result".to_string())
}
}
For custom implementations or integrations:
use turbomcp_core::{
RequestContext, SessionManager, Message, McpError, McpResult
};
struct CustomHandler {
sessions: SessionManager,
}
impl CustomHandler {
async fn handle_request(&self, data: &[u8]) -> McpResult<String> {
let context = RequestContext::new();
let message = Message::parse(data)?;
// Use core functionality directly
context.info("Custom processing").await?;
Ok("processed".to_string())
}
}
Core error types for comprehensive error handling:
use turbomcp_core::McpError;
match result {
Err(McpError::InvalidInput(msg)) => {
// Handle validation errors
},
Err(McpError::SessionExpired(id)) => {
// Handle session lifecycle
},
Err(McpError::Performance(details)) => {
// Handle performance issues
},
Ok(value) => {
// Process success case
}
}
TurboMCP Core provides abstractions for thread-safe sharing that form the foundation for SharedClient, SharedTransport, and SharedServer:
The Shareable<T>
trait provides a consistent interface for creating thread-safe wrappers:
use turbomcp_core::shared::{Shareable, Shared};
// Any type can implement Shareable
pub trait Shareable<T>: Clone + Send + Sync + 'static {
fn new(inner: T) -> Self;
}
// Use with any type
struct MyService {
counter: u64,
}
let service = MyService { counter: 0 };
let shared = Shared::new(service); // Implements Shareable<MyService>
The Shared<T>
wrapper provides closure-based access patterns for any type:
use turbomcp_core::shared::Shared;
struct Database {
connections: Vec<Connection>,
}
impl Database {
fn query(&self, sql: &str) -> Result<Vec<Row>, DbError> {
// Query implementation
}
fn execute(&mut self, sql: &str) -> Result<u64, DbError> {
// Execute implementation
}
}
// Create shared wrapper
let db = Database::new();
let shared_db = Shared::new(db);
// Read access with closures
let results = shared_db.with(|db| {
db.query("SELECT * FROM users")
}).await?;
// Mutable access with closures
let affected_rows = shared_db.with_mut(|db| {
db.execute("UPDATE users SET active = true")
}).await?;
// Async closures also supported
let async_result = shared_db.with_async(|db| async {
let rows = db.query("SELECT COUNT(*) FROM users")?;
process_async(rows).await
}).await?;
For types that need to be consumed (like servers), ConsumableShared<T>
provides safe extraction:
use turbomcp_core::shared::{ConsumableShared, SharedError};
struct Server {
config: ServerConfig,
}
impl Server {
fn run(self) -> Result<(), ServerError> {
// Consume self to run server
println!("Running server with config: {:?}", self.config);
Ok(())
}
fn status(&self) -> ServerStatus {
// Non-consuming method
ServerStatus::Ready
}
}
// Create consumable shared wrapper
let server = Server::new(config);
let shared = ConsumableShared::new(server);
// Access before consumption
let status = shared.with(|s| s.status()).await?;
println!("Server status: {:?}", status);
// Clone for monitoring while consuming
let monitor = shared.clone();
tokio::spawn(async move {
loop {
match monitor.with(|s| s.status()).await {
Ok(status) => println!("Status: {:?}", status),
Err(SharedError::Consumed) => {
println!("Server has been consumed");
break;
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
});
// Consume the server (only possible once)
let server = shared.consume().await?;
server.run()?; // Server is now running
Create domain-specific shared wrappers:
use turbomcp_core::shared::Shareable;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct SharedHttpClient {
inner: Arc<Mutex<HttpClient>>,
}
impl Shareable<HttpClient> for SharedHttpClient {
fn new(client: HttpClient) -> Self {
Self {
inner: Arc::new(Mutex::new(client)),
}
}
}
impl Clone for SharedHttpClient {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl SharedHttpClient {
pub async fn get(&self, url: &str) -> Result<Response, HttpError> {
self.inner.lock().await.get(url).await
}
pub async fn post(&self, url: &str, body: Vec<u8>) -> Result<Response, HttpError> {
self.inner.lock().await.post(url, body).await
}
}
use turbomcp_core::shared::{Shared, SharedError};
let shared_service = Shared::new(my_service);
// Handle potential errors in closures
let result = shared_service.with_mut(|service| {
service.risky_operation()
.map_err(|e| format!("Service error: {}", e))
}).await;
match result {
Ok(success) => println!("Operation successful: {}", success),
Err(e) => eprintln!("Operation failed: {}", e),
}
// Try operations without blocking
if let Some(result) = shared_service.try_with(|service| {
service.quick_operation()
}) {
println!("Quick operation result: {}", result);
} else {
println!("Service is busy, will try later");
}
The Shareable patterns follow key design principles:
# Build with all features
cargo build --features simd,metrics,tracing
# Build optimized for production
cargo build --release --features simd
# Run comprehensive tests
cargo test
# Run performance benchmarks
cargo bench
# Test SIMD features (requires compatible CPU)
cargo test --features simd
Licensed under the MIT License.
Part of the TurboMCP high-performance Rust SDK for the Model Context Protocol.