| Crates.io | pgx_pgmq |
| lib.rs | pgx_pgmq |
| version | 0.0.4 |
| created_at | 2023-02-06 20:32:47.89004+00 |
| updated_at | 2023-02-08 16:51:18.917417+00 |
| description | Postgres extension for PGMQ |
| homepage | https://www.coredb.io |
| repository | https://github.com/CoreDB-io/coredb |
| max_upload_size | |
| id | 778232 |
| size | 20,312 |
A lightweight message queue extension for Postgres. Provides similar experience to AWS SQS and Redis Simple Message Queue, but on Postgres.
TODO
docker run ...
import json
import pprint
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://postgres:postrgres@localhost:28814/pgx_pgmq")
with engine.connect() as con:
# create a queue
created = con.execute(text( "select * from pgmq_create('myqueue');"))
# list queues
list_queues = con.execute(text( "select * from pgmq_list_queues()"))
column_names = list_queues.keys()
rows = list_queues.fetchall()
print("### Queues ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Queues ###'
{'created_at': datetime.datetime(2023, 2, 7, 2, 5, 39, 946356, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
'queue_name': 'myqueue'}
with engine.connect() as con:
# send a message
msg = json.dumps({"yolo": 42})
msg_id = con.execute(text(f"select * from pgmq_send('x', '{msg}') as msg_id;"))
column_names = msg_id.keys()
rows = msg_id.fetchall()
print("### Message ID ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Message ID ###'
{'msg_id': 1}
with engine.connect() as con:
# read a message, make it unavailable to be read again for 5 seconds
read = con.execute(text("select * from pgmq_read('x', 5);"))
column_names = read.keys()
rows = read.fetchall()
print("### Read Message ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Read Message ###'
{'enqueued_at': datetime.datetime(2023, 2, 7, 2, 51, 50, 468837, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
'message': {'myqueue': 42},
'msg_id': 1,
'read_ct': 1,
'vt': datetime.datetime(2023, 2, 7, 16, 9, 4, 826669, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800)))}
with engine.connect() as con:
# delete a message
deleted = con.execute(text("select pgmq_delete('x', 1);"))
column_names = deleted.keys()
rows = deleted.fetchall()
print("### Message Deleted ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Message Deleted ###'
{'pgmq_delete': True}
CREATE EXTENSION pgmq;
SELECT pgmq_create('my_queue');
pgmq_create
-------------
pgmq=# SELECT * from pgmq_send('my_queue', '{"foo": "bar"}');
pgmq_send
--------------
1
Reads a single message from the queue. Make it invisible for 30 seconds.
pgmq=# SELECT * from pgmq_read('my_queue', 30);
msg_id | read_ct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+---------------
1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}
If the queue is empty, or if all messages are currently invisible, no rows will be returned.
pgx_pgmq=# SELECT * from pgmq_read('my_queue', 30);
msg_id | read_ct | vt | enqueued_at | message
--------+---------+----+-------------+---------
Read a message and immediately delete it from the queue. Returns None if the queue is empty.
pgmq=# SELECT * from pgmq_pop('my_queue');
msg_id | read_ct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+---------------
1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}
Archiving a message removes it from the queue, and inserts it to the archive table. TODO:
Delete a message with id 1 from queue named my_queue.
pgmq=# select pgmq_delete('my_queue', 1);
pgmq_delete
-------------
t
Setup pgx.
cargo install --locked cargo-pgx
cargo pgx init
Then, clone this repo and change into this directory.
git clone git@github.com:CoreDB-io/coredb.git
cd coredb/extensions/pgmq/
Run the dev environment
cargo pgx run pg14
Run this script to package into a .deb file, which can be installed on Ubuntu.
/bin/bash build-extension.sh