Crates.io | pg_task |
lib.rs | pg_task |
version | 0.2.0 |
source | src |
created_at | 2023-07-11 10:36:37.587192 |
updated_at | 2024-07-24 15:42:01.233074 |
description | Resumable state machine based Postgres tasks |
homepage | |
repository | https://github.com/imbolc/pg_task |
max_upload_size | |
id | 913683 |
size | 103,178 |
FSM-based Resumable Postgres tasks
The full runnable code is in examples/tutorial.rs.
We create a greeter task consisting of two steps:
#[derive(Debug, Deserialize, Serialize)]
pub struct ReadName {
filename: String,
}
#[async_trait]
impl Step<Greeter> for ReadName {
const RETRY_LIMIT: i32 = 5;
async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
let name = std::fs::read_to_string(&self.filename)?;
NextStep::now(SayHello { name })
}
}
The first step tries to read a name from a file:
filename
- the only state we need in this stepimpl Step<Greeter> for ReadName
- our step is a part of a Greeter
taskRETRY_LIMIT
- the step is fallible, let's retry it a few timesNextStep::now(SayHello { name })
- move our task to the SayHello
step
right now#[derive(Debug, Deserialize, Serialize)]
pub struct SayHello {
name: String,
}
#[async_trait]
impl Step<Greeter> for SayHello {
async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
println!("Hello, {}", self.name);
NextStep::none()
}
}
The second step prints the greeting and finishes the task returning
NextStep::none()
.
That's essentially all, except for some boilerplate you can find in the full code. Let's run it:
cargo run --example hello
You'll see log messages about the 6 (first try + RETRY_LIMIT
) attempts and
the final error message. Let's look into the DB to find out what happened:
~$ psql pg_task -c 'table pg_task'
-[ RECORD 1 ]------------------------------------------------
id | cddf7de1-1194-4bee-90c6-af73d9206ce2
step | {"Greeter":{"ReadName":{"filename":"name.txt"}}}
wakeup_at | 2024-06-30 09:32:27.703599+06
tried | 6
is_running | f
error | No such file or directory (os error 2)
created_at | 2024-06-30 09:32:22.628563+06
updated_at | 2024-06-30 09:32:27.703599+06
error
field indicates that the task has errored and contains
the error messagestep
field provides you with the information about a particular step
and its state when the error occurredIn this case, the error is due to the external world state. Let's fix it by creating the file:
echo 'Fixed World' > name.txt
To rerun the task, we just need to clear its error
:
psql pg_task -c 'update pg_task set error = null'
You'll see the log messages about rerunning the task and the greeting message of the final step. That's all 🎉.
Essentially scheduling a task is done by inserting a corresponding row into
the pg_task
table. You can do in by hands from psql
or code in any
language.
There's also a few helpers to take care of the first step serialization and time scheduling:
enqueue
] - to run the task immediatelydelay
] - to run it with a delayschedule
] - to schedule it to a particular timeAfter defining the steps of each task, we need to
wrap them into enums representing whole tasks via [task!
]:
pg_task::task!(Task1 { StepA, StepB });
pg_task::task!(Task2 { StepC });
One more enum is needed to combine all the possible tasks:
pg_task::scheduler!(Tasks { Task1, Task2 });
Now we can run the worker:
pg_task::Worker::<Tasks>::new(db).run().await?;
All the communication is synchronized by the DB, so it doesn't matter how or
how many workers you run. It could be a separate process as well as
in-process [tokio::spawn
].
You can gracefully stop task runners by sending a notification using the DB:
SELECT pg_notify('pg_task_changed', 'stop_worker');
The workers would wait until the current step of all the tasks is finished and then exit. You can wait for this by checking for the existence of running tasks:
SELECT EXISTS(SELECT 1 FROM pg_task WHERE is_running = true);
Sometimes you need to delay the next step. Using [tokio::time::sleep
]
before returning the next step creates a couple of issues:
Use [NextStep::delay
] instead - it schedules the next step with the delay
and finishes the current one right away.
You can find a runnable example in the examples/delay.rs
Use [Step::RETRY_LIMIT
] and [Step::RETRY_DELAY
] when you need to retry a
task on errors:
impl Step<MyTask> for ApiRequest {
const RETRY_LIMIT: i32 = 5;
const RETRY_DELAY: Duration = Duration::from_secs(5);
async fn step(self, _db: &PgPool) -> StepResult<MyTask> {
let result = api_request().await?;
NextStep::now(ProcessResult { result })
}
}
This project is licensed under the MIT license.