Skip to content

Commit

Permalink
resolved comments
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-davidd committed Oct 31, 2023
1 parent 59a0fd8 commit ccbe9dd
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
1 change: 0 additions & 1 deletion connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ type Conn struct {
brokerConn *nats.Conn
js nats.JetStreamContext
stationUpdatesMu sync.RWMutex
stationFunctionsMu sync.RWMutex
stationUpdatesSubs map[string]*stationUpdateSub
stationFunctionSubs map[string]*stationFunctionSub
stationPartitions map[string]*PartitionsUpdate
Expand Down
27 changes: 16 additions & 11 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ type createProducerReq struct {
}

type createProducerResp struct {
SchemaUpdateInit SchemaUpdateInit `json:"schema_update"`
PartitionsUpdate PartitionsUpdate `json:"partitions_update"`
SchemaVerseToDls bool `json:"schemaverse_to_dls"`
ClusterSendNotification bool `json:"send_notification"`
StationVersion int `json:"station_version"`
StationFirstFunctions map[int]int `json:"station_first_functions"`
Err string `json:"error"`
SchemaUpdateInit SchemaUpdateInit `json:"schema_update"`
PartitionsUpdate PartitionsUpdate `json:"partitions_update"`
SchemaVerseToDls bool `json:"schemaverse_to_dls"`
ClusterSendNotification bool `json:"send_notification"`
StationVersion int `json:"station_version"`
StationPartitionsFirstFunctions map[int]int `json:"station_partitions_first_functions"`
Err string `json:"error"`
}

type SchemaUpdateType int
Expand Down Expand Up @@ -284,7 +284,7 @@ func (p *Producer) handleCreationResp(resp []byte) error {
}

if cr.StationVersion > 0 {
err = p.conn.listenToFunctionsUpdates(p.stationName, cr.StationFirstFunctions)
err = p.conn.listenToFunctionsUpdates(p.stationName, cr.StationPartitionsFirstFunctions)
if err != nil {
return memphisError(err)
}
Expand Down Expand Up @@ -421,16 +421,21 @@ func (opts *ProduceOpts) produce(p *Producer) error {
}

var fullSubjectName string
if _, ok := p.conn.stationFunctionSubs[sn]; ok {
if functionsMap, ok := p.conn.stationFunctionSubs[sn]; ok {
partitionNumber, err := strconv.Atoi(strings.Split(streamName, "$")[1])

functionsMap.StationFunctionsMu.RLock()

if err != nil {
return memphisError(err)
}
if funcID, ok := p.conn.stationFunctionSubs[sn].FunctionsDetails.PartitionsFunctions[partitionNumber]; ok {
fullSubjectName = fmt.Sprintf("%v.%v", streamName, funcID)
if funcID, ok := functionsMap.FunctionsDetails.PartitionsFunctions[partitionNumber]; ok {
fullSubjectName = fmt.Sprintf("%v.functions.%v", streamName, funcID)
} else {
fullSubjectName = streamName + ".final"
}

functionsMap.StationFunctionsMu.RUnlock()
} else {
fullSubjectName = streamName + ".final"
}
Expand Down
11 changes: 7 additions & 4 deletions station.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ type stationUpdateSub struct {
type stationFunctionSub struct {
FunctionsUpdateCh chan FunctionsUpdate
FunctionsUpdateSub *nats.Subscription
StationFunctionsMu sync.RWMutex
FunctionsDetails functionsDetails
}

Expand Down Expand Up @@ -407,7 +408,7 @@ func (c *Conn) listenToFunctionsUpdates(stationName string, initialFunctionsMap
}
sfs := c.stationFunctionSubs[sn]
functionsUpdatesSubject := fmt.Sprintf(functionsUpdatesSubjectTemplate, sn)
go sfs.functionsUpdatesHandler(&c.stationFunctionsMu)
go sfs.functionsUpdatesHandler()
var err error
sfs.FunctionsUpdateSub, err = c.brokerConn.Subscribe(functionsUpdatesSubject, sfs.createMsgHandler())
if err != nil {
Expand Down Expand Up @@ -502,14 +503,15 @@ func (sus *stationUpdateSub) schemaUpdatesHandler(lock *sync.RWMutex) {
}
}

func (sfs *stationFunctionSub) functionsUpdatesHandler(lock *sync.RWMutex) {
func (sfs *stationFunctionSub) functionsUpdatesHandler() {
for {
update, ok := <-sfs.FunctionsUpdateCh
if !ok {
return
}

lock.Lock()
sfs.StationFunctionsMu.Lock()

if update.UpdateType == "modify" {
for partition, funcID := range update.Functions {
sfs.FunctionsDetails.PartitionsFunctions[partition] = funcID
Expand All @@ -519,7 +521,8 @@ func (sfs *stationFunctionSub) functionsUpdatesHandler(lock *sync.RWMutex) {
delete(sfs.FunctionsDetails.PartitionsFunctions, partition)
}
}
lock.Unlock()

sfs.StationFunctionsMu.Unlock()
}
}

Expand Down

0 comments on commit ccbe9dd

Please sign in to comment.