Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RND-187-add-support-for-functions-type-in-the-dls #1424

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 3 additions & 59 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis';
ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS producer_name VARCHAR NOT NULL DEFAULT '';
ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS partition_number INTEGER NOT NULL DEFAULT -1;
ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS attached_function_id INT NOT NULL DEFAULT -1;
DROP INDEX IF EXISTS dls_producer_id;
IF EXISTS (
SELECT 1 FROM information_schema.columns WHERE table_name = 'dls_messages' AND column_name = 'producer_id'
Expand Down Expand Up @@ -546,6 +547,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
tenant_name VARCHAR NOT NULL DEFAULT '$memphis',
producer_name VARCHAR NOT NULL,
partition_number INTEGER NOT NULL DEFAULT -1,
attached_function_id INT NOT NULL DEFAULT -1,
PRIMARY KEY (id),
CONSTRAINT fk_station_id
FOREIGN KEY(station_id)
Expand Down Expand Up @@ -5745,7 +5747,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Detail != "" {
if !strings.Contains(pgErr.Detail, "already exists") {
if strings.Contains(pgErr.Detail, "already exists") {
return 0, updated, errors.New("dls_messages row already exists")
} else {
return 0, updated, errors.New(pgErr.Detail)
Expand Down Expand Up @@ -5930,64 +5932,6 @@ func RemoveCgFromDlsMsg(msgId int, cgName string, tenantName string) error {
return nil
}

func GetDlsMsgsByStationId(stationId int) ([]models.DlsMessage, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return []models.DlsMessage{}, err
}
defer conn.Release()
query := `SELECT * from dls_messages where station_id=$1 ORDER BY updated_at DESC limit 1000`
stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station", query)
if err != nil {
return []models.DlsMessage{}, err
}
rows, err := conn.Conn().Query(ctx, stmt.Name, stationId)
if err != nil {
return []models.DlsMessage{}, err
}
defer rows.Close()
dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessage])
if err != nil {
return []models.DlsMessage{}, err
}
if len(dlsMsgs) == 0 {
return []models.DlsMessage{}, nil
}

return dlsMsgs, nil
}

func GetDlsMsgsByStationAndPartition(stationId, partitionNumber int) ([]models.DlsMessage, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return []models.DlsMessage{}, err
}
defer conn.Release()
query := `SELECT * from dls_messages where station_id=$1 AND partition_number = $2 ORDER BY updated_at DESC limit 2000`
stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station_and_partition", query)
if err != nil {
return []models.DlsMessage{}, err
}
rows, err := conn.Conn().Query(ctx, stmt.Name, stationId, partitionNumber)
if err != nil {
return []models.DlsMessage{}, err
}
defer rows.Close()
dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessage])
if err != nil {
return []models.DlsMessage{}, err
}
if len(dlsMsgs) == 0 {
return []models.DlsMessage{}, nil
}

return dlsMsgs, nil
}

func CountDlsMsgsByStationAndPartition(stationId, partitionNumber int) (int, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
Expand Down
67 changes: 67 additions & 0 deletions db/db_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
// A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services.
package db

import (
"context"
"time"

"github.com/jackc/pgx/v5"
"github.com/memphisdev/memphis/models"
)

const testEventsTable = ``
const functionsTable = ``
const attachedFunctionsTable = ``
Expand Down Expand Up @@ -50,3 +58,62 @@ func DeleteAndGetAttachedFunctionsByTenant(tenantName string) ([]FunctionSchema,
func DeleteAllTestEvents(tenantName string) error {
return nil
}

func GetDlsMsgsByStationId(stationId int) ([]models.DlsMessage, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return []models.DlsMessage{}, err
}
defer conn.Release()
query := `SELECT * from dls_messages where station_id=$1 ORDER BY updated_at DESC limit 1000`

stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station", query)
if err != nil {
return []models.DlsMessage{}, err
}
rows, err := conn.Conn().Query(ctx, stmt.Name, stationId)
if err != nil {
return []models.DlsMessage{}, err
}
defer rows.Close()
dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessage])
if err != nil {
return []models.DlsMessage{}, err
}
if len(dlsMsgs) == 0 {
return []models.DlsMessage{}, nil
}

return dlsMsgs, nil
}

