mirror of
https://github.com/diamondburned/arikawa.git
synced 2024-12-22 05:07:08 +00:00
Heart: Better synchronization on close methods
This commit is contained in:
parent
362929fad5
commit
77b1b08bce
|
@ -10,6 +10,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/diamondburned/arikawa/internal/heart"
|
||||||
"github.com/diamondburned/arikawa/utils/wsutil"
|
"github.com/diamondburned/arikawa/utils/wsutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,6 +18,9 @@ func init() {
|
||||||
wsutil.WSDebug = func(v ...interface{}) {
|
wsutil.WSDebug = func(v ...interface{}) {
|
||||||
log.Println(append([]interface{}{"Debug:"}, v...)...)
|
log.Println(append([]interface{}{"Debug:"}, v...)...)
|
||||||
}
|
}
|
||||||
|
heart.Debug = func(v ...interface{}) {
|
||||||
|
log.Println(append([]interface{}{"Heart:"}, v...)...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInvalidToken(t *testing.T) {
|
func TestInvalidToken(t *testing.T) {
|
||||||
|
|
|
@ -32,29 +32,6 @@ func (t *AtomicTime) Time() time.Time {
|
||||||
return time.Unix(0, t.Get())
|
return time.Unix(0, t.Get())
|
||||||
}
|
}
|
||||||
|
|
||||||
type atomicStop atomic.Value
|
|
||||||
|
|
||||||
func (s *atomicStop) Stop() bool {
|
|
||||||
if v := (*atomic.Value)(s).Load(); v != nil {
|
|
||||||
ch := v.(chan struct{})
|
|
||||||
close(ch)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
func (s *atomicStop) Recv() <-chan struct{} {
|
|
||||||
if v := (*atomic.Value)(s).Load(); v != nil {
|
|
||||||
return v.(chan struct{})
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (s *atomicStop) SetNil() {
|
|
||||||
(*atomic.Value)(s).Store((chan struct{})(nil))
|
|
||||||
}
|
|
||||||
func (s *atomicStop) Reset() {
|
|
||||||
(*atomic.Value)(s).Store(make(chan struct{}))
|
|
||||||
}
|
|
||||||
|
|
||||||
type Pacemaker struct {
|
type Pacemaker struct {
|
||||||
// Heartrate is the received duration between heartbeats.
|
// Heartrate is the received duration between heartbeats.
|
||||||
Heartrate time.Duration
|
Heartrate time.Duration
|
||||||
|
@ -66,7 +43,8 @@ type Pacemaker struct {
|
||||||
// Any callback that returns an error will stop the pacer.
|
// Any callback that returns an error will stop the pacer.
|
||||||
Pace func(context.Context) error
|
Pace func(context.Context) error
|
||||||
|
|
||||||
stop atomicStop
|
stop chan struct{}
|
||||||
|
once sync.Once
|
||||||
death chan error
|
death chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +57,6 @@ func NewPacemaker(heartrate time.Duration, pacer func(context.Context) error) *P
|
||||||
|
|
||||||
func (p *Pacemaker) Echo() {
|
func (p *Pacemaker) Echo() {
|
||||||
// Swap our received heartbeats
|
// Swap our received heartbeats
|
||||||
// p.LastBeat[0], p.LastBeat[1] = time.Now(), p.LastBeat[0]
|
|
||||||
p.EchoBeat.Set(time.Now())
|
p.EchoBeat.Set(time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,11 +84,12 @@ func (p *Pacemaker) Dead() bool {
|
||||||
|
|
||||||
// Stop stops the pacemaker, or it does nothing if the pacemaker is not started.
|
// Stop stops the pacemaker, or it does nothing if the pacemaker is not started.
|
||||||
func (p *Pacemaker) Stop() {
|
func (p *Pacemaker) Stop() {
|
||||||
if p.stop.Stop() {
|
Debug("(*Pacemaker).stop is trying sync.Once.")
|
||||||
Debug("(*Pacemaker).stop was sent a stop signal.")
|
|
||||||
} else {
|
p.once.Do(func() {
|
||||||
Debug("(*Pacemaker).stop is nil, skipping.")
|
Debug("(*Pacemaker).stop closed the channel.")
|
||||||
}
|
close(p.stop)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// pace sends a heartbeat with the appropriate timeout for the context.
|
// pace sends a heartbeat with the appropriate timeout for the context.
|
||||||
|
@ -147,7 +125,7 @@ func (p *Pacemaker) start() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-p.stop.Recv():
|
case <-p.stop:
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
case <-tick.C:
|
case <-tick.C:
|
||||||
|
@ -158,7 +136,8 @@ func (p *Pacemaker) start() error {
|
||||||
// StartAsync starts the pacemaker asynchronously. The WaitGroup is optional.
|
// StartAsync starts the pacemaker asynchronously. The WaitGroup is optional.
|
||||||
func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
|
func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
|
||||||
p.death = make(chan error)
|
p.death = make(chan error)
|
||||||
p.stop.Reset()
|
p.stop = make(chan struct{})
|
||||||
|
p.once = sync.Once{}
|
||||||
|
|
||||||
if wg != nil {
|
if wg != nil {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
@ -168,8 +147,6 @@ func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
|
||||||
p.death <- p.start()
|
p.death <- p.start()
|
||||||
// Debug.
|
// Debug.
|
||||||
Debug("Pacemaker returned.")
|
Debug("Pacemaker returned.")
|
||||||
// Mark the stop channel as nil, so later Close() calls won't block forever.
|
|
||||||
p.stop.SetNil()
|
|
||||||
|
|
||||||
// Mark the pacemaker loop as done.
|
// Mark the pacemaker loop as done.
|
||||||
if wg != nil {
|
if wg != nil {
|
||||||
|
|
Loading…
Reference in a new issue