From aae0ba03ae0c1f13d7c301045aa835136ebdf10f Mon Sep 17 00:00:00 2001 From: Tuan Pham Anh Date: Sun, 25 Jun 2023 01:34:52 +0700 Subject: [PATCH 1/7] testing echo server --- server/ethws_server.go | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/server/ethws_server.go b/server/ethws_server.go index d2fb3ac..406e7cd 100644 --- a/server/ethws_server.go +++ b/server/ethws_server.go @@ -3,32 +3,46 @@ package server import ( "context" "fmt" - "github.com/notional-labs/subnode/config" - "github.com/notional-labs/subnode/state" - "github.com/notional-labs/subnode/utils" + "github.com/gorilla/websocket" "log" "net/http" ) var ethWsServer *http.Server -func ethWsHandle(w http.ResponseWriter, r *http.Request) { - prunedNode := state.SelectPrunedNode(config.ProtocolTypeEthWs) - selectedHost := prunedNode.Backend.Eth // default to pruned node - - r.Host = r.URL.Host - state.ProxyMapEthWs[selectedHost].ServeHTTP(w, r) -} +//func ethWsHandle(w http.ResponseWriter, r *http.Request) { +// prunedNode := state.SelectPrunedNode(config.ProtocolTypeEthWs) +// selectedHost := prunedNode.Backend.Eth // default to pruned node +// +// r.Host = r.URL.Host +// state.ProxyMapEthWs[selectedHost].ServeHTTP(w, r) +//} func StartEthWsServer() { fmt.Println("StartEthWsServer...") + + var upgrader = websocket.Upgrader{} // use default options + handler := func(w http.ResponseWriter, r *http.Request) { - if r.Method == "GET" { // URI over HTTP - ethWsHandle(w, r) - } else { - _ = utils.SendError(w) + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Print("upgrade:", err) return } + defer c.Close() + for { + mt, message, err := c.ReadMessage() + if err != nil { + log.Println("read:", err) + break + } + log.Printf("recv: %s", message) + err = c.WriteMessage(mt, message) + if err != nil { + log.Println("write:", err) + break + } + } } // handle all requests to your server using the proxy From 3a26c9c5bcefa8ffff861a4e12f7d5fa09364803 Mon Sep 17 00:00:00 2001 From: Tuan Pham Anh Date: Sun, 25 Jun 2023 03:16:19 +0700 Subject: [PATCH 2/7] works --- server/ethws_server.go | 127 ++++++++++++++++++++++++++++++------ state/state.go | 25 +++---- test/test.config.evmos.yaml | 2 +- 3 files changed, 115 insertions(+), 39 deletions(-) diff --git a/server/ethws_server.go b/server/ethws_server.go index 406e7cd..e879995 100644 --- a/server/ethws_server.go +++ b/server/ethws_server.go @@ -4,45 +4,130 @@ import ( "context" "fmt" "github.com/gorilla/websocket" + "github.com/notional-labs/subnode/config" + "github.com/notional-labs/subnode/state" "log" "net/http" + "net/url" + "sync" ) var ethWsServer *http.Server +var upgrader = websocket.Upgrader{} // use default options -//func ethWsHandle(w http.ResponseWriter, r *http.Request) { -// prunedNode := state.SelectPrunedNode(config.ProtocolTypeEthWs) -// selectedHost := prunedNode.Backend.Eth // default to pruned node -// -// r.Host = r.URL.Host -// state.ProxyMapEthWs[selectedHost].ServeHTTP(w, r) -//} +func createWSClient() (*websocket.Conn, error) { + prunedNode := state.SelectPrunedNode(config.ProtocolTypeEthWs) + selectedHost := prunedNode.Backend.EthWs // default to pruned node + targetEthWs, err := url.Parse(selectedHost) + if err != nil { + return nil, err + } + log.Printf("connecting to %s", targetEthWs.String()) -func StartEthWsServer() { - fmt.Println("StartEthWsServer...") + c, _, err := websocket.DefaultDialer.Dial(targetEthWs.String(), nil) + if err != nil { + log.Fatal("dial:", err) + } - var upgrader = websocket.Upgrader{} // use default options + return c, nil +} - handler := func(w http.ResponseWriter, r *http.Request) { - c, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Print("upgrade:", err) - return +func ethWsHandle(w http.ResponseWriter, r *http.Request) { + wsConServer, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Print("upgrade:", err) + return + } + defer wsConServer.Close() + + var wg sync.WaitGroup + + //--------------------------------- + // ws-client + wsConClient, err := createWSClient() + if err != nil { + log.Print("error:", err) + return + } + defer wsConClient.Close() + + clientChannel := make(chan []byte) // struct{} + serverChannel := make(chan []byte) + + wg.Add(1) + go func() { + defer wg.Done() + //defer close(clientChannel) + + for { + msg := <-clientChannel // receive msg from clientChannel + + // relay to server + log.Println("relay to server") + err = wsConServer.WriteMessage(websocket.TextMessage, msg) + if err != nil { + log.Println("relay to server:", err) + break + } } - defer c.Close() + }() + + wg.Add(1) + go func() { + defer wg.Done() + //defer close(serverChannel) + for { - mt, message, err := c.ReadMessage() + msg := <-serverChannel // receive msg from serverChannel + + // relay to client + log.Println("relay to client") + err = wsConClient.WriteMessage(websocket.TextMessage, msg) if err != nil { - log.Println("read:", err) + log.Println("relay to client:", err) break } - log.Printf("recv: %s", message) - err = c.WriteMessage(mt, message) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + for { + _, msg, err := wsConClient.ReadMessage() if err != nil { - log.Println("write:", err) + log.Println("ws-client read:", err) break } + log.Printf("ws-client recv: %s", msg) + clientChannel <- msg // send msg to clientChannel } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + for { + _, msg, err := wsConServer.ReadMessage() + if err != nil { + log.Println("ws-server read:", err) + break + } + log.Printf("ws-server recv: %s", msg) + serverChannel <- msg // send msg to serverChannel + } + }() + + wg.Wait() +} + +func StartEthWsServer() { + fmt.Println("StartEthWsServer...") + + handler := func(w http.ResponseWriter, r *http.Request) { + ethWsHandle(w, r) } // handle all requests to your server using the proxy diff --git a/state/state.go b/state/state.go index 7b1b64f..eb465a2 100644 --- a/state/state.go +++ b/state/state.go @@ -22,15 +22,14 @@ type BackendState struct { } var ( - PoolRpc []*BackendState - PoolApi []*BackendState - PoolGrpc []*BackendState - PoolEth []*BackendState - PoolEthWs []*BackendState - ProxyMapRpc = make(map[string]*httputil.ReverseProxy) - ProxyMapApi = make(map[string]*httputil.ReverseProxy) - ProxyMapEth = make(map[string]*httputil.ReverseProxy) - ProxyMapEthWs = make(map[string]*httputil.ReverseProxy) + PoolRpc []*BackendState + PoolApi []*BackendState + PoolGrpc []*BackendState + PoolEth []*BackendState + PoolEthWs []*BackendState + ProxyMapRpc = make(map[string]*httputil.ReverseProxy) + ProxyMapApi = make(map[string]*httputil.ReverseProxy) + ProxyMapEth = make(map[string]*httputil.ReverseProxy) ) func Init() { @@ -40,7 +39,6 @@ func Init() { rpcItem := PoolRpc[i] apiItem := PoolApi[i] ethItem := PoolEth[i] - ethWsItem := PoolEthWs[i] // rpc targetRpc, err := url.Parse(rpcItem.Name) @@ -62,13 +60,6 @@ func Init() { panic(err) } ProxyMapEth[ethItem.Name] = httputil.NewSingleHostReverseProxy(targetEth) - - // eth-ws - targetEthWs, err := url.Parse(ethWsItem.Name) - if err != nil { - panic(err) - } - ProxyMapEthWs[ethWsItem.Name] = httputil.NewSingleHostReverseProxy(targetEthWs) } } diff --git a/test/test.config.evmos.yaml b/test/test.config.evmos.yaml index 3e22e5f..1004aab 100644 --- a/test/test.config.evmos.yaml +++ b/test/test.config.evmos.yaml @@ -3,7 +3,7 @@ upstream: api: "http://api-evmos-ia.cosmosia.notional.ventures:80" grpc: "grpc-evmos-ia.cosmosia.notional.ventures:9090" eth: "http://jsonrpc-evmos-ia.cosmosia.notional.ventures:80" - ethws: "http://jsonrpc-evmos-ia.cosmosia.notional.ventures:80" + ethws: "ws://jsonrpc-evmos-ia.cosmosia.notional.ventures:80/websocket/" blocks: [362880] - rpc: "http://rpc-evmos-archive-sub-ia.cosmosia.notional.ventures:80" api: "http://api-evmos-archive-sub-ia.cosmosia.notional.ventures:80" From b2cd933c6623e6d1f940ecc10385b60f59f67a6b Mon Sep 17 00:00:00 2001 From: Tuan Pham Anh Date: Sun, 25 Jun 2023 10:33:26 +0700 Subject: [PATCH 3/7] closeAll --- server/ethws_server.go | 42 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/server/ethws_server.go b/server/ethws_server.go index e879995..23bd89b 100644 --- a/server/ethws_server.go +++ b/server/ethws_server.go @@ -32,6 +32,16 @@ func createWSClient() (*websocket.Conn, error) { return c, nil } +func IsClosed(ch <-chan []byte) bool { + select { + case <-ch: + return true + default: + } + + return false +} + func ethWsHandle(w http.ResponseWriter, r *http.Request) { wsConServer, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -54,6 +64,17 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { clientChannel := make(chan []byte) // struct{} serverChannel := make(chan []byte) + closeAll := func() { + if !IsClosed(clientChannel) { + close(clientChannel) + } + if !IsClosed(serverChannel) { + close(serverChannel) + } + wsConServer.Close() + wsConClient.Close() + } + wg.Add(1) go func() { defer wg.Done() @@ -66,10 +87,13 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { log.Println("relay to server") err = wsConServer.WriteMessage(websocket.TextMessage, msg) if err != nil { - log.Println("relay to server:", err) + log.Println("relay to server err:", err) + closeAll() break } } + + log.Println("exit processing clientChannel") }() wg.Add(1) @@ -84,10 +108,13 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { log.Println("relay to client") err = wsConClient.WriteMessage(websocket.TextMessage, msg) if err != nil { - log.Println("relay to client:", err) + log.Println("relay to client err:", err) + closeAll() break } } + + log.Println("exit processing serverChannel") }() wg.Add(1) @@ -97,12 +124,15 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { for { _, msg, err := wsConClient.ReadMessage() if err != nil { - log.Println("ws-client read:", err) + log.Println("ws-client read err:", err) + closeAll() break } log.Printf("ws-client recv: %s", msg) clientChannel <- msg // send msg to clientChannel } + + log.Println("exit ws-client") }() wg.Add(1) @@ -112,15 +142,19 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { for { _, msg, err := wsConServer.ReadMessage() if err != nil { - log.Println("ws-server read:", err) + log.Println("ws-server read err:", err) + closeAll() break } log.Printf("ws-server recv: %s", msg) serverChannel <- msg // send msg to serverChannel } + + log.Println("exit ws-server") }() wg.Wait() + log.Printf("WaitGroup counter is zero") } func StartEthWsServer() { From b5ff05c23fc72499ccc6f75146e17ab6148f63a1 Mon Sep 17 00:00:00 2001 From: Tuan Pham Anh Date: Sun, 25 Jun 2023 10:52:57 +0700 Subject: [PATCH 4/7] fix bad upstream con --- server/ethws_server.go | 46 +++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/server/ethws_server.go b/server/ethws_server.go index 23bd89b..8497275 100644 --- a/server/ethws_server.go +++ b/server/ethws_server.go @@ -26,7 +26,8 @@ func createWSClient() (*websocket.Conn, error) { c, _, err := websocket.DefaultDialer.Dial(targetEthWs.String(), nil) if err != nil { - log.Fatal("dial:", err) + log.Printf("dial:", err) + return nil, err } return c, nil @@ -43,9 +44,30 @@ func IsClosed(ch <-chan []byte) bool { } func ethWsHandle(w http.ResponseWriter, r *http.Request) { - wsConServer, err := upgrader.Upgrade(w, r, nil) + var wsConServer *websocket.Conn + var wsConClient *websocket.Conn + clientChannel := make(chan []byte) // struct{} + serverChannel := make(chan []byte) + + closeAll := func() { + if !IsClosed(clientChannel) { + close(clientChannel) + } + if !IsClosed(serverChannel) { + close(serverChannel) + } + if wsConServer != nil { + wsConServer.Close() + } + if wsConClient != nil { + wsConClient.Close() + } + } + + var err error + wsConServer, err = upgrader.Upgrade(w, r, nil) if err != nil { - log.Print("upgrade:", err) + log.Print("upgrade err:", err) return } defer wsConServer.Close() @@ -54,27 +76,13 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { //--------------------------------- // ws-client - wsConClient, err := createWSClient() + wsConClient, err = createWSClient() if err != nil { - log.Print("error:", err) + log.Print("error with createWSClient:", err) return } defer wsConClient.Close() - clientChannel := make(chan []byte) // struct{} - serverChannel := make(chan []byte) - - closeAll := func() { - if !IsClosed(clientChannel) { - close(clientChannel) - } - if !IsClosed(serverChannel) { - close(serverChannel) - } - wsConServer.Close() - wsConClient.Close() - } - wg.Add(1) go func() { defer wg.Done() From 5231e443e12ebdc009bdf6de79678d9a1bb81e0f Mon Sep 17 00:00:00 2001 From: Tuan Pham Anh Date: Sun, 25 Jun 2023 10:54:52 +0700 Subject: [PATCH 5/7] ... --- server/ethws_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/ethws_server.go b/server/ethws_server.go index 8497275..0602fdf 100644 --- a/server/ethws_server.go +++ b/server/ethws_server.go @@ -26,7 +26,7 @@ func createWSClient() (*websocket.Conn, error) { c, _, err := websocket.DefaultDialer.Dial(targetEthWs.String(), nil) if err != nil { - log.Printf("dial:", err) + log.Println("dial err:", err) return nil, err } From bd6bce96e5cd708210203da59b0b1c36987c0571 Mon Sep 17 00:00:00 2001 From: Tuan Pham Anh Date: Sun, 25 Jun 2023 11:33:22 +0700 Subject: [PATCH 6/7] refactor ethws --- server/ethws_server.go | 184 +++++++++++++++++++++-------------------- 1 file changed, 94 insertions(+), 90 deletions(-) diff --git a/server/ethws_server.go b/server/ethws_server.go index 0602fdf..5d16068 100644 --- a/server/ethws_server.go +++ b/server/ethws_server.go @@ -33,7 +33,7 @@ func createWSClient() (*websocket.Conn, error) { return c, nil } -func IsClosed(ch <-chan []byte) bool { +func isClosed(ch <-chan []byte) bool { select { case <-ch: return true @@ -43,27 +43,102 @@ func IsClosed(ch <-chan []byte) bool { return false } -func ethWsHandle(w http.ResponseWriter, r *http.Request) { - var wsConServer *websocket.Conn - var wsConClient *websocket.Conn - clientChannel := make(chan []byte) // struct{} - serverChannel := make(chan []byte) +func closeAll(wsConServer *websocket.Conn, wsConClient *websocket.Conn, clientChannel chan []byte, serverChannel chan []byte) { + if !isClosed(clientChannel) { + close(clientChannel) + } + if !isClosed(serverChannel) { + close(serverChannel) + } + if wsConServer != nil { + wsConServer.Close() + } + if wsConClient != nil { + wsConClient.Close() + } +} + +func wsClientConRelay(wsConServer *websocket.Conn, wsConClient *websocket.Conn, clientChannel chan []byte, serverChannel chan []byte, wg *sync.WaitGroup) { + defer wg.Done() + //defer close(clientChannel) + + for { + msg := <-clientChannel // receive msg from clientChannel - closeAll := func() { - if !IsClosed(clientChannel) { - close(clientChannel) + // relay to server + log.Println("relay to server") + err := wsConServer.WriteMessage(websocket.TextMessage, msg) + if err != nil { + log.Println("relay to server err:", err) + closeAll(wsConServer, wsConClient, clientChannel, serverChannel) + break } - if !IsClosed(serverChannel) { - close(serverChannel) + } + + log.Println("exit processing clientChannel") +} + +func wsServerConRelay(wsConServer *websocket.Conn, wsConClient *websocket.Conn, clientChannel chan []byte, serverChannel chan []byte, wg *sync.WaitGroup) { + defer wg.Done() + //defer close(serverChannel) + + for { + msg := <-serverChannel // receive msg from serverChannel + + // relay to client + log.Println("relay to client") + err := wsConClient.WriteMessage(websocket.TextMessage, msg) + if err != nil { + log.Println("relay to client err:", err) + closeAll(wsConServer, wsConClient, clientChannel, serverChannel) + break } - if wsConServer != nil { - wsConServer.Close() + } + + log.Println("exit processing serverChannel") +} + +func wsClientHandle(wsConServer *websocket.Conn, wsConClient *websocket.Conn, clientChannel chan []byte, serverChannel chan []byte, wg *sync.WaitGroup) { + defer wg.Done() + + for { + _, msg, err := wsConClient.ReadMessage() + if err != nil { + log.Println("ws-client read err:", err) + closeAll(wsConServer, wsConClient, clientChannel, serverChannel) + break } - if wsConClient != nil { - wsConClient.Close() + log.Printf("ws-client recv: %s", msg) + clientChannel <- msg // send msg to clientChannel + } + + log.Println("exit ws-client") +} + +func wsServerHandle(wsConServer *websocket.Conn, wsConClient *websocket.Conn, clientChannel chan []byte, serverChannel chan []byte, wg *sync.WaitGroup) { + defer wg.Done() + + for { + _, msg, err := wsConServer.ReadMessage() + if err != nil { + log.Println("ws-server read err:", err) + closeAll(wsConServer, wsConClient, clientChannel, serverChannel) + break } + log.Printf("ws-server recv: %s", msg) + serverChannel <- msg // send msg to serverChannel } + log.Println("exit ws-server") +} + +func ethWsHandle(w http.ResponseWriter, r *http.Request) { + var wg sync.WaitGroup + var wsConServer *websocket.Conn + var wsConClient *websocket.Conn + clientChannel := make(chan []byte) // struct{} + serverChannel := make(chan []byte) + var err error wsConServer, err = upgrader.Upgrade(w, r, nil) if err != nil { @@ -72,8 +147,6 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { } defer wsConServer.Close() - var wg sync.WaitGroup - //--------------------------------- // ws-client wsConClient, err = createWSClient() @@ -84,82 +157,13 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { defer wsConClient.Close() wg.Add(1) - go func() { - defer wg.Done() - //defer close(clientChannel) - - for { - msg := <-clientChannel // receive msg from clientChannel - - // relay to server - log.Println("relay to server") - err = wsConServer.WriteMessage(websocket.TextMessage, msg) - if err != nil { - log.Println("relay to server err:", err) - closeAll() - break - } - } - - log.Println("exit processing clientChannel") - }() - + go wsClientConRelay(wsConServer, wsConClient, clientChannel, serverChannel, &wg) wg.Add(1) - go func() { - defer wg.Done() - //defer close(serverChannel) - - for { - msg := <-serverChannel // receive msg from serverChannel - - // relay to client - log.Println("relay to client") - err = wsConClient.WriteMessage(websocket.TextMessage, msg) - if err != nil { - log.Println("relay to client err:", err) - closeAll() - break - } - } - - log.Println("exit processing serverChannel") - }() - + go wsServerConRelay(wsConServer, wsConClient, clientChannel, serverChannel, &wg) wg.Add(1) - go func() { - defer wg.Done() - - for { - _, msg, err := wsConClient.ReadMessage() - if err != nil { - log.Println("ws-client read err:", err) - closeAll() - break - } - log.Printf("ws-client recv: %s", msg) - clientChannel <- msg // send msg to clientChannel - } - - log.Println("exit ws-client") - }() - + go wsClientHandle(wsConServer, wsConClient, clientChannel, serverChannel, &wg) wg.Add(1) - go func() { - defer wg.Done() - - for { - _, msg, err := wsConServer.ReadMessage() - if err != nil { - log.Println("ws-server read err:", err) - closeAll() - break - } - log.Printf("ws-server recv: %s", msg) - serverChannel <- msg // send msg to serverChannel - } - - log.Println("exit ws-server") - }() + go wsServerHandle(wsConServer, wsConClient, clientChannel, serverChannel, &wg) wg.Wait() log.Printf("WaitGroup counter is zero") From ff4fa722afd381b7829662eec8eae6ddab20c39e Mon Sep 17 00:00:00 2001 From: Tuan Pham Anh Date: Sun, 25 Jun 2023 11:38:07 +0700 Subject: [PATCH 7/7] refactor ethws... --- server/ethws_server.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/server/ethws_server.go b/server/ethws_server.go index 5d16068..0471881 100644 --- a/server/ethws_server.go +++ b/server/ethws_server.go @@ -136,7 +136,7 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { var wg sync.WaitGroup var wsConServer *websocket.Conn var wsConClient *websocket.Conn - clientChannel := make(chan []byte) // struct{} + clientChannel := make(chan []byte) serverChannel := make(chan []byte) var err error @@ -145,7 +145,8 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { log.Print("upgrade err:", err) return } - defer wsConServer.Close() + + defer closeAll(wsConServer, wsConClient, clientChannel, serverChannel) //--------------------------------- // ws-client @@ -154,7 +155,6 @@ func ethWsHandle(w http.ResponseWriter, r *http.Request) { log.Print("error with createWSClient:", err) return } - defer wsConClient.Close() wg.Add(1) go wsClientConRelay(wsConServer, wsConClient, clientChannel, serverChannel, &wg) @@ -176,8 +176,6 @@ func StartEthWsServer() { ethWsHandle(w, r) } - // handle all requests to your server using the proxy - //http.HandleFunc("/", handler) serverMux := http.NewServeMux() serverMux.HandleFunc("/", handler) go func() {