func GetDlsMsgsByStationAndPartition(stationId, partitionNumber int) ([]models.DlsMessage, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return []models.DlsMessage{}, err
}
defer conn.Release()
query := `SELECT * from dls_messages where station_id=$1 AND partition_number = $2 ORDER BY updated_at DESC limit 2000`
stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station_and_partition", query)
if err != nil {
return []models.DlsMessage{}, err
}
rows, err := conn.Conn().Query(ctx, stmt.Name, stationId, partitionNumber)
if err != nil {
return []models.DlsMessage{}, err
}
defer rows.Close()
dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessage])
if err != nil {
return []models.DlsMessage{}, err
}
if len(dlsMsgs) == 0 {
return []models.DlsMessage{}, nil
}

return dlsMsgs, nil
}
8 changes: 5 additions & 3 deletions models/dead_letter_station.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type DlsMessage struct {
TenantName string `json:"tenant_name"`
ProducerName string `json:"producer_name"`
PartitionNumber int `json:"partition_number"`
FunctionId int `json:"function_id"`
}

type DlsMsgResendAll struct {
Expand Down Expand Up @@ -107,9 +108,10 @@ type LightDlsMessage struct {
}

type LightDlsMessageResponse struct {
MessageSeq int `json:"message_seq"`
ID int `json:"id"`
Message MessagePayload `json:"message"`
MessageSeq int `json:"message_seq"`
ID int `json:"id"`
Message MessagePayload `json:"message"`
FunctionName string `json:"function_name"`
}

type RetentionIntervalData struct {
Expand Down
68 changes: 68 additions & 0 deletions server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2332,3 +2332,71 @@ func (s *Server) CreateStream(tenantName string, sn StationName, retentionType s

func (s *Server) ConsumeFunctionTasks() {
}

func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station, partitionNumber int) ([]models.LightDlsMessageResponse, []models.LightDlsMessageResponse, []models.LightDlsMessageResponse, int, error) {
poisonMessages := make([]models.LightDlsMessageResponse, 0)
schemaMessages := make([]models.LightDlsMessageResponse, 0)
functionsMessages := make([]models.LightDlsMessageResponse, 0)

var dlsMsgs []models.DlsMessage
var err error
if partitionNumber == -1 {
dlsMsgs, err = db.GetDlsMsgsByStationId(station.ID)
if err != nil {
return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err
}
} else {
dlsMsgs, err = db.GetDlsMsgsByStationAndPartition(station.ID, partitionNumber)
if err != nil {
return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err
}
}

for _, v := range dlsMsgs {
data := v.MessageDetails.Data
if len(data) > 80 { // get the first chars for preview needs
data = data[0:80]
}
messageDetails := models.MessagePayload{
TimeSent: v.MessageDetails.TimeSent,
Size: v.MessageDetails.Size,
Data: data,
Headers: v.MessageDetails.Headers,
}
switch v.MessageType {
case "poison":
poisonMessages = append(poisonMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: messageDetails})
case "schema":
messageDetails.Size = len(v.MessageDetails.Data) + len(v.MessageDetails.Headers)
schemaMessages = append(schemaMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: v.MessageDetails})
case "functions":
functionsMessages = append(functionsMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: v.MessageDetails})
}
}

lenPoison, lenSchema := len(poisonMessages), len(schemaMessages)
totalDlsAmount := 0
if len(dlsMsgs) >= 0 {
totalDlsAmount, err = db.CountDlsMsgsByStationAndPartition(station.ID, partitionNumber)
if err != nil {
return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err
}
}

sort.Slice(poisonMessages, func(i, j int) bool {
return poisonMessages[i].Message.TimeSent.After(poisonMessages[j].Message.TimeSent)
})

sort.Slice(schemaMessages, func(i, j int) bool {
return schemaMessages[i].Message.TimeSent.After(schemaMessages[j].Message.TimeSent)
})

if lenPoison > 1000 {
poisonMessages = poisonMessages[:1000]
}

if lenSchema > 1000 {
schemaMessages = schemaMessages[:1000]
}
return poisonMessages, schemaMessages, functionsMessages, totalDlsAmount, nil
}
65 changes: 0 additions & 65 deletions server/memphis_handlers_dls_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,71 +196,6 @@ func (s *Server) handleSchemaverseDlsMsg(msg []byte) {
}
}

