Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept client connections #1195

Merged
merged 17 commits into from
Aug 3, 2023
122 changes: 35 additions & 87 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"memphis/analytics"
"memphis/db"
"memphis/http_server"
"memphis/memphis_cache"
"memphis/server"
"strings"

Expand Down Expand Up @@ -103,101 +102,50 @@ func runMemphis(s *server.Server) {
s.Errorf("Failed initializing analytics: " + err.Error())
}

err = server.TenantSeqInitialize()
if err != nil {
s.Errorf("Failed to initialize tenants sequence %v", err.Error())
}

err = memphis_cache.InitializeUserCache(s.Errorf)
if err != nil {
s.Errorf("Failed to initialize user cache %v", err.Error())
}

err = s.InitializeEventCounter()
if err != nil {
s.Errorf("Failed initializing event counter: " + err.Error())
}

err = s.InitializeFirestore()
if err != nil {
s.Errorf("Failed initializing firestore: " + err.Error())
}

s.InitializeMemphisHandlers()

err = server.InitializeIntegrations()
if err != nil {
s.Errorf("Failed initializing integrations: " + err.Error())
}

err = s.SetDlsRetentionForExistTenants()
if err != nil {
s.Errorf("failed setting existing tenants with dls retention opts: %v", err.Error())
}
isUserPassBased := os.Getenv("USER_PASS_BASED_AUTH") == "true"

err = s.Force3ReplicationsForExistingStations()
if err != nil {
s.Errorf("Failed force 3 replications for existing stations: " + err.Error())
}

s.CompleteRelevantStuckAsyncTasks()

go func() {
s.CreateInternalJetStreamResources()
go http_server.InitializeHttpServer(s)
err = s.StartBackgroundTasks()
if err != nil {
s.Errorf("Background task failed: " + err.Error())
os.Exit(1)
if isUserPassBased {
// For backward compatibility data from old account to memphis default account
storeDir := s.Opts().StoreDir
if storeDir == "" {
storeDir = os.TempDir()
storeDir = strings.TrimSuffix(storeDir, "/")
}

// run only on the leader
go s.KillZombieResources()
folderName := fmt.Sprintf("%s%s%s", storeDir, "/jetstream/", server.DEFAULT_GLOBAL_ACCOUNT)
f, _ := os.Stat(folderName)
if f != nil {
err = s.MoveResourcesFromOldToNewDefaultAcc()
if err != nil {
s.Errorf("Data from global account to memphis account failed: %s", err.Error())
}
}
}

isUserPassBased := os.Getenv("USER_PASS_BASED_AUTH") == "true"
go http_server.InitializeHttpServer(s)

var env string
var message string
if os.Getenv("DOCKER_ENV") != "" {
env = "Docker"
if isUserPassBased {
// For backward compatibility data from old account to memphis default account
storeDir := s.Opts().StoreDir
if storeDir == "" {
storeDir = os.TempDir()
storeDir = strings.TrimSuffix(storeDir, "/")
}

folderName := fmt.Sprintf("%s%s%s", storeDir, "/jetstream/", server.DEFAULT_GLOBAL_ACCOUNT)
f, _ := os.Stat(folderName)
if f != nil {
err = s.MoveResourcesFromOldToNewDefaultAcc()
if err != nil {
s.Errorf("Data from global account to memphis account failed: %s", err.Error())
}
}
message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
} else {
message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
}

var env string
var message string
if os.Getenv("DOCKER_ENV") != "" {
env = "Docker"
if isUserPassBased {
message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
} else {
message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
}
s.Noticef(message)
} else if os.Getenv("LOCAL_CLUSTER_ENV") != "" {
env = "Local cluster"
if isUserPassBased {
message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
} else {
message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
}
s.Noticef(message)
s.Noticef(message)
} else if os.Getenv("LOCAL_CLUSTER_ENV") != "" {
env = "Local cluster"
if isUserPassBased {
message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
} else {
env = "K8S"
message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
}

s.Noticef("*** Memphis broker is ready, ENV: %s :-) ***", env)
}()
s.Noticef(message)
} else {
env = "K8S"
}
s.Noticef("*** Memphis broker is ready, ENV: %s :-) ***", env)
}

func main() {
Expand Down
2 changes: 2 additions & 0 deletions server/memphis_handlers_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ func memphisWSLoop(s *Server, subs *concurrentMap[memphisWSReqTenantsToFiller],
if !IsNatsErr(err, JSStreamNotFoundErr) && !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "alphanumeric") {
s.Errorf("[tenant: %v]memphisWSLoop at filler: %v", tenant, err.Error())
}
deleteTenantFromSub(tenant, subs, k)
continue
}
updateRaw, err := json.Marshal(update)
if err != nil {
s.Errorf("[tenant: %v]memphisWSLoop at json.Marshal: %v", tenant, err.Error())
deleteTenantFromSub(tenant, subs, k)
continue
}

Expand Down
133 changes: 117 additions & 16 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"math/rand"
"memphis/db"
"memphis/logger"
"memphis/memphis_cache"
"net"
"net/http"
"regexp"
Expand Down Expand Up @@ -1873,7 +1874,6 @@ func (s *Server) Start() {
if opts.Gateway.Port != 0 {
s.startGateways()
}

// Start websocket server if needed. Do this before starting the routes, and
// leaf node because we want to resolve the gateway host:port so that this
// information can be sent to other routes.
Expand Down Expand Up @@ -1933,6 +1933,9 @@ func (s *Server) Start() {
if !opts.DontListen {
s.AcceptLoop(clientListenReady)
}
//** added by memphis
s.initializeMemphis()
// added by memphis **
}

// Shutdown will shutdown the server instance by kicking out the AcceptLoop
Expand Down Expand Up @@ -2140,9 +2143,10 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
s.Fatalf("Error listening on port: %s, %q", hp, e)
return
}
s.Noticef("Listening for client connections on %s",
net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))

