From 784d4a6006c9edf533d8cb14895d98232d42ed56 Mon Sep 17 00:00:00 2001 From: yyasha Date: Wed, 5 Jun 2024 01:08:40 +0300 Subject: [PATCH] fix ws closing --- bybit_websocket.go | 52 ++++++++++++++++++++++++++++------------------ go.mod | 1 + go.sum | 2 ++ 3 files changed, 35 insertions(+), 20 deletions(-) diff --git a/bybit_websocket.go b/bybit_websocket.go index 99e3dd9..8f8355a 100644 --- a/bybit_websocket.go +++ b/bybit_websocket.go @@ -5,28 +5,30 @@ import ( "crypto/hmac" "crypto/sha256" "encoding/hex" + "errors" "fmt" "time" "github.com/google/uuid" "github.com/gorilla/websocket" + "golang.org/x/sync/errgroup" ) type MessageHandler func(message string) error -func (b *WebSocket) handleIncomingMessages() { +func (b *WebSocket) handleIncomingMessages() error { for { _, message, err := b.conn.ReadMessage() if err != nil { fmt.Println("Error reading:", err) - return + return err } if b.onMessage != nil { err := b.onMessage(string(message)) if err != nil { fmt.Println("Error handling message:", err) - return + return err } } } @@ -112,35 +114,45 @@ func (b *WebSocket) Connect(args []string) error { } } - go b.handleIncomingMessages() + eg := errgroup.Group{} b.ctx, b.cancel = context.WithCancel(context.Background()) - Ping(b) - return b.sendSubscription(args) + eg.Go(b.Ping) + eg.Go(b.handleIncomingMessages) + + if err := eg.Wait(); err != nil { + b.Disconnect() + return fmt.Errorf("failed to handle message: %w", err) + } + + return nil } -func Ping(b *WebSocket) { +var ErrPingFailed = errors.New("failed to send ping") + +func (b *WebSocket) Ping() error { ticker := time.NewTicker(time.Duration(b.pingInterval) * time.Second) - go func() { - defer ticker.Stop() // Ensure the ticker is stopped when this goroutine ends - for { - select { - case <-ticker.C: // Wait until the ticker sends a signal - if err := b.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - fmt.Println("Failed to send ping:", err) - } - case <-b.ctx.Done(): - fmt.Println("Exit ping") - return + defer ticker.Stop() // Ensure the ticker is stopped when this goroutine ends + for { + select { + case <-ticker.C: // Wait until the ticker sends a signal + if err := b.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + fmt.Println("Failed to send ping:", err) + return ErrPingFailed } + case <-b.ctx.Done(): + fmt.Println("Exit ping") + return nil } - }() + } } func (b *WebSocket) Disconnect() error { b.cancel() - return b.conn.Close() + err := b.conn.Close() + fmt.Println("WebSocket disconnected, err =", err) + return err } func (b *WebSocket) Send(message string) error { diff --git a/go.mod b/go.mod index 4d856dd..6c7a895 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/json-iterator/go v1.1.12 github.com/stretchr/testify v1.8.4 + golang.org/x/sync v0.7.0 ) require ( diff --git a/go.sum b/go.sum index 53db67c..c237dff 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=