1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2024-11-30 10:43:30 +00:00

Session: Added a handleloop abstraction

This abstraction will mainly be in charge of handling events from a
channel and dispatching them to handlers in a thread safe manner. It
boxes synchronizing mechanisms inside a struct.
This commit is contained in:
diamondburned 2020-11-17 11:09:51 -08:00 committed by diamondburned
parent 38fe1fafd0
commit 81b1a0a902
2 changed files with 70 additions and 31 deletions

View file

@ -0,0 +1,60 @@
// Package handleloop provides clean abstractions to handle listening to
// channels and passing them onto event handlers.
package handleloop
import "github.com/diamondburned/arikawa/v2/utils/handler"
// Loop provides a reusable event looper abstraction. It is thread-safe to use
// concurrently.
type Loop struct {
dst *handler.Handler
run chan struct{}
stop chan struct{}
}
func NewLoop(dst *handler.Handler) *Loop {
return &Loop{
dst: dst,
run: make(chan struct{}, 1), // intentional 1 buffer
stop: make(chan struct{}), // intentional unbuffer
}
}
// Start starts a new event loop. It will try to stop existing loops before.
func (l *Loop) Start(src <-chan interface{}) {
// Ensure we're stopped.
l.Stop()
// Mark that we're running.
l.run <- struct{}{}
go func() {
for {
select {
case event := <-src:
l.dst.Call(event)
case <-l.stop:
l.stop <- struct{}{}
return
}
}
}()
}
// Stop tries to stop the Loop. If the Loop is not running, then it does
// nothing; thus, it can be called multiple times.
func (l *Loop) Stop() {
// Ensure that we are running before stopping.
select {
case <-l.run:
// running
default:
return
}
// send a close request
l.stop <- struct{}{}
// wait for a reply
<-l.stop
}

View file

@ -5,12 +5,12 @@ package session
import (
"context"
"sync"
"github.com/pkg/errors"
"github.com/diamondburned/arikawa/v2/api"
"github.com/diamondburned/arikawa/v2/gateway"
"github.com/diamondburned/arikawa/v2/internal/handleloop"
"github.com/diamondburned/arikawa/v2/utils/handler"
)
@ -38,19 +38,7 @@ type Session struct {
*handler.Handler
// internal state to not be copied around.
*sessionState
}
// sessionState contains fields crucial for controlling the state of session. It
// should not be copied around.
type sessionState struct {
hstop chan struct{}
wstop sync.Once
}
func (state *sessionState) Reset() {
state.hstop = make(chan struct{})
state.wstop = sync.Once{}
looper *handleloop.Loop
}
func NewWithIntents(token string, intents ...gateway.Intents) (*Session, error) {
@ -105,19 +93,21 @@ func Login(email, password, mfa string) (*Session, error) {
}
func NewWithGateway(gw *gateway.Gateway) *Session {
handler := handler.New()
looper := handleloop.NewLoop(handler)
return &Session{
Gateway: gw,
// Nab off gateway's token
Client: api.NewClient(gw.Identifier.Token),
Handler: handler.New(),
sessionState: &sessionState{},
Client: api.NewClient(gw.Identifier.Token),
Handler: handler,
looper: looper,
}
}
func (s *Session) Open() error {
// Start the handler beforehand so no events are missed.
s.sessionState.Reset()
go s.startHandler()
s.looper.Start(s.Gateway.Events)
// Set the AfterClose's handler.
s.Gateway.AfterClose = func(err error) {
@ -145,20 +135,9 @@ func (s *Session) WithContext(ctx context.Context) *Session {
return &cpy
}
func (s *Session) startHandler() {
for {
select {
case <-s.hstop:
return
case ev := <-s.Gateway.Events:
s.Call(ev)
}
}
}
func (s *Session) Close() error {
// Stop the event handler
s.wstop.Do(func() { close(s.hstop) })
s.looper.Stop()
// Close the websocket
return s.Gateway.Close()
}