Skip to content

Commit

Permalink
Merge a38a1d2 into b5e4387
Browse files Browse the repository at this point in the history
  • Loading branch information
Hoshinonyaruko authored Jan 15, 2024
2 parents b5e4387 + a38a1d2 commit e25c5ab
Showing 1 changed file with 100 additions and 39 deletions.
139 changes: 100 additions & 39 deletions wsclient/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,25 @@ type WebSocketClient struct {
cancel context.CancelFunc
mutex sync.Mutex // 用于同步写入和重连操作的互斥锁
isReconnecting bool
sendFailures chan map[string]interface{}
sendFailures []map[string]interface{}
}

// 发送json信息给onebot应用端
func (c *WebSocketClient) SendMessage(message map[string]interface{}) error {
c.mutex.Lock() // 在写操作之前锁定
defer c.mutex.Unlock() // 确保在函数返回时解锁
func (client *WebSocketClient) SendMessage(message map[string]interface{}) error {
client.mutex.Lock() // 在写操作之前锁定
defer client.mutex.Unlock() // 确保在函数返回时解锁

msgBytes, err := json.Marshal(message)
if err != nil {
mylog.Println("Error marshalling message:", err)
return err
}

err = c.conn.WriteMessage(websocket.TextMessage, msgBytes)
err = client.conn.WriteMessage(websocket.TextMessage, msgBytes)
if err != nil {
mylog.Println("Error sending message:", err)
// 发送失败,将消息放入channel
go func() {
c.sendFailures <- message
}()
// 发送失败,将消息添加到切片
client.sendFailures = append(client.sendFailures, message)
return err
}

Expand All @@ -70,50 +68,111 @@ func (c *WebSocketClient) handleIncomingMessages(ctx context.Context, cancel con

// 断线重连
func (client *WebSocketClient) Reconnect() {
client.mutex.Lock()
if client.isReconnecting {
client.mutex.Unlock()
return // 如果已经有其他携程在重连了,就直接返回
client.isReconnecting = true

addresses := config.GetWsAddress()
tokens := config.GetWsToken()

var token string
for index, address := range addresses {
if address == client.urlStr && index < len(tokens) {
token = tokens[index]
break
}
}

// 暂存旧的 sendFailures channel,不要关闭它
oldSendFailures := client.sendFailures
// 检查URL中是否有access_token参数
mp := getParamsFromURI(client.urlStr)
if val, ok := mp["access_token"]; ok {
token = val
}

client.isReconnecting = true
client.mutex.Unlock()
headers := http.Header{
"User-Agent": []string{"CQHttp/4.15.0"},
"X-Client-Role": []string{"Universal"},
"X-Self-ID": []string{fmt.Sprintf("%d", client.botID)},
}

if token != "" {
headers["Authorization"] = []string{"Token " + token}
}
mylog.Printf("准备使用token[%s]重新连接到[%s]\n", token, client.urlStr)
dialer := websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second,
}

var conn *websocket.Conn
var err error

maxRetryAttempts := config.GetReconnecTimes()
retryCount := 0
for {
mylog.Println("Dialing URL:", client.urlStr)
conn, _, err = dialer.Dial(client.urlStr, headers)
if err != nil {
retryCount++
if retryCount > maxRetryAttempts {
mylog.Printf("Exceeded maximum retry attempts for WebSocket[%v]: %v\n", client.urlStr, err)
return
}
mylog.Printf("Failed to connect to WebSocket[%v]: %v, retrying in 5 seconds...\n", client.urlStr, err)
time.Sleep(5 * time.Second) // sleep for 5 seconds before retrying
} else {
mylog.Printf("Successfully connected to %s.\n", client.urlStr) // 输出连接成功提示
break // successfully connected, break the loop
}
}
// 复用现有的client完成重连
client.conn = conn

// 再次发送元事件
message := map[string]interface{}{
"meta_event_type": "lifecycle",
"post_type": "meta_event",
"self_id": client.botID,
"sub_type": "connect",
"time": int(time.Now().Unix()),
}

mylog.Printf("Message: %+v\n", message)

err = client.SendMessage(message)
if err != nil {
// handle error
mylog.Printf("Error sending message: %v\n", err)
}

//退出老的sendHeartbeat和handleIncomingMessages
client.cancel()

// Starting goroutine for heartbeats and another for listening to messages
ctx, cancel := context.WithCancel(context.Background())

client.cancel = cancel
heartbeatinterval := config.GetHeartBeatInterval()
go client.sendHeartbeat(ctx, client.botID, heartbeatinterval)
go client.handleIncomingMessages(ctx, cancel)

defer func() {
client.mutex.Lock()
client.isReconnecting = false
client.mutex.Unlock()
}()
reconnecttimes := config.GetReconnecTimes()
newClient, err := NewWebSocketClient(client.urlStr, client.botID, client.Token, client.BaseUrl, reconnecttimes)
if err == nil && newClient != nil {
client.mutex.Lock() // 在替换连接之前锁定
client.cancel()
client.conn = newClient.conn
client.Token = newClient.Token
client.BaseUrl = newClient.BaseUrl
client.cancel = newClient.cancel // 更新取消函数
client.mutex.Unlock()
// 重发失败的消息
newClient.processFailedMessages(oldSendFailures)
mylog.Println("Successfully reconnected to WebSocket.")
return
}

mylog.Printf("Successfully reconnected to WebSocket.")

}

// 处理发送失败的消息
func (client *WebSocketClient) processFailedMessages(failuresChan chan map[string]interface{}) {
for failedMessage := range failuresChan {
func (client *WebSocketClient) processFailedMessages() {
for _, failedMessage := range client.sendFailures {
// 尝试重新发送消息
err := client.SendMessage(failedMessage)
if err != nil {
// 处理重发失败的情况,比如重新放入 channel 或记录日志
mylog.Println("Error re-sending message:", err)
mylog.Printf("Error resending message: %v\n", err)
}
}
// 清空失败消息列表
client.sendFailures = []map[string]interface{}{}
}

// 处理信息,调用腾讯api
Expand Down Expand Up @@ -179,6 +238,8 @@ func (c *WebSocketClient) sendHeartbeat(ctx context.Context, botID uint64, heart
"interval": 10000, // 以毫秒为单位
}
c.SendMessage(message)
// 重发失败的消息
c.processFailedMessages()
}
}
}
Expand Down Expand Up @@ -243,7 +304,7 @@ func NewWebSocketClient(urlStr string, botID uint64, Token string, BaseUrl strin
BaseUrl: BaseUrl,
botID: botID,
urlStr: urlStr,
sendFailures: make(chan map[string]interface{}, 100),
sendFailures: []map[string]interface{}{},
}

// Sending initial message similar to your setupB function
Expand Down

0 comments on commit e25c5ab

Please sign in to comment.