From 5de1bebe2f189698abbb487d7ab35c1ced267c72 Mon Sep 17 00:00:00 2001 From: shay23b Date: Sun, 30 Jul 2023 16:28:42 +0300 Subject: [PATCH 01/16] AcceptClientConnections --- main.go | 94 ++++++++++++++++++++++++------------------------ server/server.go | 55 ++++++++++++++++++---------- 2 files changed, 84 insertions(+), 65 deletions(-) diff --git a/main.go b/main.go index 11bafb2d0..715d7dfd7 100644 --- a/main.go +++ b/main.go @@ -142,62 +142,62 @@ func runMemphis(s *server.Server) { s.CompleteRelevantStuckAsyncTasks() - go func() { - s.CreateInternalJetStreamResources() - go http_server.InitializeHttpServer(s) - err = s.StartBackgroundTasks() - if err != nil { - s.Errorf("Background task failed: " + err.Error()) - os.Exit(1) - } + // go func() { + s.CreateInternalJetStreamResources() + go http_server.InitializeHttpServer(s) + err = s.StartBackgroundTasks() + if err != nil { + s.Errorf("Background task failed: " + err.Error()) + os.Exit(1) + } - // run only on the leader - go s.KillZombieResources() + // run only on the leader + go s.KillZombieResources() - isUserPassBased := os.Getenv("USER_PASS_BASED_AUTH") == "true" + isUserPassBased := os.Getenv("USER_PASS_BASED_AUTH") == "true" - if isUserPassBased { - // For backward compatibility data from old account to memphis default account - storeDir := s.Opts().StoreDir - if storeDir == "" { - storeDir = os.TempDir() - storeDir = strings.TrimSuffix(storeDir, "/") - } + if isUserPassBased { + // For backward compatibility data from old account to memphis default account + storeDir := s.Opts().StoreDir + if storeDir == "" { + storeDir = os.TempDir() + storeDir = strings.TrimSuffix(storeDir, "/") + } - folderName := fmt.Sprintf("%s%s%s", storeDir, "/jetstream/", server.DEFAULT_GLOBAL_ACCOUNT) - f, _ := os.Stat(folderName) - if f != nil { - err = s.MoveResourcesFromOldToNewDefaultAcc() - if err != nil { - s.Errorf("Data from global account to memphis account failed: %s", err.Error()) - } + folderName := fmt.Sprintf("%s%s%s", storeDir, "/jetstream/", server.DEFAULT_GLOBAL_ACCOUNT) + f, _ := os.Stat(folderName) + if f != nil { + err = s.MoveResourcesFromOldToNewDefaultAcc() + if err != nil { + s.Errorf("Data from global account to memphis account failed: %s", err.Error()) } } + } - var env string - var message string - if os.Getenv("DOCKER_ENV") != "" { - env = "Docker" - if isUserPassBased { - message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" - } else { - message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" - } - s.Noticef(message) - } else if os.Getenv("LOCAL_CLUSTER_ENV") != "" { - env = "Local cluster" - if isUserPassBased { - message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" - } else { - message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" - } - s.Noticef(message) + var env string + var message string + if os.Getenv("DOCKER_ENV") != "" { + env = "Docker" + if isUserPassBased { + message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" } else { - env = "K8S" + message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" } - - s.Noticef("*** Memphis broker is ready, ENV: %s :-) ***", env) - }() + s.Noticef(message) + } else if os.Getenv("LOCAL_CLUSTER_ENV") != "" { + env = "Local cluster" + if isUserPassBased { + message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" + } else { + message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" + } + s.Noticef(message) + } else { + env = "K8S" + } + s.AcceptClientConnections() + s.Noticef("*** Memphis broker is ready, ENV: %s :-) ***", env) + // }() } func main() { diff --git a/server/server.go b/server/server.go index e1a028f98..bc9749a57 100644 --- a/server/server.go +++ b/server/server.go @@ -2116,11 +2116,11 @@ func (s *Server) WaitForShutdown() { func (s *Server) AcceptLoop(clr chan struct{}) { // If we were to exit before the listener is setup properly, // make sure we close the channel. - defer func() { - if clr != nil { - close(clr) - } - }() + // defer func() { + // if clr != nil { + // close(clr) + // } + // }() // Snapshot server options. opts := s.getOpts() @@ -2140,8 +2140,8 @@ func (s *Server) AcceptLoop(clr chan struct{}) { s.Fatalf("Error listening on port: %s, %q", hp, e) return } - s.Noticef("Listening for client connections on %s", - net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port))) + // s.Noticef("Listening for client connections on %s", + // net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port))) // Alert of TLS enabled. if opts.TLSConfig != nil { @@ -2168,17 +2168,17 @@ func (s *Server) AcceptLoop(clr chan struct{}) { s.clientConnectURLs = s.getClientConnectURLs() s.listener = l - go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) }, - func(_ error) bool { - if s.isLameDuckMode() { - // Signal that we are not accepting new clients - s.ldmCh <- true - // Now wait for the Shutdown... - <-s.quitCh - return true - } - return false - }) + // go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) }, + // func(_ error) bool { + // if s.isLameDuckMode() { + // // Signal that we are not accepting new clients + // s.ldmCh <- true + // // Now wait for the Shutdown... + // <-s.quitCh + // return true + // } + // return false + // }) s.mu.Unlock() // Let the caller know that we are ready @@ -3777,3 +3777,22 @@ func (s *Server) changeRateLimitLogInterval(d time.Duration) { default: } } + +func (s *Server) AcceptClientConnections() { + s.mu.Lock() + go s.acceptConnections(s.listener, "Client", func(conn net.Conn) { s.createClient(conn) }, + func(_ error) bool { + if s.isLameDuckMode() { + // Signal that we are not accepting new clients + s.ldmCh <- true + // Now wait for the Shutdown... + <-s.quitCh + return true + } + return false + }) + s.mu.Unlock() + opts := s.getOpts() + s.Noticef("Listening for client connections on %s", + net.JoinHostPort(opts.Host, strconv.Itoa(s.listener.Addr().(*net.TCPAddr).Port))) +} From 3dfb7b11f3d27c33b93f8e56b4a53ee5c986e45c Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 31 Jul 2023 08:47:18 +0300 Subject: [PATCH 02/16] remove comment --- server/memphis_handlers_ws.go | 2 ++ server/server.go | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/server/memphis_handlers_ws.go b/server/memphis_handlers_ws.go index abef0d85b..7bca64827 100644 --- a/server/memphis_handlers_ws.go +++ b/server/memphis_handlers_ws.go @@ -114,11 +114,13 @@ func memphisWSLoop(s *Server, subs *concurrentMap[memphisWSReqTenantsToFiller], if !IsNatsErr(err, JSStreamNotFoundErr) && !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "alphanumeric") { s.Errorf("[tenant: %v]memphisWSLoop at filler: %v", tenant, err.Error()) } + deleteTenantFromSub(tenant, subs, k) continue } updateRaw, err := json.Marshal(update) if err != nil { s.Errorf("[tenant: %v]memphisWSLoop at json.Marshal: %v", tenant, err.Error()) + deleteTenantFromSub(tenant, subs, k) continue } diff --git a/server/server.go b/server/server.go index bc9749a57..869a33e62 100644 --- a/server/server.go +++ b/server/server.go @@ -2116,11 +2116,11 @@ func (s *Server) WaitForShutdown() { func (s *Server) AcceptLoop(clr chan struct{}) { // If we were to exit before the listener is setup properly, // make sure we close the channel. - // defer func() { - // if clr != nil { - // close(clr) - // } - // }() + defer func() { + if clr != nil { + close(clr) + } + }() // Snapshot server options. opts := s.getOpts() From be096a451741acab2bc94c9c41d9b827a8f88bf6 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 31 Jul 2023 09:53:27 +0300 Subject: [PATCH 03/16] move ws listener --- main.go | 1 + server/server.go | 6 +++--- server/websocket.go | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 715d7dfd7..350689f71 100644 --- a/main.go +++ b/main.go @@ -196,6 +196,7 @@ func runMemphis(s *server.Server) { env = "K8S" } s.AcceptClientConnections() + s.StartWebsocketServer() s.Noticef("*** Memphis broker is ready, ENV: %s :-) ***", env) // }() } diff --git a/server/server.go b/server/server.go index 869a33e62..8108478f0 100644 --- a/server/server.go +++ b/server/server.go @@ -1877,9 +1877,9 @@ func (s *Server) Start() { // Start websocket server if needed. Do this before starting the routes, and // leaf node because we want to resolve the gateway host:port so that this // information can be sent to other routes. - if opts.Websocket.Port != 0 { - s.startWebsocketServer() - } + // if opts.Websocket.Port != 0 { + // s.startWebsocketServer() + // } // Start up listen if we want to accept leaf node connections. if opts.LeafNode.Port != 0 { diff --git a/server/websocket.go b/server/websocket.go index a884b49b0..393082180 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1042,7 +1042,7 @@ func (s *Server) wsConfigAuth(opts *WebsocketOpts) { ws.authOverride = opts.Username != _EMPTY_ || opts.Token != _EMPTY_ || opts.NoAuthUser != _EMPTY_ } -func (s *Server) startWebsocketServer() { +func (s *Server) StartWebsocketServer() { sopts := s.getOpts() o := &sopts.Websocket From 93487a88ee6da43edb929075892d8e056b032f2c Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 31 Jul 2023 10:36:31 +0300 Subject: [PATCH 04/16] comment out CreateInternalJetStreamResources --- main.go | 2 +- server/websocket.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 350689f71..f7f12ea08 100644 --- a/main.go +++ b/main.go @@ -143,7 +143,7 @@ func runMemphis(s *server.Server) { s.CompleteRelevantStuckAsyncTasks() // go func() { - s.CreateInternalJetStreamResources() + // s.CreateInternalJetStreamResources() go http_server.InitializeHttpServer(s) err = s.StartBackgroundTasks() if err != nil { diff --git a/server/websocket.go b/server/websocket.go index 393082180..0da25f660 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1090,10 +1090,12 @@ func (s *Server) StartWebsocketServer() { if port == 0 { o.Port = hl.Addr().(*net.TCPAddr).Port } + // s.mu.Unlock() s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) if proto == wsSchemePrefix { s.Warnf("Websocket not configured with TLS. DO NOT USE IN PRODUCTION!") } + // s.mu.Lock() s.websocket.tls = proto == "wss" s.websocket.connectURLs, err = s.getConnectURLs(o.Advertise, o.Host, o.Port) From a28348dd70885ef1e4fa4cd675a012da23c39087 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 31 Jul 2023 15:59:26 +0300 Subject: [PATCH 05/16] move run memphis to server.Start --- main.go | 53 ----------------------------------------- server/server.go | 61 ++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 57 insertions(+), 57 deletions(-) diff --git a/main.go b/main.go index f7f12ea08..b62587a83 100644 --- a/main.go +++ b/main.go @@ -21,7 +21,6 @@ import ( "memphis/analytics" "memphis/db" "memphis/http_server" - "memphis/memphis_cache" "memphis/server" "strings" @@ -103,56 +102,7 @@ func runMemphis(s *server.Server) { s.Errorf("Failed initializing analytics: " + err.Error()) } - err = server.TenantSeqInitialize() - if err != nil { - s.Errorf("Failed to initialize tenants sequence %v", err.Error()) - } - - err = memphis_cache.InitializeUserCache(s.Errorf) - if err != nil { - s.Errorf("Failed to initialize user cache %v", err.Error()) - } - - err = s.InitializeEventCounter() - if err != nil { - s.Errorf("Failed initializing event counter: " + err.Error()) - } - - err = s.InitializeFirestore() - if err != nil { - s.Errorf("Failed initializing firestore: " + err.Error()) - } - - s.InitializeMemphisHandlers() - - err = server.InitializeIntegrations() - if err != nil { - s.Errorf("Failed initializing integrations: " + err.Error()) - } - - err = s.SetDlsRetentionForExistTenants() - if err != nil { - s.Errorf("failed setting existing tenants with dls retention opts: %v", err.Error()) - } - - err = s.Force3ReplicationsForExistingStations() - if err != nil { - s.Errorf("Failed force 3 replications for existing stations: " + err.Error()) - } - - s.CompleteRelevantStuckAsyncTasks() - - // go func() { - // s.CreateInternalJetStreamResources() go http_server.InitializeHttpServer(s) - err = s.StartBackgroundTasks() - if err != nil { - s.Errorf("Background task failed: " + err.Error()) - os.Exit(1) - } - - // run only on the leader - go s.KillZombieResources() isUserPassBased := os.Getenv("USER_PASS_BASED_AUTH") == "true" @@ -195,10 +145,7 @@ func runMemphis(s *server.Server) { } else { env = "K8S" } - s.AcceptClientConnections() - s.StartWebsocketServer() s.Noticef("*** Memphis broker is ready, ENV: %s :-) ***", env) - // }() } func main() { diff --git a/server/server.go b/server/server.go index 8108478f0..938dfd3c1 100644 --- a/server/server.go +++ b/server/server.go @@ -26,6 +26,7 @@ import ( "math/rand" "memphis/db" "memphis/logger" + "memphis/memphis_cache" "net" "net/http" "regexp" @@ -1873,13 +1874,13 @@ func (s *Server) Start() { if opts.Gateway.Port != 0 { s.startGateways() } - + s.runMemphis() // Start websocket server if needed. Do this before starting the routes, and // leaf node because we want to resolve the gateway host:port so that this // information can be sent to other routes. - // if opts.Websocket.Port != 0 { - // s.startWebsocketServer() - // } + if opts.Websocket.Port != 0 { + s.StartWebsocketServer() + } // Start up listen if we want to accept leaf node connections. if opts.LeafNode.Port != 0 { @@ -3796,3 +3797,55 @@ func (s *Server) AcceptClientConnections() { s.Noticef("Listening for client connections on %s", net.JoinHostPort(opts.Host, strconv.Itoa(s.listener.Addr().(*net.TCPAddr).Port))) } + +func (s *Server) runMemphis() { + err := TenantSeqInitialize() + if err != nil { + s.Errorf("Failed to initialize tenants sequence %v", err.Error()) + } + + err = memphis_cache.InitializeUserCache(s.Errorf) + if err != nil { + s.Errorf("Failed to initialize user cache %v", err.Error()) + } + + err = s.InitializeEventCounter() + if err != nil { + s.Errorf("Failed initializing event counter: " + err.Error()) + } + + err = s.InitializeFirestore() + if err != nil { + s.Errorf("Failed initializing firestore: " + err.Error()) + } + + s.InitializeMemphisHandlers() + + err = InitializeIntegrations() + if err != nil { + s.Errorf("Failed initializing integrations: " + err.Error()) + } + + err = s.SetDlsRetentionForExistTenants() + if err != nil { + s.Errorf("failed setting existing tenants with dls retention opts: %v", err.Error()) + } + + err = s.Force3ReplicationsForExistingStations() + if err != nil { + s.Errorf("Failed force 3 replications for existing stations: " + err.Error()) + } + + s.CompleteRelevantStuckAsyncTasks() + + go func() { + s.CreateInternalJetStreamResources() + err = s.StartBackgroundTasks() + if err != nil { + s.Errorf("Background task failed: " + err.Error()) + os.Exit(1) + } + // run only on the leader + go s.KillZombieResources() + }() +} From 1092cbb3ebc5e88c3d23407f26fcd3b141b5f4ef Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 31 Jul 2023 16:51:23 +0300 Subject: [PATCH 06/16] fix --- server/server.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/server/server.go b/server/server.go index 938dfd3c1..bdc5237b9 100644 --- a/server/server.go +++ b/server/server.go @@ -2169,17 +2169,17 @@ func (s *Server) AcceptLoop(clr chan struct{}) { s.clientConnectURLs = s.getClientConnectURLs() s.listener = l - // go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) }, - // func(_ error) bool { - // if s.isLameDuckMode() { - // // Signal that we are not accepting new clients - // s.ldmCh <- true - // // Now wait for the Shutdown... - // <-s.quitCh - // return true - // } - // return false - // }) + go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) }, + func(_ error) bool { + if s.isLameDuckMode() { + // Signal that we are not accepting new clients + s.ldmCh <- true + // Now wait for the Shutdown... + <-s.quitCh + return true + } + return false + }) s.mu.Unlock() // Let the caller know that we are ready From c120582435c77bb0fa2fba1fa7c67a02924a6259 Mon Sep 17 00:00:00 2001 From: shay23b Date: Tue, 1 Aug 2023 15:21:51 +0300 Subject: [PATCH 07/16] AcceptClientAndWSConnections --- server/memphis_helper.go | 79 +++++++++++++++++----------------------- server/server.go | 75 +++++++++++++++++++++----------------- server/websocket.go | 29 +++++++-------- 3 files changed, 89 insertions(+), 94 deletions(-) diff --git a/server/memphis_helper.go b/server/memphis_helper.go index b670ee256..d1088e36f 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -268,37 +268,31 @@ func (s *Server) CreateInternalJetStreamResources() { ready := !s.JetStreamIsClustered() retentionDur := time.Duration(s.opts.LogsRetentionDays) * time.Hour * 24 - successCh := make(chan error) - if ready { // stand alone - go tryCreateInternalJetStreamResources(s, retentionDur, successCh, false) - err := <-successCh + err := tryCreateInternalJetStreamResources(s, retentionDur, false) if err != nil { s.Errorf("CreateInternalJetStreamResources: system streams creation failed: %v", err.Error()) } } else { s.WaitForLeaderElection() if s.JetStreamIsLeader() { + timeout := time.After(1 * time.Minute) for !ready { // wait for cluster to be ready if we are in cluster mode - timeout := time.NewTimer(1 * time.Minute) - go tryCreateInternalJetStreamResources(s, retentionDur, successCh, true) - select { - case <-timeout.C: - s.Warnf("CreateInternalJetStreamResources: system streams creation takes more than a minute") - err := <-successCh - if err != nil { - s.Warnf("CreateInternalJetStreamResources: %v", err.Error()) - continue - } + err := tryCreateInternalJetStreamResources(s, retentionDur, true) + if err != nil { + s.Warnf("CreateInternalJetStreamResources: %v", err.Error()) + } else { ready = true - case err := <-successCh: - if err != nil { - s.Warnf("CreateInternalJetStreamResources: %v", err.Error()) - <-timeout.C - continue + } + time.Sleep(1 * time.Second) + select { + case <-timeout: + if !ready { + s.Warnf("CreateInternalJetStreamResources: system streams creation takes more than a minute") } - timeout.Stop() ready = true + default: + // continue the loop until ready or timeout } } } @@ -311,7 +305,7 @@ func (s *Server) CreateInternalJetStreamResources() { s.popFallbackLogs() } -func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, successCh chan error, isCluster bool) { +func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, isCluster bool) error { replicas := 1 if isCluster { replicas = 3 @@ -319,8 +313,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, v, err := s.Varz(nil) if err != nil { - successCh <- err - return + return err } // system logs stream @@ -337,13 +330,10 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, Replicas: replicas, }) if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { - time.Sleep(1 * time.Second) - tryCreateInternalJetStreamResources(s, retentionDur, successCh, isCluster) - return + return err } if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { - successCh <- err - return + return err } SYSLOGS_STREAM_CREATED = true } @@ -363,13 +353,10 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, Duplicates: idempotencyWindow, }) if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { - time.Sleep(1 * time.Second) - tryCreateInternalJetStreamResources(s, retentionDur, successCh, isCluster) - return + return err } if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { - successCh <- err - return + return err } TIERED_STORAGE_STREAM_CREATED = true } @@ -387,8 +374,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, } err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), tieredStorageStream, &cc) if err != nil { - successCh <- err - return + return err } TIERED_STORAGE_CONSUMER_CREATED = true } @@ -406,8 +392,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, Replicas: replicas, }) if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { - successCh <- err - return + return err } DLS_UNACKED_STREAM_CREATED = true } @@ -424,8 +409,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, } err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), dlsUnackedStream, &cc) if err != nil { - successCh <- err - return + return err } DLS_UNACKED_CONSUMER_CREATED = true } @@ -442,9 +426,11 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, Storage: FileStorage, Replicas: replicas, }) + if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { + return err + } if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { - successCh <- err - return + return err } DLS_SCHEMAVERSE_STREAM_CREATED = true } @@ -461,8 +447,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, } err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), dlsSchemaverseStream, &cc) if err != nil { - successCh <- err - return + return err } DLS_SCHEMAVERSE_CONSUMER_CREATED = true } @@ -490,13 +475,15 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, Replicas: replicas, NoAck: false, }) + if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { + return err + } if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { - successCh <- err - return + return err } TIERED_STORAGE_STREAM_CREATED = true } - successCh <- nil + return nil } func (s *Server) popFallbackLogs() { diff --git a/server/server.go b/server/server.go index bdc5237b9..00aa713e8 100644 --- a/server/server.go +++ b/server/server.go @@ -1874,7 +1874,6 @@ func (s *Server) Start() { if opts.Gateway.Port != 0 { s.startGateways() } - s.runMemphis() // Start websocket server if needed. Do this before starting the routes, and // leaf node because we want to resolve the gateway host:port so that this // information can be sent to other routes. @@ -1934,6 +1933,7 @@ func (s *Server) Start() { if !opts.DontListen { s.AcceptLoop(clientListenReady) } + s.runMemphis() } // Shutdown will shutdown the server instance by kicking out the AcceptLoop @@ -2168,18 +2168,18 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // Keep track of client connect URLs. We may need them later. s.clientConnectURLs = s.getClientConnectURLs() s.listener = l - - go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) }, - func(_ error) bool { - if s.isLameDuckMode() { - // Signal that we are not accepting new clients - s.ldmCh <- true - // Now wait for the Shutdown... - <-s.quitCh - return true - } - return false - }) + //** moved to AcceptClientAndWSConnections by memphis **// + // go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) }, + // func(_ error) bool { + // if s.isLameDuckMode() { + // // Signal that we are not accepting new clients + // s.ldmCh <- true + // // Now wait for the Shutdown... + // <-s.quitCh + // return true + // } + // return false + // }) s.mu.Unlock() // Let the caller know that we are ready @@ -3779,7 +3779,7 @@ func (s *Server) changeRateLimitLogInterval(d time.Duration) { } } -func (s *Server) AcceptClientConnections() { +func (s *Server) AcceptClientAndWSConnections() { s.mu.Lock() go s.acceptConnections(s.listener, "Client", func(conn net.Conn) { s.createClient(conn) }, func(_ error) bool { @@ -3792,6 +3792,19 @@ func (s *Server) AcceptClientConnections() { } return false }) + go func() { + if err := s.websocket.server.Serve(s.websocket.listener); err != http.ErrServerClosed { + s.Fatalf("websocket listener error: %v", err) + } + if s.isLameDuckMode() { + // Signal that we are not accepting new clients + s.ldmCh <- true + // Now wait for the Shutdown... + <-s.quitCh + return + } + s.done <- true + }() s.mu.Unlock() opts := s.getOpts() s.Noticef("Listening for client connections on %s", @@ -3803,49 +3816,45 @@ func (s *Server) runMemphis() { if err != nil { s.Errorf("Failed to initialize tenants sequence %v", err.Error()) } - err = memphis_cache.InitializeUserCache(s.Errorf) if err != nil { s.Errorf("Failed to initialize user cache %v", err.Error()) } - err = s.InitializeEventCounter() if err != nil { s.Errorf("Failed initializing event counter: " + err.Error()) } - err = s.InitializeFirestore() if err != nil { s.Errorf("Failed initializing firestore: " + err.Error()) } - s.InitializeMemphisHandlers() - err = InitializeIntegrations() if err != nil { s.Errorf("Failed initializing integrations: " + err.Error()) } - err = s.SetDlsRetentionForExistTenants() if err != nil { s.Errorf("failed setting existing tenants with dls retention opts: %v", err.Error()) } - err = s.Force3ReplicationsForExistingStations() if err != nil { s.Errorf("Failed force 3 replications for existing stations: " + err.Error()) } - s.CompleteRelevantStuckAsyncTasks() - - go func() { - s.CreateInternalJetStreamResources() - err = s.StartBackgroundTasks() - if err != nil { - s.Errorf("Background task failed: " + err.Error()) - os.Exit(1) - } - // run only on the leader - go s.KillZombieResources() - }() + // go func() { + s.CreateInternalJetStreamResources() + opts := s.getOpts() + s.InitializeMemphisHandlers() + if !opts.DontListen { + s.AcceptClientAndWSConnections() + } + err = s.StartBackgroundTasks() + if err != nil { + s.Errorf("Background task failed: " + err.Error()) + os.Exit(1) + } + // run only on the leader + go s.KillZombieResources() + // }() } diff --git a/server/websocket.go b/server/websocket.go index 0da25f660..ceec686bd 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1090,12 +1090,10 @@ func (s *Server) StartWebsocketServer() { if port == 0 { o.Port = hl.Addr().(*net.TCPAddr).Port } - // s.mu.Unlock() s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) if proto == wsSchemePrefix { s.Warnf("Websocket not configured with TLS. DO NOT USE IN PRODUCTION!") } - // s.mu.Lock() s.websocket.tls = proto == "wss" s.websocket.connectURLs, err = s.getConnectURLs(o.Advertise, o.Host, o.Port) @@ -1138,19 +1136,20 @@ func (s *Server) StartWebsocketServer() { } s.websocket.server = hs s.websocket.listener = hl - go func() { - if err := hs.Serve(hl); err != http.ErrServerClosed { - s.Fatalf("websocket listener error: %v", err) - } - if s.isLameDuckMode() { - // Signal that we are not accepting new clients - s.ldmCh <- true - // Now wait for the Shutdown... - <-s.quitCh - return - } - s.done <- true - }() + //** moved to AcceptClientAndWSConnections by memphis **// + // go func() { + // if err := hs.Serve(hl); err != http.ErrServerClosed { + // s.Fatalf("websocket listener error: %v", err) + // } + // if s.isLameDuckMode() { + // // Signal that we are not accepting new clients + // s.ldmCh <- true + // // Now wait for the Shutdown... + // <-s.quitCh + // return + // } + // s.done <- true + // }() s.mu.Unlock() } From 4815ff9073ae39e7950d583b641fe4238a36a099 Mon Sep 17 00:00:00 2001 From: shay23b Date: Tue, 1 Aug 2023 15:56:16 +0300 Subject: [PATCH 08/16] add prints --- server/memphis_helper.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/memphis_helper.go b/server/memphis_helper.go index d1088e36f..0c015f673 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -318,6 +318,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, // system logs stream if shouldPersistSysLogs() && !SYSLOGS_STREAM_CREATED { + fmt.Println("creating stream: memphis_syslogs") err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ Name: syslogsStreamName, Subjects: []string{syslogsStreamName + ".>"}, @@ -341,6 +342,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, idempotencyWindow := time.Duration(1 * time.Minute) // tiered storage stream if !TIERED_STORAGE_STREAM_CREATED { + fmt.Println("creating stream: memphis_tiered_storage") err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ Name: tieredStorageStream, Subjects: []string{tieredStorageStream + ".>"}, @@ -381,6 +383,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, // dls unacked messages stream if !DLS_UNACKED_STREAM_CREATED { + fmt.Println("creating stream: memphis_dls_unacked") err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ Name: dlsUnackedStream, Subjects: []string{JSAdvisoryConsumerMaxDeliveryExceedPre + ".>"}, @@ -416,6 +419,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, // create schemaverse dls stream if !DLS_SCHEMAVERSE_STREAM_CREATED { + fmt.Println("creating stream: memphis_dls_schemaverse") err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ Name: dlsSchemaverseStream, Subjects: []string{SCHEMAVERSE_DLS_INNER_SUBJ}, @@ -454,6 +458,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, // delete the old version throughput stream if THROUGHPUT_LEGACY_STREAM_EXIST { + fmt.Println("deleting legacy stream: memphis-throughput-v1") err = s.memphisDeleteStream(s.MemphisGlobalAccountString(), throughputStreamName) if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { s.Errorf("Failed deleting old internal throughput stream - %s", err.Error()) @@ -462,6 +467,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, // throughput kv if !THROUGHPUT_STREAM_CREATED { + fmt.Println("creating stream: memphis-throughput-v1") err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ Name: (throughputStreamNameV1), Subjects: []string{throughputStreamNameV1 + ".>"}, From f346119d37a7a959406bc4df3e50bdaadfeb2005 Mon Sep 17 00:00:00 2001 From: shay23b Date: Tue, 1 Aug 2023 16:25:34 +0300 Subject: [PATCH 09/16] CreateInternalJetStreamResources --- server/memphis_helper.go | 314 ++++++++++++++++++++++++++++++++++----- 1 file changed, 274 insertions(+), 40 deletions(-) diff --git a/server/memphis_helper.go b/server/memphis_helper.go index 0c015f673..008f95337 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -263,36 +263,41 @@ func (s *Server) WaitForLeaderElection() { } } } - func (s *Server) CreateInternalJetStreamResources() { ready := !s.JetStreamIsClustered() retentionDur := time.Duration(s.opts.LogsRetentionDays) * time.Hour * 24 + successCh := make(chan error) + if ready { // stand alone - err := tryCreateInternalJetStreamResources(s, retentionDur, false) + go tryCreateInternalJetStreamResources(s, retentionDur, successCh, false) + err := <-successCh if err != nil { s.Errorf("CreateInternalJetStreamResources: system streams creation failed: %v", err.Error()) } } else { s.WaitForLeaderElection() if s.JetStreamIsLeader() { - timeout := time.After(1 * time.Minute) for !ready { // wait for cluster to be ready if we are in cluster mode - err := tryCreateInternalJetStreamResources(s, retentionDur, true) - if err != nil { - s.Warnf("CreateInternalJetStreamResources: %v", err.Error()) - } else { - ready = true - } - time.Sleep(1 * time.Second) + timeout := time.NewTimer(1 * time.Minute) + go tryCreateInternalJetStreamResources(s, retentionDur, successCh, true) select { - case <-timeout: - if !ready { - s.Warnf("CreateInternalJetStreamResources: system streams creation takes more than a minute") + case <-timeout.C: + s.Warnf("CreateInternalJetStreamResources: system streams creation takes more than a minute") + err := <-successCh + if err != nil { + s.Warnf("CreateInternalJetStreamResources: %v", err.Error()) + continue + } + ready = true + case err := <-successCh: + if err != nil { + s.Warnf("CreateInternalJetStreamResources: %v", err.Error()) + <-timeout.C + continue } + timeout.Stop() ready = true - default: - // continue the loop until ready or timeout } } } @@ -305,7 +310,7 @@ func (s *Server) CreateInternalJetStreamResources() { s.popFallbackLogs() } -func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, isCluster bool) error { +func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, successCh chan error, isCluster bool) { replicas := 1 if isCluster { replicas = 3 @@ -313,12 +318,12 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, v, err := s.Varz(nil) if err != nil { - return err + successCh <- err + return } // system logs stream if shouldPersistSysLogs() && !SYSLOGS_STREAM_CREATED { - fmt.Println("creating stream: memphis_syslogs") err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ Name: syslogsStreamName, Subjects: []string{syslogsStreamName + ".>"}, @@ -331,10 +336,13 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, Replicas: replicas, }) if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { - return err + time.Sleep(1 * time.Second) + tryCreateInternalJetStreamResources(s, retentionDur, successCh, isCluster) + return } if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { - return err + successCh <- err + return } SYSLOGS_STREAM_CREATED = true } @@ -342,7 +350,6 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, idempotencyWindow := time.Duration(1 * time.Minute) // tiered storage stream if !TIERED_STORAGE_STREAM_CREATED { - fmt.Println("creating stream: memphis_tiered_storage") err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ Name: tieredStorageStream, Subjects: []string{tieredStorageStream + ".>"}, @@ -355,10 +362,13 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, Duplicates: idempotencyWindow, }) if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { - return err + time.Sleep(1 * time.Second) + tryCreateInternalJetStreamResources(s, retentionDur, successCh, isCluster) + return } if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { - return err + successCh <- err + return } TIERED_STORAGE_STREAM_CREATED = true } @@ -376,14 +386,14 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, } err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), tieredStorageStream, &cc) if err != nil { - return err + successCh <- err + return } TIERED_STORAGE_CONSUMER_CREATED = true } // dls unacked messages stream if !DLS_UNACKED_STREAM_CREATED { - fmt.Println("creating stream: memphis_dls_unacked") err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ Name: dlsUnackedStream, Subjects: []string{JSAdvisoryConsumerMaxDeliveryExceedPre + ".>"}, @@ -395,7 +405,8 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, Replicas: replicas, }) if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { - return err + successCh <- err + return } DLS_UNACKED_STREAM_CREATED = true } @@ -412,14 +423,14 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, } err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), dlsUnackedStream, &cc) if err != nil { - return err + successCh <- err + return } DLS_UNACKED_CONSUMER_CREATED = true } // create schemaverse dls stream if !DLS_SCHEMAVERSE_STREAM_CREATED { - fmt.Println("creating stream: memphis_dls_schemaverse") err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ Name: dlsSchemaverseStream, Subjects: []string{SCHEMAVERSE_DLS_INNER_SUBJ}, @@ -430,11 +441,9 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, Storage: FileStorage, Replicas: replicas, }) - if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { - return err - } if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { - return err + successCh <- err + return } DLS_SCHEMAVERSE_STREAM_CREATED = true } @@ -451,14 +460,14 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, } err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), dlsSchemaverseStream, &cc) if err != nil { - return err + successCh <- err + return } DLS_SCHEMAVERSE_CONSUMER_CREATED = true } // delete the old version throughput stream if THROUGHPUT_LEGACY_STREAM_EXIST { - fmt.Println("deleting legacy stream: memphis-throughput-v1") err = s.memphisDeleteStream(s.MemphisGlobalAccountString(), throughputStreamName) if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { s.Errorf("Failed deleting old internal throughput stream - %s", err.Error()) @@ -467,7 +476,6 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, // throughput kv if !THROUGHPUT_STREAM_CREATED { - fmt.Println("creating stream: memphis-throughput-v1") err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ Name: (throughputStreamNameV1), Subjects: []string{throughputStreamNameV1 + ".>"}, @@ -481,17 +489,243 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, Replicas: replicas, NoAck: false, }) - if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { - return err - } if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { - return err + successCh <- err + return } TIERED_STORAGE_STREAM_CREATED = true } - return nil + successCh <- nil } +// func (s *Server) CreateInternalJetStreamResources() { +// ready := !s.JetStreamIsClustered() +// retentionDur := time.Duration(s.opts.LogsRetentionDays) * time.Hour * 24 + +// if ready { // stand alone +// err := tryCreateInternalJetStreamResources(s, retentionDur, false) +// if err != nil { +// s.Errorf("CreateInternalJetStreamResources: system streams creation failed: %v", err.Error()) +// } +// } else { +// s.WaitForLeaderElection() +// if s.JetStreamIsLeader() { +// timeout := time.After(1 * time.Minute) +// for !ready { // wait for cluster to be ready if we are in cluster mode +// err := tryCreateInternalJetStreamResources(s, retentionDur, true) +// if err != nil { +// s.Warnf("CreateInternalJetStreamResources: %v", err.Error()) +// } else { +// ready = true +// } +// time.Sleep(1 * time.Second) +// select { +// case <-timeout: +// if !ready { +// s.Warnf("CreateInternalJetStreamResources: system streams creation takes more than a minute") +// } +// ready = true +// default: +// // continue the loop until ready or timeout +// } +// } +// } +// } + +// if s.memphis.activateSysLogsPubFunc == nil { +// s.Fatalf("internal error: sys logs publish activation func is not initialized") +// } +// s.memphis.activateSysLogsPubFunc() +// s.popFallbackLogs() +// } + +// func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, isCluster bool) error { +// replicas := 1 +// if isCluster { +// replicas = 3 +// } + +// v, err := s.Varz(nil) +// if err != nil { +// return err +// } + +// // system logs stream +// if shouldPersistSysLogs() && !SYSLOGS_STREAM_CREATED { +// fmt.Println("creating stream: memphis_syslogs") +// err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ +// Name: syslogsStreamName, +// Subjects: []string{syslogsStreamName + ".>"}, +// Retention: LimitsPolicy, +// MaxAge: retentionDur, +// MaxBytes: v.JetStream.Config.MaxStore / 3, // tops third of the available storage +// MaxConsumers: -1, +// Discard: DiscardOld, +// Storage: FileStorage, +// Replicas: replicas, +// }) +// if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { +// return err +// } +// if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { +// return err +// } +// SYSLOGS_STREAM_CREATED = true +// } + +// idempotencyWindow := time.Duration(1 * time.Minute) +// // tiered storage stream +// if !TIERED_STORAGE_STREAM_CREATED { +// fmt.Println("creating stream: memphis_tiered_storage") +// err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ +// Name: tieredStorageStream, +// Subjects: []string{tieredStorageStream + ".>"}, +// Retention: WorkQueuePolicy, +// MaxAge: time.Hour * 24, +// MaxConsumers: -1, +// Discard: DiscardOld, +// Storage: FileStorage, +// Replicas: replicas, +// Duplicates: idempotencyWindow, +// }) +// if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { +// return err +// } +// if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { +// return err +// } +// TIERED_STORAGE_STREAM_CREATED = true +// } + +// // create tiered storage consumer +// if !TIERED_STORAGE_CONSUMER_CREATED { +// cc := ConsumerConfig{ +// DeliverPolicy: DeliverAll, +// AckPolicy: AckExplicit, +// Durable: TIERED_STORAGE_CONSUMER, +// FilterSubject: tieredStorageStream + ".>", +// AckWait: time.Duration(2) * time.Duration(s.opts.TieredStorageUploadIntervalSec) * time.Second, +// MaxAckPending: -1, +// MaxDeliver: 10, +// } +// err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), tieredStorageStream, &cc) +// if err != nil { +// return err +// } +// TIERED_STORAGE_CONSUMER_CREATED = true +// } + +// // dls unacked messages stream +// if !DLS_UNACKED_STREAM_CREATED { +// fmt.Println("creating stream: memphis_dls_unacked") +// err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ +// Name: dlsUnackedStream, +// Subjects: []string{JSAdvisoryConsumerMaxDeliveryExceedPre + ".>"}, +// Retention: WorkQueuePolicy, +// MaxAge: time.Hour * 24, +// MaxConsumers: -1, +// Discard: DiscardOld, +// Storage: FileStorage, +// Replicas: replicas, +// }) +// if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { +// return err +// } +// DLS_UNACKED_STREAM_CREATED = true +// } + +// // create dls unacked consumer +// if !DLS_UNACKED_CONSUMER_CREATED { +// cc := ConsumerConfig{ +// DeliverPolicy: DeliverAll, +// AckPolicy: AckExplicit, +// Durable: DLS_UNACKED_CONSUMER, +// AckWait: time.Duration(80) * time.Second, +// MaxAckPending: -1, +// MaxDeliver: 10, +// } +// err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), dlsUnackedStream, &cc) +// if err != nil { +// return err +// } +// DLS_UNACKED_CONSUMER_CREATED = true +// } + +// // create schemaverse dls stream +// if !DLS_SCHEMAVERSE_STREAM_CREATED { +// fmt.Println("creating stream: memphis_dls_schemaverse") +// err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ +// Name: dlsSchemaverseStream, +// Subjects: []string{SCHEMAVERSE_DLS_INNER_SUBJ}, +// Retention: WorkQueuePolicy, +// MaxAge: time.Hour * 24, +// MaxConsumers: -1, +// Discard: DiscardOld, +// Storage: FileStorage, +// Replicas: replicas, +// }) +// if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { +// return err +// } +// if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { +// return err +// } +// DLS_SCHEMAVERSE_STREAM_CREATED = true +// } + +// // create schemaverse dls consumer +// if !DLS_SCHEMAVERSE_CONSUMER_CREATED { +// cc := ConsumerConfig{ +// DeliverPolicy: DeliverAll, +// AckPolicy: AckExplicit, +// Durable: SCHEMAVERSE_DLS_CONSUMER, +// AckWait: time.Duration(80) * time.Second, +// MaxAckPending: -1, +// MaxDeliver: 10, +// } +// err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), dlsSchemaverseStream, &cc) +// if err != nil { +// return err +// } +// DLS_SCHEMAVERSE_CONSUMER_CREATED = true +// } + +// // delete the old version throughput stream +// if THROUGHPUT_LEGACY_STREAM_EXIST { +// fmt.Println("deleting legacy stream: memphis-throughput-v1") +// err = s.memphisDeleteStream(s.MemphisGlobalAccountString(), throughputStreamName) +// if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { +// s.Errorf("Failed deleting old internal throughput stream - %s", err.Error()) +// } +// } + +// // throughput kv +// if !THROUGHPUT_STREAM_CREATED { +// fmt.Println("creating stream: memphis-throughput-v1") +// err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ +// Name: (throughputStreamNameV1), +// Subjects: []string{throughputStreamNameV1 + ".>"}, +// Retention: LimitsPolicy, +// MaxConsumers: -1, +// MaxMsgs: int64(-1), +// MaxBytes: int64(-1), +// Discard: DiscardOld, +// MaxMsgsPer: ws_updates_interval_sec, +// Storage: FileStorage, +// Replicas: replicas, +// NoAck: false, +// }) +// if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { +// return err +// } +// if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { +// return err +// } +// TIERED_STORAGE_STREAM_CREATED = true +// } +// return nil +// } + func (s *Server) popFallbackLogs() { select { case <-s.memphis.fallbackLogQ.ch: From 2820f8a7ae99397be9ea161a5122151b75801f63 Mon Sep 17 00:00:00 2001 From: shay23b Date: Tue, 1 Aug 2023 16:35:27 +0300 Subject: [PATCH 10/16] add prints --- server/memphis_helper.go | 5 +++++ server/server.go | 2 ++ 2 files changed, 7 insertions(+) diff --git a/server/memphis_helper.go b/server/memphis_helper.go index 008f95337..8a3a6bf84 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -276,10 +276,14 @@ func (s *Server) CreateInternalJetStreamResources() { s.Errorf("CreateInternalJetStreamResources: system streams creation failed: %v", err.Error()) } } else { + fmt.Println("before WaitForLeaderElection") s.WaitForLeaderElection() + fmt.Println("after WaitForLeaderElection") if s.JetStreamIsLeader() { + fmt.Println("Leader") for !ready { // wait for cluster to be ready if we are in cluster mode timeout := time.NewTimer(1 * time.Minute) + fmt.Println("before tryCreateInternalJetStreamResources") go tryCreateInternalJetStreamResources(s, retentionDur, successCh, true) select { case <-timeout.C: @@ -297,6 +301,7 @@ func (s *Server) CreateInternalJetStreamResources() { continue } timeout.Stop() + fmt.Println("after tryCreateInternalJetStreamResources") ready = true } } diff --git a/server/server.go b/server/server.go index 00aa713e8..e3e5e4e0e 100644 --- a/server/server.go +++ b/server/server.go @@ -3843,7 +3843,9 @@ func (s *Server) runMemphis() { } s.CompleteRelevantStuckAsyncTasks() // go func() { + fmt.Println("before CreateInternalJetStreamResources") s.CreateInternalJetStreamResources() + fmt.Println("after CreateInternalJetStreamResources") opts := s.getOpts() s.InitializeMemphisHandlers() if !opts.DontListen { From 2f77f67d38a90e8d24b4e6e102f0473b06baa96b Mon Sep 17 00:00:00 2001 From: shay23b Date: Tue, 1 Aug 2023 16:58:01 +0300 Subject: [PATCH 11/16] print leader --- server/memphis_helper.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/memphis_helper.go b/server/memphis_helper.go index 8a3a6bf84..a9a35640b 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -257,6 +257,7 @@ func (s *Server) WaitForLeaderElection() { } if ci.Leader != "" { + fmt.Println("leader: " + ci.Leader) break } else { time.Sleep(100 * time.Millisecond) From 33fdc410469103791f889a66bc62ea4f658154a3 Mon Sep 17 00:00:00 2001 From: shay23b Date: Wed, 2 Aug 2023 10:32:12 +0300 Subject: [PATCH 12/16] CreateInternalJetStreamResources after AcceptClientConnections --- server/server.go | 29 ++++++++++++++++++++++------- server/websocket.go | 2 +- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/server/server.go b/server/server.go index e3e5e4e0e..68d73b049 100644 --- a/server/server.go +++ b/server/server.go @@ -3779,7 +3779,7 @@ func (s *Server) changeRateLimitLogInterval(d time.Duration) { } } -func (s *Server) AcceptClientAndWSConnections() { +func (s *Server) AcceptClientConnections() { s.mu.Lock() go s.acceptConnections(s.listener, "Client", func(conn net.Conn) { s.createClient(conn) }, func(_ error) bool { @@ -3792,6 +3792,14 @@ func (s *Server) AcceptClientAndWSConnections() { } return false }) + s.mu.Unlock() + opts := s.getOpts() + s.Noticef("Listening for client connections on %s", + net.JoinHostPort(opts.Host, strconv.Itoa(s.listener.Addr().(*net.TCPAddr).Port))) +} + +func (s *Server) AcceptWSConnections() { + s.mu.Lock() go func() { if err := s.websocket.server.Serve(s.websocket.listener); err != http.ErrServerClosed { s.Fatalf("websocket listener error: %v", err) @@ -3807,8 +3815,14 @@ func (s *Server) AcceptClientAndWSConnections() { }() s.mu.Unlock() opts := s.getOpts() - s.Noticef("Listening for client connections on %s", - net.JoinHostPort(opts.Host, strconv.Itoa(s.listener.Addr().(*net.TCPAddr).Port))) + o := &opts.Websocket + var proto string + if o.TLSConfig != nil { + proto = wsSchemePrefixTLS + } else { + proto = wsSchemePrefix + } + s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) } func (s *Server) runMemphis() { @@ -3843,19 +3857,20 @@ func (s *Server) runMemphis() { } s.CompleteRelevantStuckAsyncTasks() // go func() { - fmt.Println("before CreateInternalJetStreamResources") - s.CreateInternalJetStreamResources() - fmt.Println("after CreateInternalJetStreamResources") opts := s.getOpts() s.InitializeMemphisHandlers() if !opts.DontListen { - s.AcceptClientAndWSConnections() + s.AcceptClientConnections() } + fmt.Println("before CreateInternalJetStreamResources") + s.CreateInternalJetStreamResources() + fmt.Println("after CreateInternalJetStreamResources") err = s.StartBackgroundTasks() if err != nil { s.Errorf("Background task failed: " + err.Error()) os.Exit(1) } + s.AcceptWSConnections() // run only on the leader go s.KillZombieResources() // }() diff --git a/server/websocket.go b/server/websocket.go index ceec686bd..24f079abc 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1090,7 +1090,7 @@ func (s *Server) StartWebsocketServer() { if port == 0 { o.Port = hl.Addr().(*net.TCPAddr).Port } - s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) + // s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) if proto == wsSchemePrefix { s.Warnf("Websocket not configured with TLS. DO NOT USE IN PRODUCTION!") } From 622cd5e2c4f18b90538bf34b46cf541ae2909edf Mon Sep 17 00:00:00 2001 From: shay23b Date: Wed, 2 Aug 2023 17:28:07 +0300 Subject: [PATCH 13/16] remove unnecessary code and add comments --- server/memphis_helper.go | 234 --------------------------------------- server/server.go | 15 ++- server/websocket.go | 5 +- 3 files changed, 13 insertions(+), 241 deletions(-) diff --git a/server/memphis_helper.go b/server/memphis_helper.go index a9a35640b..537023311 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -257,7 +257,6 @@ func (s *Server) WaitForLeaderElection() { } if ci.Leader != "" { - fmt.Println("leader: " + ci.Leader) break } else { time.Sleep(100 * time.Millisecond) @@ -277,14 +276,10 @@ func (s *Server) CreateInternalJetStreamResources() { s.Errorf("CreateInternalJetStreamResources: system streams creation failed: %v", err.Error()) } } else { - fmt.Println("before WaitForLeaderElection") s.WaitForLeaderElection() - fmt.Println("after WaitForLeaderElection") if s.JetStreamIsLeader() { - fmt.Println("Leader") for !ready { // wait for cluster to be ready if we are in cluster mode timeout := time.NewTimer(1 * time.Minute) - fmt.Println("before tryCreateInternalJetStreamResources") go tryCreateInternalJetStreamResources(s, retentionDur, successCh, true) select { case <-timeout.C: @@ -302,7 +297,6 @@ func (s *Server) CreateInternalJetStreamResources() { continue } timeout.Stop() - fmt.Println("after tryCreateInternalJetStreamResources") ready = true } } @@ -504,234 +498,6 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, successCh <- nil } -// func (s *Server) CreateInternalJetStreamResources() { -// ready := !s.JetStreamIsClustered() -// retentionDur := time.Duration(s.opts.LogsRetentionDays) * time.Hour * 24 - -// if ready { // stand alone -// err := tryCreateInternalJetStreamResources(s, retentionDur, false) -// if err != nil { -// s.Errorf("CreateInternalJetStreamResources: system streams creation failed: %v", err.Error()) -// } -// } else { -// s.WaitForLeaderElection() -// if s.JetStreamIsLeader() { -// timeout := time.After(1 * time.Minute) -// for !ready { // wait for cluster to be ready if we are in cluster mode -// err := tryCreateInternalJetStreamResources(s, retentionDur, true) -// if err != nil { -// s.Warnf("CreateInternalJetStreamResources: %v", err.Error()) -// } else { -// ready = true -// } -// time.Sleep(1 * time.Second) -// select { -// case <-timeout: -// if !ready { -// s.Warnf("CreateInternalJetStreamResources: system streams creation takes more than a minute") -// } -// ready = true -// default: -// // continue the loop until ready or timeout -// } -// } -// } -// } - -// if s.memphis.activateSysLogsPubFunc == nil { -// s.Fatalf("internal error: sys logs publish activation func is not initialized") -// } -// s.memphis.activateSysLogsPubFunc() -// s.popFallbackLogs() -// } - -// func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration, isCluster bool) error { -// replicas := 1 -// if isCluster { -// replicas = 3 -// } - -// v, err := s.Varz(nil) -// if err != nil { -// return err -// } - -// // system logs stream -// if shouldPersistSysLogs() && !SYSLOGS_STREAM_CREATED { -// fmt.Println("creating stream: memphis_syslogs") -// err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ -// Name: syslogsStreamName, -// Subjects: []string{syslogsStreamName + ".>"}, -// Retention: LimitsPolicy, -// MaxAge: retentionDur, -// MaxBytes: v.JetStream.Config.MaxStore / 3, // tops third of the available storage -// MaxConsumers: -1, -// Discard: DiscardOld, -// Storage: FileStorage, -// Replicas: replicas, -// }) -// if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { -// return err -// } -// if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { -// return err -// } -// SYSLOGS_STREAM_CREATED = true -// } - -// idempotencyWindow := time.Duration(1 * time.Minute) -// // tiered storage stream -// if !TIERED_STORAGE_STREAM_CREATED { -// fmt.Println("creating stream: memphis_tiered_storage") -// err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ -// Name: tieredStorageStream, -// Subjects: []string{tieredStorageStream + ".>"}, -// Retention: WorkQueuePolicy, -// MaxAge: time.Hour * 24, -// MaxConsumers: -1, -// Discard: DiscardOld, -// Storage: FileStorage, -// Replicas: replicas, -// Duplicates: idempotencyWindow, -// }) -// if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { -// return err -// } -// if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { -// return err -// } -// TIERED_STORAGE_STREAM_CREATED = true -// } - -// // create tiered storage consumer -// if !TIERED_STORAGE_CONSUMER_CREATED { -// cc := ConsumerConfig{ -// DeliverPolicy: DeliverAll, -// AckPolicy: AckExplicit, -// Durable: TIERED_STORAGE_CONSUMER, -// FilterSubject: tieredStorageStream + ".>", -// AckWait: time.Duration(2) * time.Duration(s.opts.TieredStorageUploadIntervalSec) * time.Second, -// MaxAckPending: -1, -// MaxDeliver: 10, -// } -// err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), tieredStorageStream, &cc) -// if err != nil { -// return err -// } -// TIERED_STORAGE_CONSUMER_CREATED = true -// } - -// // dls unacked messages stream -// if !DLS_UNACKED_STREAM_CREATED { -// fmt.Println("creating stream: memphis_dls_unacked") -// err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ -// Name: dlsUnackedStream, -// Subjects: []string{JSAdvisoryConsumerMaxDeliveryExceedPre + ".>"}, -// Retention: WorkQueuePolicy, -// MaxAge: time.Hour * 24, -// MaxConsumers: -1, -// Discard: DiscardOld, -// Storage: FileStorage, -// Replicas: replicas, -// }) -// if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { -// return err -// } -// DLS_UNACKED_STREAM_CREATED = true -// } - -// // create dls unacked consumer -// if !DLS_UNACKED_CONSUMER_CREATED { -// cc := ConsumerConfig{ -// DeliverPolicy: DeliverAll, -// AckPolicy: AckExplicit, -// Durable: DLS_UNACKED_CONSUMER, -// AckWait: time.Duration(80) * time.Second, -// MaxAckPending: -1, -// MaxDeliver: 10, -// } -// err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), dlsUnackedStream, &cc) -// if err != nil { -// return err -// } -// DLS_UNACKED_CONSUMER_CREATED = true -// } - -// // create schemaverse dls stream -// if !DLS_SCHEMAVERSE_STREAM_CREATED { -// fmt.Println("creating stream: memphis_dls_schemaverse") -// err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ -// Name: dlsSchemaverseStream, -// Subjects: []string{SCHEMAVERSE_DLS_INNER_SUBJ}, -// Retention: WorkQueuePolicy, -// MaxAge: time.Hour * 24, -// MaxConsumers: -1, -// Discard: DiscardOld, -// Storage: FileStorage, -// Replicas: replicas, -// }) -// if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { -// return err -// } -// if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { -// return err -// } -// DLS_SCHEMAVERSE_STREAM_CREATED = true -// } - -// // create schemaverse dls consumer -// if !DLS_SCHEMAVERSE_CONSUMER_CREATED { -// cc := ConsumerConfig{ -// DeliverPolicy: DeliverAll, -// AckPolicy: AckExplicit, -// Durable: SCHEMAVERSE_DLS_CONSUMER, -// AckWait: time.Duration(80) * time.Second, -// MaxAckPending: -1, -// MaxDeliver: 10, -// } -// err = serv.memphisAddConsumer(s.MemphisGlobalAccountString(), dlsSchemaverseStream, &cc) -// if err != nil { -// return err -// } -// DLS_SCHEMAVERSE_CONSUMER_CREATED = true -// } - -// // delete the old version throughput stream -// if THROUGHPUT_LEGACY_STREAM_EXIST { -// fmt.Println("deleting legacy stream: memphis-throughput-v1") -// err = s.memphisDeleteStream(s.MemphisGlobalAccountString(), throughputStreamName) -// if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { -// s.Errorf("Failed deleting old internal throughput stream - %s", err.Error()) -// } -// } - -// // throughput kv -// if !THROUGHPUT_STREAM_CREATED { -// fmt.Println("creating stream: memphis-throughput-v1") -// err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{ -// Name: (throughputStreamNameV1), -// Subjects: []string{throughputStreamNameV1 + ".>"}, -// Retention: LimitsPolicy, -// MaxConsumers: -1, -// MaxMsgs: int64(-1), -// MaxBytes: int64(-1), -// Discard: DiscardOld, -// MaxMsgsPer: ws_updates_interval_sec, -// Storage: FileStorage, -// Replicas: replicas, -// NoAck: false, -// }) -// if err != nil && IsNatsErr(err, JSClusterNoPeersErrF) { -// return err -// } -// if err != nil && !IsNatsErr(err, JSStreamNameExistErr) { -// return err -// } -// TIERED_STORAGE_STREAM_CREATED = true -// } -// return nil -// } - func (s *Server) popFallbackLogs() { select { case <-s.memphis.fallbackLogQ.ch: diff --git a/server/server.go b/server/server.go index 68d73b049..5cc9bddd6 100644 --- a/server/server.go +++ b/server/server.go @@ -1933,7 +1933,9 @@ func (s *Server) Start() { if !opts.DontListen { s.AcceptLoop(clientListenReady) } + //** added by memphis s.runMemphis() + // added by memphis ** } // Shutdown will shutdown the server instance by kicking out the AcceptLoop @@ -2141,9 +2143,10 @@ func (s *Server) AcceptLoop(clr chan struct{}) { s.Fatalf("Error listening on port: %s, %q", hp, e) return } + //** moved to AcceptClientConnections by memphis // s.Noticef("Listening for client connections on %s", // net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port))) - + // moved to AcceptClientConnections by memphis ** // Alert of TLS enabled. if opts.TLSConfig != nil { s.Noticef("TLS required for client connections") @@ -2168,7 +2171,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // Keep track of client connect URLs. We may need them later. s.clientConnectURLs = s.getClientConnectURLs() s.listener = l - //** moved to AcceptClientAndWSConnections by memphis **// + //** moved to AcceptClientConnections by memphis // go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) }, // func(_ error) bool { // if s.isLameDuckMode() { @@ -2180,6 +2183,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // } // return false // }) + // moved to AcceptClientConnections by memphis ** s.mu.Unlock() // Let the caller know that we are ready @@ -3779,6 +3783,7 @@ func (s *Server) changeRateLimitLogInterval(d time.Duration) { } } +//** added by memphis func (s *Server) AcceptClientConnections() { s.mu.Lock() go s.acceptConnections(s.listener, "Client", func(conn net.Conn) { s.createClient(conn) }, @@ -3856,15 +3861,12 @@ func (s *Server) runMemphis() { s.Errorf("Failed force 3 replications for existing stations: " + err.Error()) } s.CompleteRelevantStuckAsyncTasks() - // go func() { opts := s.getOpts() s.InitializeMemphisHandlers() if !opts.DontListen { s.AcceptClientConnections() } - fmt.Println("before CreateInternalJetStreamResources") s.CreateInternalJetStreamResources() - fmt.Println("after CreateInternalJetStreamResources") err = s.StartBackgroundTasks() if err != nil { s.Errorf("Background task failed: " + err.Error()) @@ -3873,5 +3875,6 @@ func (s *Server) runMemphis() { s.AcceptWSConnections() // run only on the leader go s.KillZombieResources() - // }() } + +// added by memphis ** diff --git a/server/websocket.go b/server/websocket.go index 24f079abc..647221470 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1090,7 +1090,9 @@ func (s *Server) StartWebsocketServer() { if port == 0 { o.Port = hl.Addr().(*net.TCPAddr).Port } + //** moved to AcceptWSConnections by memphis // s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) + //moved to AcceptWSConnections by memphis ** if proto == wsSchemePrefix { s.Warnf("Websocket not configured with TLS. DO NOT USE IN PRODUCTION!") } @@ -1136,7 +1138,7 @@ func (s *Server) StartWebsocketServer() { } s.websocket.server = hs s.websocket.listener = hl - //** moved to AcceptClientAndWSConnections by memphis **// + //** moved to AcceptWSConnections by memphis // go func() { // if err := hs.Serve(hl); err != http.ErrServerClosed { // s.Fatalf("websocket listener error: %v", err) @@ -1150,6 +1152,7 @@ func (s *Server) StartWebsocketServer() { // } // s.done <- true // }() + // moved to AcceptWSConnections by memphis ** s.mu.Unlock() } From 97c27207f795f2e52a6a9b6a5f9b166f9c0cc7b8 Mon Sep 17 00:00:00 2001 From: shay23b Date: Wed, 2 Aug 2023 17:32:00 +0300 Subject: [PATCH 14/16] space --- server/memphis_helper.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/memphis_helper.go b/server/memphis_helper.go index 0b5e66028..2d9596141 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -263,6 +263,7 @@ func (s *Server) WaitForLeaderElection() { } } } + func (s *Server) CreateInternalJetStreamResources() { ready := !s.JetStreamIsClustered() retentionDur := time.Duration(s.opts.LogsRetentionDays) * time.Hour * 24 From bf4c7e70bd64f6c97f728476e17825346c09a5bc Mon Sep 17 00:00:00 2001 From: shay23b Date: Wed, 2 Aug 2023 17:40:53 +0300 Subject: [PATCH 15/16] change name --- server/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/server.go b/server/server.go index 5cc9bddd6..c641827df 100644 --- a/server/server.go +++ b/server/server.go @@ -1934,7 +1934,7 @@ func (s *Server) Start() { s.AcceptLoop(clientListenReady) } //** added by memphis - s.runMemphis() + s.initializeMemphis() // added by memphis ** } @@ -3830,7 +3830,7 @@ func (s *Server) AcceptWSConnections() { s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) } -func (s *Server) runMemphis() { +func (s *Server) initializeMemphis() { err := TenantSeqInitialize() if err != nil { s.Errorf("Failed to initialize tenants sequence %v", err.Error()) From 75351d4f135c68f069d1180866b8aa86df586350 Mon Sep 17 00:00:00 2001 From: shay23b Date: Thu, 3 Aug 2023 09:22:02 +0300 Subject: [PATCH 16/16] cr changes --- main.go | 4 ++-- server/server.go | 2 +- server/websocket.go | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index b62587a83..584246bf9 100644 --- a/main.go +++ b/main.go @@ -102,8 +102,6 @@ func runMemphis(s *server.Server) { s.Errorf("Failed initializing analytics: " + err.Error()) } - go http_server.InitializeHttpServer(s) - isUserPassBased := os.Getenv("USER_PASS_BASED_AUTH") == "true" if isUserPassBased { @@ -124,6 +122,8 @@ func runMemphis(s *server.Server) { } } + go http_server.InitializeHttpServer(s) + var env string var message string if os.Getenv("DOCKER_ENV") != "" { diff --git a/server/server.go b/server/server.go index c641827df..f08f6210d 100644 --- a/server/server.go +++ b/server/server.go @@ -1878,7 +1878,7 @@ func (s *Server) Start() { // leaf node because we want to resolve the gateway host:port so that this // information can be sent to other routes. if opts.Websocket.Port != 0 { - s.StartWebsocketServer() + s.startWebsocketServer() } // Start up listen if we want to accept leaf node connections. diff --git a/server/websocket.go b/server/websocket.go index 647221470..797ed5c86 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1042,7 +1042,7 @@ func (s *Server) wsConfigAuth(opts *WebsocketOpts) { ws.authOverride = opts.Username != _EMPTY_ || opts.Token != _EMPTY_ || opts.NoAuthUser != _EMPTY_ } -func (s *Server) StartWebsocketServer() { +func (s *Server) startWebsocketServer() { sopts := s.getOpts() o := &sopts.Websocket @@ -1090,7 +1090,7 @@ func (s *Server) StartWebsocketServer() { if port == 0 { o.Port = hl.Addr().(*net.TCPAddr).Port } - //** moved to AcceptWSConnections by memphis + //** moved to AcceptWSConnections by memphis // s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) //moved to AcceptWSConnections by memphis ** if proto == wsSchemePrefix { @@ -1138,7 +1138,7 @@ func (s *Server) StartWebsocketServer() { } s.websocket.server = hs s.websocket.listener = hl - //** moved to AcceptWSConnections by memphis + //** moved to AcceptWSConnections by memphis // go func() { // if err := hs.Serve(hl); err != http.ErrServerClosed { // s.Fatalf("websocket listener error: %v", err)