| Crates.io | moduforge-collaboration |
| lib.rs | moduforge-collaboration |
| version | 0.5.0 |
| created_at | 2025-06-25 10:16:22.637574+00 |
| updated_at | 2025-08-19 09:31:59.316189+00 |
| description | moduforge 协作系统 |
| homepage | https://github.com/Cassielxd/moduforge-rs |
| repository | https://github.com/Cassielxd/moduforge-rs |
| max_upload_size | |
| id | 1725593 |
| size | 93,874 |
moduforge-collaboration 是一个为 ModuForge 生态系统提供实时协作功能的 Rust crate。它基于 CRDT (无冲突复制数据类型) 技术,允许多个用户同时在同一个文档上工作,并实时同步所有更改。
协作系统采用分层架构设计,每个组件都有明确的职责:
┌─────────────────────────────────────────────────────────────┐
│ CollaborationServer │
│ (WebSocket 服务器 + 路由管理) │
├─────────────────────────────────────────────────────────────┤
│ SyncService │
│ (业务逻辑 + 状态管理) │
├─────────────────────────────────────────────────────────────┤
│ YrsManager │
│ (CRDT 文档管理) │
├─────────────────────────────────────────────────────────────┤
│ Mapper │
│ (数据转换 + 步骤映射) │
└─────────────────────────────────────────────────────────────┘
文件: src/ws_server.rs
职责: WebSocket 服务器和 HTTP 路由管理
关键特性:
// 严格的房间存在性检查
if !server.sync_service().yrs_manager().room_exists(&room_id) {
return Err(warp::reject::custom(RoomNotFoundError::new(room_id)));
}
// 自定义错误处理
async fn handle_rejection(err: Rejection) -> Result<impl Reply> {
if let Some(room_error) = err.find::<RoomNotFoundError>() {
return Ok(json!({
"error": "ROOM_NOT_FOUND",
"message": format!("房间 '{}' 不存在", room_error.room_id()),
"code": 404
}));
}
// ... 其他错误处理
}
文件: src/sync_service.rs
职责: 业务逻辑和状态管理
房间状态枚举:
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum RoomStatus {
NotExists, // 房间不存在
Created, // 房间已创建但未初始化数据
Initialized, // 房间已初始化并有数据
Shutting, // 房间正在下线中
Offline, // 房间已下线
}
文件: src/yrs_manager.rs
职责: CRDT 文档管理
DashMap 和 RwLock 确保并发安全核心方法:
impl YrsManager {
// 获取或创建房间的 Awareness 引用
pub fn get_or_create_awareness(&self, room_id: &str) -> AwarenessRef;
// 检查房间是否存在
pub fn room_exists(&self, room_id: &str) -> bool;
// 移除房间并清理资源
pub async fn remove_room(&self, room_id: &str) -> Option<AwarenessRef>;
// 强制清理房间资源
pub async fn force_cleanup_room(&self, room_id: &str) -> bool;
}
文件: src/mapping.rs
职责: 数据转换和步骤映射
转换器系统:
pub trait StepConverter: Send + Sync {
fn apply_to_yrs_txn(
&self,
step: &dyn Step,
txn: &mut TransactionMut,
) -> Result<StepResult, Box<dyn std::error::Error>>;
fn name(&self) -> &'static str;
fn supports(&self, step: &dyn Step) -> bool;
}
// 内置转换器
pub struct NodeStepConverter; // 节点操作
pub struct AttrStepConverter; // 属性操作
pub struct MarkStepConverter; // 标记操作
文件: src/middleware.rs
职责: 中间件集成
#[async_trait]
impl Middleware for YrsMiddleware {
async fn after_dispatch(
&self,
_state: Option<Arc<State>>,
transactions: &[Transaction],
) -> ForgeResult<Option<Transaction>> {
// 自动同步事务到 Yrs 文档
self.sync_service
.handle_transaction_applied(transactions, &self.room_id)
.await?;
Ok(None)
}
}
[dependencies]
# 异步运行时
tokio = { workspace = true }
async-trait = { workspace = true }
# WebSocket 和 HTTP
warp = "0.3.7"
yrs-warp = "0.8.0"
# CRDT 引擎
yrs = "0.18.2"
# 并发和同步
parking_lot = { workspace = true }
dashmap = { workspace = true }
# 序列化
serde = { workspace = true }
serde_json = { workspace = true }
# 日志和监控
tracing = "0.1"
tracing-subscriber = "0.3"
# ModuForge 生态系统
moduforge-model = { version = "0.4.12", path = "../model" }
moduforge-state = { version = "0.4.12", path = "../state" }
moduforge-transform = { version = "0.4.12", path = "../transform" }
moduforge-core = { version = "0.4.12", path = "../core" }
use mf_collab::{CollaborationServer, YrsManager, SyncService};
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 1. 初始化核心组件
let yrs_manager = Arc::new(YrsManager::new());
let sync_service = Arc::new(SyncService::new(yrs_manager.clone()));
// 2. 创建协作服务器
let server = CollaborationServer::with_sync_service(
yrs_manager,
sync_service.clone(),
8080
);
// 3. 预初始化房间(关键步骤)
let rooms_to_initialize = ["room1", "room2", "project-main"];
for room_id in &rooms_to_initialize {
if let Some(existing_tree) = load_room_data(room_id).await? {
server.init_room_with_data(room_id, &existing_tree).await?;
println!("✅ 房间 '{}' 已初始化", room_id);
}
}
// 4. 启动服务器
println!("🚀 协作服务器启动于 127.0.0.1:8080");
server.start().await;
Ok(())
}
use mf_core::{ForgeRuntime, RuntimeOptions};
use mf_collab::YrsMiddleware;
async fn setup_collaborative_runtime(
sync_service: Arc<SyncService>,
room_id: String,
) -> ForgeRuntime {
let mut options = RuntimeOptions::default();
// 添加 Yrs 中间件
let yrs_middleware = YrsMiddleware {
sync_service: sync_service.clone(),
room_id: room_id.clone(),
};
options.add_middleware(yrs_middleware);
// 创建运行时
ForgeRuntime::new(options).await
}
// 房间不存在时的错误响应
{
"error": "ROOM_NOT_FOUND",
"message": "房间 'room-123' 不存在",
"room_id": "room-123",
"code": 404
}
// 获取房间状态
let status = sync_service.get_room_status("room-id").await;
// 获取房间详细信息
let room_info = sync_service.get_room_info("room-id").await;
// RoomInfo {
// room_id: "room-id",
// status: RoomStatus::Initialized,
// node_count: 42,
// client_count: 3,
// last_activity: SystemTime { ... }
// }
// 下线空房间
let empty_rooms = server.offline_empty_rooms(true).await?;
// 下线不活跃房间
let inactive_rooms = server.offline_inactive_rooms(
Duration::from_secs(3600), // 1小时无活动
true
).await?;
// 条件下线
let rooms_to_offline = server.offline_rooms_by_condition(
|room_info| room_info.client_count == 0,
true
).await?;
WebSocket: ws://localhost:8080/collaboration/{room_id}
HTTP 状态检查: GET /collaboration/{room_id}
健康检查: GET /health
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WsMessage {
JoinRoom { room_id: String },
LeaveRoom { room_id: String },
YrsUpdate { room_id: String, update: Vec<u8> },
YrsSyncRequest { room_id: String, state_vector: Vec<u8> },
}
项目包含完整的测试套件,覆盖核心功能:
# 运行所有测试
cargo test
# 运行特定测试
cargo test test_collaboration
cargo test test_room_offline
cargo test test_conditional_offline
// 自定义端口
let server = CollaborationServer::with_sync_service(
yrs_manager,
sync_service,
9000 // 自定义端口
);
// 自定义错误处理
server.set_error_handler(custom_error_handler);
// 自定义中间件栈
let mut middleware_stack = MiddlewareStack::new();
middleware_stack.add(YrsMiddleware::new(sync_service, room_id));
middleware_stack.add(LoggingMiddleware::new());
RwLock 和 DashMap 优化并发访问#[derive(Error, Debug)]
pub enum TransmissionError {
#[error("Yrs 操作错误: {0}")]
YrsError(String),
#[error("WebSocket 错误: {0}")]
WebSocketError(String),
#[error("房间不存在: {0}")]
RoomNotFound(String),
#[error("同步错误: {0}")]
SyncError(String),
#[error("其他错误: {0}")]
Other(#[from] anyhow::Error),
}
欢迎贡献代码!请确保:
本项目采用 MIT 许可证 - 详见 LICENSE 文件。