From 2016fc577cae9d00a543ca3c0e93344cf222b5bb Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Sun, 19 Nov 2023 20:50:04 +0200 Subject: [PATCH 1/5] RND-187-add-support-for-functions-type-in-the-dls --- db/db.go | 125 ++++++++++++++++++++---- models/dead_letter_station.go | 24 ++++- server/memphis_handlers_dls_messages.go | 19 ++-- server/memphis_handlers_monitoring.go | 5 +- server/memphis_handlers_stations.go | 7 +- server/memphis_handlers_user_mgmt.go | 4 + server/memphis_handlers_ws.go | 5 +- 7 files changed, 155 insertions(+), 34 deletions(-) diff --git a/db/db.go b/db/db.go index 0ba924f77..d7c393bf2 100644 --- a/db/db.go +++ b/db/db.go @@ -514,6 +514,7 @@ func createTables(MetadataDbClient MetadataStorage) error { ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis'; ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS producer_name VARCHAR NOT NULL DEFAULT ''; ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS partition_number INTEGER NOT NULL DEFAULT -1; + ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS function_id INT NOT NULL DEFAULT -1; DROP INDEX IF EXISTS dls_producer_id; IF EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'dls_messages' AND column_name = 'producer_id' @@ -546,6 +547,7 @@ func createTables(MetadataDbClient MetadataStorage) error { tenant_name VARCHAR NOT NULL DEFAULT '$memphis', producer_name VARCHAR NOT NULL, partition_number INTEGER NOT NULL DEFAULT -1, + function_id INT NOT NULL DEFAULT -1, PRIMARY KEY (id), CONSTRAINT fk_station_id FOREIGN KEY(station_id) @@ -5745,7 +5747,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin var pgErr *pgconn.PgError if errors.As(err, &pgErr) { if pgErr.Detail != "" { - if !strings.Contains(pgErr.Detail, "already exists") { + if strings.Contains(pgErr.Detail, "already exists") { return 0, updated, errors.New("dls_messages row already exists") } else { return 0, updated, errors.New(pgErr.Detail) @@ -5930,59 +5932,114 @@ func RemoveCgFromDlsMsg(msgId int, cgName string, tenantName string) error { return nil } -func GetDlsMsgsByStationId(stationId int) ([]models.DlsMessage, error) { +func GetDlsMsgsByStationId(stationId int) ([]models.DlsMessageRes, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() conn, err := MetadataDbClient.Client.Acquire(ctx) if err != nil { - return []models.DlsMessage{}, err + return []models.DlsMessageRes{}, err } defer conn.Release() - query := `SELECT * from dls_messages where station_id=$1 ORDER BY updated_at DESC limit 1000` + query := `SELECT + dlsm.id, + dlsm.station_id, + dlsm.message_seq, + dlsm.poisoned_cgs, + dlsm.message_details, + dlsm.updated_at, + dlsm.message_type, + dlsm.validation_error, + dlsm.tenant_name, + dlsm.producer_name, + dlsm.partition_number, + dlsm.function_id, + CASE + WHEN dlsm.function_id != -1 THEN s.function_name + ELSE '' + END AS function_name + FROM + dls_messages AS dlsm + LEFT JOIN + attached_functions AS s + ON + dlsm.station_id = s.station_id + AND dlsm.function_id = s.id + WHERE + dlsm.station_id = $1 + ORDER BY updated_at DESC limit 1000 + ` stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station", query) if err != nil { - return []models.DlsMessage{}, err + return []models.DlsMessageRes{}, err } rows, err := conn.Conn().Query(ctx, stmt.Name, stationId) if err != nil { - return []models.DlsMessage{}, err + return []models.DlsMessageRes{}, err } defer rows.Close() - dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessage]) + dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessageRes]) if err != nil { - return []models.DlsMessage{}, err + return []models.DlsMessageRes{}, err } if len(dlsMsgs) == 0 { - return []models.DlsMessage{}, nil + return []models.DlsMessageRes{}, nil } return dlsMsgs, nil } -func GetDlsMsgsByStationAndPartition(stationId, partitionNumber int) ([]models.DlsMessage, error) { +func GetDlsMsgsByStationAndPartition(stationId, partitionNumber int) ([]models.DlsMessageRes, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() conn, err := MetadataDbClient.Client.Acquire(ctx) if err != nil { - return []models.DlsMessage{}, err + return []models.DlsMessageRes{}, err } defer conn.Release() - query := `SELECT * from dls_messages where station_id=$1 AND partition_number = $2 ORDER BY updated_at DESC limit 2000` + query := `SELECT + dlsm.id, + dlsm.station_id, + dlsm.message_seq, + dlsm.poisoned_cgs, + dlsm.message_details, + dlsm.updated_at, + dlsm.message_type, + dlsm.validation_error, + dlsm.tenant_name, + dlsm.producer_name, + dlsm.partition_number, + dlsm.function_id, + CASE + WHEN dlsm.function_id != -1 THEN s.function_name + ELSE '' + END AS function_name + FROM + dls_messages AS dlsm + LEFT JOIN + attached_functions AS s + ON + dlsm.station_id = s.station_id + AND dlsm.function_id = s.id + WHERE + dlsm.station_id = $1 + AND dlsm.partition_number = $2 + ORDER BY updated_at DESC limit 1000; + ` stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station_and_partition", query) if err != nil { - return []models.DlsMessage{}, err + return []models.DlsMessageRes{}, err } rows, err := conn.Conn().Query(ctx, stmt.Name, stationId, partitionNumber) if err != nil { - return []models.DlsMessage{}, err + return []models.DlsMessageRes{}, err } defer rows.Close() - dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessage]) + dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessageRes]) if err != nil { - return []models.DlsMessage{}, err + return []models.DlsMessageRes{}, err } if len(dlsMsgs) == 0 { - return []models.DlsMessage{}, nil + return []models.DlsMessageRes{}, nil } return dlsMsgs, nil @@ -6655,10 +6712,14 @@ func DeleteOldProducersAndConsumers(timeInterval time.Time) ([]models.LightCG, e queries = append(queries, "DELETE FROM producers WHERE is_active = false AND updated_at < $1") queries = append(queries, "WITH deleted AS (DELETE FROM consumers WHERE is_active = false AND updated_at < $1 RETURNING *) SELECT deleted.consumers_group, s.name as station_name, deleted.station_id , deleted.tenant_name, deleted.partitions FROM deleted INNER JOIN stations s ON deleted.station_id = s.id GROUP BY deleted.consumers_group, s.name, deleted.station_id, deleted.tenant_name, deleted.partitions") + timeIntervalTemp := time.Now().Add(time.Duration(time.Hour * -time.Duration(2))) // TODO to be deleted once we fix the producer limitation + batch := &pgx.Batch{} - for _, q := range queries { - batch.Queue(q, timeInterval) - } + batch.Queue(queries[0], timeIntervalTemp) // TODO to be deleted once we fix the producer limitation + batch.Queue(queries[1], timeInterval) // TODO to be deleted once we fix the producer limitation + // for _, q := range queries { // TODO to be returned once we fix the producer limitation + // batch.Queue(q, timeInterval) + // } br := conn.SendBatch(ctx, batch) @@ -7414,3 +7475,27 @@ func GetMemphisFunctionsByMemphis() ([]models.Function, error) { } return functions, nil } + +func DeleteAttachedFunctionsByTenant(tenantName string) error { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + removeUserQuery := `DELETE FROM attached_functions WHERE tenant_name = $1` + + stmt, err := conn.Conn().Prepare(ctx, "remove_attached_functions_by_tenant", removeUserQuery) + if err != nil { + return err + } + _, err = conn.Conn().Exec(ctx, stmt.Name, tenantName) + if err != nil { + return err + } + + return nil +} diff --git a/models/dead_letter_station.go b/models/dead_letter_station.go index af59e46cb..28ece35c7 100644 --- a/models/dead_letter_station.go +++ b/models/dead_letter_station.go @@ -76,6 +76,23 @@ type DlsMessage struct { TenantName string `json:"tenant_name"` ProducerName string `json:"producer_name"` PartitionNumber int `json:"partition_number"` + FunctionId int `json:"function_id"` +} + +type DlsMessageRes struct { + ID int `json:"id"` + StationId int `json:"station_id"` + MessageSeq int `json:"message_seq"` + PoisonedCgs []string `json:"poisoned_cgs"` + MessageDetails MessagePayload `json:"message_details"` + UpdatedAt time.Time `json:"updated_at"` + MessageType string `json:"message_type"` + ValidationError string `json:"validation_error"` + TenantName string `json:"tenant_name"` + ProducerName string `json:"producer_name"` + PartitionNumber int `json:"partition_number"` + FunctionId int `json:"function_id"` + FunctionName string `json:"function_name"` } type DlsMsgResendAll struct { @@ -107,9 +124,10 @@ type LightDlsMessage struct { } type LightDlsMessageResponse struct { - MessageSeq int `json:"message_seq"` - ID int `json:"id"` - Message MessagePayload `json:"message"` + MessageSeq int `json:"message_seq"` + ID int `json:"id"` + Message MessagePayload `json:"message"` + FunctionName string `json:"function_name"` } type RetentionIntervalData struct { diff --git a/server/memphis_handlers_dls_messages.go b/server/memphis_handlers_dls_messages.go index 9cb330b65..fe5c71d63 100644 --- a/server/memphis_handlers_dls_messages.go +++ b/server/memphis_handlers_dls_messages.go @@ -196,21 +196,22 @@ func (s *Server) handleSchemaverseDlsMsg(msg []byte) { } } -func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station, partitionNumber int) ([]models.LightDlsMessageResponse, []models.LightDlsMessageResponse, int, error) { +func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station, partitionNumber int) ([]models.LightDlsMessageResponse, []models.LightDlsMessageResponse, []models.LightDlsMessageResponse, int, error) { poisonMessages := make([]models.LightDlsMessageResponse, 0) schemaMessages := make([]models.LightDlsMessageResponse, 0) + functionsMessages := make([]models.LightDlsMessageResponse, 0) - var dlsMsgs []models.DlsMessage + var dlsMsgs []models.DlsMessageRes var err error if partitionNumber == -1 { dlsMsgs, err = db.GetDlsMsgsByStationId(station.ID) if err != nil { - return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err + return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err } } else { dlsMsgs, err = db.GetDlsMsgsByStationAndPartition(station.ID, partitionNumber) if err != nil { - return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err + return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err } } @@ -227,10 +228,12 @@ func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station } switch v.MessageType { case "poison": - poisonMessages = append(poisonMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: messageDetails}) + poisonMessages = append(poisonMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: messageDetails, FunctionName: v.FunctionName}) case "schema": messageDetails.Size = len(v.MessageDetails.Data) + len(v.MessageDetails.Headers) - schemaMessages = append(schemaMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: v.MessageDetails}) + schemaMessages = append(schemaMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: v.MessageDetails, FunctionName: v.FunctionName}) + case "functions": + functionsMessages = append(functionsMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: v.MessageDetails, FunctionName: v.FunctionName}) } } @@ -239,7 +242,7 @@ func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station if len(dlsMsgs) >= 0 { totalDlsAmount, err = db.CountDlsMsgsByStationAndPartition(station.ID, partitionNumber) if err != nil { - return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err + return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err } } @@ -258,7 +261,7 @@ func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station if lenSchema > 1000 { schemaMessages = schemaMessages[:1000] } - return poisonMessages, schemaMessages, totalDlsAmount, nil + return poisonMessages, schemaMessages, functionsMessages, totalDlsAmount, nil } func (pmh PoisonMessagesHandler) GetDlsMessageDetailsById(messageId int, dlsType string, tenantName string) (models.DlsMessageResponse, error) { diff --git a/server/memphis_handlers_monitoring.go b/server/memphis_handlers_monitoring.go index 64eeda0f1..a7d8e191c 100644 --- a/server/memphis_handlers_monitoring.go +++ b/server/memphis_handlers_monitoring.go @@ -622,7 +622,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { } } - poisonMessages, schemaFailedMessages, totalDlsAmount, err := poisonMsgsHandler.GetDlsMsgsByStationLight(station, body.PartitionNumber) + poisonMessages, schemaFailedMessages, functionsMessages, totalDlsAmount, err := poisonMsgsHandler.GetDlsMsgsByStationLight(station, body.PartitionNumber) if err != nil { if IsNatsErr(err, JSStreamNotFoundErr) { serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetDlsMsgsByStationLight: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName) @@ -717,6 +717,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { "messages": messages, "poison_messages": poisonMessages, "schema_failed_messages": schemaFailedMessages, + "functions_failed_messages": functionsMessages, "tags": tags, "leader": leader, "followers": followers, @@ -747,6 +748,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { "messages": messages, "poison_messages": poisonMessages, "schema_failed_messages": schemaFailedMessages, + "functions_failed_messages": functionsMessages, "tags": tags, "leader": leader, "followers": followers, @@ -774,6 +776,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { "messages": messages, "poison_messages": poisonMessages, "schema_failed_messages": schemaFailedMessages, + "functions_failed_messages": functionsMessages, "tags": tags, "leader": leader, "followers": followers, diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index 7354c1338..a5ba9c6a8 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -670,7 +670,12 @@ func (sh StationsHandler) GetStationsDetails(tenantName string) ([]models.Extend Version: station.Version, } - exStations = append(exStations, models.ExtendedStationDetails{Station: stationRes, HasDlsMsgs: hasDlsMsgs, TotalMessages: totalMsgInfo, Tags: tags, Activity: activity}) + totalDlsMsgs, err := db.CountDlsMsgsByStationAndPartition(station.ID, -1) + if err != nil { + return []models.ExtendedStationDetails{}, err + } + + exStations = append(exStations, models.ExtendedStationDetails{Station: stationRes, HasDlsMsgs: hasDlsMsgs, TotalMessages: totalMsgInfo, Tags: tags, Activity: activity, PoisonMessages: totalDlsMsgs}) } if exStations == nil { return []models.ExtendedStationDetails{}, nil diff --git a/server/memphis_handlers_user_mgmt.go b/server/memphis_handlers_user_mgmt.go index 1037a985c..2a8bf004d 100644 --- a/server/memphis_handlers_user_mgmt.go +++ b/server/memphis_handlers_user_mgmt.go @@ -198,6 +198,10 @@ func removeTenantResources(tenantName string, user models.User) error { return err } + err = db.DeleteAttachedFunctionsByTenant(tenantName) + if err != nil { + return err + } if tenantName != MEMPHIS_GLOBAL_ACCOUNT { err = db.RemoveTenant(tenantName) if err != nil { diff --git a/server/memphis_handlers_ws.go b/server/memphis_handlers_ws.go index 2c65f0737..abbfcee2f 100644 --- a/server/memphis_handlers_ws.go +++ b/server/memphis_handlers_ws.go @@ -350,7 +350,7 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, } } - poisonMessages, schemaFailMessages, totalDlsAmount, err := h.PoisonMsgs.GetDlsMsgsByStationLight(station, partitionNumber) + poisonMessages, schemaFailMessages, functionsMessages, totalDlsAmount, err := h.PoisonMsgs.GetDlsMsgsByStationLight(station, partitionNumber) if err != nil { return map[string]any{}, err } @@ -397,6 +397,7 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, "messages": messages, "poison_messages": poisonMessages, "schema_failed_messages": schemaFailMessages, + "functions_failed_messages": functionsMessages, "tags": tags, "leader": leader, "followers": followers, @@ -424,6 +425,7 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, "messages": messages, "poison_messages": poisonMessages, "schema_failed_messages": schemaFailMessages, + "functions_failed_messages": functionsMessages, "tags": tags, "leader": leader, "followers": followers, @@ -467,6 +469,7 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, "messages": messages, "poison_messages": poisonMessages, "schema_failed_messages": schemaFailMessages, + "functions_failed_messages": functionsMessages, "tags": tags, "leader": leader, "followers": followers, From 523c9f8470b90717875dad3b7e9450bb07157bce Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Sun, 19 Nov 2023 21:47:15 +0200 Subject: [PATCH 2/5] fixes --- db/db.go | 24 ------------------------ server/memphis_handlers_user_mgmt.go | 4 ---- 2 files changed, 28 deletions(-) diff --git a/db/db.go b/db/db.go index d7c393bf2..49c58e827 100644 --- a/db/db.go +++ b/db/db.go @@ -7475,27 +7475,3 @@ func GetMemphisFunctionsByMemphis() ([]models.Function, error) { } return functions, nil } - -func DeleteAttachedFunctionsByTenant(tenantName string) error { - ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) - defer cancelfunc() - - conn, err := MetadataDbClient.Client.Acquire(ctx) - if err != nil { - return err - } - defer conn.Release() - - removeUserQuery := `DELETE FROM attached_functions WHERE tenant_name = $1` - - stmt, err := conn.Conn().Prepare(ctx, "remove_attached_functions_by_tenant", removeUserQuery) - if err != nil { - return err - } - _, err = conn.Conn().Exec(ctx, stmt.Name, tenantName) - if err != nil { - return err - } - - return nil -} diff --git a/server/memphis_handlers_user_mgmt.go b/server/memphis_handlers_user_mgmt.go index 2a8bf004d..1037a985c 100644 --- a/server/memphis_handlers_user_mgmt.go +++ b/server/memphis_handlers_user_mgmt.go @@ -198,10 +198,6 @@ func removeTenantResources(tenantName string, user models.User) error { return err } - err = db.DeleteAttachedFunctionsByTenant(tenantName) - if err != nil { - return err - } if tenantName != MEMPHIS_GLOBAL_ACCOUNT { err = db.RemoveTenant(tenantName) if err != nil { From 15263892c708a7b4cecac00708c1fc89f934ef28 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Sun, 19 Nov 2023 22:29:49 +0200 Subject: [PATCH 3/5] fixes --- db/db.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/db/db.go b/db/db.go index 49c58e827..907c8eb93 100644 --- a/db/db.go +++ b/db/db.go @@ -6712,14 +6712,10 @@ func DeleteOldProducersAndConsumers(timeInterval time.Time) ([]models.LightCG, e queries = append(queries, "DELETE FROM producers WHERE is_active = false AND updated_at < $1") queries = append(queries, "WITH deleted AS (DELETE FROM consumers WHERE is_active = false AND updated_at < $1 RETURNING *) SELECT deleted.consumers_group, s.name as station_name, deleted.station_id , deleted.tenant_name, deleted.partitions FROM deleted INNER JOIN stations s ON deleted.station_id = s.id GROUP BY deleted.consumers_group, s.name, deleted.station_id, deleted.tenant_name, deleted.partitions") - timeIntervalTemp := time.Now().Add(time.Duration(time.Hour * -time.Duration(2))) // TODO to be deleted once we fix the producer limitation - batch := &pgx.Batch{} - batch.Queue(queries[0], timeIntervalTemp) // TODO to be deleted once we fix the producer limitation - batch.Queue(queries[1], timeInterval) // TODO to be deleted once we fix the producer limitation - // for _, q := range queries { // TODO to be returned once we fix the producer limitation - // batch.Queue(q, timeInterval) - // } + for _, q := range queries { + batch.Queue(q, timeInterval) + } br := conn.SendBatch(ctx, batch) From e35d9c6d1f82c082ed84cd5ba55d48c3b7a0a34f Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Sun, 19 Nov 2023 22:32:53 +0200 Subject: [PATCH 4/5] fixes --- db/db.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/db/db.go b/db/db.go index 907c8eb93..ab9a625c9 100644 --- a/db/db.go +++ b/db/db.go @@ -514,7 +514,7 @@ func createTables(MetadataDbClient MetadataStorage) error { ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis'; ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS producer_name VARCHAR NOT NULL DEFAULT ''; ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS partition_number INTEGER NOT NULL DEFAULT -1; - ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS function_id INT NOT NULL DEFAULT -1; + ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS attached_function_id INT NOT NULL DEFAULT -1; DROP INDEX IF EXISTS dls_producer_id; IF EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'dls_messages' AND column_name = 'producer_id' @@ -547,7 +547,7 @@ func createTables(MetadataDbClient MetadataStorage) error { tenant_name VARCHAR NOT NULL DEFAULT '$memphis', producer_name VARCHAR NOT NULL, partition_number INTEGER NOT NULL DEFAULT -1, - function_id INT NOT NULL DEFAULT -1, + attached_function_id INT NOT NULL DEFAULT -1, PRIMARY KEY (id), CONSTRAINT fk_station_id FOREIGN KEY(station_id) @@ -5952,9 +5952,9 @@ func GetDlsMsgsByStationId(stationId int) ([]models.DlsMessageRes, error) { dlsm.tenant_name, dlsm.producer_name, dlsm.partition_number, - dlsm.function_id, + dlsm.attached_function_id, CASE - WHEN dlsm.function_id != -1 THEN s.function_name + WHEN dlsm.attached_function_id != -1 THEN s.function_name ELSE '' END AS function_name FROM @@ -5963,7 +5963,7 @@ func GetDlsMsgsByStationId(stationId int) ([]models.DlsMessageRes, error) { attached_functions AS s ON dlsm.station_id = s.station_id - AND dlsm.function_id = s.id + AND dlsm.attached_function_id = s.id WHERE dlsm.station_id = $1 ORDER BY updated_at DESC limit 1000 @@ -6008,9 +6008,9 @@ func GetDlsMsgsByStationAndPartition(stationId, partitionNumber int) ([]models.D dlsm.tenant_name, dlsm.producer_name, dlsm.partition_number, - dlsm.function_id, + dlsm.attached_function_id, CASE - WHEN dlsm.function_id != -1 THEN s.function_name + WHEN dlsm.attached_function_id != -1 THEN s.function_name ELSE '' END AS function_name FROM @@ -6019,7 +6019,7 @@ func GetDlsMsgsByStationAndPartition(stationId, partitionNumber int) ([]models.D attached_functions AS s ON dlsm.station_id = s.station_id - AND dlsm.function_id = s.id + AND dlsm.attached_function_id = s.id WHERE dlsm.station_id = $1 AND dlsm.partition_number = $2 From 8b91a6473ccebdd151111b29abffeb83b11d963d Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Mon, 20 Nov 2023 10:46:27 +0200 Subject: [PATCH 5/5] fixes --- db/db.go | 113 ------------------------ db/db_cloud.go | 67 ++++++++++++++ models/dead_letter_station.go | 16 ---- server/memphis_cloud.go | 68 ++++++++++++++ server/memphis_handlers_dls_messages.go | 68 -------------- 5 files changed, 135 insertions(+), 197 deletions(-) diff --git a/db/db.go b/db/db.go index ab9a625c9..8f63f82ef 100644 --- a/db/db.go +++ b/db/db.go @@ -5932,119 +5932,6 @@ func RemoveCgFromDlsMsg(msgId int, cgName string, tenantName string) error { return nil } -func GetDlsMsgsByStationId(stationId int) ([]models.DlsMessageRes, error) { - ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) - defer cancelfunc() - conn, err := MetadataDbClient.Client.Acquire(ctx) - if err != nil { - return []models.DlsMessageRes{}, err - } - defer conn.Release() - query := `SELECT - dlsm.id, - dlsm.station_id, - dlsm.message_seq, - dlsm.poisoned_cgs, - dlsm.message_details, - dlsm.updated_at, - dlsm.message_type, - dlsm.validation_error, - dlsm.tenant_name, - dlsm.producer_name, - dlsm.partition_number, - dlsm.attached_function_id, - CASE - WHEN dlsm.attached_function_id != -1 THEN s.function_name - ELSE '' - END AS function_name - FROM - dls_messages AS dlsm - LEFT JOIN - attached_functions AS s - ON - dlsm.station_id = s.station_id - AND dlsm.attached_function_id = s.id - WHERE - dlsm.station_id = $1 - ORDER BY updated_at DESC limit 1000 - ` - stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station", query) - if err != nil { - return []models.DlsMessageRes{}, err - } - rows, err := conn.Conn().Query(ctx, stmt.Name, stationId) - if err != nil { - return []models.DlsMessageRes{}, err - } - defer rows.Close() - dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessageRes]) - if err != nil { - return []models.DlsMessageRes{}, err - } - if len(dlsMsgs) == 0 { - return []models.DlsMessageRes{}, nil - } - - return dlsMsgs, nil -} - -func GetDlsMsgsByStationAndPartition(stationId, partitionNumber int) ([]models.DlsMessageRes, error) { - ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) - defer cancelfunc() - conn, err := MetadataDbClient.Client.Acquire(ctx) - if err != nil { - return []models.DlsMessageRes{}, err - } - defer conn.Release() - query := `SELECT - dlsm.id, - dlsm.station_id, - dlsm.message_seq, - dlsm.poisoned_cgs, - dlsm.message_details, - dlsm.updated_at, - dlsm.message_type, - dlsm.validation_error, - dlsm.tenant_name, - dlsm.producer_name, - dlsm.partition_number, - dlsm.attached_function_id, - CASE - WHEN dlsm.attached_function_id != -1 THEN s.function_name - ELSE '' - END AS function_name - FROM - dls_messages AS dlsm - LEFT JOIN - attached_functions AS s - ON - dlsm.station_id = s.station_id - AND dlsm.attached_function_id = s.id - WHERE - dlsm.station_id = $1 - AND dlsm.partition_number = $2 - ORDER BY updated_at DESC limit 1000; - ` - stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station_and_partition", query) - if err != nil { - return []models.DlsMessageRes{}, err - } - rows, err := conn.Conn().Query(ctx, stmt.Name, stationId, partitionNumber) - if err != nil { - return []models.DlsMessageRes{}, err - } - defer rows.Close() - dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessageRes]) - if err != nil { - return []models.DlsMessageRes{}, err - } - if len(dlsMsgs) == 0 { - return []models.DlsMessageRes{}, nil - } - - return dlsMsgs, nil -} - func CountDlsMsgsByStationAndPartition(stationId, partitionNumber int) (int, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() diff --git a/db/db_cloud.go b/db/db_cloud.go index 7cf03647d..62bbed50d 100644 --- a/db/db_cloud.go +++ b/db/db_cloud.go @@ -11,6 +11,14 @@ // A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services. package db +import ( + "context" + "time" + + "github.com/jackc/pgx/v5" + "github.com/memphisdev/memphis/models" +) + const testEventsTable = `` const functionsTable = `` const attachedFunctionsTable = `` @@ -50,3 +58,62 @@ func DeleteAndGetAttachedFunctionsByTenant(tenantName string) ([]FunctionSchema, func DeleteAllTestEvents(tenantName string) error { return nil } + +func GetDlsMsgsByStationId(stationId int) ([]models.DlsMessage, error) { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return []models.DlsMessage{}, err + } + defer conn.Release() + query := `SELECT * from dls_messages where station_id=$1 ORDER BY updated_at DESC limit 1000` + + stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station", query) + if err != nil { + return []models.DlsMessage{}, err + } + rows, err := conn.Conn().Query(ctx, stmt.Name, stationId) + if err != nil { + return []models.DlsMessage{}, err + } + defer rows.Close() + dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessage]) + if err != nil { + return []models.DlsMessage{}, err + } + if len(dlsMsgs) == 0 { + return []models.DlsMessage{}, nil + } + + return dlsMsgs, nil +} + +func GetDlsMsgsByStationAndPartition(stationId, partitionNumber int) ([]models.DlsMessage, error) { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return []models.DlsMessage{}, err + } + defer conn.Release() + query := `SELECT * from dls_messages where station_id=$1 AND partition_number = $2 ORDER BY updated_at DESC limit 2000` + stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station_and_partition", query) + if err != nil { + return []models.DlsMessage{}, err + } + rows, err := conn.Conn().Query(ctx, stmt.Name, stationId, partitionNumber) + if err != nil { + return []models.DlsMessage{}, err + } + defer rows.Close() + dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessage]) + if err != nil { + return []models.DlsMessage{}, err + } + if len(dlsMsgs) == 0 { + return []models.DlsMessage{}, nil + } + + return dlsMsgs, nil +} diff --git a/models/dead_letter_station.go b/models/dead_letter_station.go index 28ece35c7..dbf401b58 100644 --- a/models/dead_letter_station.go +++ b/models/dead_letter_station.go @@ -79,22 +79,6 @@ type DlsMessage struct { FunctionId int `json:"function_id"` } -type DlsMessageRes struct { - ID int `json:"id"` - StationId int `json:"station_id"` - MessageSeq int `json:"message_seq"` - PoisonedCgs []string `json:"poisoned_cgs"` - MessageDetails MessagePayload `json:"message_details"` - UpdatedAt time.Time `json:"updated_at"` - MessageType string `json:"message_type"` - ValidationError string `json:"validation_error"` - TenantName string `json:"tenant_name"` - ProducerName string `json:"producer_name"` - PartitionNumber int `json:"partition_number"` - FunctionId int `json:"function_id"` - FunctionName string `json:"function_name"` -} - type DlsMsgResendAll struct { MinId int `json:"min_id"` MaxId int `json:"max_id"` diff --git a/server/memphis_cloud.go b/server/memphis_cloud.go index 754bad202..c131feaea 100644 --- a/server/memphis_cloud.go +++ b/server/memphis_cloud.go @@ -2332,3 +2332,71 @@ func (s *Server) CreateStream(tenantName string, sn StationName, retentionType s func (s *Server) ConsumeFunctionTasks() { } + +func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station, partitionNumber int) ([]models.LightDlsMessageResponse, []models.LightDlsMessageResponse, []models.LightDlsMessageResponse, int, error) { + poisonMessages := make([]models.LightDlsMessageResponse, 0) + schemaMessages := make([]models.LightDlsMessageResponse, 0) + functionsMessages := make([]models.LightDlsMessageResponse, 0) + + var dlsMsgs []models.DlsMessage + var err error + if partitionNumber == -1 { + dlsMsgs, err = db.GetDlsMsgsByStationId(station.ID) + if err != nil { + return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err + } + } else { + dlsMsgs, err = db.GetDlsMsgsByStationAndPartition(station.ID, partitionNumber) + if err != nil { + return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err + } + } + + for _, v := range dlsMsgs { + data := v.MessageDetails.Data + if len(data) > 80 { // get the first chars for preview needs + data = data[0:80] + } + messageDetails := models.MessagePayload{ + TimeSent: v.MessageDetails.TimeSent, + Size: v.MessageDetails.Size, + Data: data, + Headers: v.MessageDetails.Headers, + } + switch v.MessageType { + case "poison": + poisonMessages = append(poisonMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: messageDetails}) + case "schema": + messageDetails.Size = len(v.MessageDetails.Data) + len(v.MessageDetails.Headers) + schemaMessages = append(schemaMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: v.MessageDetails}) + case "functions": + functionsMessages = append(functionsMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: v.MessageDetails}) + } + } + + lenPoison, lenSchema := len(poisonMessages), len(schemaMessages) + totalDlsAmount := 0 + if len(dlsMsgs) >= 0 { + totalDlsAmount, err = db.CountDlsMsgsByStationAndPartition(station.ID, partitionNumber) + if err != nil { + return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err + } + } + + sort.Slice(poisonMessages, func(i, j int) bool { + return poisonMessages[i].Message.TimeSent.After(poisonMessages[j].Message.TimeSent) + }) + + sort.Slice(schemaMessages, func(i, j int) bool { + return schemaMessages[i].Message.TimeSent.After(schemaMessages[j].Message.TimeSent) + }) + + if lenPoison > 1000 { + poisonMessages = poisonMessages[:1000] + } + + if lenSchema > 1000 { + schemaMessages = schemaMessages[:1000] + } + return poisonMessages, schemaMessages, functionsMessages, totalDlsAmount, nil +} diff --git a/server/memphis_handlers_dls_messages.go b/server/memphis_handlers_dls_messages.go index fe5c71d63..877ccf06f 100644 --- a/server/memphis_handlers_dls_messages.go +++ b/server/memphis_handlers_dls_messages.go @@ -196,74 +196,6 @@ func (s *Server) handleSchemaverseDlsMsg(msg []byte) { } } -func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station, partitionNumber int) ([]models.LightDlsMessageResponse, []models.LightDlsMessageResponse, []models.LightDlsMessageResponse, int, error) { - poisonMessages := make([]models.LightDlsMessageResponse, 0) - schemaMessages := make([]models.LightDlsMessageResponse, 0) - functionsMessages := make([]models.LightDlsMessageResponse, 0) - - var dlsMsgs []models.DlsMessageRes - var err error - if partitionNumber == -1 { - dlsMsgs, err = db.GetDlsMsgsByStationId(station.ID) - if err != nil { - return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err - } - } else { - dlsMsgs, err = db.GetDlsMsgsByStationAndPartition(station.ID, partitionNumber) - if err != nil { - return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err - } - } - - for _, v := range dlsMsgs { - data := v.MessageDetails.Data - if len(data) > 80 { // get the first chars for preview needs - data = data[0:80] - } - messageDetails := models.MessagePayload{ - TimeSent: v.MessageDetails.TimeSent, - Size: v.MessageDetails.Size, - Data: data, - Headers: v.MessageDetails.Headers, - } - switch v.MessageType { - case "poison": - poisonMessages = append(poisonMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: messageDetails, FunctionName: v.FunctionName}) - case "schema": - messageDetails.Size = len(v.MessageDetails.Data) + len(v.MessageDetails.Headers) - schemaMessages = append(schemaMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: v.MessageDetails, FunctionName: v.FunctionName}) - case "functions": - functionsMessages = append(functionsMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: v.MessageDetails, FunctionName: v.FunctionName}) - } - } - - lenPoison, lenSchema := len(poisonMessages), len(schemaMessages) - totalDlsAmount := 0 - if len(dlsMsgs) >= 0 { - totalDlsAmount, err = db.CountDlsMsgsByStationAndPartition(station.ID, partitionNumber) - if err != nil { - return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err - } - } - - sort.Slice(poisonMessages, func(i, j int) bool { - return poisonMessages[i].Message.TimeSent.After(poisonMessages[j].Message.TimeSent) - }) - - sort.Slice(schemaMessages, func(i, j int) bool { - return schemaMessages[i].Message.TimeSent.After(schemaMessages[j].Message.TimeSent) - }) - - if lenPoison > 1000 { - poisonMessages = poisonMessages[:1000] - } - - if lenSchema > 1000 { - schemaMessages = schemaMessages[:1000] - } - return poisonMessages, schemaMessages, functionsMessages, totalDlsAmount, nil -} - func (pmh PoisonMessagesHandler) GetDlsMessageDetailsById(messageId int, dlsType string, tenantName string) (models.DlsMessageResponse, error) { exist, dlsMessage, err := db.GetDlsMessageById(messageId) if err != nil {