From f33b4ff7d803fc69c4031972bc002ceccfc28cdc Mon Sep 17 00:00:00 2001 From: diamondburned Date: Sat, 11 Jul 2020 12:49:28 -0700 Subject: [PATCH] wsutil: API changed to support contexts --- utils/heart/heart.go | 19 +++++++++++++----- utils/wsutil/heart.go | 21 ++++++++++---------- utils/wsutil/op.go | 45 ++++++++++++++++++++++++++----------------- utils/wsutil/ws.go | 5 ++--- 4 files changed, 54 insertions(+), 36 deletions(-) diff --git a/utils/heart/heart.go b/utils/heart/heart.go index 2c19a2e..b9d3ba7 100644 --- a/utils/heart/heart.go +++ b/utils/heart/heart.go @@ -2,6 +2,7 @@ package heart import ( + "context" "sync" "sync/atomic" "time" @@ -63,13 +64,13 @@ type Pacemaker struct { EchoBeat AtomicTime // Any callback that returns an error will stop the pacer. - Pace func() error + Pace func(context.Context) error stop atomicStop death chan error } -func NewPacemaker(heartrate time.Duration, pacer func() error) *Pacemaker { +func NewPacemaker(heartrate time.Duration, pacer func(context.Context) error) *Pacemaker { return &Pacemaker{ Heartrate: heartrate, Pace: pacer, @@ -104,6 +105,7 @@ func (p *Pacemaker) Dead() bool { return sent-echo > int64(p.Heartrate)*2 } +// Stop stops the pacemaker, or it does nothing if the pacemaker is not started. func (p *Pacemaker) Stop() { if p.stop.Stop() { Debug("(*Pacemaker).stop was sent a stop signal.") @@ -112,6 +114,14 @@ func (p *Pacemaker) Stop() { } } +// pace sends a heartbeat with the appropriate timeout for the context. +func (p *Pacemaker) pace() error { + ctx, cancel := context.WithTimeout(context.Background(), p.Heartrate) + defer cancel() + + return p.Pace(ctx) +} + func (p *Pacemaker) start() error { // Reset states to its old position. p.EchoBeat.Set(time.Time{}) @@ -125,9 +135,8 @@ func (p *Pacemaker) start() error { p.Echo() for { - - if err := p.Pace(); err != nil { - return err + if err := p.pace(); err != nil { + return errors.Wrap(err, "failed to pace") } // Paced, save: diff --git a/utils/wsutil/heart.go b/utils/wsutil/heart.go index 4784e06..0a782b9 100644 --- a/utils/wsutil/heart.go +++ b/utils/wsutil/heart.go @@ -1,6 +1,7 @@ package wsutil import ( + "context" "time" "github.com/diamondburned/arikawa/utils/heart" @@ -9,10 +10,9 @@ import ( ) // TODO API -type EventLoop interface { - Heartbeat() error - HandleOP(*OP) error - // HandleEvent(ev Event) error +type EventLoopHandler interface { + EventHandler + HeartbeatCtx(context.Context) error } // PacemakerLoop provides an event loop with a pacemaker. @@ -30,11 +30,9 @@ type PacemakerLoop struct { ErrorLog func(error) } -func NewLoop(heartrate time.Duration, evs <-chan Event, evl EventLoop) *PacemakerLoop { - pacemaker := heart.NewPacemaker(heartrate, evl.Heartbeat) - +func NewLoop(heartrate time.Duration, evs <-chan Event, evl EventLoopHandler) *PacemakerLoop { return &PacemakerLoop{ - pacemaker: pacemaker, + pacemaker: heart.NewPacemaker(heartrate, evl.HeartbeatCtx), events: evs, handler: evl.HandleOP, } @@ -49,14 +47,17 @@ func (p *PacemakerLoop) errorLog(err error) { p.ErrorLog(err) } -func (p *PacemakerLoop) Pace() error { - return p.pacemaker.Pace() +// Pace calls the pacemaker's Pace function. +func (p *PacemakerLoop) Pace(ctx context.Context) error { + return p.pacemaker.Pace(ctx) } +// Echo calls the pacemaker's Echo function. func (p *PacemakerLoop) Echo() { p.pacemaker.Echo() } +// Stop calls the pacemaker's Stop function. func (p *PacemakerLoop) Stop() { p.pacemaker.Stop() } diff --git a/utils/wsutil/op.go b/utils/wsutil/op.go index 437cf61..6ccb31a 100644 --- a/utils/wsutil/op.go +++ b/utils/wsutil/op.go @@ -1,6 +1,7 @@ package wsutil import ( + "context" "fmt" "sync" @@ -79,28 +80,36 @@ func HandleEvent(h EventHandler, ev Event) error { // WaitForEvent blocks until fn() returns true. All incoming events are handled // regardless. -func WaitForEvent(h EventHandler, ch <-chan Event, fn func(*OP) bool) error { - for ev := range ch { - o, err := DecodeOP(ev) - if err != nil { - return err - } +func WaitForEvent(ctx context.Context, h EventHandler, ch <-chan Event, fn func(*OP) bool) error { + for { + select { + case e, ok := <-ch: + if !ok { + return errors.New("event not found and event channel is closed") + } - // Handle the *OP first, in case it's an Invalid Session. This should - // also prevent a race condition with things that need Ready after - // Open(). - if err := h.HandleOP(o); err != nil { - return err - } + o, err := DecodeOP(e) + if err != nil { + return err + } - // Are these events what we're looking for? If we've found the event, - // return. - if fn(o) { - return nil + // Handle the *OP first, in case it's an Invalid Session. This should + // also prevent a race condition with things that need Ready after + // Open(). + if err := h.HandleOP(o); err != nil { + return err + } + + // Are these events what we're looking for? If we've found the event, + // return. + if fn(o) { + return nil + } + + case <-ctx.Done(): + return ctx.Err() } } - - return errors.New("event not found and event channel is closed") } type ExtraHandlers struct { diff --git a/utils/wsutil/ws.go b/utils/wsutil/ws.go index c4e5c64..5db7f78 100644 --- a/utils/wsutil/ws.go +++ b/utils/wsutil/ws.go @@ -93,11 +93,10 @@ func (ws *Websocket) Listen() <-chan Event { } func (ws *Websocket) Send(b []byte) error { - return ws.SendContext(context.Background(), b) + return ws.SendCtx(context.Background(), b) } -// SendContext is a beta API. -func (ws *Websocket) SendContext(ctx context.Context, b []byte) error { +func (ws *Websocket) SendCtx(ctx context.Context, b []byte) error { if err := ws.SendLimiter.Wait(ctx); err != nil { return errors.Wrap(err, "SendLimiter failed") }