Skip to content

Commit

Permalink
Merge pull request #1610 from memphisdev/bugfix-RND-413-messages-that…
Browse files Browse the repository at this point in the history
…-send-and-the-redis-connector-on-pause-while-it-is-the-single-consumer-dosent-wait-him-to-be-reconnect

from cloud
  • Loading branch information
shay23b authored Jan 3, 2024
2 parents f750de2 + a8d4143 commit a73649f
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 0 deletions.
48 changes: 48 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2824,6 +2824,26 @@ func DeleteProducerByNameAndStationID(name string, stationId int) (bool, error)
return true, nil
}

func DeleteConnectorProducerByNameAndStationID(name string, stationId int) (bool, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return false, err
}
defer conn.Release()
query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND type = 'connector' LIMIT 1`
stmt, err := conn.Conn().Prepare(ctx, "delete_producer_by_name_and_station_id", query)
if err != nil {
return false, err
}
_, err = conn.Conn().Query(ctx, stmt.Name, name, stationId)
if err != nil {
return false, err
}
return true, nil
}

func DeleteProducerByNameStationIDAndConnID(name string, stationId int, connId string) (bool, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
Expand Down Expand Up @@ -3223,6 +3243,34 @@ func DeleteConsumerByNameStationIDAndConnID(connectionId, name string, stationId
return true, consumers[0], nil
}

func DeleteConsumerByNameStationIDAndType(consumerType, name string, stationId int) (bool, models.Consumer, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return false, models.Consumer{}, err
}
defer conn.Release()
query := ` DELETE FROM consumers WHERE ctid = ( SELECT ctid FROM consumers WHERE type = $1 AND name = $2 AND station_id = $3 LIMIT 1) RETURNING *`
deleteStmt, err := conn.Conn().Prepare(ctx, "delete_consumers", query)
if err != nil {
return false, models.Consumer{}, err
}
rows, err := conn.Conn().Query(ctx, deleteStmt.Name, consumerType, name, stationId)
if err != nil {
return false, models.Consumer{}, err
}
defer rows.Close()
consumers, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.Consumer])
if err != nil {
return false, models.Consumer{}, err
}
if len(consumers) == 0 {
return false, models.Consumer{}, err
}
return true, consumers[0], nil
}

func DeleteConsumerByNameAndStationId(name string, stationId int) (bool, models.Consumer, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
Expand Down
57 changes: 57 additions & 0 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,63 @@ func (s *Server) destroyCGFromNats(c *client, reply, userName, tenantName string

}

func (s *Server) destroyCGFromNatsInternal(username, tenantName string, stationName StationName, consumer models.Consumer, station models.Station) error {
// ensure not part of an active consumer group
count, err := db.CountActiveConsumersInCG(consumer.ConsumersGroup, station.ID)
if err != nil {
return err
}

deleted := false
if count == 0 { // no other members in this group
err = s.RemoveConsumer(station.TenantName, stationName, consumer.ConsumersGroup, consumer.PartitionsList)
if err != nil && !IsNatsErr(err, JSConsumerNotFoundErr) && !IsNatsErr(err, JSStreamNotFoundErr) {
return err
}
if err == nil {
deleted = true
}

err = db.RemovePoisonedCg(station.ID, consumer.ConsumersGroup)
if err != nil && !IsNatsErr(err, JSConsumerNotFoundErr) && !IsNatsErr(err, JSStreamNotFoundErr) {
return err
}
}

name := strings.ToLower(consumer.Name)
if deleted {
_, user, err := memphis_cache.GetUser(username, consumer.TenantName, false)
if err != nil && !IsNatsErr(err, JSConsumerNotFoundErr) && !IsNatsErr(err, JSStreamNotFoundErr) {
return err
}
message := fmt.Sprintf("Consumer %v has been destroyed", name)
serv.Noticef("[tenant: %v][user: %v]: %v", user.TenantName, user.Username, message)
var auditLogs []interface{}
newAuditLog := models.AuditLog{
StationName: stationName.Ext(),
Message: message,
CreatedBy: user.ID,
CreatedByUsername: user.Username,
CreatedAt: time.Now(),
TenantName: user.TenantName,
}
auditLogs = append(auditLogs, newAuditLog)
err = CreateAuditLogs(auditLogs)
if err != nil {
serv.Errorf("[tenant: %v]destroyCGFromNats at CreateAuditLogs: Consumer %v at station %v: %v", user.TenantName, consumer.Name, station.Name, err.Error())
}

shouldSendAnalytics, _ := shouldSendAnalytics()
if shouldSendAnalytics {
analyticsParams := make(map[string]interface{})
analytics.SendEvent(user.TenantName, username, analyticsParams, "user-remove-consumer-sdk")
}
}

return nil

}

func comparePartitionsList(pList1, pList2 []int) bool {
if len(pList1) != len(pList2) {
return false
Expand Down

0 comments on commit a73649f

Please sign in to comment.