ws: Add LastAcknowledgedBeat
This commit adds ws.Handler.LastAcknowledgedBeat to allow ws.Gateway to monitor whether or not the server is still reachable. It fixes #324.
This commit is contained in:
parent
e09abfdbcb
commit
649dd36086
|
@ -392,6 +392,8 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool {
|
|||
now := time.Now()
|
||||
|
||||
g.beatMutex.Lock()
|
||||
// Keep sentBeat separately with the echoed beat to calculate the
|
||||
// gateway latency properly.
|
||||
g.sentBeat = g.lastSentBeat
|
||||
g.echoBeat = now
|
||||
g.beatMutex.Unlock()
|
||||
|
@ -408,15 +410,26 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool {
|
|||
|
||||
// SendHeartbeat sends a heartbeat with the gateway's current sequence.
|
||||
func (g *gatewayImpl) SendHeartbeat(ctx context.Context) {
|
||||
g.lastSentBeat = time.Now()
|
||||
|
||||
sequence := HeartbeatCommand(g.state.Sequence)
|
||||
|
||||
if err := g.gateway.Send(ctx, &sequence); err != nil {
|
||||
g.gateway.SendErrorWrap(err, "heartbeat error")
|
||||
g.gateway.QueueReconnect()
|
||||
} else {
|
||||
g.beatMutex.Lock()
|
||||
g.lastSentBeat = time.Now()
|
||||
g.beatMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// LastAcknowledgedBeat returns the last acknowledged beat.
|
||||
func (g *gatewayImpl) LastAcknowledgedBeat() time.Time {
|
||||
g.beatMutex.Lock()
|
||||
defer g.beatMutex.Unlock()
|
||||
|
||||
return g.echoBeat
|
||||
}
|
||||
|
||||
// Close closes the state.
|
||||
func (g *gatewayImpl) Close() error {
|
||||
g.retryTimer.Stop()
|
||||
|
|
|
@ -11,6 +11,10 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ErrHeartbeatTimeout is returned if the server fails to acknowledge our heart
|
||||
// beat in time.
|
||||
var ErrHeartbeatTimeout = errors.New("server timed out replying to heartbeat")
|
||||
|
||||
// ConnectionError is given to the user if the gateway fails to connect to the
|
||||
// gateway for any reason, including during an initial connection or a
|
||||
// reconnection. To check for this error, use the errors.As function.
|
||||
|
@ -95,8 +99,11 @@ var DefaultGatewayOpts = GatewayOpts{
|
|||
type Gateway struct {
|
||||
ws *Websocket
|
||||
|
||||
reconnect chan struct{}
|
||||
heart lazytime.Ticker
|
||||
heartRate time.Duration
|
||||
lastBeat time.Time
|
||||
|
||||
reconnect chan struct{}
|
||||
srcOp <-chan Op // from WS
|
||||
outer outerState
|
||||
lastError error
|
||||
|
@ -104,6 +111,19 @@ type Gateway struct {
|
|||
opts GatewayOpts
|
||||
}
|
||||
|
||||
// HeartbeatInfo is the heart rate information. It is used to ensure that the
|
||||
// gateway is alive.
|
||||
type HeartbeatInfo struct {
|
||||
LastAcknowledged time.Time
|
||||
LastSent time.Time
|
||||
}
|
||||
|
||||
// ShouldReconnect returns true if the heartbeat info and the heart rate
|
||||
// suggests that we should reconnect.
|
||||
func (i HeartbeatInfo) ShouldReconnect(hr time.Duration) bool {
|
||||
return i.LastAcknowledged.Add(2 * hr).Before(i.LastSent)
|
||||
}
|
||||
|
||||
// outerState holds gateway state that the caller may change concurrently. As
|
||||
// such, it holds a mutex to allow that. The main purpose of this
|
||||
// synchronization is to allow the caller to use the gateway while the event
|
||||
|
@ -125,6 +145,9 @@ type Handler interface {
|
|||
// SendHeartbeat is called by the gateway event loop everytime a heartbeat
|
||||
// needs to be sent over.
|
||||
SendHeartbeat(context.Context)
|
||||
// LastAcknowledgedBeat returns the last time that the server acknowledged
|
||||
// our heart beat.
|
||||
LastAcknowledgedBeat() time.Time
|
||||
// Close closes the handler.
|
||||
Close() error
|
||||
}
|
||||
|
@ -263,6 +286,7 @@ func (g *Gateway) QueueReconnect() {
|
|||
// ResetHeartbeat resets the heartbeat to be the given duration.
|
||||
func (g *Gateway) ResetHeartbeat(d time.Duration) {
|
||||
g.heart.Reset(d)
|
||||
g.heartRate = d
|
||||
}
|
||||
|
||||
// SendError sends the given error wrapped in a BackgroundErrorEvent into the
|
||||
|
@ -327,6 +351,17 @@ func (g *Gateway) spin(ctx context.Context, h Handler) {
|
|||
g.lastError = nil
|
||||
|
||||
case <-g.heart.C:
|
||||
const missThreshold = 2 // allow 2 heart beat misses
|
||||
|
||||
if !g.lastBeat.IsZero() {
|
||||
if h.LastAcknowledgedBeat().Add(missThreshold * g.heartRate).Before(g.lastBeat) {
|
||||
g.SendError(ErrHeartbeatTimeout)
|
||||
g.QueueReconnect()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
g.lastBeat = time.Now()
|
||||
h.SendHeartbeat(ctx)
|
||||
|
||||
case <-g.reconnect:
|
||||
|
@ -339,6 +374,11 @@ func (g *Gateway) spin(ctx context.Context, h Handler) {
|
|||
// Invalidate our srcOp.
|
||||
g.srcOp = nil
|
||||
|
||||
// Invalidate our last sent beat timestamp so we don't mistakenly
|
||||
// think that the gateway is dead after we reconnect and before we
|
||||
// sent a heartbeat.
|
||||
g.lastBeat = time.Time{}
|
||||
|
||||
// Keep track of the last error for notifying.
|
||||
var err error
|
||||
|
||||
|
|
|
@ -45,8 +45,9 @@ type Gateway struct {
|
|||
gateway *ws.Gateway
|
||||
state State // constant
|
||||
|
||||
mutex sync.RWMutex
|
||||
ready *ReadyEvent
|
||||
mutex sync.RWMutex
|
||||
beatAck time.Time
|
||||
ready *ReadyEvent
|
||||
}
|
||||
|
||||
// DefaultGatewayOpts contains the default options to be used for connecting to
|
||||
|
@ -187,6 +188,10 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool {
|
|||
g.gateway.QueueReconnect()
|
||||
}
|
||||
}
|
||||
case *HeartbeatAckEvent:
|
||||
g.mutex.Lock()
|
||||
g.beatAck = time.Now()
|
||||
g.mutex.Unlock()
|
||||
case *ReadyEvent:
|
||||
g.mutex.Lock()
|
||||
g.ready = data
|
||||
|
@ -204,6 +209,13 @@ func (g *gatewayImpl) SendHeartbeat(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (g *gatewayImpl) LastAcknowledgedBeat() time.Time {
|
||||
g.mutex.RLock()
|
||||
defer g.mutex.RUnlock()
|
||||
|
||||
return g.beatAck
|
||||
}
|
||||
|
||||
func (g *gatewayImpl) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue