Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept client connections #1195

Merged
merged 17 commits into from
Aug 3, 2023
122 changes: 35 additions & 87 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"memphis/analytics"
"memphis/db"
"memphis/http_server"
"memphis/memphis_cache"
"memphis/server"
"strings"

Expand Down Expand Up @@ -103,101 +102,50 @@ func runMemphis(s *server.Server) {
s.Errorf("Failed initializing analytics: " + err.Error())
}

err = server.TenantSeqInitialize()
if err != nil {
s.Errorf("Failed to initialize tenants sequence %v", err.Error())
}

err = memphis_cache.InitializeUserCache(s.Errorf)
if err != nil {
s.Errorf("Failed to initialize user cache %v", err.Error())
}

err = s.InitializeEventCounter()
if err != nil {
s.Errorf("Failed initializing event counter: " + err.Error())
}

err = s.InitializeFirestore()
if err != nil {
s.Errorf("Failed initializing firestore: " + err.Error())
}

s.InitializeMemphisHandlers()

err = server.InitializeIntegrations()
if err != nil {
s.Errorf("Failed initializing integrations: " + err.Error())
}

err = s.SetDlsRetentionForExistTenants()
if err != nil {
s.Errorf("failed setting existing tenants with dls retention opts: %v", err.Error())
}
go http_server.InitializeHttpServer(s)
shay23b marked this conversation as resolved.
Show resolved Hide resolved

err = s.Force3ReplicationsForExistingStations()
if err != nil {
s.Errorf("Failed force 3 replications for existing stations: " + err.Error())
}

s.CompleteRelevantStuckAsyncTasks()
isUserPassBased := os.Getenv("USER_PASS_BASED_AUTH") == "true"

