Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
shohamroditimemphis committed Nov 21, 2023
1 parent 921a462 commit 68dd617
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 29 deletions.
25 changes: 14 additions & 11 deletions models/dead_letter_station.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ type MessagePayload struct {
Headers map[string]string `json:"headers"`
}

type MessagePayloadFunctionDls struct {
TimeSent time.Time `json:"time_sent"`
Size int `json:"size"`
Data []byte `json:"data"`
Headers map[string]string `json:"headers"`
}

type PoisonedCg struct {
CgName string `json:"cg_name"`
UnprocessedMessages int `json:"unprocessed_messages"`
Expand All @@ -64,18 +71,14 @@ type SchemaVerseDlsMessageSdk struct {
PartitionNumber int `json:"partition_number"`
}

type FunctionsMessagePayload struct {
TimeSent time.Time `json:"time_sent"`
Size int `json:"size"`
Data []byte `json:"data"`
Headers map[string]string `json:"headers"`
}

type FunctionsDlsMessage struct {
StationID int `json:"station_id"`
Message MessagePayload `json:"message"`
Err string `json:"err"`
PartitionNumber int `json:"partition_number"`
StationID int `json:"station_id"`
TenantName string `json:"tenant_name"`
Message MessagePayloadFunctionDls `json:"message"`
Err string `json:"err"`
PartitionNumber int `json:"partition_number"`
FunctionID int `json:"function_id"`
FunctionName string `json:"function_name"`
}

type DlsMessage struct {
Expand Down
8 changes: 1 addition & 7 deletions server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ const TIERED_STORAGE_CONSUMER = "$memphis_tiered_storage_consumer"
const DLS_UNACKED_CONSUMER = "$memphis_dls_unacked_consumer"
const SCHEMAVERSE_DLS_SUBJ = "$memphis_schemaverse_dls"
const SCHEMAVERSE_DLS_INNER_SUBJ = "$memphis_schemaverse_inner_dls"
const FUNCTIONS_DLS_INNER_SUBJ = "$memphis_functions_inner_dls"
const SCHEMAVERSE_DLS_CONSUMER = "$memphis_schemaverse_dls_consumer"
const FUNCTIONS_DLS_INNER_SUBJ = "$memphis_functions_inner_dls"
const FUNCTIONS_DLS_CONSUMER = "$memphis_functions_dls_consumer"
const CACHE_UDATES_SUBJ = "$memphis_cache_updates"
const INTEGRATIONS_AUDIT_LOGS_CONSUMER = "$memphis_integrations_audit_logs_consumer"
Expand Down Expand Up @@ -310,12 +310,6 @@ func (s *Server) StartBackgroundTasks() error {
return errors.New("Failed to subscribing for schemaverse dls" + err.Error())
}

err = s.ListenForFunctionsDlsEvents()
if err != nil {
errMsg := fmt.Errorf("Failed to subscribing for functions dls %s ", err.Error())
return errMsg
}

err = s.ListenForPoisonMsgAcks()
if err != nil {
return errors.New("Failed subscribing for poison message acks: " + err.Error())
Expand Down
8 changes: 4 additions & 4 deletions server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2405,10 +2405,10 @@ func (s *Server) ScaleFunctionWorkers() {
return
}

func (s *Server) ListenForFunctionsDlsEvents() error {
return nil
}

func (s *Server) ConsumeFunctionsDlsMessages() {

}

func shouldCreateFunctionDlsStream() bool {
return false
}
9 changes: 6 additions & 3 deletions server/memphis_handlers_dls_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (s *Server) handleNewUnackedMsg(msg []byte) error {
return err
}
if !updated {
err = s.sendToDlsStation(station, data, headersJson, "unacked")
err = s.sendToDlsStation(station, data, headersJson, "unacked", "")
if err != nil {
serv.Errorf("[tenant: %v]handleNewUnackedMsg at sendToDlsStation: station: %v, Error while getting notified about a poison message: %v", station.TenantName, station.DlsStation, err.Error())
return err
Expand Down Expand Up @@ -189,7 +189,7 @@ func (s *Server) handleSchemaverseDlsMsg(msg []byte) {
serv.Errorf("[tenant: %v]handleSchemaverseDlsMsg at DecodeString: %v", tenantName, err.Error())
return
}
err = s.sendToDlsStation(station, data, message.Message.Headers, "failed_schema")
err = s.sendToDlsStation(station, data, message.Message.Headers, "failed_schema", "")
if err != nil {
serv.Errorf("[tenant: %v]handleSchemaverseDlsMsg at sendToDlsStation: station: %v, Error while getting notified about a poison message: %v", tenantName, station.DlsStation, err.Error())
return
Expand Down Expand Up @@ -366,7 +366,7 @@ func GetPoisonedCgsByMessage(station models.Station, messageSeq, partitionNumber
return poisonedCgs, nil
}

func (s *Server) sendToDlsStation(station models.Station, messagePayload []byte, headers map[string]string, dlsType string) error {
func (s *Server) sendToDlsStation(station models.Station, messagePayload []byte, headers map[string]string, dlsType, functionName string) error {
if station.DlsStation != "" {
exist, dlsStation, err := db.GetStationByName(station.DlsStation, station.TenantName)
if err != nil {
Expand Down Expand Up @@ -397,6 +397,9 @@ func (s *Server) sendToDlsStation(station models.Station, messagePayload []byte,
}
headers["station"] = station.Name
headers["type"] = dlsType
if dlsType == "functions" {
headers["function_name"] = functionName
}
s.sendInternalAccountMsgWithHeadersWithEcho(acc, subject, messagePayload, headers)
}
}
Expand Down
6 changes: 2 additions & 4 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ var memphisExportString = `[
{service: "$memphis_pm_acks"},
{service: "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.>"},
{stream: "$memphis_ws_pubs.>"},
{service: "$memphis_functions_dls"},
]
`

Expand All @@ -106,7 +105,6 @@ var memphisImportString = `[
{service: {account: "$memphis", subject: "$memphis_pm_acks"}},
{service: {account: "$memphis", subject: "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.>"}},
{stream: {account: "$memphis", subject: "$memphis_ws_pubs.>"}},
{service: {account: "$memphis", subject: "$memphis_functions_dls"}},
]
`

Expand Down Expand Up @@ -433,7 +431,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration,
}

// create functions dls stream
if !DLS_FUNCTIONS_STREAM_CREATED {
if shouldCreateFunctionDlsStream() && !DLS_FUNCTIONS_STREAM_CREATED {
err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{
Name: dlsFunctionsStream,
Subjects: []string{FUNCTIONS_DLS_INNER_SUBJ},
Expand All @@ -452,7 +450,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration,
}

// create functions dls consumer
if !DLS_FUNCTIONS_CONSUMER_CREATED {
if shouldCreateFunctionDlsStream() && !DLS_FUNCTIONS_CONSUMER_CREATED {
cc := ConsumerConfig{
DeliverPolicy: DeliverAll,
AckPolicy: AckExplicit,
Expand Down

0 comments on commit 68dd617

Please sign in to comment.