From defac94f7e319e6af7d1b38bb4640a8bf8468caf Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 10 Jul 2023 11:08:27 +0300 Subject: [PATCH 1/4] trace system up is up per tenant for cloud --- server/memphis_cloud.go | 24 +++++ server/memphis_handlers_stations.go | 144 ++++++++++++++++++++++++++++ server/memphis_zombie_resources.go | 26 ----- 3 files changed, 168 insertions(+), 26 deletions(-) diff --git a/server/memphis_cloud.go b/server/memphis_cloud.go index 51ca6c040..d6f7c5fbc 100644 --- a/server/memphis_cloud.go +++ b/server/memphis_cloud.go @@ -1886,3 +1886,27 @@ func getStationReplicas(replicas int) int { func getDefaultReplicas() int { return 1 } + +func updateSystemLiveness() { + stationsHandler := StationsHandler{S: serv} + stations, totalMessages, totalDlsMsgs, err := stationsHandler.GetAllStationsDetails(false, "") + if err != nil { + serv.Warnf("updateSystemLiveness: %v", err.Error()) + return + } + + producersCount, err := db.CountAllActiveProudcers() + if err != nil { + serv.Warnf("updateSystemLiveness: %v", err.Error()) + return + } + + consumersCount, err := db.CountAllActiveConsumers() + if err != nil { + serv.Warnf("updateSystemLiveness: %v", err.Error()) + return + } + + analyticsParams := map[string]interface{}{"total-messages": strconv.Itoa(int(totalMessages)), "total-dls-messages": strconv.Itoa(int(totalDlsMsgs)), "total-stations": strconv.Itoa(len(stations)), "active-producers": strconv.Itoa(int(producersCount)), "active-consumers": strconv.Itoa(int(consumersCount))} + analytics.SendEvent("", "", analyticsParams, "system-is-up") +} diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index 178931488..dd031141f 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "memphis/analytics" + "memphis/conf" "memphis/db" "memphis/models" "memphis/utils" @@ -42,6 +43,21 @@ type StationName struct { external string } +type StationsDetailsPerAccount struct { + TotalMessages uint64 `json:"total_messages"` + TotalDlsMessages uint64 `json:"total_dls_messages"` + TotalStations int `json:"total_stations"` +} + +type StationsDetailsPerAccountRes struct { + Account string `json:"account"` + TotalMessages string `json:"total_messages"` + TotalDlsMessages string `json:"total_dls_messages"` + TotalStations string `json:"total_stations"` + ActiveProducers int `json:"active_producers"` + ActiveConsumers int `json:"active_consumers"` +} + func (sn StationName) Ext() string { return sn.external } @@ -659,6 +675,134 @@ func (sh StationsHandler) GetAllStationsDetails(shouldGetTags bool, tenantName s } } +func (sh StationsHandler) GetAllStationsDetailsPerTenant(shouldGetTags bool, tenantName []string) (map[string]StationsDetailsPerAccount, error) { + var stations []models.ExtendedStation + totalMessages := uint64(0) + StationsDetailsPerTenant := make(map[string]StationsDetailsPerAccount) + for _, tenantName := range tenantName { + if tenantName == "" { + tenantName = conf.MemphisGlobalAccountName + } + totalDlsMessages, err := db.GetTotalDlsMessages(tenantName) + if err != nil { + return map[string]StationsDetailsPerAccount{}, err + } + + stations, err = db.GetAllStationsDetailsPerTenant(tenantName) + if err != nil { + return map[string]StationsDetailsPerAccount{}, err + } + if len(stations) == 0 { + res := StationsDetailsPerAccount{ + TotalMessages: 0, + TotalDlsMessages: 0, + TotalStations: 0, + } + StationsDetailsPerTenant[tenantName] = res + } else { + stationTotalMsgs := make(map[string]int) + tagsHandler := TagsHandler{S: sh.S} + acc, err := sh.S.lookupAccount(tenantName) + if err != nil { + return map[string]StationsDetailsPerAccount{}, err + } + accName := acc.Name + allStreamInfo, err := serv.memphisAllStreamsInfo(accName) + if err != nil { + return map[string]StationsDetailsPerAccount{}, err + } + for _, info := range allStreamInfo { + streamName := info.Config.Name + if !strings.Contains(streamName, "$memphis") { + totalMessages += info.State.Msgs + stationTotalMsgs[streamName] = int(info.State.Msgs) + } + } + + stationIdsDlsMsgs, err := db.GetStationIdsFromDlsMsgs(tenantName) + if err != nil { + return map[string]StationsDetailsPerAccount{}, err + } + + var extStations []models.ExtendedStation + for i := 0; i < len(stations); i++ { + fullStationName, err := StationNameFromStr(stations[i].Name) + if err != nil { + return map[string]StationsDetailsPerAccount{}, err + } + hasDlsMsgs := false + for _, stationId := range stationIdsDlsMsgs { + if stationId == stations[i].ID { + hasDlsMsgs = true + } + } + + if shouldGetTags { + tags, err := tagsHandler.GetTagsByEntityWithID("station", stations[i].ID) + if err != nil { + return map[string]StationsDetailsPerAccount{}, err + } + stations[i].Tags = tags + } + + stations[i].TotalMessages = stationTotalMsgs[fullStationName.Intern()] + stations[i].HasDlsMsgs = hasDlsMsgs + + found := false + for _, p := range stations[i].Producers { + if p.IsActive { + stations[i].Activity = true + found = true + break + } + } + + if !found { + for _, c := range stations[i].Consumers { + if c.IsActive { + stations[i].Activity = true + break + } + } + } + if tenantInetgrations, ok := IntegrationsConcurrentCache.Load(tenantName); !ok { + stations[i].TieredStorageEnabled = false + } else { + _, ok = tenantInetgrations["s3"].(models.Integration) + if !ok { + stations[i].TieredStorageEnabled = false + } else if stations[i].TieredStorageEnabled { + stations[i].TieredStorageEnabled = true + } else { + stations[i].TieredStorageEnabled = false + } + } + + stationRes := models.ExtendedStation{ + ID: stations[i].ID, + Name: stations[i].Name, + CreatedAt: stations[i].CreatedAt, + TotalMessages: stations[i].TotalMessages, + HasDlsMsgs: stations[i].HasDlsMsgs, + Activity: stations[i].Activity, + IsNative: stations[i].IsNative, + } + + extStations = append(extStations, stationRes) + } + + stationDetailsPerAccountRes := StationsDetailsPerAccount{ + TotalMessages: totalMessages, + TotalDlsMessages: totalDlsMessages, + TotalStations: len(extStations), + } + + StationsDetailsPerTenant[tenantName] = stationDetailsPerAccountRes + } + } + return StationsDetailsPerTenant, nil +} + func (sh StationsHandler) GetStations(c *gin.Context) { user, err := getUserDetailsFromMiddleware(c) if err != nil { diff --git a/server/memphis_zombie_resources.go b/server/memphis_zombie_resources.go index 8ff9b9697..b48e886f0 100644 --- a/server/memphis_zombie_resources.go +++ b/server/memphis_zombie_resources.go @@ -13,10 +13,8 @@ package server import ( "encoding/json" - "memphis/analytics" "memphis/db" "memphis/models" - "strconv" "sync" "time" ) @@ -45,30 +43,6 @@ func (srv *Server) removeStaleStations() { } } -func updateSystemLiveness() { - stationsHandler := StationsHandler{S: serv} - stations, totalMessages, totalDlsMsgs, err := stationsHandler.GetAllStationsDetails(false, "") - if err != nil { - serv.Warnf("updateSystemLiveness: %v", err.Error()) - return - } - - producersCount, err := db.CountAllActiveProudcers() - if err != nil { - serv.Warnf("updateSystemLiveness: %v", err.Error()) - return - } - - consumersCount, err := db.CountAllActiveConsumers() - if err != nil { - serv.Warnf("updateSystemLiveness: %v", err.Error()) - return - } - - analyticsParams := map[string]interface{}{"total-messages": strconv.Itoa(int(totalMessages)), "total-dls-messages": strconv.Itoa(int(totalDlsMsgs)), "total-stations": strconv.Itoa(len(stations)), "active-producers": strconv.Itoa(int(producersCount)), "active-consumers": strconv.Itoa(int(consumersCount))} - analytics.SendEvent("", "", analyticsParams, "system-is-up") -} - func aggregateClientConnections(s *Server) (map[string]string, error) { connectionIds := make(map[string]string) var lock sync.Mutex From 96f2b8c5e851b1e29bc7a5d1c4efc263532b96ed Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 10 Jul 2023 11:30:02 +0300 Subject: [PATCH 2/4] fix --- server/memphis_handlers_stations.go | 80 +---------------------------- 1 file changed, 2 insertions(+), 78 deletions(-) diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index dd031141f..2cb051724 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -675,7 +675,7 @@ func (sh StationsHandler) GetAllStationsDetails(shouldGetTags bool, tenantName s } } -func (sh StationsHandler) GetAllStationsDetailsPerTenant(shouldGetTags bool, tenantName []string) (map[string]StationsDetailsPerAccount, error) { +func (sh StationsHandler) GetAllStationsDetailsPerTenant(tenantName []string) (map[string]StationsDetailsPerAccount, error) { var stations []models.ExtendedStation totalMessages := uint64(0) StationsDetailsPerTenant := make(map[string]StationsDetailsPerAccount) @@ -700,8 +700,6 @@ func (sh StationsHandler) GetAllStationsDetailsPerTenant(shouldGetTags bool, ten } StationsDetailsPerTenant[tenantName] = res } else { - stationTotalMsgs := make(map[string]int) - tagsHandler := TagsHandler{S: sh.S} acc, err := sh.S.lookupAccount(tenantName) if err != nil { return map[string]StationsDetailsPerAccount{}, err @@ -715,86 +713,12 @@ func (sh StationsHandler) GetAllStationsDetailsPerTenant(shouldGetTags bool, ten streamName := info.Config.Name if !strings.Contains(streamName, "$memphis") { totalMessages += info.State.Msgs - stationTotalMsgs[streamName] = int(info.State.Msgs) } } - - stationIdsDlsMsgs, err := db.GetStationIdsFromDlsMsgs(tenantName) - if err != nil { - return map[string]StationsDetailsPerAccount{}, err - } - - var extStations []models.ExtendedStation - for i := 0; i < len(stations); i++ { - fullStationName, err := StationNameFromStr(stations[i].Name) - if err != nil { - return map[string]StationsDetailsPerAccount{}, err - } - hasDlsMsgs := false - for _, stationId := range stationIdsDlsMsgs { - if stationId == stations[i].ID { - hasDlsMsgs = true - } - } - - if shouldGetTags { - tags, err := tagsHandler.GetTagsByEntityWithID("station", stations[i].ID) - if err != nil { - return map[string]StationsDetailsPerAccount{}, err - } - stations[i].Tags = tags - } - - stations[i].TotalMessages = stationTotalMsgs[fullStationName.Intern()] - stations[i].HasDlsMsgs = hasDlsMsgs - - found := false - for _, p := range stations[i].Producers { - if p.IsActive { - stations[i].Activity = true - found = true - break - } - } - - if !found { - for _, c := range stations[i].Consumers { - if c.IsActive { - stations[i].Activity = true - break - } - } - } - if tenantInetgrations, ok := IntegrationsConcurrentCache.Load(tenantName); !ok { - stations[i].TieredStorageEnabled = false - } else { - _, ok = tenantInetgrations["s3"].(models.Integration) - if !ok { - stations[i].TieredStorageEnabled = false - } else if stations[i].TieredStorageEnabled { - stations[i].TieredStorageEnabled = true - } else { - stations[i].TieredStorageEnabled = false - } - } - - stationRes := models.ExtendedStation{ - ID: stations[i].ID, - Name: stations[i].Name, - CreatedAt: stations[i].CreatedAt, - TotalMessages: stations[i].TotalMessages, - HasDlsMsgs: stations[i].HasDlsMsgs, - Activity: stations[i].Activity, - IsNative: stations[i].IsNative, - } - - extStations = append(extStations, stationRes) - } - stationDetailsPerAccountRes := StationsDetailsPerAccount{ TotalMessages: totalMessages, TotalDlsMessages: totalDlsMessages, - TotalStations: len(extStations), + TotalStations: len(stations), } StationsDetailsPerTenant[tenantName] = stationDetailsPerAccountRes From 3309f80f146cd0d9446cc9039bc1c0f6a506afec Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 10 Jul 2023 11:45:19 +0300 Subject: [PATCH 3/4] fix --- server/memphis_handlers_stations.go | 53 ----------------------------- 1 file changed, 53 deletions(-) diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index 2cb051724..81b2911ed 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -17,7 +17,6 @@ import ( "errors" "fmt" "memphis/analytics" - "memphis/conf" "memphis/db" "memphis/models" "memphis/utils" @@ -675,58 +674,6 @@ func (sh StationsHandler) GetAllStationsDetails(shouldGetTags bool, tenantName s } } -func (sh StationsHandler) GetAllStationsDetailsPerTenant(tenantName []string) (map[string]StationsDetailsPerAccount, error) { - var stations []models.ExtendedStation - totalMessages := uint64(0) - StationsDetailsPerTenant := make(map[string]StationsDetailsPerAccount) - for _, tenantName := range tenantName { - if tenantName == "" { - tenantName = conf.MemphisGlobalAccountName - } - totalDlsMessages, err := db.GetTotalDlsMessages(tenantName) - if err != nil { - return map[string]StationsDetailsPerAccount{}, err - } - - stations, err = db.GetAllStationsDetailsPerTenant(tenantName) - if err != nil { - return map[string]StationsDetailsPerAccount{}, err - } - if len(stations) == 0 { - res := StationsDetailsPerAccount{ - TotalMessages: 0, - TotalDlsMessages: 0, - TotalStations: 0, - } - StationsDetailsPerTenant[tenantName] = res - } else { - acc, err := sh.S.lookupAccount(tenantName) - if err != nil { - return map[string]StationsDetailsPerAccount{}, err - } - accName := acc.Name - allStreamInfo, err := serv.memphisAllStreamsInfo(accName) - if err != nil { - return map[string]StationsDetailsPerAccount{}, err - } - for _, info := range allStreamInfo { - streamName := info.Config.Name - if !strings.Contains(streamName, "$memphis") { - totalMessages += info.State.Msgs - } - } - stationDetailsPerAccountRes := StationsDetailsPerAccount{ - TotalMessages: totalMessages, - TotalDlsMessages: totalDlsMessages, - TotalStations: len(stations), - } - - StationsDetailsPerTenant[tenantName] = stationDetailsPerAccountRes - } - } - return StationsDetailsPerTenant, nil -} - func (sh StationsHandler) GetStations(c *gin.Context) { user, err := getUserDetailsFromMiddleware(c) if err != nil { From df24e586356ba414b39ddce257fe41038c1e135d Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 10 Jul 2023 12:03:59 +0300 Subject: [PATCH 4/4] fix --- server/memphis_handlers_stations.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index 81b2911ed..178931488 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -42,21 +42,6 @@ type StationName struct { external string } -type StationsDetailsPerAccount struct { - TotalMessages uint64 `json:"total_messages"` - TotalDlsMessages uint64 `json:"total_dls_messages"` - TotalStations int `json:"total_stations"` -} - -type StationsDetailsPerAccountRes struct { - Account string `json:"account"` - TotalMessages string `json:"total_messages"` - TotalDlsMessages string `json:"total_dls_messages"` - TotalStations string `json:"total_stations"` - ActiveProducers int `json:"active_producers"` - ActiveConsumers int `json:"active_consumers"` -} - func (sn StationName) Ext() string { return sn.external }