From 81b1a0a902f85b7c316586e64afddd175e0d2888 Mon Sep 17 00:00:00 2001 From: diamondburned Date: Tue, 17 Nov 2020 11:09:51 -0800 Subject: [PATCH] Session: Added a handleloop abstraction This abstraction will mainly be in charge of handling events from a channel and dispatching them to handlers in a thread safe manner. It boxes synchronizing mechanisms inside a struct. --- internal/handleloop/handleloop.go | 60 +++++++++++++++++++++++++++++++ session/session.go | 41 ++++++--------------- 2 files changed, 70 insertions(+), 31 deletions(-) create mode 100644 internal/handleloop/handleloop.go diff --git a/internal/handleloop/handleloop.go b/internal/handleloop/handleloop.go new file mode 100644 index 0000000..11cecff --- /dev/null +++ b/internal/handleloop/handleloop.go @@ -0,0 +1,60 @@ +// Package handleloop provides clean abstractions to handle listening to +// channels and passing them onto event handlers. +package handleloop + +import "github.com/diamondburned/arikawa/v2/utils/handler" + +// Loop provides a reusable event looper abstraction. It is thread-safe to use +// concurrently. +type Loop struct { + dst *handler.Handler + run chan struct{} + stop chan struct{} +} + +func NewLoop(dst *handler.Handler) *Loop { + return &Loop{ + dst: dst, + run: make(chan struct{}, 1), // intentional 1 buffer + stop: make(chan struct{}), // intentional unbuffer + } +} + +// Start starts a new event loop. It will try to stop existing loops before. +func (l *Loop) Start(src <-chan interface{}) { + // Ensure we're stopped. + l.Stop() + + // Mark that we're running. + l.run <- struct{}{} + + go func() { + for { + select { + case event := <-src: + l.dst.Call(event) + + case <-l.stop: + l.stop <- struct{}{} + return + } + } + }() +} + +// Stop tries to stop the Loop. If the Loop is not running, then it does +// nothing; thus, it can be called multiple times. +func (l *Loop) Stop() { + // Ensure that we are running before stopping. + select { + case <-l.run: + // running + default: + return + } + + // send a close request + l.stop <- struct{}{} + // wait for a reply + <-l.stop +} diff --git a/session/session.go b/session/session.go index 9bc890a..c0d94ad 100644 --- a/session/session.go +++ b/session/session.go @@ -5,12 +5,12 @@ package session import ( "context" - "sync" "github.com/pkg/errors" "github.com/diamondburned/arikawa/v2/api" "github.com/diamondburned/arikawa/v2/gateway" + "github.com/diamondburned/arikawa/v2/internal/handleloop" "github.com/diamondburned/arikawa/v2/utils/handler" ) @@ -38,19 +38,7 @@ type Session struct { *handler.Handler // internal state to not be copied around. - *sessionState -} - -// sessionState contains fields crucial for controlling the state of session. It -// should not be copied around. -type sessionState struct { - hstop chan struct{} - wstop sync.Once -} - -func (state *sessionState) Reset() { - state.hstop = make(chan struct{}) - state.wstop = sync.Once{} + looper *handleloop.Loop } func NewWithIntents(token string, intents ...gateway.Intents) (*Session, error) { @@ -105,19 +93,21 @@ func Login(email, password, mfa string) (*Session, error) { } func NewWithGateway(gw *gateway.Gateway) *Session { + handler := handler.New() + looper := handleloop.NewLoop(handler) + return &Session{ Gateway: gw, // Nab off gateway's token - Client: api.NewClient(gw.Identifier.Token), - Handler: handler.New(), - sessionState: &sessionState{}, + Client: api.NewClient(gw.Identifier.Token), + Handler: handler, + looper: looper, } } func (s *Session) Open() error { // Start the handler beforehand so no events are missed. - s.sessionState.Reset() - go s.startHandler() + s.looper.Start(s.Gateway.Events) // Set the AfterClose's handler. s.Gateway.AfterClose = func(err error) { @@ -145,20 +135,9 @@ func (s *Session) WithContext(ctx context.Context) *Session { return &cpy } -func (s *Session) startHandler() { - for { - select { - case <-s.hstop: - return - case ev := <-s.Gateway.Events: - s.Call(ev) - } - } -} - func (s *Session) Close() error { // Stop the event handler - s.wstop.Do(func() { close(s.hstop) }) + s.looper.Stop() // Close the websocket return s.Gateway.Close() }