go func() {
s.CreateInternalJetStreamResources()
go http_server.InitializeHttpServer(s)
err = s.StartBackgroundTasks()
if err != nil {
s.Errorf("Background task failed: " + err.Error())
os.Exit(1)
if isUserPassBased {
// For backward compatibility data from old account to memphis default account
storeDir := s.Opts().StoreDir
if storeDir == "" {
storeDir = os.TempDir()
storeDir = strings.TrimSuffix(storeDir, "/")
}

// run only on the leader
go s.KillZombieResources()

isUserPassBased := os.Getenv("USER_PASS_BASED_AUTH") == "true"

if isUserPassBased {
// For backward compatibility data from old account to memphis default account
storeDir := s.Opts().StoreDir
if storeDir == "" {
storeDir = os.TempDir()
storeDir = strings.TrimSuffix(storeDir, "/")
}

folderName := fmt.Sprintf("%s%s%s", storeDir, "/jetstream/", server.DEFAULT_GLOBAL_ACCOUNT)
f, _ := os.Stat(folderName)
if f != nil {
err = s.MoveResourcesFromOldToNewDefaultAcc()
if err != nil {
s.Errorf("Data from global account to memphis account failed: %s", err.Error())
}
folderName := fmt.Sprintf("%s%s%s", storeDir, "/jetstream/", server.DEFAULT_GLOBAL_ACCOUNT)
f, _ := os.Stat(folderName)
if f != nil {
err = s.MoveResourcesFromOldToNewDefaultAcc()
if err != nil {
s.Errorf("Data from global account to memphis account failed: %s", err.Error())
}
}
}

var env string
var message string
if os.Getenv("DOCKER_ENV") != "" {
env = "Docker"
if isUserPassBased {
message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
} else {
message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
}
s.Noticef(message)
} else if os.Getenv("LOCAL_CLUSTER_ENV") != "" {
env = "Local cluster"
if isUserPassBased {
message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
} else {
message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
}
s.Noticef(message)
var env string
var message string
if os.Getenv("DOCKER_ENV") != "" {
env = "Docker"
if isUserPassBased {
message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
} else {
env = "K8S"
message = "\n**********\n\nDashboard/CLI: http://localhost:" + fmt.Sprint(s.Opts().UiPort) + "\nBroker: localhost:" + fmt.Sprint(s.Opts().Port) + " (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
}

s.Noticef("*** Memphis broker is ready, ENV: %s :-) ***", env)
}()
s.Noticef(message)
} else if os.Getenv("LOCAL_CLUSTER_ENV") != "" {
env = "Local cluster"
if isUserPassBased {
message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI/SDK root password - memphis\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
} else {
message = "\n**********\n\nDashboard/CLI: http://localhost:9000/9001/9002\nBroker: localhost:6666/6667/6668 (client connections)\nREST gateway: localhost:" + fmt.Sprint(s.Opts().RestGwPort) + " (Data and management via HTTP)\nUI/CLI/SDK root username - root\nUI/CLI root password - memphis\nSDK connection token - " + s.Opts().Authorization + "\n\nDocs: https://docs.memphis.dev/memphis/getting-started/2-hello-world \n\n**********"
}
s.Noticef(message)
} else {
env = "K8S"
}
s.Noticef("*** Memphis broker is ready, ENV: %s :-) ***", env)
}

func main() {
Expand Down
2 changes: 2 additions & 0 deletions server/memphis_handlers_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,13 @@ func memphisWSLoop(s *Server, subs *concurrentMap[memphisWSReqTenantsToFiller],
if !IsNatsErr(err, JSStreamNotFoundErr) && !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "alphanumeric") {
s.Errorf("[tenant: %v]memphisWSLoop at filler: %v", tenant, err.Error())
}
deleteTenantFromSub(tenant, subs, k)
continue
}
updateRaw, err := json.Marshal(update)
if err != nil {
s.Errorf("[tenant: %v]memphisWSLoop at json.Marshal: %v", tenant, err.Error())
deleteTenantFromSub(tenant, subs, k)
continue
}

Expand Down
1 change: 0 additions & 1 deletion server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ func (s *Server) WaitForLeaderElection() {
}
}
}

shay23b marked this conversation as resolved.
Show resolved Hide resolved
func (s *Server) CreateInternalJetStreamResources() {
ready := !s.JetStreamIsClustered()
retentionDur := time.Duration(s.opts.LogsRetentionDays) * time.Hour * 24
Expand Down
135 changes: 118 additions & 17 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"math/rand"
"memphis/db"
"memphis/logger"
"memphis/memphis_cache"
"net"
"net/http"
"regexp"
Expand Down Expand Up @@ -1873,12 +1874,11 @@ func (s *Server) Start() {
if opts.Gateway.Port != 0 {
s.startGateways()
}

// Start websocket server if needed. Do this before starting the routes, and
// leaf node because we want to resolve the gateway host:port so that this
// information can be sent to other routes.
if opts.Websocket.Port != 0 {
s.startWebsocketServer()
s.StartWebsocketServer()
shay23b marked this conversation as resolved.
Show resolved Hide resolved
}

// Start up listen if we want to accept leaf node connections.
Expand Down Expand Up @@ -1933,6 +1933,9 @@ func (s *Server) Start() {
if !opts.DontListen {
s.AcceptLoop(clientListenReady)
}
//** added by memphis
s.runMemphis()
// added by memphis **
}

// Shutdown will shutdown the server instance by kicking out the AcceptLoop
Expand Down Expand Up @@ -2140,9 +2143,10 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
s.Fatalf("Error listening on port: %s, %q", hp, e)
return
}
s.Noticef("Listening for client connections on %s",
net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))

//** moved to AcceptClientConnections by memphis
// s.Noticef("Listening for client connections on %s",
// net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
// moved to AcceptClientConnections by memphis **
// Alert of TLS enabled.
if opts.TLSConfig != nil {
s.Noticef("TLS required for client connections")
Expand All @@ -2167,18 +2171,19 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
// Keep track of client connect URLs. We may need them later.
s.clientConnectURLs = s.getClientConnectURLs()
s.listener = l

go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
func(_ error) bool {
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return true
}
return false
})
//** moved to AcceptClientConnections by memphis
// go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
// func(_ error) bool {
// if s.isLameDuckMode() {
// // Signal that we are not accepting new clients
// s.ldmCh <- true
// // Now wait for the Shutdown...
// <-s.quitCh
// return true
// }
// return false
// })
// moved to AcceptClientConnections by memphis **
s.mu.Unlock()

// Let the caller know that we are ready
Expand Down Expand Up @@ -3777,3 +3782,99 @@ func (s *Server) changeRateLimitLogInterval(d time.Duration) {
default:
}
}

//** added by memphis
func (s *Server) AcceptClientConnections() {
s.mu.Lock()
go s.acceptConnections(s.listener, "Client", func(conn net.Conn) { s.createClient(conn) },
func(_ error) bool {
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return true
}
return false
})
s.mu.Unlock()
opts := s.getOpts()
s.Noticef("Listening for client connections on %s",
net.JoinHostPort(opts.Host, strconv.Itoa(s.listener.Addr().(*net.TCPAddr).Port)))
}

func (s *Server) AcceptWSConnections() {
s.mu.Lock()
go func() {
if err := s.websocket.server.Serve(s.websocket.listener); err != http.ErrServerClosed {
s.Fatalf("websocket listener error: %v", err)
}
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return
}
s.done <- true
}()
s.mu.Unlock()
opts := s.getOpts()
o := &opts.Websocket
var proto string
if o.TLSConfig != nil {
proto = wsSchemePrefixTLS
} else {
proto = wsSchemePrefix
}
s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port)
}

func (s *Server) runMemphis() {
err := TenantSeqInitialize()
if err != nil {
s.Errorf("Failed to initialize tenants sequence %v", err.Error())
}
err = memphis_cache.InitializeUserCache(s.Errorf)
if err != nil {
s.Errorf("Failed to initialize user cache %v", err.Error())
}
err = s.InitializeEventCounter()
if err != nil {
s.Errorf("Failed initializing event counter: " + err.Error())
}
err = s.InitializeFirestore()
if err != nil {
s.Errorf("Failed initializing firestore: " + err.Error())
}

err = InitializeIntegrations()
if err != nil {
s.Errorf("Failed initializing integrations: " + err.Error())
}
err = s.SetDlsRetentionForExistTenants()
if err != nil {
s.Errorf("failed setting existing tenants with dls retention opts: %v", err.Error())
}
err = s.Force3ReplicationsForExistingStations()
if err != nil {
s.Errorf("Failed force 3 replications for existing stations: " + err.Error())
}
s.CompleteRelevantStuckAsyncTasks()
opts := s.getOpts()
s.InitializeMemphisHandlers()
if !opts.DontListen {
s.AcceptClientConnections()
}
s.CreateInternalJetStreamResources()
err = s.StartBackgroundTasks()
if err != nil {
s.Errorf("Background task failed: " + err.Error())
os.Exit(1)
}
s.AcceptWSConnections()
// run only on the leader
go s.KillZombieResources()
}

// added by memphis **
34 changes: 19 additions & 15 deletions server/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@ func (s *Server) wsConfigAuth(opts *WebsocketOpts) {
ws.authOverride = opts.Username != _EMPTY_ || opts.Token != _EMPTY_ || opts.NoAuthUser != _EMPTY_
}

func (s *Server) startWebsocketServer() {
func (s *Server) StartWebsocketServer() {
shay23b marked this conversation as resolved.
Show resolved Hide resolved
sopts := s.getOpts()
o := &sopts.Websocket

Expand Down Expand Up @@ -1090,7 +1090,9 @@ func (s *Server) startWebsocketServer() {
if port == 0 {
o.Port = hl.Addr().(*net.TCPAddr).Port
}
s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port)
//** moved to AcceptWSConnections by memphis
// s.Noticef("Listening for websocket clients on %s://%s:%d", proto, o.Host, o.Port)
//moved to AcceptWSConnections by memphis **
if proto == wsSchemePrefix {
s.Warnf("Websocket not configured with TLS. DO NOT USE IN PRODUCTION!")
}
Expand Down Expand Up @@ -1136,19 +1138,21 @@ func (s *Server) startWebsocketServer() {
}
s.websocket.server = hs
s.websocket.listener = hl
go func() {
if err := hs.Serve(hl); err != http.ErrServerClosed {
s.Fatalf("websocket listener error: %v", err)
}
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return
}
s.done <- true
}()
//** moved to AcceptWSConnections by memphis
// go func() {
// if err := hs.Serve(hl); err != http.ErrServerClosed {
// s.Fatalf("websocket listener error: %v", err)
// }
// if s.isLameDuckMode() {
// // Signal that we are not accepting new clients
// s.ldmCh <- true
// // Now wait for the Shutdown...
// <-s.quitCh
// return
// }
// s.done <- true
// }()
// moved to AcceptWSConnections by memphis **
s.mu.Unlock()
}

Expand Down