[//]: # ([![Crates.io][crates-badge]][crates-url]) [![MIT licensed][mit-badge]][mit-url] [![Build Status][actions-badge]][actions-url] [//]: # ([crates-badge]: https://img.shields.io/crates/v/tokio.svg) [//]: # ([crates-url]: https://crates.io/crates/tokio) [mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg [mit-url]: https://github.com/qiaoruntao/mscheduler/blob/master/LICENSE [actions-badge]: https://github.com/qiaoruntao/mscheduler/actions/workflows/ci.yml/badge.svg [actions-url]: https://github.com/qiaoruntao/mscheduler/actions?query=branch%3Amaster ## 功能点 1. 任务的发布和执行 2. 多个worker同时运行任务 3. 任务失败后重试 4. worker ping超时后其他worker可以抢占任务 5. ## 核心模型 Task 1. TaskState 1. 整体任务状态参数 2. Vec\ 2. TaskOption 3. \参数 TaskConsumer 1. Arc\ 因为需要spawn到其他线程内执行 2. 任务中worker state的状态转换 1. INIT 刚发送 2. RUNNING 被worker占领并执行中 3. SUCCESS worker执行成功 4. FAIL worker执行失败 ## 功能设计 ### 任务发送 不变 ### 任务消费 **如何发现任务** 1. 启动时主动查询一批任务 2. 通过change stream 订阅一批发生变化的任务 3. 任务列表空时主动查询一批任务 **查询什么任务** 1. INIT=》需要抢占 2. RUNNING=》等待超时后抢占 3. 其他worker执行失败,但是符合抢占条件=》走复杂判断逻辑 **按什么优先级查询任务** 目前没有特殊要求, 按任意key顺序查询 **如何判断任务是否可抢占** 1. 任务参数限制不可执行 1. specific_worker_ids限制不可执行 2. 其他worker状态限制 1. ping_expire_time没到, 不可抢占 2. 其他worker info占据了并发执行数量 1. max_unexpected_retries超出限制, 不执行. 约束: max_unexpected_retries\setOnInsert 找到了=>set nothing, 按配置决定是否清除失败/成功的worker 2. 占用任务(多worker) 查找条件: 判断是否被指定worker_id, 任务状态不是成功, 任务没有被当前worker处理过(不能在worker列表中), 可以接受更多的worker 排序条件: priority最高, 优先被指定worker_id的 没有找到=>不做操作 找到了=>增加自己的worker对象, 并且过滤worker列表(清除超时的运行中worker) 3. 维持任务 查找条件: key相同, worker id相同, 找到了=> 更新对应的超时时间 没找到=> 结束当前任务 4. 任务执行成功 尝试更新任务状态为成功 5. 任务执行失败 主动返回任务失败, 尝试更新任务状态为失败 6. 任务执行异常 按option中设置重试, 并更新重试次数, 如果重试次数超过限制那么更新为任务失败. 重试过程只在本地发生, 所有重试次数失败后更新为失败状态 ## 配置更新后的影响 1. specific_worker_ids变动 如果更新后不允许当前worker执行, 那么结束任务 2. ping_interval_ms变动 下次ping时生效 3. 其他参数变动 不影响正在执行中的任务, 变动需要在重新占用任务时体现 ## 具体实现 ### 发布任务 直接发送就行 ### 消费任务 核心问题: 下一次什么时候去占用任务 1. 启动时计算下一次时间next_try_time 2. 使用change_stream实时更新next_try_time ## TODO -[ ] clean_success 暂不实现, maintenance -[ ] clean_failed 暂不实现, maintenance -[ ] detect compatibility of collection data -[x] auto worker id -[ ] 错误处理