Skip to content

Commit

Permalink
bugfix-RND-256-new-endpoint-to-clean-disconnected-producers-consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
shohamroditimemphis committed Dec 3, 2023
1 parent ea7c245 commit e27f53e
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 105 deletions.
42 changes: 22 additions & 20 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2670,44 +2670,46 @@ func GetNotDeletedProducersByStationID(stationId int) ([]models.Producer, error)
}
return producers, nil
}
func GetAllProducersByStationID(stationId int) ([]models.ExtendedProducer, error) {
func GetAllProducersByStationID(stationId int) ([]models.ExtendedProducerRes, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return []models.ExtendedProducer{}, err
return []models.ExtendedProducerRes{}, err
}
defer conn.Release()
query := `SELECT
p.id,
p.name,
p.type,
p.connection_id,
p.updated_at,
s.name,
p.is_active,
COUNT(CASE WHEN p.is_active THEN 1 END) OVER (PARTITION BY p.name) AS count_producers
FROM producers AS p
LEFT JOIN stations AS s ON s.id = p.station_id
WHERE p.station_id = $1
ORDER BY p.name, p.is_active DESC, p.updated_at DESC
LIMIT 5000;`
p.id,
p.name,
p.type,
p.connection_id,
p.updated_at,
s.name,
p.is_active,
COUNT(CASE WHEN p.is_active THEN 1 END) OVER (PARTITION BY p.name) AS connected_producers_count,
COUNT(CASE WHEN NOT p.is_active THEN 1 END) OVER (PARTITION BY p.name) AS disconnected_producers_count
FROM producers AS p
LEFT JOIN stations AS s ON s.id = p.station_id
WHERE p.station_id = $1
ORDER BY p.name, p.is_active, p.updated_at DESC
LIMIT 5000;
`
stmt, err := conn.Conn().Prepare(ctx, "get_producers_by_station_id", query)
if err != nil {
return []models.ExtendedProducer{}, err
return []models.ExtendedProducerRes{}, err
}
rows, err := conn.Conn().Query(ctx, stmt.Name, stationId)
if err != nil {
return []models.ExtendedProducer{}, err
return []models.ExtendedProducerRes{}, err
}
defer rows.Close()

producers, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.ExtendedProducer])
producers, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.ExtendedProducerRes])
if err != nil {
return []models.ExtendedProducer{}, err
return []models.ExtendedProducerRes{}, err
}
if len(producers) == 0 {
return []models.ExtendedProducer{}, nil
return []models.ExtendedProducerRes{}, nil
}
return producers, nil
}
Expand Down
14 changes: 13 additions & 1 deletion models/producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,24 @@ type ExtendedProducer struct {
Name string `json:"name"`
Type string `json:"type,omitempty"`
ConnectionId string `json:"connection_id,omitempty"`
UpdatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
StationName string `json:"station_name"`
IsActive bool `json:"is_active"`
Count int `json:"count"`
}

type ExtendedProducerRes struct {
ID int `json:"id"`
Name string `json:"name"`
Type string `json:"type,omitempty"`
ConnectionId string `json:"connection_id,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
StationName string `json:"station_name"`
IsActive bool `json:"is_active"`
ConnectedProducersCount int `json:"connected_producers_count"`
DisconnedtedProducersCount int `json:"disconnected_producers_count"`
}

type LightProducer struct {
Name string `json:"name"`
StationName string `json:"station_name"`
Expand Down
4 changes: 4 additions & 0 deletions server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2370,3 +2370,7 @@ func (pmh PoisonMessagesHandler) GetDlsMessageDetails(messageId int, dlsType str

return dlsMsgResponse, nil
}

func getUsageLimitProduersLimitPerStation(tenantName, username, stationName string) (float64, error) {
return -1, nil
}
160 changes: 84 additions & 76 deletions server/memphis_handlers_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,12 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
}
}

