fix ws closing
This commit is contained in:
parent
978ab02672
commit
784d4a6006
|
@ -5,28 +5,30 @@ import (
|
||||||
"crypto/hmac"
|
"crypto/hmac"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageHandler func(message string) error
|
type MessageHandler func(message string) error
|
||||||
|
|
||||||
func (b *WebSocket) handleIncomingMessages() {
|
func (b *WebSocket) handleIncomingMessages() error {
|
||||||
for {
|
for {
|
||||||
_, message, err := b.conn.ReadMessage()
|
_, message, err := b.conn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Error reading:", err)
|
fmt.Println("Error reading:", err)
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.onMessage != nil {
|
if b.onMessage != nil {
|
||||||
err := b.onMessage(string(message))
|
err := b.onMessage(string(message))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Error handling message:", err)
|
fmt.Println("Error handling message:", err)
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -112,35 +114,45 @@ func (b *WebSocket) Connect(args []string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go b.handleIncomingMessages()
|
eg := errgroup.Group{}
|
||||||
|
|
||||||
b.ctx, b.cancel = context.WithCancel(context.Background())
|
b.ctx, b.cancel = context.WithCancel(context.Background())
|
||||||
Ping(b)
|
|
||||||
|
|
||||||
return b.sendSubscription(args)
|
eg.Go(b.Ping)
|
||||||
|
eg.Go(b.handleIncomingMessages)
|
||||||
|
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
b.Disconnect()
|
||||||
|
return fmt.Errorf("failed to handle message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func Ping(b *WebSocket) {
|
var ErrPingFailed = errors.New("failed to send ping")
|
||||||
|
|
||||||
|
func (b *WebSocket) Ping() error {
|
||||||
ticker := time.NewTicker(time.Duration(b.pingInterval) * time.Second)
|
ticker := time.NewTicker(time.Duration(b.pingInterval) * time.Second)
|
||||||
go func() {
|
|
||||||
defer ticker.Stop() // Ensure the ticker is stopped when this goroutine ends
|
defer ticker.Stop() // Ensure the ticker is stopped when this goroutine ends
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C: // Wait until the ticker sends a signal
|
case <-ticker.C: // Wait until the ticker sends a signal
|
||||||
if err := b.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
if err := b.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||||
fmt.Println("Failed to send ping:", err)
|
fmt.Println("Failed to send ping:", err)
|
||||||
|
return ErrPingFailed
|
||||||
}
|
}
|
||||||
case <-b.ctx.Done():
|
case <-b.ctx.Done():
|
||||||
fmt.Println("Exit ping")
|
fmt.Println("Exit ping")
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *WebSocket) Disconnect() error {
|
func (b *WebSocket) Disconnect() error {
|
||||||
b.cancel()
|
b.cancel()
|
||||||
return b.conn.Close()
|
err := b.conn.Close()
|
||||||
|
fmt.Println("WebSocket disconnected, err =", err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *WebSocket) Send(message string) error {
|
func (b *WebSocket) Send(message string) error {
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -8,6 +8,7 @@ require (
|
||||||
github.com/gorilla/websocket v1.5.1
|
github.com/gorilla/websocket v1.5.1
|
||||||
github.com/json-iterator/go v1.1.12
|
github.com/json-iterator/go v1.1.12
|
||||||
github.com/stretchr/testify v1.8.4
|
github.com/stretchr/testify v1.8.4
|
||||||
|
golang.org/x/sync v0.7.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -30,6 +30,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
|
||||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||||
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
|
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
|
||||||
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
|
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
|
||||||
|
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
|
||||||
|
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
|
Loading…
Reference in New Issue