package mem_ring import ( "sync" "sync/atomic" "unsafe" "github.com/edwingeng/deque/v2" ) type QueueMeta struct { BufferPtr uintptr BufferLen uintptr HeadPtr uintptr TailPtr uintptr WorkingPtr uintptr StuckPtr uintptr WorkingFd int32 UnstuckFd int32 } type Queue[T any] struct { bufferPtr unsafe.Pointer bufferLen uintptr headPtr *uintptr tailPtr *uintptr workingPtr *uint32 stuckPtr *uint32 workingFd int32 unstuckFd int32 } type ReadQueue[T any] struct { q Queue[T] unstuckNotifier Notifier } type WriteQueue[T any] struct { q Queue[T] Lock *sync.Mutex pendingTasks *deque.Deque[T] workingNotifier Notifier } func NewQueue[T any](meta QueueMeta) Queue[T] { return Queue[T]{ bufferPtr: unsafe.Pointer(meta.BufferPtr), bufferLen: meta.BufferLen, headPtr: (*uintptr)(unsafe.Pointer(meta.HeadPtr)), tailPtr: (*uintptr)(unsafe.Pointer(meta.TailPtr)), workingPtr: (*uint32)(unsafe.Pointer(meta.WorkingPtr)), stuckPtr: (*uint32)(unsafe.Pointer(meta.StuckPtr)), workingFd: meta.WorkingFd, unstuckFd: meta.UnstuckFd, } } func (q *Queue[T]) push(item T) bool { t_size := unsafe.Sizeof(item) tail := atomic.LoadUintptr(q.tailPtr) head := atomic.LoadUintptr(q.headPtr) if tail-head == q.bufferLen { return false } ptr := unsafe.Add(q.bufferPtr, (tail%q.bufferLen)*t_size) *(*T)(ptr) = item atomic.AddUintptr(q.tailPtr, 1) return true } func (q *Queue[T]) pop() *T { var _t T t_size := unsafe.Sizeof(_t) tail := atomic.LoadUintptr(q.tailPtr) head := atomic.LoadUintptr(q.headPtr) if tail == head { return nil } ptr := unsafe.Add(q.bufferPtr, (head%q.bufferLen)*t_size) item := *(*T)(ptr) atomic.AddUintptr(q.headPtr, 1) return &item } func (q *Queue[T]) isEmpty() bool { return atomic.LoadUintptr(q.tailPtr) == atomic.LoadUintptr(q.headPtr) } func (q *Queue[T]) isFull() bool { return atomic.LoadUintptr(q.tailPtr)-atomic.LoadUintptr(q.headPtr) == q.bufferLen } func (q *Queue[T]) markWorking() { atomic.StoreUint32(q.workingPtr, 1) } func (q *Queue[T]) markUnworking() bool { atomic.StoreUint32(q.workingPtr, 0) if q.isEmpty() { return true } q.markWorking() return false } func (q *Queue[T]) working() bool { return atomic.LoadUint32(q.workingPtr) == 1 } func (q *Queue[T]) markStuck() { atomic.StoreUint32(q.stuckPtr, 1) } func (q *Queue[T]) markUnstuck() { atomic.StoreUint32(q.stuckPtr, 0) } func (q *Queue[T]) stuck() bool { return atomic.LoadUint32(q.stuckPtr) == 1 } func (q Queue[T]) Read() ReadQueue[T] { unstuckNotifier := NewNotifier(q.unstuckFd) return ReadQueue[T]{ q, unstuckNotifier, } } func (q Queue[T]) Write() WriteQueue[T] { awaiter := NewAwaiter(q.unstuckFd) wq := WriteQueue[T]{ q: q, Lock: &sync.Mutex{}, pendingTasks: deque.NewDeque[T](), workingNotifier: NewNotifier(q.workingFd), } go func() { for { wq.Lock.Lock() for item, ok := wq.pendingTasks.TryPopFront(); ok; item, ok = wq.pendingTasks.TryPopFront() { if !wq.q.push(item) { wq.pendingTasks.PushFront(item) break } } if !wq.q.working() { wq.q.markWorking() wq.workingNotifier.Notify() } if !wq.pendingTasks.IsEmpty() { wq.q.markStuck() if !wq.q.isFull() { continue } } wq.Lock.Unlock() awaiter.Wait() } }() return wq } func (rq *ReadQueue[T]) RunHandler(handler func(T), w ...TinyWaiter) { // TODO: return channel-based guard var waiter TinyWaiter if len(w) == 0 { waiter = &GoSchedWaiter{} } else { waiter = w[0] } go func() { awaiter := NewAwaiter(rq.q.workingFd) rq.q.markWorking() c: for { for item := rq.q.pop(); item != nil; item = rq.q.pop() { handler(*item) } waiter.Reset() for { stop_wait := waiter.Wait() if !rq.q.isEmpty() || !rq.q.markUnworking() { continue c } if stop_wait { break } } awaiter.Wait() rq.q.markWorking() } }() } func (wq *WriteQueue[T]) Push(item T) { wq.Lock.Lock() if wq.q.push(item) { if !wq.q.working() { wq.q.markWorking() wq.Lock.Unlock() wq.workingNotifier.Notify() return } } else { wq.q.markStuck() wq.pendingTasks.PushBack(item) } wq.Lock.Unlock() }