Skip to content

Commit

Permalink
Merge pull request #1545 from memphisdev/RND-314-audit-logs-cleaning-…
Browse files Browse the repository at this point in the history
…background-task

RND-314-audit-logs-cleaning-background-task
  • Loading branch information
shohamroditimemphis authored Dec 21, 2023
2 parents a3142b8 + 034fe98 commit 65360a8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
20 changes: 20 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,26 @@ func RemoveAuditLogsByTenant(tenantName string) error {
return nil
}

func RemoveAuditLogsByTenantAndCreatedAt(tenantName string, createdAt time.Time) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
query := `DELETE FROM audit_logs WHERE tenant_name = $1 AND created_at < $2`
stmt, err := conn.Conn().Prepare(ctx, "remove_audit_logs_by_tenant_and_created_at", query)
if err != nil {
return err
}
_, err = conn.Conn().Query(ctx, stmt.Name, tenantName, createdAt)
if err != nil {
return err
}
return nil
}

// Station Functions
func GetActiveStationsPerTenant(tenantName string) ([]models.Station, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
Expand Down
11 changes: 8 additions & 3 deletions server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (s *Server) StartBackgroundTasks() error {
go s.InitializeThroughputSampling()
go s.UploadTenantUsageToDB()
go s.RefreshFirebaseFunctionsKey()
go s.RemoveOldProducersAndConsumers()
go s.RemoveOldProducersAndConsumersAndAuditLogs()
go ScheduledCloudCacheRefresh()
go s.SendBillingAlertWhenNeeded()
go s.CheckBrokenConnectedIntegrations()
Expand Down Expand Up @@ -616,14 +616,19 @@ func (s *Server) RemoveOldDlsMsgs() {
}
}

func (s *Server) RemoveOldProducersAndConsumers() {
func (s *Server) RemoveOldProducersAndConsumersAndAuditLogs() {
ticker := time.NewTicker(15 * time.Minute)
for range ticker.C {
for tenantName, rt := range s.opts.GCProducersConsumersRetentionHours {
configurationTime := time.Now().Add(time.Hour * time.Duration(-rt))
err := db.DeleteOldProducersAndConsumers(configurationTime, tenantName)
if err != nil {
serv.Errorf("[tenant: %v]RemoveOldProducersAndConsumers at DeleteOldProducersAndConsumers : %v", tenantName, err.Error())
serv.Errorf("[tenant: %v]RemoveOldProducersAndConsumersAndAuditLogs at DeleteOldProducersAndConsumers : %v", tenantName, err.Error())
}
time := time.Now().Add(-time.Hour * 3 * 24)
err = db.RemoveAuditLogsByTenantAndCreatedAt(tenantName, time)
if err != nil {
serv.Errorf("[tenant: %v]RemoveOldProducersAndConsumersAndAuditLogs at RemoveAuditLogsByTenantAndCreatedAt : %v", tenantName, err.Error())
}
}
}
Expand Down

0 comments on commit 65360a8

Please sign in to comment.