Crates.io | gozo |
lib.rs | gozo |
version | 0.1.1 |
source | src |
created_at | 2024-05-09 17:48:18.831369 |
updated_at | 2024-06-16 14:49:45.835681 |
description | Message scheduler for NATS. |
homepage | |
repository | https://codeberg.org/gblach/gozo |
max_upload_size | |
id | 1235320 |
size | 72,827 |
Message scheduler for NATS.
Gozo connects to the NATS server (with JetStream enabled) and listens for a message
with a subject gozo
, a reply subject, payload and relevant headers.
When one is received, it schedules to send back a message with the reply subject
as the message subject, the same payload and the headers described in the usage section.
Due to all communication is via NATS, Gozo can be used in any programming language
that has client libraries.
Options can be set in two ways: from the command line or from an environment variable.
Command line options are listed below. All environment variables are written in upper case
with a NATS_
prefix, e.g. --address
option becomes NATS_ADDRESS
.
On startup gozo reads environment variables from /etc/gozo.env
and ./gozo.env
files if it exits.
If both the command line option and the environment variable are set,
the command line option is used.
For readability, all examples below are written in Python.
Gozo-When
header contains a string with the Unix epoch timestamp
when the message will be sent back.
Gozo-Id
header is optional, but if present it must be unique.
Schedules with Gozo-Id
header can be overwritten or canceled.
In addition, schedules with Gozo-Id
are written to the NATS key/value store,
so they won't be discarded when gozo is restarted.
import asyncio
import nats
import time
import uuid
async def message_cb(msg):
print(msg.subject) # 'reply.subject'
print(msg.data) # b'Hello Gozo!'
print(msg.headers) # {'Gozo-Reply': 'Yes', 'Gozo-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3'}
async def main():
nc = await nats.connect('nats://localhost:4222')
sub = await nc.subscribe('reply.subject', cb=message_cb)
headers = {
'Gozo-When': str(int(time.time()) + 10),
'Gozo-Id': str(uuid.uuid4()),
}
await nc.publish('gozo', b'Hello Gozo!', 'reply.subject', headers)
await asyncio.sleep(11)
if __name__ == '__main__':
asyncio.run(main())
This time Gozo-When
header contains a string with a "+" sign followed by the number of seconds
after which the message will be sent back.
import asyncio
import nats
import uuid
async def message_cb(msg):
print(msg.subject) # 'reply.subject'
print(msg.data) # b'Hello Gozo!'
print(msg.headers) # {'Gozo-Reply': 'Yes', 'Gozo-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3'}
async def main():
nc = await nats.connect('nats://localhost:4222')
sub = await nc.subscribe('reply.subject', cb=message_cb)
headers = {
'Gozo-When': '+10',
'Gozo-Id': str(uuid.uuid4()),
}
await nc.publish('gozo', b'Hello Gozo!', 'reply.subject', headers)
await asyncio.sleep(11)
if __name__ == '__main__':
asyncio.run(main())
To cancel the schedule, send a message with the header Gozo-Del-Id
and set the corresponding ID as its value.
import asyncio
import nats
async def main():
nc = await nats.connect('nats://localhost:4222')
headers = {
'Gozo-Del-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3',
}
await nc.publish('gozo', b'', None, headers)
if __name__ == '__main__':
asyncio.run(main())