import json import time import typing class WorkerParameters: source_path: str destination_path: typing.Optional[str] = None @staticmethod def __display__(): return json.dumps({k: v for k, v in WorkerParameters.__dict__.items() if not k.startswith("__")}) class Worker: def __init__(self): pass @staticmethod def get_parameters_type() -> typing.Type: return WorkerParameters # TODO make this function non-static @staticmethod def init(): """ Optional worker initialization function. """ print(f"\nInitializing Python worker...") # TODO make this function non-static @staticmethod def process(handle_callback, parameters: WorkerParameters, job_id) -> dict: """ Standard worker process function. """ print(f"\nPython worker starts processing job {job_id} with parameters: {parameters.__display__()}") assert parameters.source_path != "" assert parameters.destination_path is not None # do some stuff here for i in range(0, 25): if handle_callback.is_stopped(): # Set job status to stop and return handle_callback.set_job_status("stopped") return {"destination_paths": ["/path/to/generated/file.ext"]} time.sleep(1) # notify the progression (between 0 and 100) handle_callback.publish_job_progression(i * 4) # By default, the job status is set to "completed" if the process returns properly. If an error is raised, # the job status is set to "error" and the error message is returned as a response. # However, the job status can be overwritten by calling: # # handle_callback.set_job_status("stopped") # # This function return whether the status has been correctly set. # Possible job status values are: "completed", "stopped" and "error". return { "destination_paths": ["/path/to/generated/file.ext"] }