From 1625403f09ff32719e17b77408ad3dd087ca2fef Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 11 Dec 2023 14:56:18 +0200 Subject: [PATCH] from cloud --- db/db.go | 48 +++++++++---------- server/background_tasks.go | 1 + server/memphis_cloud.go | 12 +++++ server/memphis_handlers_consumers.go | 23 +++++---- server/memphis_handlers_monitoring.go | 23 +++++---- server/memphis_handlers_producers.go | 67 +++++++++++++++------------ server/memphis_handlers_ws.go | 15 ++++-- 7 files changed, 114 insertions(+), 75 deletions(-) diff --git a/db/db.go b/db/db.go index 23d228337..d0a5b6a56 100644 --- a/db/db.go +++ b/db/db.go @@ -913,7 +913,7 @@ func GetActiveConnections() ([]string, error) { return []string{}, err } defer conn.Release() - query := `SELECT connection_id FROM producers WHERE is_active = true UNION SELECT connection_id FROM consumers WHERE is_active = true;` + query := `SELECT connection_id FROM producers WHERE is_active = true AND type = 'application' UNION SELECT connection_id FROM consumers WHERE is_active = true AND type = 'application';` stmt, err := conn.Conn().Prepare(ctx, "get_active_connection", query) if err != nil { return []string{}, err @@ -2459,7 +2459,7 @@ func GetProducerByNameAndConnectionID(name string, connectionId string) (bool, m return false, models.Producer{}, err } defer conn.Release() - query := `SELECT * FROM producers WHERE name = $1 AND connection_id = $2` + query := `SELECT * FROM producers WHERE name = $1 AND connection_id = $2 AND type = 'application'` stmt, err := conn.Conn().Prepare(ctx, "get_producer_by_name_and_connection_id", query) if err != nil { return false, models.Producer{}, err @@ -2487,7 +2487,7 @@ func GetProducerByStationIDAndConnectionId(name string, stationId int, connectio return false, models.Producer{}, err } defer conn.Release() - query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 AND connection_id = $3 ORDER BY is_active DESC LIMIT 1` + query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 AND connection_id = $3 AND type = 'application' ORDER BY is_active DESC LIMIT 1` stmt, err := conn.Conn().Prepare(ctx, "get_producer_by_station_id_and_connection_id", query) if err != nil { return false, models.Producer{}, err @@ -2515,7 +2515,7 @@ func GetProducerByNameAndStationID(name string, stationId int) (bool, models.Pro return false, models.Producer{}, err } defer conn.Release() - query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 ORDER BY is_active DESC LIMIT 1` + query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 AND type = 'application' ORDER BY is_active DESC LIMIT 1` stmt, err := conn.Conn().Prepare(ctx, "get_producer_by_name_and_station_id", query) if err != nil { return false, models.Producer{}, err @@ -2544,7 +2544,7 @@ func GetActiveProducerByStationID(producerName string, stationId int) (bool, mod } defer conn.Release() - query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 AND is_active = true LIMIT 1` + query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 AND is_active = true AND type = 'application' LIMIT 1` stmt, err := conn.Conn().Prepare(ctx, "get_active_producer_by_station_id", query) if err != nil { return false, models.Producer{}, err @@ -2574,7 +2574,7 @@ func GetProducersForGraph(tenantName string) ([]models.ProducerForGraph, error) defer conn.Release() query := `SELECT p.name, p.station_id, p.app_id FROM producers AS p - WHERE p.tenant_name = $1 AND p.is_active = true + WHERE p.tenant_name = $1 AND p.is_active = true AND p.type = 'application' ORDER BY p.name, p.station_id DESC LIMIT 10000;` stmt, err := conn.Conn().Prepare(ctx, "get_producers_for_graph", query) @@ -2673,7 +2673,7 @@ func InsertNewProducer(name string, stationId int, producerType string, connecti return newProducer, nil } -func GetNotDeletedProducersByStationID(stationId int) ([]models.Producer, error) { +func GetNotDeletedProducersByStationID(stationId int) ([]models.Producer, error) { // TODO: check if not needed - I think its not used ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() conn, err := MetadataDbClient.Client.Acquire(ctx) @@ -2682,7 +2682,7 @@ func GetNotDeletedProducersByStationID(stationId int) ([]models.Producer, error) } defer conn.Release() - query := `SELECT * FROM producers AS p WHERE p.station_id = $1 AND p.is_deleted = false` + query := `SELECT * FROM producers AS p WHERE p.station_id = $1 AND p.is_deleted = false AND p.type = 'application'` stmt, err := conn.Conn().Prepare(ctx, "get_not_deleted_producers_by_station_id", query) if err != nil { return []models.Producer{}, err @@ -2721,7 +2721,7 @@ func GetAllProducersByStationID(stationId int) ([]models.ExtendedProducer, error COUNT(CASE WHEN NOT p.is_active THEN 1 END) OVER (PARTITION BY p.name) AS disconnected_producers_count FROM producers AS p LEFT JOIN stations AS s ON s.id = p.station_id -WHERE p.station_id = $1 +WHERE p.station_id = $1 AND p.type = 'application' ORDER BY p.name, p.is_active DESC, p.updated_at DESC LIMIT 5000; ` @@ -2753,7 +2753,7 @@ func DeleteProducerByNameAndStationID(name string, stationId int) (bool, error) return false, err } defer conn.Release() - query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 LIMIT 1` + query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND type = 'application' LIMIT 1` stmt, err := conn.Conn().Prepare(ctx, "delete_producer_by_name_and_station_id", query) if err != nil { return false, err @@ -2819,7 +2819,7 @@ func CountActiveProudcersByStationID(stationId int) (int64, error) { return 0, err } defer conn.Release() - query := `SELECT COUNT(*) FROM producers WHERE station_id = $1 AND is_active = true` + query := `SELECT COUNT(*) FROM producers WHERE station_id = $1 AND is_active = true AND type = 'application'` stmt, err := conn.Conn().Prepare(ctx, "count_active_producers_by_station_id", query) if err != nil { return 0, err @@ -2841,7 +2841,7 @@ func CountAllActiveProudcers() (int64, error) { return 0, err } defer conn.Release() - query := `SELECT COUNT(*) FROM producers WHERE is_active = true` + query := `SELECT COUNT(*) FROM producers WHERE is_active = true AND type = 'application'` stmt, err := conn.Conn().Prepare(ctx, "count_all_active_producers", query) if err != nil { return 0, err @@ -2924,7 +2924,7 @@ func GetActiveConsumerByCG(consumersGroup string, stationId int) (bool, models.C } defer conn.Release() - query := `SELECT * FROM consumers WHERE consumers_group = $1 AND station_id = $2 LIMIT 1` + query := `SELECT * FROM consumers WHERE consumers_group = $1 AND station_id = $2 AND type = 'application' LIMIT 1` stmt, err := conn.Conn().Prepare(ctx, "get_active_consumer_by_cg", query) if err != nil { return false, models.Consumer{}, err @@ -3051,7 +3051,7 @@ func GetConsumers() ([]models.Consumer, error) { return []models.Consumer{}, err } defer conn.Release() - query := `SELECT * From consumers` + query := `SELECT * From consumers AND type = 'application'` stmt, err := conn.Conn().Prepare(ctx, "get_consumers", query) if err != nil { return []models.Consumer{}, err @@ -3083,7 +3083,7 @@ func GetAllConsumersByStation(stationId int) ([]models.ExtendedConsumer, error) COUNT (CASE WHEN c.is_active THEN 1 END) OVER (PARTITION BY c.name) AS count FROM consumers AS c LEFT JOIN stations AS s ON s.id = c.station_id - WHERE c.station_id = $1 ORDER BY c.name, c.consumers_group, c.updated_at DESC + WHERE c.station_id = $1 AND c.type = 'application' ORDER BY c.name, c.consumers_group, c.updated_at DESC LIMIT 5000;` stmt, err := conn.Conn().Prepare(ctx, "get_all_consumers_by_station", query) if err != nil { @@ -3114,7 +3114,7 @@ func GetConsumersForGraph(tenantName string) ([]models.ConsumerForGraph, error) defer conn.Release() query := `SELECT c.name, c.consumers_group, c.station_id, c.app_id FROM consumers AS c - WHERE c.tenant_name = $1 AND c.is_active = true + WHERE c.tenant_name = $1 AND c.is_active = true AND c.type = 'application' ORDER BY c.name, c.station_id DESC LIMIT 10000;` stmt, err := conn.Conn().Prepare(ctx, "get_consumers_for_graph", query) @@ -3172,7 +3172,7 @@ func DeleteConsumerByNameAndStationId(name string, stationId int) (bool, models. return false, models.Consumer{}, err } defer conn.Release() - query := ` DELETE FROM consumers WHERE ctid = ( SELECT ctid FROM consumers WHERE name = $1 AND station_id = $ LIMIT 1) RETURNING *` + query := ` DELETE FROM consumers WHERE ctid = ( SELECT ctid FROM consumers WHERE name = $1 AND station_id = $2 LIMIT 1) RETURNING *` deleteStmt, err := conn.Conn().Prepare(ctx, "delete_consumers", query) if err != nil { return false, models.Consumer{}, err @@ -3241,7 +3241,7 @@ func CountActiveConsumersInCG(consumersGroup string, stationId int) (int64, erro return 0, err } defer conn.Release() - query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND consumers_group = $2 AND is_active = true` + query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND consumers_group = $2 AND is_active = true AND type = 'application'` stmt, err := conn.Conn().Prepare(ctx, "count_active_consumers_in_cg", query) if err != nil { return 0, err @@ -3263,7 +3263,7 @@ func CountActiveConsumersByStationID(stationId int) (int64, error) { return 0, err } defer conn.Release() - query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND is_active = true` + query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND is_active = true AND type = 'application'` stmt, err := conn.Conn().Prepare(ctx, "count_active_consumers_by_station_id", query) if err != nil { return 0, err @@ -3285,7 +3285,7 @@ func CountAllActiveConsumers() (int64, error) { return 0, err } defer conn.Release() - query := `SELECT COUNT(*) FROM consumers WHERE is_active = true` + query := `SELECT COUNT(*) FROM consumers WHERE is_active = true AND type = 'application'` stmt, err := conn.Conn().Prepare(ctx, "count_all_active_consumers", query) if err != nil { return 0, err @@ -3390,7 +3390,7 @@ func GetActiveConsumerByStationID(consumerName string, stationId int) (bool, mod return false, models.Consumer{}, err } defer conn.Release() - query := `SELECT * FROM consumers WHERE name = $1 AND station_id = $2 AND is_active = true LIMIT 1` + query := `SELECT * FROM consumers WHERE name = $1 AND station_id = $2 AND is_active = true AND type = 'application' LIMIT 1` stmt, err := conn.Conn().Prepare(ctx, "get_active_consumer_by_station_id", query) if err != nil { return false, models.Consumer{}, err @@ -3502,7 +3502,7 @@ func GetActiveCgsByName(names []string, tenantName string) ([]models.LightConsum SELECT c.consumers_group, s.name, COUNT(*) FROM consumers AS c LEFT JOIN stations AS s ON s.id = c.station_id - WHERE c.tenant_name = $1 AND c.consumers_group = ANY($2) AND c.is_active = true + WHERE c.tenant_name = $1 AND c.consumers_group = ANY($2) AND c.is_active = true AND c.type = 'application' GROUP BY c.consumers_group, s.name, c.station_id;` stmt, err := conn.Conn().Prepare(ctx, "get_active_consumers_by_name", query) if err != nil { @@ -5702,7 +5702,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin } defer tx.Rollback(ctx) - query := `SELECT EXISTS(SELECT 1 FROM consumers WHERE tenant_name = $1 AND consumers_group = $2)` + query := `SELECT EXISTS(SELECT 1 FROM consumers WHERE tenant_name = $1 AND consumers_group = $2 AND type = 'application')` stmt, err := tx.Prepare(ctx, "check_if_consumer_exists", query) if err != nil { return 0, updated, err @@ -7228,7 +7228,7 @@ func CountProudcersForStation(stationId int) (int64, error) { return 0, err } defer conn.Release() - query := `SELECT COUNT(*) FROM producers WHERE station_id=$1` + query := `SELECT COUNT(*) FROM producers WHERE station_id=$1 AND type = 'application'` stmt, err := conn.Conn().Prepare(ctx, "get_count_producers_for_station", query) if err != nil { return 0, err diff --git a/server/background_tasks.go b/server/background_tasks.go index aa023628e..f339dbeeb 100644 --- a/server/background_tasks.go +++ b/server/background_tasks.go @@ -350,6 +350,7 @@ func (s *Server) StartBackgroundTasks() error { go s.ReleaseStuckLocks() go s.ConsumeFunctionTasks() go s.ScaleFunctionWorkers() + go s.ConnectorsDeadPodsRescheduler() return nil } diff --git a/server/memphis_cloud.go b/server/memphis_cloud.go index 1c23cd01e..b21ca9390 100644 --- a/server/memphis_cloud.go +++ b/server/memphis_cloud.go @@ -2346,6 +2346,10 @@ func (s *Server) ScaleFunctionWorkers() { return } +func (s *Server) ConnectorsDeadPodsRescheduler() { + return +} + func (s *Server) ConsumeFunctionsDlsMessages() { } @@ -2596,3 +2600,11 @@ func deleteConnectorsStationResources(tenantName string, stationID int) error { func deleteConnectorsTenantResources(tenantName string) error { return nil } + +func (s *Server) GetSourceConnectorsByStationAndPartition(stationID, partitionNumber, numOfPartitions int) (map[string]interface{}, error) { + return map[string]interface{}{}, nil +} + +func (ch ConsumersHandler) GetSinkConnectorsByStation(stationName StationName, station models.Station, partition int, partitions []int) ([]string, error) { + return []string{}, nil +} diff --git a/server/memphis_handlers_consumers.go b/server/memphis_handlers_consumers.go index 63f7a3165..940cdaa82 100644 --- a/server/memphis_handlers_consumers.go +++ b/server/memphis_handlers_consumers.go @@ -182,10 +182,19 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam splitted := strings.Split(c.opts.Lang, ".") sdkName := splitted[len(splitted)-1] - newConsumer, err := db.InsertNewConsumer(name, station.ID, consumerType, connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId) - if err != nil { - serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error()) - return []int{}, err + var newConsumer models.Consumer + if strings.HasPrefix(user.Username, "$") { + newConsumer, err = db.InsertNewConsumer(name, station.ID, "connector", connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId) + if err != nil { + serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error()) + return []int{}, err + } + } else { + newConsumer, err = db.InsertNewConsumer(name, station.ID, consumerType, connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId) + if err != nil { + serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error()) + return []int{}, err + } } message := "Consumer " + name + " connected" @@ -335,11 +344,7 @@ func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) { return } if err != nil { - if strings.Contains(err.Error(), "not exist") { - s.Warnf("[tenant: %v][user: %v]createConsumerDirect at getSchemaUpdateInitFromStation: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error()) - } else { - s.Errorf("[tenant: %v][user: %v]createConsumerDirect at getSchemaUpdateInitFromStation: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error()) - } + s.Errorf("[tenant: %v][user: %v]createConsumerDirect at getSchemaUpdateInitFromStation: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error()) respondWithRespErr(s.MemphisGlobalAccountString(), s, reply, err, &resp) return } diff --git a/server/memphis_handlers_monitoring.go b/server/memphis_handlers_monitoring.go index 894d42aef..f28e8c92d 100644 --- a/server/memphis_handlers_monitoring.go +++ b/server/memphis_handlers_monitoring.go @@ -695,12 +695,19 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { } } - connectors, err := mh.S.GetConnectorsByStationAndPartition(station.ID, body.PartitionNumber, len(station.PartitionsList)) + sourceConnectors, err := mh.S.GetSourceConnectorsByStationAndPartition(station.ID, body.PartitionNumber, len(station.PartitionsList)) if err != nil { - serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetConnectorsByStationAndPartition: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error()) + serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetSourceConnectorsByStationAndPartition: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error()) c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) return } + sinkConnectors, err := consumersHandler.GetSinkConnectorsByStation(stationName, station, body.PartitionNumber, station.PartitionsList) + if err != nil { + serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetSinkConnectorsByStation: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + var response gin.H // Check when the schema object in station is not empty, not optional for non native stations @@ -722,9 +729,6 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { return } updatesAvailable := !schemaVersion.Active - if schema.Name == "" && schema.Type == "" && station.SchemaVersionNumber == 0 { - updatesAvailable = false - } schemaDetails = models.StationOverviewSchemaDetails{ SchemaName: schema.Name, VersionNumber: station.SchemaVersionNumber, @@ -759,7 +763,8 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { "resend_disabled": station.ResendDisabled, "functions_enabled": functionsEnabled, "max_amount_of_allowed_producers": usageLimit, - "connectors": connectors, + "source_connectors": sourceConnectors, + "sink_connectors": sinkConnectors, "act_as_dls_station_in_stations": usedAsDlsStations, } } else { @@ -793,7 +798,8 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { "resend_disabled": station.ResendDisabled, "functions_enabled": functionsEnabled, "max_amount_of_allowed_producers": usageLimit, - "connectors": connectors, + "source_connectors": sourceConnectors, + "sink_connectors": sinkConnectors, "act_as_dls_station_in_stations": usedAsDlsStations, } } else { @@ -824,7 +830,8 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { "resend_disabled": station.ResendDisabled, "functions_enabled": functionsEnabled, "max_amount_of_allowed_producers": usageLimit, - "connectors": connectors, + "source_connectors": sourceConnectors, + "sink_connectors": sinkConnectors, "act_as_dls_station_in_stations": usedAsDlsStations, } } diff --git a/server/memphis_handlers_producers.go b/server/memphis_handlers_producers.go index d1f817b0c..db6c78d81 100644 --- a/server/memphis_handlers_producers.go +++ b/server/memphis_handlers_producers.go @@ -129,36 +129,43 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection splitted := strings.Split(c.opts.Lang, ".") sdkName := splitted[len(splitted)-1] - newProducer, err := db.InsertNewProducer(name, station.ID, producerType, pConnectionId, station.TenantName, station.PartitionsList, version, sdkName, appId) - if err != nil { - serv.Warnf("[tenant: %v][user: %v]createProducerDirectCommon at InsertNewProducer: %v", user.TenantName, user.Username, err.Error()) - return false, false, err, models.Station{} - } - message := "Producer " + name + " connected" - var auditLogs []interface{} - newAuditLog := models.AuditLog{ - StationName: pStationName.Ext(), - Message: message, - CreatedBy: user.ID, - CreatedByUsername: user.Username, - CreatedAt: time.Now(), - TenantName: user.TenantName, - } - auditLogs = append(auditLogs, newAuditLog) - err = CreateAuditLogs(auditLogs) - if err != nil { - serv.Errorf("[tenant: %v][user: %v]createProducerDirectCommon at CreateAuditLogs: Producer %v at station %v: %v", user.TenantName, user.Username, pName, pStationName.external, err.Error()) - return false, false, err, models.Station{} - } - - shouldSendAnalytics, _ := shouldSendAnalytics() - if shouldSendAnalytics { - ip := serv.getIp() - analyticsParams := map[string]interface{}{"producer-name": newProducer.Name, "ip": ip} - analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-create-producer-sdk") - if strings.HasPrefix(newProducer.Name, "rest_gateway") { - analyticsParams = map[string]interface{}{} - analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-send-messages-via-rest-gw") + if strings.HasPrefix(user.Username, "$") && name != "gui" { + _, err := db.InsertNewProducer(name, station.ID, "connector", pConnectionId, station.TenantName, station.PartitionsList, version, sdkName, appId) + if err != nil { + serv.Warnf("[tenant: %v][user: %v]createProducerDirectCommon at InsertNewProducer: %v", user.TenantName, user.Username, err.Error()) + return false, false, err, models.Station{} + } + } else { + newProducer, err := db.InsertNewProducer(name, station.ID, producerType, pConnectionId, station.TenantName, station.PartitionsList, version, sdkName, appId) + if err != nil { + serv.Warnf("[tenant: %v][user: %v]createProducerDirectCommon at InsertNewProducer: %v", user.TenantName, user.Username, err.Error()) + return false, false, err, models.Station{} + } + message := "Producer " + name + " connected" + var auditLogs []interface{} + newAuditLog := models.AuditLog{ + StationName: pStationName.Ext(), + Message: message, + CreatedBy: user.ID, + CreatedByUsername: user.Username, + CreatedAt: time.Now(), + TenantName: user.TenantName, + } + auditLogs = append(auditLogs, newAuditLog) + err = CreateAuditLogs(auditLogs) + if err != nil { + serv.Errorf("[tenant: %v][user: %v]createProducerDirectCommon at CreateAuditLogs: Producer %v at station %v: %v", user.TenantName, user.Username, pName, pStationName.external, err.Error()) + return false, false, err, models.Station{} + } + shouldSendAnalytics, _ := shouldSendAnalytics() + if shouldSendAnalytics { + ip := serv.getIp() + analyticsParams := map[string]interface{}{"producer-name": newProducer.Name, "ip": ip} + analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-create-producer-sdk") + if strings.HasPrefix(newProducer.Name, "rest_gateway") { + analyticsParams = map[string]interface{}{} + analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-send-messages-via-rest-gw") + } } } diff --git a/server/memphis_handlers_ws.go b/server/memphis_handlers_ws.go index 1224ef2f1..d8493efd8 100644 --- a/server/memphis_handlers_ws.go +++ b/server/memphis_handlers_ws.go @@ -398,7 +398,11 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, } } - connectors, err := s.GetConnectorsByStationAndPartition(station.ID, partitionNumber, len(station.PartitionsList)) + sourceConnectors, err := s.GetSourceConnectorsByStationAndPartition(station.ID, partitionNumber, len(station.PartitionsList)) + if err != nil { + return map[string]any{}, err + } + sinkConnectors, err := h.Consumers.GetSinkConnectorsByStation(sn, station, partitionNumber, station.PartitionsList) if err != nil { return map[string]any{}, err } @@ -433,7 +437,8 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, "resend_disabled": station.ResendDisabled, "functions_enabled": functionsEnabled, "max_amount_of_allowed_producers": usageLimit, - "connectors": connectors, + "source_connectors": sourceConnectors, + "sink_connectors": sinkConnectors, "act_as_dls_station_in_stations": usedAsDlsStations, } } else { @@ -464,7 +469,8 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, "resend_disabled": station.ResendDisabled, "functions_enabled": functionsEnabled, "max_amount_of_allowed_producers": usageLimit, - "connectors": connectors, + "source_connectors": sourceConnectors, + "sink_connectors": sinkConnectors, "act_as_dls_station_in_stations": usedAsDlsStations, } } @@ -514,7 +520,8 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, "resend_disabled": station.ResendDisabled, "functions_enabled": functionsEnabled, "max_amount_of_allowed_producers": usageLimit, - "connectors": connectors, + "source_connectors": sourceConnectors, + "sink_connectors": sinkConnectors, "act_as_dls_station_in_stations": usedAsDlsStations, }