diff --git a/connect.go b/connect.go index e4027a1..918c074 100644 --- a/connect.go +++ b/connect.go @@ -159,7 +159,6 @@ type Conn struct { brokerConn *nats.Conn js nats.JetStreamContext stationUpdatesMu sync.RWMutex - stationFunctionsMu sync.RWMutex stationUpdatesSubs map[string]*stationUpdateSub stationFunctionSubs map[string]*stationFunctionSub stationPartitions map[string]*PartitionsUpdate diff --git a/producer.go b/producer.go index 655bfc4..fcf8382 100644 --- a/producer.go +++ b/producer.go @@ -60,13 +60,13 @@ type createProducerReq struct { } type createProducerResp struct { - SchemaUpdateInit SchemaUpdateInit `json:"schema_update"` - PartitionsUpdate PartitionsUpdate `json:"partitions_update"` - SchemaVerseToDls bool `json:"schemaverse_to_dls"` - ClusterSendNotification bool `json:"send_notification"` - StationVersion int `json:"station_version"` - StationFirstFunctions map[int]int `json:"station_first_functions"` - Err string `json:"error"` + SchemaUpdateInit SchemaUpdateInit `json:"schema_update"` + PartitionsUpdate PartitionsUpdate `json:"partitions_update"` + SchemaVerseToDls bool `json:"schemaverse_to_dls"` + ClusterSendNotification bool `json:"send_notification"` + StationVersion int `json:"station_version"` + StationPartitionsFirstFunctions map[int]int `json:"station_partitions_first_functions"` + Err string `json:"error"` } type SchemaUpdateType int @@ -284,7 +284,7 @@ func (p *Producer) handleCreationResp(resp []byte) error { } if cr.StationVersion > 0 { - err = p.conn.listenToFunctionsUpdates(p.stationName, cr.StationFirstFunctions) + err = p.conn.listenToFunctionsUpdates(p.stationName, cr.StationPartitionsFirstFunctions) if err != nil { return memphisError(err) } @@ -421,16 +421,21 @@ func (opts *ProduceOpts) produce(p *Producer) error { } var fullSubjectName string - if _, ok := p.conn.stationFunctionSubs[sn]; ok { + if functionsMap, ok := p.conn.stationFunctionSubs[sn]; ok { partitionNumber, err := strconv.Atoi(strings.Split(streamName, "$")[1]) + + functionsMap.StationFunctionsMu.RLock() + if err != nil { return memphisError(err) } - if funcID, ok := p.conn.stationFunctionSubs[sn].FunctionsDetails.PartitionsFunctions[partitionNumber]; ok { - fullSubjectName = fmt.Sprintf("%v.%v", streamName, funcID) + if funcID, ok := functionsMap.FunctionsDetails.PartitionsFunctions[partitionNumber]; ok { + fullSubjectName = fmt.Sprintf("%v.functions.%v", streamName, funcID) } else { fullSubjectName = streamName + ".final" } + + functionsMap.StationFunctionsMu.RUnlock() } else { fullSubjectName = streamName + ".final" } diff --git a/station.go b/station.go index 6ac2794..e8e47b6 100644 --- a/station.go +++ b/station.go @@ -344,6 +344,7 @@ type stationUpdateSub struct { type stationFunctionSub struct { FunctionsUpdateCh chan FunctionsUpdate FunctionsUpdateSub *nats.Subscription + StationFunctionsMu sync.RWMutex FunctionsDetails functionsDetails } @@ -407,7 +408,7 @@ func (c *Conn) listenToFunctionsUpdates(stationName string, initialFunctionsMap } sfs := c.stationFunctionSubs[sn] functionsUpdatesSubject := fmt.Sprintf(functionsUpdatesSubjectTemplate, sn) - go sfs.functionsUpdatesHandler(&c.stationFunctionsMu) + go sfs.functionsUpdatesHandler() var err error sfs.FunctionsUpdateSub, err = c.brokerConn.Subscribe(functionsUpdatesSubject, sfs.createMsgHandler()) if err != nil { @@ -502,14 +503,15 @@ func (sus *stationUpdateSub) schemaUpdatesHandler(lock *sync.RWMutex) { } } -func (sfs *stationFunctionSub) functionsUpdatesHandler(lock *sync.RWMutex) { +func (sfs *stationFunctionSub) functionsUpdatesHandler() { for { update, ok := <-sfs.FunctionsUpdateCh if !ok { return } - lock.Lock() + sfs.StationFunctionsMu.Lock() + if update.UpdateType == "modify" { for partition, funcID := range update.Functions { sfs.FunctionsDetails.PartitionsFunctions[partition] = funcID @@ -519,7 +521,8 @@ func (sfs *stationFunctionSub) functionsUpdatesHandler(lock *sync.RWMutex) { delete(sfs.FunctionsDetails.PartitionsFunctions, partition) } } - lock.Unlock() + + sfs.StationFunctionsMu.Unlock() } }