Skip to content

Commit

Permalink
add remove old stations (#1038)
Browse files Browse the repository at this point in the history
* add remove old stations

* fix

* fix

* fix

* fix

* fix

* fix

* fix
  • Loading branch information
shohamroditimemphis authored Jul 2, 2023
1 parent 5096462 commit 274997f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
49 changes: 49 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1705,6 +1705,27 @@ func DeleteStationsByNames(stationNames []string, tenantName string) error {
return nil
}

func RemoveDeletedStations() error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
query := `DELETE FROM stations WHERE is_deleted = true`
stmt, err := conn.Conn().Prepare(ctx, "remove_deleted_stations", query)
if err != nil {
return err
}

_, err = conn.Conn().Query(ctx, stmt.Name)
if err != nil {
return err
}
return nil
}

func DeleteStation(name string, tenantName string) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
Expand Down Expand Up @@ -1920,6 +1941,34 @@ func RemoveSchemaFromAllUsingStations(schemaName string, tenantName string) erro
return nil
}

func GetDeletedStations() ([]models.Station, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return []models.Station{}, err
}
defer conn.Release()
query := `SELECT * FROM stations WHERE is_deleted = true`
stmt, err := conn.Conn().Prepare(ctx, "get_not_active_stations", query)
if err != nil {
return []models.Station{}, err
}
rows, err := conn.Conn().Query(ctx, stmt.Name)
if err != nil {
return []models.Station{}, err
}
defer rows.Close()
stations, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.Station])
if err != nil {
return []models.Station{}, err
}
if len(stations) == 0 {
return []models.Station{}, err
}
return stations, nil
}

// Producer Functions
func GetProducersByConnectionIDWithStationDetails(connectionId string) ([]models.LightProducer, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
Expand Down
20 changes: 20 additions & 0 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2154,3 +2154,23 @@ func getUserAndTenantIdFromString(username string) (string, int, error) {
return username, -1, nil

}

func (s *Server) RemoveOldStations() {
stations, err := db.GetDeletedStations()
if err != nil {
s.Errorf("RemoveOldStations: at GetDeletedStations: %v", err.Error())
return
}
for _, station := range stations {
err = removeStationResources(s, station, true)
if err != nil {
s.Errorf("[tenant: %v]RemoveOldStations: at removeStationResources: %v", station.TenantName, err.Error())
return
}
}
err = db.RemoveDeletedStations()
if err != nil {
s.Warnf("RemoveOldStations: at RemoveDeletedStations: %v", err.Error())
return
}
}
6 changes: 4 additions & 2 deletions server/memphis_zombie_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ func killFunc(s *Server) {
}
}
}

s.removeStaleStations()
}

func (s *Server) KillZombieResources() {
Expand All @@ -180,6 +178,10 @@ func (s *Server) KillZombieResources() {
firstIteration := true
for range time.Tick(time.Minute * 1) {
s.Debugf("Killing Zombie resources iteration")
if firstIteration {
s.removeStaleStations()
s.RemoveOldStations()
}
killFunc(s)

if firstIteration || count == 1*60 { // once in 1 hour
Expand Down

0 comments on commit 274997f

Please sign in to comment.