From aee547fa1ffc42bc57e0e110cc83b28384578559 Mon Sep 17 00:00:00 2001 From: diamondburned Date: Fri, 1 Apr 2022 04:41:22 -0700 Subject: [PATCH] ws: Add RawEvent for debugging --- utils/handler/handler.go | 16 ++++++++++++---- utils/ws/codec.go | 33 +++++++++++++++++++++++++++------ utils/ws/conn.go | 37 +++++++++++++++++++++++-------------- utils/ws/op.go | 18 ++++++++++++++++++ 4 files changed, 80 insertions(+), 24 deletions(-) diff --git a/utils/handler/handler.go b/utils/handler/handler.go index 47ab8e8..1de95bb 100644 --- a/utils/handler/handler.go +++ b/utils/handler/handler.go @@ -39,20 +39,28 @@ func New() *Handler { // Call calls all handlers with the given event. This is an internal method; use // with care. func (h *Handler) Call(ev interface{}) { - v := reflect.ValueOf(ev) - t := v.Type() + t := reflect.TypeOf(ev) h.mutex.RLock() defer h.mutex.RUnlock() - for _, entry := range h.events[t].Entries { + typedHandlers := h.events[t].Entries + anyHandlers := h.events[nil].Entries + + if len(typedHandlers) == 0 && len(anyHandlers) == 0 { + return + } + + v := reflect.ValueOf(ev) + + for _, entry := range typedHandlers { if entry.isInvalid() { continue } entry.Call(v) } - for _, entry := range h.events[nil].Entries { + for _, entry := range anyHandlers { if entry.isInvalid() || entry.not(t) { continue } diff --git a/utils/ws/codec.go b/utils/ws/codec.go index 0cf1cd3..497495c 100644 --- a/utils/ws/codec.go +++ b/utils/ws/codec.go @@ -1,6 +1,7 @@ package ws import ( + "context" "io" "net/http" @@ -50,15 +51,26 @@ func NewDecodeBuffer(cap int) DecodeBuffer { } } -// DecodeFrom reads the given reader and decodes it into an Op. +// DecodeInto reads the given reader and decodes it into the Op out channel. // // buf is optional. -func (c Codec) DecodeFrom(r io.Reader, buf *DecodeBuffer) Op { +func (c Codec) DecodeInto(ctx context.Context, r io.Reader, buf *DecodeBuffer, out chan<- Op) error { var op codecOp op.Data = json.Raw(buf.buf) if err := json.DecodeStream(r, &op); err != nil { - return newErrOp(err, "cannot read JSON stream") + return c.send(ctx, out, newErrOp(err, "cannot read JSON stream")) + } + + if EnableRawEvents { + dt := op.Data + op := op.Op + op.Data = &RawEvent{ + Raw: dt, + OriginalCode: op.Code, + OriginalType: op.Type, + } + c.send(ctx, out, op) } // buf isn't grown from here out. Set it back right now. If Data hasn't been @@ -73,15 +85,24 @@ func (c Codec) DecodeFrom(r io.Reader, buf *DecodeBuffer) Op { Op: op.Code, Type: op.Type, } - return newErrOp(err, "") + return c.send(ctx, out, newErrOp(err, "")) } op.Op.Data = fn() if err := op.Data.UnmarshalTo(op.Op.Data); err != nil { - return newErrOp(err, "cannot unmarshal JSON data from gateway") + return c.send(ctx, out, newErrOp(err, "cannot unmarshal JSON data from gateway")) } - return op.Op + return c.send(ctx, out, op.Op) +} + +func (c *Codec) send(ctx context.Context, ch chan<- Op, op Op) error { + select { + case ch <- op: + return nil + case <-ctx.Done(): + return ctx.Err() + } } func newErrOp(err error, wrap string) Op { diff --git a/utils/ws/conn.go b/utils/ws/conn.go index 822c627..3423b30 100644 --- a/utils/ws/conn.go +++ b/utils/ws/conn.go @@ -55,8 +55,9 @@ type Conn struct { } type connMutex struct { - wrmut chan struct{} *websocket.Conn + wrmut chan struct{} + cancel context.CancelFunc } var _ Connection = (*Conn)(nil) @@ -101,12 +102,15 @@ func (c *Conn) Dial(ctx context.Context, addr string) (<-chan Op, error) { return nil, errors.Wrap(err, "failed to dial WS") } + ctx, cancel := context.WithCancel(context.Background()) + events := make(chan Op, 1) - go readLoop(conn, c.codec, events) + go readLoop(ctx, conn, c.codec, events) c.conn = &connMutex{ - wrmut: make(chan struct{}, 1), - Conn: conn, + wrmut: make(chan struct{}, 1), + Conn: conn, + cancel: cancel, } return events, err @@ -168,6 +172,10 @@ func (c *connMutex) close(timeout time.Duration, gracefully bool) error { } c.Conn = nil + + c.cancel() + c.cancel = nil + return err } @@ -212,7 +220,7 @@ type loopState struct { buf DecodeBuffer } -func readLoop(conn *websocket.Conn, codec Codec, opCh chan<- Op) { +func readLoop(ctx context.Context, conn *websocket.Conn, codec Codec, opCh chan<- Op) { // Clean up the events channel in the end. defer close(opCh) @@ -224,8 +232,7 @@ func readLoop(conn *websocket.Conn, codec Codec, opCh chan<- Op) { } for { - b, err := state.handle() - if err != nil { + if err := state.handle(ctx, opCh); err != nil { WSDebug("Conn: fatal Conn error:", err) closeEv := &CloseEvent{ @@ -247,16 +254,14 @@ func readLoop(conn *websocket.Conn, codec Codec, opCh chan<- Op) { return } - - opCh <- b } } -func (state *loopState) handle() (Op, error) { +func (state *loopState) handle(ctx context.Context, opCh chan<- Op) error { // skip message type t, r, err := state.conn.NextReader() if err != nil { - return Op{}, err + return err } if t == websocket.BinaryMessage { @@ -265,12 +270,12 @@ func (state *loopState) handle() (Op, error) { if state.zlib == nil { z, err := zlib.NewReader(r) if err != nil { - return Op{}, errors.Wrap(err, "failed to create a zlib reader") + return errors.Wrap(err, "failed to create a zlib reader") } state.zlib = z } else { if err := state.zlib.(zlib.Resetter).Reset(r, nil); err != nil { - return Op{}, errors.Wrap(err, "failed to reset zlib reader") + return errors.Wrap(err, "failed to reset zlib reader") } } @@ -278,5 +283,9 @@ func (state *loopState) handle() (Op, error) { r = state.zlib } - return state.codec.DecodeFrom(r, &state.buf), nil + if err := state.codec.DecodeInto(ctx, r, &state.buf, opCh); err != nil { + return errors.Wrap(err, "error distributing event") + } + + return nil } diff --git a/utils/ws/op.go b/utils/ws/op.go index 08c99a8..278e1c2 100644 --- a/utils/ws/op.go +++ b/utils/ws/op.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/diamondburned/arikawa/v3/utils/json" "github.com/pkg/errors" ) @@ -35,6 +36,23 @@ func (e *CloseEvent) Op() OpCode { return -1 } // EventType implements Event. It returns an emty string. func (e *CloseEvent) EventType() EventType { return "__ws.CloseEvent" } +// EnableRawEvents, if true, will cause ws to generate a RawEvent for each +// regular Event. It should only be used for debugging. +var EnableRawEvents = false + +// RawEvent is used if EnableRawEvents is true. +type RawEvent struct { + json.Raw + OriginalCode OpCode `json:"-"` + OriginalType EventType `json:"-"` +} + +// Op implements Event. It returns -1. +func (e *RawEvent) Op() OpCode { return -1 } + +// EventType implements Event. It returns an emty string. +func (e *RawEvent) EventType() EventType { return "__ws.RawEvent" } + // EventType is a type for event types, which is the "t" field in the payload. type EventType string