func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station, partitionNumber int) ([]models.LightDlsMessageResponse, []models.LightDlsMessageResponse, int, error) {
poisonMessages := make([]models.LightDlsMessageResponse, 0)
schemaMessages := make([]models.LightDlsMessageResponse, 0)

var dlsMsgs []models.DlsMessage
var err error
if partitionNumber == -1 {
dlsMsgs, err = db.GetDlsMsgsByStationId(station.ID)
if err != nil {
return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err
}
} else {
dlsMsgs, err = db.GetDlsMsgsByStationAndPartition(station.ID, partitionNumber)
if err != nil {
return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err
}
}

for _, v := range dlsMsgs {
data := v.MessageDetails.Data
if len(data) > 80 { // get the first chars for preview needs
data = data[0:80]
}
messageDetails := models.MessagePayload{
TimeSent: v.MessageDetails.TimeSent,
Size: v.MessageDetails.Size,
Data: data,
Headers: v.MessageDetails.Headers,
}
switch v.MessageType {
case "poison":
poisonMessages = append(poisonMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: messageDetails})
case "schema":
messageDetails.Size = len(v.MessageDetails.Data) + len(v.MessageDetails.Headers)
schemaMessages = append(schemaMessages, models.LightDlsMessageResponse{MessageSeq: v.MessageSeq, ID: v.ID, Message: v.MessageDetails})
}
}

lenPoison, lenSchema := len(poisonMessages), len(schemaMessages)
totalDlsAmount := 0
if len(dlsMsgs) >= 0 {
totalDlsAmount, err = db.CountDlsMsgsByStationAndPartition(station.ID, partitionNumber)
if err != nil {
return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err
}
}

sort.Slice(poisonMessages, func(i, j int) bool {
return poisonMessages[i].Message.TimeSent.After(poisonMessages[j].Message.TimeSent)
})

sort.Slice(schemaMessages, func(i, j int) bool {
return schemaMessages[i].Message.TimeSent.After(schemaMessages[j].Message.TimeSent)
})

if lenPoison > 1000 {
poisonMessages = poisonMessages[:1000]
}

if lenSchema > 1000 {
schemaMessages = schemaMessages[:1000]
}
return poisonMessages, schemaMessages, totalDlsAmount, nil
}

func (pmh PoisonMessagesHandler) GetDlsMessageDetailsById(messageId int, dlsType string, tenantName string) (models.DlsMessageResponse, error) {
exist, dlsMessage, err := db.GetDlsMessageById(messageId)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion server/memphis_handlers_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
}
}

poisonMessages, schemaFailedMessages, totalDlsAmount, err := poisonMsgsHandler.GetDlsMsgsByStationLight(station, body.PartitionNumber)
poisonMessages, schemaFailedMessages, functionsMessages, totalDlsAmount, err := poisonMsgsHandler.GetDlsMsgsByStationLight(station, body.PartitionNumber)
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetDlsMsgsByStationLight: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName)
Expand Down Expand Up @@ -717,6 +717,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"messages": messages,
"poison_messages": poisonMessages,
"schema_failed_messages": schemaFailedMessages,
"functions_failed_messages": functionsMessages,
"tags": tags,
"leader": leader,
"followers": followers,
Expand Down Expand Up @@ -747,6 +748,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"messages": messages,
"poison_messages": poisonMessages,
"schema_failed_messages": schemaFailedMessages,
"functions_failed_messages": functionsMessages,
"tags": tags,
"leader": leader,
"followers": followers,
Expand Down Expand Up @@ -774,6 +776,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"messages": messages,
"poison_messages": poisonMessages,
"schema_failed_messages": schemaFailedMessages,
"functions_failed_messages": functionsMessages,
"tags": tags,
"leader": leader,
"followers": followers,
Expand Down
7 changes: 6 additions & 1 deletion server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,12 @@ func (sh StationsHandler) GetStationsDetails(tenantName string) ([]models.Extend
Version: station.Version,
}

exStations = append(exStations, models.ExtendedStationDetails{Station: stationRes, HasDlsMsgs: hasDlsMsgs, TotalMessages: totalMsgInfo, Tags: tags, Activity: activity})
totalDlsMsgs, err := db.CountDlsMsgsByStationAndPartition(station.ID, -1)
if err != nil {
return []models.ExtendedStationDetails{}, err
}

exStations = append(exStations, models.ExtendedStationDetails{Station: stationRes, HasDlsMsgs: hasDlsMsgs, TotalMessages: totalMsgInfo, Tags: tags, Activity: activity, PoisonMessages: totalDlsMsgs})
}
if exStations == nil {
return []models.ExtendedStationDetails{}, nil
Expand Down
Loading