//** moved to AcceptClientConnections by memphis
// s.Noticef("Listening for client connections on %s",
// net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
// moved to AcceptClientConnections by memphis **
// Alert of TLS enabled.
if opts.TLSConfig != nil {
s.Noticef("TLS required for client connections")
Expand All @@ -2167,18 +2171,19 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
// Keep track of client connect URLs. We may need them later.
s.clientConnectURLs = s.getClientConnectURLs()
s.listener = l

go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
func(_ error) bool {
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return true
}
return false
})
//** moved to AcceptClientConnections by memphis
// go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
// func(_ error) bool {
// if s.isLameDuckMode() {
// // Signal that we are not accepting new clients
// s.ldmCh <- true
// // Now wait for the Shutdown...
// <-s.quitCh
// return true
// }
// return false
// })
// moved to AcceptClientConnections by memphis **
s.mu.Unlock()

// Let the caller know that we are ready
Expand Down Expand Up @@ -3777,3 +3782,99 @@ func (s *Server) changeRateLimitLogInterval(d time.Duration) {
default:
}
}

//** added by memphis
func (s *Server) AcceptClientConnections() {
s.mu.Lock()
go s.acceptConnections(s.listener, "Client", func(conn net.Conn) { s.createClient(conn) },
func(_ error) bool {
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return true
}
return false
})
s.mu.Unlock()
opts := s.getOpts()
s.Noticef("Listening for client connections on %s",
net.JoinHostPort(opts.Host, strconv.Itoa(s.listener.Addr().(*net.TCPAddr).Port)))
}

func (s *Server) AcceptWSConnections() {
s.mu.Lock()
go func() {
if err := s.websocket.server.Serve(s.websocket.listener); err != http.ErrServerClosed {
s.Fatalf("websocket listener error: %v", err)
}
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return
}
s.done <- true
}()
s.mu.Unlock()
opts := s.getOpts()
o := &opts.Websocket
var proto string
if o.TLSConfig != nil {
proto = wsSchemePrefixTLS
} else {
proto = wsSchemePrefix
}
s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port)
}

func (s *Server) initializeMemphis() {
err := TenantSeqInitialize()
if err != nil {
s.Errorf("Failed to initialize tenants sequence %v", err.Error())
}
err = memphis_cache.InitializeUserCache(s.Errorf)
if err != nil {
s.Errorf("Failed to initialize user cache %v", err.Error())
}
err = s.InitializeEventCounter()
if err != nil {
s.Errorf("Failed initializing event counter: " + err.Error())
}
err = s.InitializeFirestore()
if err != nil {
s.Errorf("Failed initializing firestore: " + err.Error())
}

err = InitializeIntegrations()
if err != nil {
s.Errorf("Failed initializing integrations: " + err.Error())
}
err = s.SetDlsRetentionForExistTenants()
if err != nil {
s.Errorf("failed setting existing tenants with dls retention opts: %v", err.Error())
}
err = s.Force3ReplicationsForExistingStations()
if err != nil {
s.Errorf("Failed force 3 replications for existing stations: " + err.Error())
}
s.CompleteRelevantStuckAsyncTasks()
opts := s.getOpts()
s.InitializeMemphisHandlers()
if !opts.DontListen {
s.AcceptClientConnections()
}
s.CreateInternalJetStreamResources()
err = s.StartBackgroundTasks()
if err != nil {
s.Errorf("Background task failed: " + err.Error())
os.Exit(1)
}
s.AcceptWSConnections()
// run only on the leader
go s.KillZombieResources()
}

// added by memphis **
32 changes: 18 additions & 14 deletions server/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,9 @@ func (s *Server) startWebsocketServer() {
if port == 0 {
o.Port = hl.Addr().(*net.TCPAddr).Port
}
s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port)
//** moved to AcceptWSConnections by memphis
// s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port)
//moved to AcceptWSConnections by memphis **
if proto == wsSchemePrefix {
s.Warnf("Websocket not configured with TLS. DO NOT USE IN PRODUCTION!")
}
Expand Down Expand Up @@ -1136,19 +1138,21 @@ func (s *Server) startWebsocketServer() {
}
s.websocket.server = hs
s.websocket.listener = hl
go func() {
if err := hs.Serve(hl); err != http.ErrServerClosed {
s.Fatalf("websocket listener error: %v", err)
}
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return
}
s.done <- true
}()
//** moved to AcceptWSConnections by memphis
// go func() {
// if err := hs.Serve(hl); err != http.ErrServerClosed {
// s.Fatalf("websocket listener error: %v", err)
// }
// if s.isLameDuckMode() {
// // Signal that we are not accepting new clients
// s.ldmCh <- true
// // Now wait for the Shutdown...
// <-s.quitCh
// return
// }
// s.done <- true
// }()
// moved to AcceptWSConnections by memphis **
s.mu.Unlock()
}

Expand Down