1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2025-08-31 02:45:18 +00:00
arikawa/utils/handler/handler.go
2023-08-04 14:24:50 -07:00

294 lines
7.2 KiB
Go

// Package handler handles incoming Gateway events. It reflects the function's
// first argument and caches that for use in each event.
//
// Performance
//
// Benchmark results replicated with `go test -bench=.`:
//
// goos: linux
// goarch: amd64
// pkg: libdb.so/arikawa/v4/utils/handler
// cpu: 12th Gen Intel(R) Core(TM) i5-1240P
// BenchmarkHandleRemove
// BenchmarkHandleRemove-16 2910373 395.2 ns/op
// BenchmarkHandleLatency
// BenchmarkHandleLatency-16 1622205 761.9 ns/op
// BenchmarkHandleSynchronousLatency
// BenchmarkHandleSynchronousLatency-16 10508937 108.2 ns/op
// PASS
// ok libdb.so/arikawa/v4/utils/handler 4.884s
//
// Usage
//
// handler's usage is mostly similar to Discordgo, in that Addhandler expects a
// function with only one argument or an event channel. For more information,
// refer to Addhandler.
package handler
import (
"context"
"sync"
"sync/atomic"
)
// Dispatcher is an interface for dispatching events.
type Dispatcher[T any] interface {
// Dispatch dispatches all handlers with the given event. The method blocks
// until all handlers are done.
Dispatch(ev T)
}
// Handler is an interface for adding callbacks and channels.
type Handler[T any] interface {
// HandleCallback adds a callback function that is called on every dispatched
// event. It returns a function that would remove this handler when called.
// Callbacks are dispatched in its own goroutine.
HandleCallback(fn func(T)) (rm func())
// HandleSynchronousCallback is like AddCallback, but it's called
// synchronously. Use this only for non-blocking operations such as
// dispatching to other handlers.
HandleSynchronousCallback(fn func(T)) (rm func())
// HandleChannel adds the given channel to receive dispatched events. If the
// channel is full, the Dispatch caller will block until the channel is
// available. If a channel is never available, the dispatch goroutine will
// dangle indefinitely.
//
// Keep in mind that the user must NOT close the channel. In fact, the
// channel should not be closed at all. The caller function WILL PANIC if
// the channel is closed!
//
// When the rm callback that is returned is called, it will also guarantee
// that all blocking sends will be cancelled. This helps prevent dangling
// goroutines.
//
// Example usage:
//
// ch := make(chan *gateway.MessageCreateEvent)
// rm := h.Addhandler(ch)
// defer rm()
//
// for ev := range ch {
// // do something with ev
// }
//
HandleChannel(ch chan<- T) (rm func())
// HandleBlockingChannel is like AddChannel, but the Dispatch caller will
// block until the channel is available to receive the event. If the
// channel is never available, the dispatch caller will block indefinitely.
HandleBlockingChannel(ch chan<- T) (rm func())
}
// Add adds a callback function that is called on every dispatched event to
// the given handler. If the dispatched type does not implement the callback's
// argument type, it is ignored. The callback is dispatched asynchronously.
func Add[HandlerT any, EventT any](h Handler[HandlerT], fn func(EventT)) (rm func()) {
assertImpl[HandlerT, EventT]()
return h.HandleSynchronousCallback(func(ev HandlerT) {
if e, ok := any(ev).(EventT); ok {
go fn(e)
}
})
}
// AddSynchronous is like Add, but the callback is dispatched synchronously.
func AddSynchronous[handlerT any, EventT any](h Handler[handlerT], fn func(EventT)) (rm func()) {
assertImpl[handlerT, EventT]()
return h.HandleSynchronousCallback(func(ev handlerT) {
if e, ok := any(ev).(EventT); ok {
fn(e)
}
})
}
// Expect returns a function that blocks until the given callback returns true,
// and then returns the event. If the context is canceled, it returns false.
func Expect[HandlerT, EventT any](h Handler[HandlerT], fn func(EventT) bool) func(context.Context) (EventT, error) {
assertImpl[HandlerT, EventT]()
out := make(chan HandlerT)
rm := h.HandleChannel(out)
return func(ctx context.Context) (EventT, error) {
defer rm()
for {
select {
case <-ctx.Done():
var z EventT
return z, ctx.Err()
case ev := <-out:
v, ok := any(ev).(EventT)
if ok && fn(v) {
return v, nil
}
}
}
}
}
// ExpectCh is like Expect, but it returns a channel instead. The channel is no
// longer sent to when the context is canceled. Unlike Expect, the returned
// channel can receive multiple events.
func ExpectCh[HandlerT, EventT any](ctx context.Context, h Handler[HandlerT], fn func(EventT) bool) <-chan EventT {
assertImpl[HandlerT, EventT]()
evs := make(chan EventT, 1)
out := make(chan HandlerT, 1)
rm := h.HandleChannel(out)
go func() {
defer rm()
for {
select {
case <-ctx.Done():
return
case ev := <-out:
v, ok := any(ev).(EventT)
if ok && fn(v) {
select {
case <-ctx.Done():
return
case evs <- v:
}
}
}
}
}()
return evs
}
// Handlers is a container for command handlers. A zero-value instance is a valid
// instance.
type Handlers[T any] struct {
mutex sync.RWMutex
callers slab[caller[T]] // nil type for interfaces
}
var (
_ Dispatcher[struct{}] = (*Handlers[struct{}])(nil)
_ Handler[struct{}] = (*Handlers[struct{}])(nil)
)
// New constructs a zero-value Handler.
func New[T any]() interface {
Dispatcher[T]
Handler[T]
} {
return &Handlers[T]{callers: newSlab[caller[T]](12)}
}
// Dispatch implements Dispatcher.
func (h *Handlers[T]) Dispatch(ev T) {
h.mutex.RLock()
defer h.mutex.RUnlock()
h.callers.All(func(c caller[T]) bool {
c.Call(ev)
return true
})
}
// HandleCallback implements Handler.
func (h *Handlers[T]) HandleCallback(Handler func(T)) (rm func()) {
return h.add(callback[T]{fn: Handler, async: true})
}
// HandleSynchronousCallback implements Handler.
func (h *Handlers[T]) HandleSynchronousCallback(Handler func(T)) (rm func()) {
return h.add(callback[T]{fn: Handler})
}
// HandleChannel implements Handler.
func (h *Handlers[T]) HandleChannel(ch chan<- T) (rm func()) {
return h.add(channel[T]{ch: ch, close: make(chan struct{}), async: true})
}
// HandleBlockingChannel implements Handler.
func (h *Handlers[T]) HandleBlockingChannel(ch chan<- T) (rm func()) {
return h.add(channel[T]{ch: ch, close: make(chan struct{})})
}
func (h *Handlers[T]) add(c caller[T]) (rm func()) {
h.mutex.Lock()
defer h.mutex.Unlock()
i := h.callers.Put(c)
var gone atomic.Bool
return func() {
if !gone.CompareAndSwap(false, true) {
return
}
h.mutex.Lock()
c := h.callers.Pop(i)
h.mutex.Unlock()
c.Close()
}
}
type caller[T any] interface {
Call(T)
Close()
}
var (
_ caller[struct{}] = callback[struct{}]{}
_ caller[struct{}] = channel[struct{}]{}
)
type callback[T any] struct {
fn func(T)
async bool
}
func (c callback[T]) Call(v T) {
if c.async {
go c.fn(v)
} else {
c.fn(v)
}
}
func (c callback[T]) Close() {}
type channel[T any] struct {
ch chan<- T
close chan struct{}
async bool
}
func (c channel[T]) Call(v T) {
select {
case <-c.close:
return
default:
}
if c.async {
go func() {
select {
case c.ch <- v:
case <-c.close:
}
}()
} else {
select {
case c.ch <- v:
case <-c.close:
}
}
}
func (c channel[T]) Close() {
select {
case <-c.close:
default:
close(c.close)
}
}