extern crate dbq; extern crate postgres; extern crate serde_json; use std::error::Error; use std::result::Result; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; use std::thread; use std::time::Duration; // An "HTTP client" #[derive(Clone)] struct HttpClient {} // Handler that holds an HTTP client that can be used within the jobs that it // runs and tracks the number of jobs that it runs #[derive(Clone)] struct StatefulHandler { run_count: Arc, http_client: HttpClient, } fn main() -> Result<(), Box> { let db_conn_params = "postgres://postgres:password@localhost/dbq"; let conn = postgres::Connection::connect(db_conn_params, postgres::TlsMode::None)?; // Schema config allows for changing the database schema and table names // Defaults are no schema (default is used) and tables are prefixed with "dbq_" let schema_config = dbq::SchemaConfig::default(); // Run the migrations on start. Migrations are idempotent and should be run // on startup dbq::run_migrations(&schema_config, &conn, Some(646_271)).unwrap(); let queue = dbq::Queue::new(schema_config, "example_stateful".to_string()); // Enqueue a job queue.enqueue("job_a", serde_json::Value::Null, 3, &conn)?; // Create a dependency and the job handler let http_client = HttpClient::new(); let handler = StatefulHandler::new(http_client); // Start a worker pool let workers_config = dbq::WorkerPoolConfig::new(queue, db_conn_params, handler.clone())?; let workers = dbq::WorkerPool::start(workers_config); // Give a worker time to find and start the job thread::sleep(Duration::new(1, 0)); // Shutdown the worker pool waiting for all currently executing jobs to finish workers.join(); // Print the job count println!("handler ran {} jobs", handler.run_count()); Ok(()) } impl StatefulHandler { fn new(http_client: HttpClient) -> StatefulHandler { StatefulHandler { run_count: Arc::new(AtomicUsize::new(0)), http_client, } } fn increment_run_count(&self) { self.run_count.fetch_add(1, Ordering::SeqCst); } fn run_count(&self) -> usize { self.run_count.load(Ordering::SeqCst) } } impl dbq::Handler for StatefulHandler { type Error = std::io::Error; fn handle(&self, _ctx: dbq::JobContext) -> Result<(), Self::Error> { // Increment the run count self.increment_run_count(); // Make an HTTP request self.http_client.get("https://www.example.com")?; Ok(()) } } impl HttpClient { fn new() -> HttpClient { HttpClient {} } fn get(&self, _url: &str) -> std::io::Result<&str> { Ok("") } }