usageLimit, err := getUsageLimitProduersLimitPerStation(user.TenantName, user.Username, body.StationName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData: At getUsageLimitProduersLimitPerStation %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
messagesToFetch := 1000
messages := make([]models.MessageDetails, 0)
if body.PartitionNumber == -1 {
Expand Down Expand Up @@ -679,7 +685,6 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {

// Check when the schema object in station is not empty, not optional for non native stations
if station.SchemaName != "" && station.SchemaVersionNumber != 0 {

var schemaDetails models.StationOverviewSchemaDetails
exist, schema, err := db.GetSchemaByName(station.SchemaName, station.TenantName)
if err != nil {
Expand All @@ -705,90 +710,93 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
}
}
response = gin.H{
"connected_producers": connectedProducers,
"disconnected_producers": disconnectedProducers,
"deleted_producers": deletedProducers,
"connected_cgs": connectedCgs,
"disconnected_cgs": disconnectedCgs,
"deleted_cgs": deletedCgs,
"total_messages": totalMessages,
"average_message_size": avgMsgSize,
"audit_logs": auditLogs,
"messages": messages,
"poison_messages": poisonMessages,
"schema_failed_messages": schemaFailedMessages,
"functions_failed_messages": functionsMessages,
"tags": tags,
"leader": leader,
"followers": followers,
"schema": schemaDetails,
"idempotency_window_in_ms": station.IdempotencyWindow,
"dls_configuration_poison": station.DlsConfigurationPoison,
"dls_configuration_schemaverse": station.DlsConfigurationSchemaverse,
"total_dls_messages": totalDlsAmount,
"tiered_storage_enabled": station.TieredStorageEnabled,
"created_by_username": station.CreatedByUsername,
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"connected_producers": connectedProducers,
"disconnected_producers": disconnectedProducers,
"deleted_producers": deletedProducers,
"connected_cgs": connectedCgs,
"disconnected_cgs": disconnectedCgs,
"deleted_cgs": deletedCgs,
"total_messages": totalMessages,
"average_message_size": avgMsgSize,
"audit_logs": auditLogs,
"messages": messages,
"poison_messages": poisonMessages,
"schema_failed_messages": schemaFailedMessages,
"functions_failed_messages": functionsMessages,
"tags": tags,
"leader": leader,
"followers": followers,
"schema": schemaDetails,
"idempotency_window_in_ms": station.IdempotencyWindow,
"dls_configuration_poison": station.DlsConfigurationPoison,
"dls_configuration_schemaverse": station.DlsConfigurationSchemaverse,
"total_dls_messages": totalDlsAmount,
"tiered_storage_enabled": station.TieredStorageEnabled,
"created_by_username": station.CreatedByUsername,
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
}
} else {
var emptyResponse struct{}
if !station.IsNative {
cp, dp, cc, dc := getFakeProdsAndConsForPreview()
response = gin.H{
"connected_producers": cp,
"disconnected_producers": dp,
"deleted_producers": deletedProducers,
"connected_cgs": cc,
"disconnected_cgs": dc,
"deleted_cgs": deletedCgs,
"total_messages": totalMessages,
"average_message_size": avgMsgSize,
"audit_logs": auditLogs,
"messages": messages,
"poison_messages": poisonMessages,
"schema_failed_messages": schemaFailedMessages,
"functions_failed_messages": functionsMessages,
"tags": tags,
"leader": leader,
"followers": followers,
"schema": emptyResponse,
"idempotency_window_in_ms": station.IdempotencyWindow,
"dls_configuration_poison": station.DlsConfigurationPoison,
"dls_configuration_schemaverse": station.DlsConfigurationSchemaverse,
"total_dls_messages": totalDlsAmount,
"tiered_storage_enabled": station.TieredStorageEnabled,
"created_by_username": station.CreatedByUsername,
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"connected_producers": cp,
"disconnected_producers": dp,
"deleted_producers": deletedProducers,
"connected_cgs": cc,
"disconnected_cgs": dc,
"deleted_cgs": deletedCgs,
"total_messages": totalMessages,
"average_message_size": avgMsgSize,
"audit_logs": auditLogs,
"messages": messages,
"poison_messages": poisonMessages,
"schema_failed_messages": schemaFailedMessages,
"functions_failed_messages": functionsMessages,
"tags": tags,
"leader": leader,
"followers": followers,
"schema": emptyResponse,
"idempotency_window_in_ms": station.IdempotencyWindow,
"dls_configuration_poison": station.DlsConfigurationPoison,
"dls_configuration_schemaverse": station.DlsConfigurationSchemaverse,
"total_dls_messages": totalDlsAmount,
"tiered_storage_enabled": station.TieredStorageEnabled,
"created_by_username": station.CreatedByUsername,
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
}
} else {
response = gin.H{
"connected_producers": connectedProducers,
"disconnected_producers": disconnectedProducers,
"deleted_producers": deletedProducers,
"connected_cgs": connectedCgs,
"disconnected_cgs": disconnectedCgs,
"deleted_cgs": deletedCgs,
"total_messages": totalMessages,
"average_message_size": avgMsgSize,
"audit_logs": auditLogs,
"messages": messages,
"poison_messages": poisonMessages,
"schema_failed_messages": schemaFailedMessages,
"functions_failed_messages": functionsMessages,
"tags": tags,
"leader": leader,
"followers": followers,
"schema": emptyResponse,
"idempotency_window_in_ms": station.IdempotencyWindow,
"dls_configuration_poison": station.DlsConfigurationPoison,
"dls_configuration_schemaverse": station.DlsConfigurationSchemaverse,
"total_dls_messages": totalDlsAmount,
"tiered_storage_enabled": station.TieredStorageEnabled,
"created_by_username": station.CreatedByUsername,
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"connected_producers": connectedProducers,
"disconnected_producers": disconnectedProducers,
"deleted_producers": deletedProducers,
"connected_cgs": connectedCgs,
"disconnected_cgs": disconnectedCgs,
"deleted_cgs": deletedCgs,
"total_messages": totalMessages,
"average_message_size": avgMsgSize,
"audit_logs": auditLogs,
"messages": messages,
"poison_messages": poisonMessages,
"schema_failed_messages": schemaFailedMessages,
"functions_failed_messages": functionsMessages,
"tags": tags,
"leader": leader,
"followers": followers,
"schema": emptyResponse,
"idempotency_window_in_ms": station.IdempotencyWindow,
"dls_configuration_poison": station.DlsConfigurationPoison,
"dls_configuration_schemaverse": station.DlsConfigurationSchemaverse,
"total_dls_messages": totalDlsAmount,
"tiered_storage_enabled": station.TieredStorageEnabled,
"created_by_username": station.CreatedByUsername,
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
}
}
}
Expand Down
33 changes: 25 additions & 8 deletions server/memphis_handlers_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (s *Server) createProducerDirect(c *client, reply string, msg []byte) {
func (ph ProducersHandler) GetProducersByStation(station models.Station) ([]models.ExtendedProducer, []models.ExtendedProducer, []models.ExtendedProducer, error) { // for socket io endpoint
producers, err := db.GetAllProducersByStationID(station.ID)
if err != nil {
return producers, producers, producers, err
return []models.ExtendedProducer{}, []models.ExtendedProducer{}, []models.ExtendedProducer{}, err
}

var connectedProducers []models.ExtendedProducer
Expand All @@ -265,19 +265,36 @@ func (ph ProducersHandler) GetProducersByStation(station models.Station) ([]mode
continue
}

producerRes := models.ExtendedProducer{
ID: producer.ID,
Name: producer.Name,
StationName: producer.StationName,
UpdatedAt: producer.UpdatedAt,
IsActive: producer.IsActive,
Count: producer.Count,
producerExtendedRes := models.ExtendedProducerRes{
ID: producer.ID,
Name: producer.Name,
StationName: producer.StationName,
UpdatedAt: producer.UpdatedAt,
IsActive: producer.IsActive,
DisconnedtedProducersCount: producer.DisconnedtedProducersCount,
ConnectedProducersCount: producer.ConnectedProducersCount,
}

producersNames = append(producersNames, producer.Name)
if producer.IsActive {
producerRes := models.ExtendedProducer{
ID: producerExtendedRes.ID,
Name: producerExtendedRes.Name,
StationName: producerExtendedRes.StationName,
UpdatedAt: producerExtendedRes.UpdatedAt,
IsActive: producerExtendedRes.IsActive,
Count: producerExtendedRes.ConnectedProducersCount,
}
connectedProducers = append(connectedProducers, producerRes)
} else {
producerRes := models.ExtendedProducer{
ID: producerExtendedRes.ID,
Name: producerExtendedRes.Name,
StationName: producerExtendedRes.StationName,
UpdatedAt: producerExtendedRes.UpdatedAt,
IsActive: producerExtendedRes.IsActive,
Count: producerExtendedRes.DisconnedtedProducersCount,
}
disconnectedProducers = append(disconnectedProducers, producerRes)
}
}
Expand Down

0 comments on commit e27f53e

Please sign in to comment.