diff --git a/main.go b/main.go index 11bafb2d0..584246bf9 100644 --- a/main.go +++ b/main.go @@ -21,7 +21,6 @@ import ( "memphis/analytics" "memphis/db" "memphis/http_server" - "memphis/memphis_cache" "memphis/server" "strings" @@ -103,101 +102,50 @@ func runMemphis(s *server.Server) { s.Errorf("Failed initializing analytics: " + err.Error()) } - err = server.TenantSeqInitialize() - if err != nil { - s.Errorf("Failed to initialize tenants sequence %v", err.Error()) - } - - err = memphis_cache.InitializeUserCache(s.Errorf) - if err != nil { - s.Errorf("Failed to initialize user cache %v", err.Error()) - } - - err = s.InitializeEventCounter() - if err != nil { - s.Errorf("Failed initializing event counter: " + err.Error()) - } - - err = s.InitializeFirestore() - if err != nil { - s.Errorf("Failed initializing firestore: " + err.Error()) - } - - s.InitializeMemphisHandlers() - - err = server.InitializeIntegrations() - if err != nil { - s.Errorf("Failed initializing integrations: " + err.Error()) - } - - err = s.SetDlsRetentionForExistTenants() - if err != nil { - s.Errorf("failed setting existing tenants with dls retention opts: %v", err.Error()) - } + isUserPassBased := os.Getenv("USER_PASS_BASED_AUTH") == "true" - err = s.Force3ReplicationsForExistingStations() - if err != nil { - s.Errorf("Failed force 3 replications for existing stations: " + err.Error()) - } - - s.CompleteRelevantStuckAsyncTasks() - - go func() { - s.CreateInternalJetStreamResources() - go http_server.InitializeHttpServer(s) - err = s.StartBackgroundTasks() - if err != nil { - s.Errorf("Background task failed: " + err.Error()) - os.Exit(1) + if isUserPassBased { + // For backward compatibility data from old account to memphis default account + storeDir := s.Opts().StoreDir + if storeDir == "" { + storeDir = os.TempDir() + storeDir = strings.TrimSuffix(storeDir, "/") } - // run only on the leader - go s.KillZombieResources() + folderName := fmt.Sprintf("%s%s%s", storeDir, "/jetstream/", server.DEFAULT_GLOBAL_ACCOUNT) + f, _ := os.Stat(folderName) + if f != nil { + err = s.MoveResourcesFromOldToNewDefaultAcc() + if err != nil { + s.Errorf("Data from global account to memphis account failed: %s", err.Error()) + } + } + } - isUserPassBased := os.Getenv("USER_PASS_BASED_AUTH") == "true" + go http_server.InitializeHttpServer(s) + var env string + var message string + if os.Getenv("DOCKER_ENV") != "" { + env = "Docker" if isUserPassBased { - // For backward compatibility data from old account to memphis default account - storeDir := s.Opts().StoreDir - if storeDir == "" { - storeDir = os.TempDir() - storeDir = strings.TrimSuffix(storeDir, "/") - } - - folderName := fmt.Sprintf("%s%s%s", storeDir, "/jetstream/", server.DEFAULT_GLOBAL_ACCOUNT) - f, _ := os.Stat(folderName) - if f != nil { - err = s.MoveResourcesFromOldToNewDefaultAcc() - if err != nil { - s.Errorf("Data from global account to memphis account failed: %s", err.Error()) - } - } + message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" + } else { + message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" } - - var env string - var message string - if os.Getenv("DOCKER_ENV") != "" { - env = "Docker" - if isUserPassBased { - message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" - } else { - message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" - } - s.Noticef(message) - } else if os.Getenv("LOCAL_CLUSTER_ENV") != "" { - env = "Local cluster" - if isUserPassBased { - message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" - } else { - message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" - } - s.Noticef(message) + s.Noticef(message) + } else if os.Getenv("LOCAL_CLUSTER_ENV") != "" { + env = "Local cluster" + if isUserPassBased { + message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" } else { - env = "K8S" + message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********" } - - s.Noticef("*** Memphis broker is ready, ENV: %s :-) ***", env) - }() + s.Noticef(message) + } else { + env = "K8S" + } + s.Noticef("*** Memphis broker is ready, ENV: %s :-) ***", env) } func main() { diff --git a/server/memphis_handlers_ws.go b/server/memphis_handlers_ws.go index 262cfe959..7837e7366 100644 --- a/server/memphis_handlers_ws.go +++ b/server/memphis_handlers_ws.go @@ -115,11 +115,13 @@ func memphisWSLoop(s *Server, subs *concurrentMap[memphisWSReqTenantsToFiller], if !IsNatsErr(err, JSStreamNotFoundErr) && !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "alphanumeric") { s.Errorf("[tenant: %v]memphisWSLoop at filler: %v", tenant, err.Error()) } + deleteTenantFromSub(tenant, subs, k) continue } updateRaw, err := json.Marshal(update) if err != nil { s.Errorf("[tenant: %v]memphisWSLoop at json.Marshal: %v", tenant, err.Error()) + deleteTenantFromSub(tenant, subs, k) continue } diff --git a/server/server.go b/server/server.go index e1a028f98..f08f6210d 100644 --- a/server/server.go +++ b/server/server.go @@ -26,6 +26,7 @@ import ( "math/rand" "memphis/db" "memphis/logger" + "memphis/memphis_cache" "net" "net/http" "regexp" @@ -1873,7 +1874,6 @@ func (s *Server) Start() { if opts.Gateway.Port != 0 { s.startGateways() } - // Start websocket server if needed. Do this before starting the routes, and // leaf node because we want to resolve the gateway host:port so that this // information can be sent to other routes. @@ -1933,6 +1933,9 @@ func (s *Server) Start() { if !opts.DontListen { s.AcceptLoop(clientListenReady) } + //** added by memphis + s.initializeMemphis() + // added by memphis ** } // Shutdown will shutdown the server instance by kicking out the AcceptLoop @@ -2140,9 +2143,10 @@ func (s *Server) AcceptLoop(clr chan struct{}) { s.Fatalf("Error listening on port: %s, %q", hp, e) return } - s.Noticef("Listening for client connections on %s", - net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port))) - + //** moved to AcceptClientConnections by memphis + // s.Noticef("Listening for client connections on %s", + // net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port))) + // moved to AcceptClientConnections by memphis ** // Alert of TLS enabled. if opts.TLSConfig != nil { s.Noticef("TLS required for client connections") @@ -2167,18 +2171,19 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // Keep track of client connect URLs. We may need them later. s.clientConnectURLs = s.getClientConnectURLs() s.listener = l - - go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) }, - func(_ error) bool { - if s.isLameDuckMode() { - // Signal that we are not accepting new clients - s.ldmCh <- true - // Now wait for the Shutdown... - <-s.quitCh - return true - } - return false - }) + //** moved to AcceptClientConnections by memphis + // go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) }, + // func(_ error) bool { + // if s.isLameDuckMode() { + // // Signal that we are not accepting new clients + // s.ldmCh <- true + // // Now wait for the Shutdown... + // <-s.quitCh + // return true + // } + // return false + // }) + // moved to AcceptClientConnections by memphis ** s.mu.Unlock() // Let the caller know that we are ready @@ -3777,3 +3782,99 @@ func (s *Server) changeRateLimitLogInterval(d time.Duration) { default: } } + +//** added by memphis +func (s *Server) AcceptClientConnections() { + s.mu.Lock() + go s.acceptConnections(s.listener, "Client", func(conn net.Conn) { s.createClient(conn) }, + func(_ error) bool { + if s.isLameDuckMode() { + // Signal that we are not accepting new clients + s.ldmCh <- true + // Now wait for the Shutdown... + <-s.quitCh + return true + } + return false + }) + s.mu.Unlock() + opts := s.getOpts() + s.Noticef("Listening for client connections on %s", + net.JoinHostPort(opts.Host, strconv.Itoa(s.listener.Addr().(*net.TCPAddr).Port))) +} + +func (s *Server) AcceptWSConnections() { + s.mu.Lock() + go func() { + if err := s.websocket.server.Serve(s.websocket.listener); err != http.ErrServerClosed { + s.Fatalf("websocket listener error: %v", err) + } + if s.isLameDuckMode() { + // Signal that we are not accepting new clients + s.ldmCh <- true + // Now wait for the Shutdown... + <-s.quitCh + return + } + s.done <- true + }() + s.mu.Unlock() + opts := s.getOpts() + o := &opts.Websocket + var proto string + if o.TLSConfig != nil { + proto = wsSchemePrefixTLS + } else { + proto = wsSchemePrefix + } + s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) +} + +func (s *Server) initializeMemphis() { + err := TenantSeqInitialize() + if err != nil { + s.Errorf("Failed to initialize tenants sequence %v", err.Error()) + } + err = memphis_cache.InitializeUserCache(s.Errorf) + if err != nil { + s.Errorf("Failed to initialize user cache %v", err.Error()) + } + err = s.InitializeEventCounter() + if err != nil { + s.Errorf("Failed initializing event counter: " + err.Error()) + } + err = s.InitializeFirestore() + if err != nil { + s.Errorf("Failed initializing firestore: " + err.Error()) + } + + err = InitializeIntegrations() + if err != nil { + s.Errorf("Failed initializing integrations: " + err.Error()) + } + err = s.SetDlsRetentionForExistTenants() + if err != nil { + s.Errorf("failed setting existing tenants with dls retention opts: %v", err.Error()) + } + err = s.Force3ReplicationsForExistingStations() + if err != nil { + s.Errorf("Failed force 3 replications for existing stations: " + err.Error()) + } + s.CompleteRelevantStuckAsyncTasks() + opts := s.getOpts() + s.InitializeMemphisHandlers() + if !opts.DontListen { + s.AcceptClientConnections() + } + s.CreateInternalJetStreamResources() + err = s.StartBackgroundTasks() + if err != nil { + s.Errorf("Background task failed: " + err.Error()) + os.Exit(1) + } + s.AcceptWSConnections() + // run only on the leader + go s.KillZombieResources() +} + +// added by memphis ** diff --git a/server/websocket.go b/server/websocket.go index a884b49b0..797ed5c86 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1090,7 +1090,9 @@ func (s *Server) startWebsocketServer() { if port == 0 { o.Port = hl.Addr().(*net.TCPAddr).Port } - s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) + //** moved to AcceptWSConnections by memphis + // s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port) + //moved to AcceptWSConnections by memphis ** if proto == wsSchemePrefix { s.Warnf("Websocket not configured with TLS. DO NOT USE IN PRODUCTION!") } @@ -1136,19 +1138,21 @@ func (s *Server) startWebsocketServer() { } s.websocket.server = hs s.websocket.listener = hl - go func() { - if err := hs.Serve(hl); err != http.ErrServerClosed { - s.Fatalf("websocket listener error: %v", err) - } - if s.isLameDuckMode() { - // Signal that we are not accepting new clients - s.ldmCh <- true - // Now wait for the Shutdown... - <-s.quitCh - return - } - s.done <- true - }() + //** moved to AcceptWSConnections by memphis + // go func() { + // if err := hs.Serve(hl); err != http.ErrServerClosed { + // s.Fatalf("websocket listener error: %v", err) + // } + // if s.isLameDuckMode() { + // // Signal that we are not accepting new clients + // s.ldmCh <- true + // // Now wait for the Shutdown... + // <-s.quitCh + // return + // } + // s.done <- true + // }() + // moved to AcceptWSConnections by memphis ** s.mu.Unlock() }