Skip to content

Commit

Permalink
from cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
shay23b committed Dec 11, 2023
1 parent 6216f0f commit 1625403
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 75 deletions.
48 changes: 24 additions & 24 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ func GetActiveConnections() ([]string, error) {
return []string{}, err
}
defer conn.Release()
query := `SELECT connection_id FROM producers WHERE is_active = true UNION SELECT connection_id FROM consumers WHERE is_active = true;`
query := `SELECT connection_id FROM producers WHERE is_active = true AND type = 'application' UNION SELECT connection_id FROM consumers WHERE is_active = true AND type = 'application';`
stmt, err := conn.Conn().Prepare(ctx, "get_active_connection", query)
if err != nil {
return []string{}, err
Expand Down Expand Up @@ -2459,7 +2459,7 @@ func GetProducerByNameAndConnectionID(name string, connectionId string) (bool, m
return false, models.Producer{}, err
}
defer conn.Release()
query := `SELECT * FROM producers WHERE name = $1 AND connection_id = $2`
query := `SELECT * FROM producers WHERE name = $1 AND connection_id = $2 AND type = 'application'`
stmt, err := conn.Conn().Prepare(ctx, "get_producer_by_name_and_connection_id", query)
if err != nil {
return false, models.Producer{}, err
Expand Down Expand Up @@ -2487,7 +2487,7 @@ func GetProducerByStationIDAndConnectionId(name string, stationId int, connectio
return false, models.Producer{}, err
}
defer conn.Release()
query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 AND connection_id = $3 ORDER BY is_active DESC LIMIT 1`
query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 AND connection_id = $3 AND type = 'application' ORDER BY is_active DESC LIMIT 1`
stmt, err := conn.Conn().Prepare(ctx, "get_producer_by_station_id_and_connection_id", query)
if err != nil {
return false, models.Producer{}, err
Expand Down Expand Up @@ -2515,7 +2515,7 @@ func GetProducerByNameAndStationID(name string, stationId int) (bool, models.Pro
return false, models.Producer{}, err
}
defer conn.Release()
query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 ORDER BY is_active DESC LIMIT 1`
query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 AND type = 'application' ORDER BY is_active DESC LIMIT 1`
stmt, err := conn.Conn().Prepare(ctx, "get_producer_by_name_and_station_id", query)
if err != nil {
return false, models.Producer{}, err
Expand Down Expand Up @@ -2544,7 +2544,7 @@ func GetActiveProducerByStationID(producerName string, stationId int) (bool, mod
}
defer conn.Release()

query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 AND is_active = true LIMIT 1`
query := `SELECT * FROM producers WHERE name = $1 AND station_id = $2 AND is_active = true AND type = 'application' LIMIT 1`
stmt, err := conn.Conn().Prepare(ctx, "get_active_producer_by_station_id", query)
if err != nil {
return false, models.Producer{}, err
Expand Down Expand Up @@ -2574,7 +2574,7 @@ func GetProducersForGraph(tenantName string) ([]models.ProducerForGraph, error)
defer conn.Release()
query := `SELECT p.name, p.station_id, p.app_id
FROM producers AS p
WHERE p.tenant_name = $1 AND p.is_active = true
WHERE p.tenant_name = $1 AND p.is_active = true AND p.type = 'application'
ORDER BY p.name, p.station_id DESC
LIMIT 10000;`
stmt, err := conn.Conn().Prepare(ctx, "get_producers_for_graph", query)
Expand Down Expand Up @@ -2673,7 +2673,7 @@ func InsertNewProducer(name string, stationId int, producerType string, connecti
return newProducer, nil
}

func GetNotDeletedProducersByStationID(stationId int) ([]models.Producer, error) {
func GetNotDeletedProducersByStationID(stationId int) ([]models.Producer, error) { // TODO: check if not needed - I think its not used
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
Expand All @@ -2682,7 +2682,7 @@ func GetNotDeletedProducersByStationID(stationId int) ([]models.Producer, error)
}
defer conn.Release()

query := `SELECT * FROM producers AS p WHERE p.station_id = $1 AND p.is_deleted = false`
query := `SELECT * FROM producers AS p WHERE p.station_id = $1 AND p.is_deleted = false AND p.type = 'application'`
stmt, err := conn.Conn().Prepare(ctx, "get_not_deleted_producers_by_station_id", query)
if err != nil {
return []models.Producer{}, err
Expand Down Expand Up @@ -2721,7 +2721,7 @@ func GetAllProducersByStationID(stationId int) ([]models.ExtendedProducer, error
COUNT(CASE WHEN NOT p.is_active THEN 1 END) OVER (PARTITION BY p.name) AS disconnected_producers_count
FROM producers AS p
LEFT JOIN stations AS s ON s.id = p.station_id
WHERE p.station_id = $1
WHERE p.station_id = $1 AND p.type = 'application'
ORDER BY p.name, p.is_active DESC, p.updated_at DESC
LIMIT 5000;
`
Expand Down Expand Up @@ -2753,7 +2753,7 @@ func DeleteProducerByNameAndStationID(name string, stationId int) (bool, error)
return false, err
}
defer conn.Release()
query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 LIMIT 1`
query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND type = 'application' LIMIT 1`
stmt, err := conn.Conn().Prepare(ctx, "delete_producer_by_name_and_station_id", query)
if err != nil {
return false, err
Expand Down Expand Up @@ -2819,7 +2819,7 @@ func CountActiveProudcersByStationID(stationId int) (int64, error) {
return 0, err
}
defer conn.Release()
query := `SELECT COUNT(*) FROM producers WHERE station_id = $1 AND is_active = true`
query := `SELECT COUNT(*) FROM producers WHERE station_id = $1 AND is_active = true AND type = 'application'`
stmt, err := conn.Conn().Prepare(ctx, "count_active_producers_by_station_id", query)
if err != nil {
return 0, err
Expand All @@ -2841,7 +2841,7 @@ func CountAllActiveProudcers() (int64, error) {
return 0, err
}
defer conn.Release()
query := `SELECT COUNT(*) FROM producers WHERE is_active = true`
query := `SELECT COUNT(*) FROM producers WHERE is_active = true AND type = 'application'`
stmt, err := conn.Conn().Prepare(ctx, "count_all_active_producers", query)
if err != nil {
return 0, err
Expand Down Expand Up @@ -2924,7 +2924,7 @@ func GetActiveConsumerByCG(consumersGroup string, stationId int) (bool, models.C
}
defer conn.Release()

query := `SELECT * FROM consumers WHERE consumers_group = $1 AND station_id = $2 LIMIT 1`
query := `SELECT * FROM consumers WHERE consumers_group = $1 AND station_id = $2 AND type = 'application' LIMIT 1`
stmt, err := conn.Conn().Prepare(ctx, "get_active_consumer_by_cg", query)
if err != nil {
return false, models.Consumer{}, err
Expand Down Expand Up @@ -3051,7 +3051,7 @@ func GetConsumers() ([]models.Consumer, error) {
return []models.Consumer{}, err
}
defer conn.Release()
query := `SELECT * From consumers`
query := `SELECT * From consumers AND type = 'application'`
stmt, err := conn.Conn().Prepare(ctx, "get_consumers", query)
if err != nil {
return []models.Consumer{}, err
Expand Down Expand Up @@ -3083,7 +3083,7 @@ func GetAllConsumersByStation(stationId int) ([]models.ExtendedConsumer, error)
COUNT (CASE WHEN c.is_active THEN 1 END) OVER (PARTITION BY c.name) AS count
FROM consumers AS c
LEFT JOIN stations AS s ON s.id = c.station_id
WHERE c.station_id = $1 ORDER BY c.name, c.consumers_group, c.updated_at DESC
WHERE c.station_id = $1 AND c.type = 'application' ORDER BY c.name, c.consumers_group, c.updated_at DESC
LIMIT 5000;`
stmt, err := conn.Conn().Prepare(ctx, "get_all_consumers_by_station", query)
if err != nil {
Expand Down Expand Up @@ -3114,7 +3114,7 @@ func GetConsumersForGraph(tenantName string) ([]models.ConsumerForGraph, error)
defer conn.Release()
query := `SELECT c.name, c.consumers_group, c.station_id, c.app_id
FROM consumers AS c
WHERE c.tenant_name = $1 AND c.is_active = true
WHERE c.tenant_name = $1 AND c.is_active = true AND c.type = 'application'
ORDER BY c.name, c.station_id DESC
LIMIT 10000;`
stmt, err := conn.Conn().Prepare(ctx, "get_consumers_for_graph", query)
Expand Down Expand Up @@ -3172,7 +3172,7 @@ func DeleteConsumerByNameAndStationId(name string, stationId int) (bool, models.
return false, models.Consumer{}, err
}
defer conn.Release()
query := ` DELETE FROM consumers WHERE ctid = ( SELECT ctid FROM consumers WHERE name = $1 AND station_id = $ LIMIT 1) RETURNING *`
query := ` DELETE FROM consumers WHERE ctid = ( SELECT ctid FROM consumers WHERE name = $1 AND station_id = $2 LIMIT 1) RETURNING *`
deleteStmt, err := conn.Conn().Prepare(ctx, "delete_consumers", query)
if err != nil {
return false, models.Consumer{}, err
Expand Down Expand Up @@ -3241,7 +3241,7 @@ func CountActiveConsumersInCG(consumersGroup string, stationId int) (int64, erro
return 0, err
}
defer conn.Release()
query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND consumers_group = $2 AND is_active = true`
query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND consumers_group = $2 AND is_active = true AND type = 'application'`
stmt, err := conn.Conn().Prepare(ctx, "count_active_consumers_in_cg", query)
if err != nil {
return 0, err
Expand All @@ -3263,7 +3263,7 @@ func CountActiveConsumersByStationID(stationId int) (int64, error) {
return 0, err
}
defer conn.Release()
query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND is_active = true`
query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND is_active = true AND type = 'application'`
stmt, err := conn.Conn().Prepare(ctx, "count_active_consumers_by_station_id", query)
if err != nil {
return 0, err
Expand All @@ -3285,7 +3285,7 @@ func CountAllActiveConsumers() (int64, error) {
return 0, err
}
defer conn.Release()
query := `SELECT COUNT(*) FROM consumers WHERE is_active = true`
query := `SELECT COUNT(*) FROM consumers WHERE is_active = true AND type = 'application'`
stmt, err := conn.Conn().Prepare(ctx, "count_all_active_consumers", query)
if err != nil {
return 0, err
Expand Down Expand Up @@ -3390,7 +3390,7 @@ func GetActiveConsumerByStationID(consumerName string, stationId int) (bool, mod
return false, models.Consumer{}, err
}
defer conn.Release()
query := `SELECT * FROM consumers WHERE name = $1 AND station_id = $2 AND is_active = true LIMIT 1`
query := `SELECT * FROM consumers WHERE name = $1 AND station_id = $2 AND is_active = true AND type = 'application' LIMIT 1`
stmt, err := conn.Conn().Prepare(ctx, "get_active_consumer_by_station_id", query)
if err != nil {
return false, models.Consumer{}, err
Expand Down Expand Up @@ -3502,7 +3502,7 @@ func GetActiveCgsByName(names []string, tenantName string) ([]models.LightConsum
SELECT c.consumers_group, s.name, COUNT(*)
FROM consumers AS c
LEFT JOIN stations AS s ON s.id = c.station_id
WHERE c.tenant_name = $1 AND c.consumers_group = ANY($2) AND c.is_active = true
WHERE c.tenant_name = $1 AND c.consumers_group = ANY($2) AND c.is_active = true AND c.type = 'application'
GROUP BY c.consumers_group, s.name, c.station_id;`
stmt, err := conn.Conn().Prepare(ctx, "get_active_consumers_by_name", query)
if err != nil {
Expand Down Expand Up @@ -5702,7 +5702,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
}
defer tx.Rollback(ctx)

query := `SELECT EXISTS(SELECT 1 FROM consumers WHERE tenant_name = $1 AND consumers_group = $2)`
query := `SELECT EXISTS(SELECT 1 FROM consumers WHERE tenant_name = $1 AND consumers_group = $2 AND type = 'application')`
stmt, err := tx.Prepare(ctx, "check_if_consumer_exists", query)
if err != nil {
return 0, updated, err
Expand Down Expand Up @@ -7228,7 +7228,7 @@ func CountProudcersForStation(stationId int) (int64, error) {
return 0, err
}
defer conn.Release()
query := `SELECT COUNT(*) FROM producers WHERE station_id=$1`
query := `SELECT COUNT(*) FROM producers WHERE station_id=$1 AND type = 'application'`
stmt, err := conn.Conn().Prepare(ctx, "get_count_producers_for_station", query)
if err != nil {
return 0, err
Expand Down
1 change: 1 addition & 0 deletions server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func (s *Server) StartBackgroundTasks() error {
go s.ReleaseStuckLocks()
go s.ConsumeFunctionTasks()
go s.ScaleFunctionWorkers()
go s.ConnectorsDeadPodsRescheduler()
return nil
}

Expand Down
12 changes: 12 additions & 0 deletions server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2346,6 +2346,10 @@ func (s *Server) ScaleFunctionWorkers() {
return
}

func (s *Server) ConnectorsDeadPodsRescheduler() {
return
}

func (s *Server) ConsumeFunctionsDlsMessages() {

}
Expand Down Expand Up @@ -2596,3 +2600,11 @@ func deleteConnectorsStationResources(tenantName string, stationID int) error {
func deleteConnectorsTenantResources(tenantName string) error {
return nil
}

func (s *Server) GetSourceConnectorsByStationAndPartition(stationID, partitionNumber, numOfPartitions int) (map[string]interface{}, error) {
return map[string]interface{}{}, nil
}

func (ch ConsumersHandler) GetSinkConnectorsByStation(stationName StationName, station models.Station, partition int, partitions []int) ([]string, error) {
return []string{}, nil
}
23 changes: 14 additions & 9 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,19 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam

splitted := strings.Split(c.opts.Lang, ".")
sdkName := splitted[len(splitted)-1]
newConsumer, err := db.InsertNewConsumer(name, station.ID, consumerType, connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId)
if err != nil {
serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error())
return []int{}, err
var newConsumer models.Consumer
if strings.HasPrefix(user.Username, "$") {
newConsumer, err = db.InsertNewConsumer(name, station.ID, "connector", connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId)
if err != nil {
serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
} else {
newConsumer, err = db.InsertNewConsumer(name, station.ID, consumerType, connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId)
if err != nil {
serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
}

message := "Consumer " + name + " connected"
Expand Down Expand Up @@ -335,11 +344,7 @@ func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) {
return
}
if err != nil {
if strings.Contains(err.Error(), "not exist") {
s.Warnf("[tenant: %v][user: %v]createConsumerDirect at getSchemaUpdateInitFromStation: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error())
} else {
s.Errorf("[tenant: %v][user: %v]createConsumerDirect at getSchemaUpdateInitFromStation: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error())
}
s.Errorf("[tenant: %v][user: %v]createConsumerDirect at getSchemaUpdateInitFromStation: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error())
respondWithRespErr(s.MemphisGlobalAccountString(), s, reply, err, &resp)
return
}
Expand Down
23 changes: 15 additions & 8 deletions server/memphis_handlers_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,12 +695,19 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
}
}

connectors, err := mh.S.GetConnectorsByStationAndPartition(station.ID, body.PartitionNumber, len(station.PartitionsList))
sourceConnectors, err := mh.S.GetSourceConnectorsByStationAndPartition(station.ID, body.PartitionNumber, len(station.PartitionsList))
if err != nil {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetConnectorsByStationAndPartition: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetSourceConnectorsByStationAndPartition: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
sinkConnectors, err := consumersHandler.GetSinkConnectorsByStation(stationName, station, body.PartitionNumber, station.PartitionsList)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetSinkConnectorsByStation: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}

var response gin.H

// Check when the schema object in station is not empty, not optional for non native stations
Expand All @@ -722,9 +729,6 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
return
}
updatesAvailable := !schemaVersion.Active
if schema.Name == "" && schema.Type == "" && station.SchemaVersionNumber == 0 {
updatesAvailable = false
}
schemaDetails = models.StationOverviewSchemaDetails{
SchemaName: schema.Name,
VersionNumber: station.SchemaVersionNumber,
Expand Down Expand Up @@ -759,7 +763,8 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"connectors": connectors,
"source_connectors": sourceConnectors,
"sink_connectors": sinkConnectors,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
} else {
Expand Down Expand Up @@ -793,7 +798,8 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"connectors": connectors,
"source_connectors": sourceConnectors,
"sink_connectors": sinkConnectors,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
} else {
Expand Down Expand Up @@ -824,7 +830,8 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"connectors": connectors,
"source_connectors": sourceConnectors,
"sink_connectors": sinkConnectors,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
}
Expand Down
Loading

0 comments on commit 1625403

Please sign in to comment.