Skip to content

Commit

Permalink
producer can send to the first function if exists
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-davidd committed Oct 30, 2023
1 parent 1bf80b4 commit 59a0fd8
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 7 deletions.
7 changes: 7 additions & 0 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
)

var stationUpdatesSubsLock sync.Mutex
var stationFunctionsSubsLock sync.Mutex
var lockProducersMap sync.Mutex

var applicationId string
Expand Down Expand Up @@ -158,7 +159,9 @@ type Conn struct {
brokerConn *nats.Conn
js nats.JetStreamContext
stationUpdatesMu sync.RWMutex
stationFunctionsMu sync.RWMutex
stationUpdatesSubs map[string]*stationUpdateSub
stationFunctionSubs map[string]*stationFunctionSub
stationPartitions map[string]*PartitionsUpdate
sdkClientsUpdatesMu sync.RWMutex
clientsUpdatesSub sdkClientsUpdateSub
Expand Down Expand Up @@ -306,6 +309,10 @@ func (opts Options) connect() (*Conn, error) {
stationUpdatesSubsLock.Lock()
defer stationUpdatesSubsLock.Unlock()
c.stationUpdatesSubs = make(map[string]*stationUpdateSub)

stationFunctionsSubsLock.Lock()
defer stationFunctionsSubsLock.Unlock()
c.stationFunctionSubs = make(map[string]*stationFunctionSub)
c.stationPartitions = make(map[string]*PartitionsUpdate)

return &c, nil
Expand Down
40 changes: 33 additions & 7 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"log"
"reflect"
"strconv"
"strings"
"time"

Expand All @@ -30,12 +31,13 @@ import (
)

const (
schemaUpdatesSubjectTemplate = "$memphis_schema_updates_%s"
memphisNotificationsSubject = "$memphis_notifications"
schemaVFailAlertType = "schema_validation_fail_alert"
lastProducerCreationReqVersion = 3
schemaVerseDlsSubject = "$memphis_schemaverse_dls"
lastProducerDestroyReqVersion = 1
schemaUpdatesSubjectTemplate = "$memphis_schema_updates_%s"
functionsUpdatesSubjectTemplate = "$memphis_functions_updates_%s"
memphisNotificationsSubject = "$memphis_notifications"
schemaVFailAlertType = "schema_validation_fail_alert"
lastProducerCreationReqVersion = 3
schemaVerseDlsSubject = "$memphis_schemaverse_dls"
lastProducerDestroyReqVersion = 1
)

// Producer - memphis producer object.
Expand All @@ -62,6 +64,8 @@ type createProducerResp struct {
PartitionsUpdate PartitionsUpdate `json:"partitions_update"`
SchemaVerseToDls bool `json:"schemaverse_to_dls"`
ClusterSendNotification bool `json:"send_notification"`
StationVersion int `json:"station_version"`
StationFirstFunctions map[int]int `json:"station_first_functions"`
Err string `json:"error"`
}

Expand Down Expand Up @@ -279,6 +283,13 @@ func (p *Producer) handleCreationResp(resp []byte) error {
p.PartitionGenerator = pg
}

if cr.StationVersion > 0 {
err = p.conn.listenToFunctionsUpdates(p.stationName, cr.StationFirstFunctions)
if err != nil {
return memphisError(err)
}
}

p.conn.sdkClientsUpdatesMu.Lock()
cu := &p.conn.clientsUpdatesSub
cu.ClusterConfigurations["send_notification"] = cr.ClusterSendNotification
Expand Down Expand Up @@ -409,9 +420,24 @@ func (opts *ProduceOpts) produce(p *Producer) error {
streamName = sn
}

var fullSubjectName string
if _, ok := p.conn.stationFunctionSubs[sn]; ok {
partitionNumber, err := strconv.Atoi(strings.Split(streamName, "$")[1])
if err != nil {
return memphisError(err)
}
if funcID, ok := p.conn.stationFunctionSubs[sn].FunctionsDetails.PartitionsFunctions[partitionNumber]; ok {
fullSubjectName = fmt.Sprintf("%v.%v", streamName, funcID)
} else {
fullSubjectName = streamName + ".final"
}
} else {
fullSubjectName = streamName + ".final"
}

natsMessage := nats.Msg{
Header: opts.MsgHeaders.MsgHeaders,
Subject: streamName + ".final",
Subject: fullSubjectName,
Data: data,
}

Expand Down
76 changes: 76 additions & 0 deletions station.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,21 @@ type stationUpdateSub struct {
schemaDetails schemaDetails
}

type stationFunctionSub struct {
FunctionsUpdateCh chan FunctionsUpdate
FunctionsUpdateSub *nats.Subscription
FunctionsDetails functionsDetails
}

type FunctionsUpdate struct {
UpdateType string
Functions map[int]int
}

type functionsDetails struct {
PartitionsFunctions map[int]int `json:"partitions_functions"`
}

type schemaDetails struct {
name string
schemaType string
Expand Down Expand Up @@ -378,6 +393,34 @@ func (c *Conn) listenToSchemaUpdates(stationName string) error {
return nil
}

func (c *Conn) listenToFunctionsUpdates(stationName string, initialFunctionsMap map[int]int) error {
sn := getInternalName(stationName)
stationFunctionsSubsLock.Lock()
defer stationFunctionsSubsLock.Unlock()
_, ok := c.stationFunctionSubs[sn]
if !ok {
c.stationFunctionSubs[sn] = &stationFunctionSub{
FunctionsUpdateCh: make(chan FunctionsUpdate),
FunctionsDetails: functionsDetails{
PartitionsFunctions: initialFunctionsMap,
},
}
sfs := c.stationFunctionSubs[sn]
functionsUpdatesSubject := fmt.Sprintf(functionsUpdatesSubjectTemplate, sn)
go sfs.functionsUpdatesHandler(&c.stationFunctionsMu)
var err error
sfs.FunctionsUpdateSub, err = c.brokerConn.Subscribe(functionsUpdatesSubject, sfs.createMsgHandler())
if err != nil {
close(sfs.FunctionsUpdateCh)
return memphisError(err)
}

return nil
}

return nil
}

func (sus *stationUpdateSub) createMsgHandler() nats.MsgHandler {
return func(msg *nats.Msg) {
var update SchemaUpdate
Expand All @@ -390,6 +433,18 @@ func (sus *stationUpdateSub) createMsgHandler() nats.MsgHandler {
}
}

func (sfs *stationFunctionSub) createMsgHandler() nats.MsgHandler {
return func(msg *nats.Msg) {
var update FunctionsUpdate
err := json.Unmarshal(msg.Data, &update)
if err != nil {
log.Printf("functions update unmarshal error: %v\n", memphisError(err))
return
}
sfs.FunctionsUpdateCh <- update
}
}

func (c *Conn) removeSchemaUpdatesListener(stationName string) error {
sn := getInternalName(stationName)

Expand Down Expand Up @@ -447,6 +502,27 @@ func (sus *stationUpdateSub) schemaUpdatesHandler(lock *sync.RWMutex) {
}
}

func (sfs *stationFunctionSub) functionsUpdatesHandler(lock *sync.RWMutex) {
for {
update, ok := <-sfs.FunctionsUpdateCh
if !ok {
return
}

lock.Lock()
if update.UpdateType == "modify" {
for partition, funcID := range update.Functions {
sfs.FunctionsDetails.PartitionsFunctions[partition] = funcID
}
} else if update.UpdateType == "drop" {
for partition := range update.Functions {
delete(sfs.FunctionsDetails.PartitionsFunctions, partition)
}
}
lock.Unlock()
}
}

func (sd *schemaDetails) handleSchemaUpdateInit(sui SchemaUpdateInit) {
sd.name = sui.SchemaName
sd.schemaType = sui.SchemaType
Expand Down

0 comments on commit 59a0fd8

Please sign in to comment.