diff --git a/connect.go b/connect.go index a3efd51..e4027a1 100644 --- a/connect.go +++ b/connect.go @@ -43,6 +43,7 @@ const ( ) var stationUpdatesSubsLock sync.Mutex +var stationFunctionsSubsLock sync.Mutex var lockProducersMap sync.Mutex var applicationId string @@ -158,7 +159,9 @@ type Conn struct { brokerConn *nats.Conn js nats.JetStreamContext stationUpdatesMu sync.RWMutex + stationFunctionsMu sync.RWMutex stationUpdatesSubs map[string]*stationUpdateSub + stationFunctionSubs map[string]*stationFunctionSub stationPartitions map[string]*PartitionsUpdate sdkClientsUpdatesMu sync.RWMutex clientsUpdatesSub sdkClientsUpdateSub @@ -306,6 +309,10 @@ func (opts Options) connect() (*Conn, error) { stationUpdatesSubsLock.Lock() defer stationUpdatesSubsLock.Unlock() c.stationUpdatesSubs = make(map[string]*stationUpdateSub) + + stationFunctionsSubsLock.Lock() + defer stationFunctionsSubsLock.Unlock() + c.stationFunctionSubs = make(map[string]*stationFunctionSub) c.stationPartitions = make(map[string]*PartitionsUpdate) return &c, nil diff --git a/producer.go b/producer.go index eaf09a5..655bfc4 100644 --- a/producer.go +++ b/producer.go @@ -21,6 +21,7 @@ import ( "fmt" "log" "reflect" + "strconv" "strings" "time" @@ -30,12 +31,13 @@ import ( ) const ( - schemaUpdatesSubjectTemplate = "$memphis_schema_updates_%s" - memphisNotificationsSubject = "$memphis_notifications" - schemaVFailAlertType = "schema_validation_fail_alert" - lastProducerCreationReqVersion = 3 - schemaVerseDlsSubject = "$memphis_schemaverse_dls" - lastProducerDestroyReqVersion = 1 + schemaUpdatesSubjectTemplate = "$memphis_schema_updates_%s" + functionsUpdatesSubjectTemplate = "$memphis_functions_updates_%s" + memphisNotificationsSubject = "$memphis_notifications" + schemaVFailAlertType = "schema_validation_fail_alert" + lastProducerCreationReqVersion = 3 + schemaVerseDlsSubject = "$memphis_schemaverse_dls" + lastProducerDestroyReqVersion = 1 ) // Producer - memphis producer object. @@ -62,6 +64,8 @@ type createProducerResp struct { PartitionsUpdate PartitionsUpdate `json:"partitions_update"` SchemaVerseToDls bool `json:"schemaverse_to_dls"` ClusterSendNotification bool `json:"send_notification"` + StationVersion int `json:"station_version"` + StationFirstFunctions map[int]int `json:"station_first_functions"` Err string `json:"error"` } @@ -279,6 +283,13 @@ func (p *Producer) handleCreationResp(resp []byte) error { p.PartitionGenerator = pg } + if cr.StationVersion > 0 { + err = p.conn.listenToFunctionsUpdates(p.stationName, cr.StationFirstFunctions) + if err != nil { + return memphisError(err) + } + } + p.conn.sdkClientsUpdatesMu.Lock() cu := &p.conn.clientsUpdatesSub cu.ClusterConfigurations["send_notification"] = cr.ClusterSendNotification @@ -409,9 +420,24 @@ func (opts *ProduceOpts) produce(p *Producer) error { streamName = sn } + var fullSubjectName string + if _, ok := p.conn.stationFunctionSubs[sn]; ok { + partitionNumber, err := strconv.Atoi(strings.Split(streamName, "$")[1]) + if err != nil { + return memphisError(err) + } + if funcID, ok := p.conn.stationFunctionSubs[sn].FunctionsDetails.PartitionsFunctions[partitionNumber]; ok { + fullSubjectName = fmt.Sprintf("%v.%v", streamName, funcID) + } else { + fullSubjectName = streamName + ".final" + } + } else { + fullSubjectName = streamName + ".final" + } + natsMessage := nats.Msg{ Header: opts.MsgHeaders.MsgHeaders, - Subject: streamName + ".final", + Subject: fullSubjectName, Data: data, } diff --git a/station.go b/station.go index 76284d2..6ac2794 100644 --- a/station.go +++ b/station.go @@ -341,6 +341,21 @@ type stationUpdateSub struct { schemaDetails schemaDetails } +type stationFunctionSub struct { + FunctionsUpdateCh chan FunctionsUpdate + FunctionsUpdateSub *nats.Subscription + FunctionsDetails functionsDetails +} + +type FunctionsUpdate struct { + UpdateType string + Functions map[int]int +} + +type functionsDetails struct { + PartitionsFunctions map[int]int `json:"partitions_functions"` +} + type schemaDetails struct { name string schemaType string @@ -378,6 +393,34 @@ func (c *Conn) listenToSchemaUpdates(stationName string) error { return nil } +func (c *Conn) listenToFunctionsUpdates(stationName string, initialFunctionsMap map[int]int) error { + sn := getInternalName(stationName) + stationFunctionsSubsLock.Lock() + defer stationFunctionsSubsLock.Unlock() + _, ok := c.stationFunctionSubs[sn] + if !ok { + c.stationFunctionSubs[sn] = &stationFunctionSub{ + FunctionsUpdateCh: make(chan FunctionsUpdate), + FunctionsDetails: functionsDetails{ + PartitionsFunctions: initialFunctionsMap, + }, + } + sfs := c.stationFunctionSubs[sn] + functionsUpdatesSubject := fmt.Sprintf(functionsUpdatesSubjectTemplate, sn) + go sfs.functionsUpdatesHandler(&c.stationFunctionsMu) + var err error + sfs.FunctionsUpdateSub, err = c.brokerConn.Subscribe(functionsUpdatesSubject, sfs.createMsgHandler()) + if err != nil { + close(sfs.FunctionsUpdateCh) + return memphisError(err) + } + + return nil + } + + return nil +} + func (sus *stationUpdateSub) createMsgHandler() nats.MsgHandler { return func(msg *nats.Msg) { var update SchemaUpdate @@ -390,6 +433,18 @@ func (sus *stationUpdateSub) createMsgHandler() nats.MsgHandler { } } +func (sfs *stationFunctionSub) createMsgHandler() nats.MsgHandler { + return func(msg *nats.Msg) { + var update FunctionsUpdate + err := json.Unmarshal(msg.Data, &update) + if err != nil { + log.Printf("functions update unmarshal error: %v\n", memphisError(err)) + return + } + sfs.FunctionsUpdateCh <- update + } +} + func (c *Conn) removeSchemaUpdatesListener(stationName string) error { sn := getInternalName(stationName) @@ -447,6 +502,27 @@ func (sus *stationUpdateSub) schemaUpdatesHandler(lock *sync.RWMutex) { } } +func (sfs *stationFunctionSub) functionsUpdatesHandler(lock *sync.RWMutex) { + for { + update, ok := <-sfs.FunctionsUpdateCh + if !ok { + return + } + + lock.Lock() + if update.UpdateType == "modify" { + for partition, funcID := range update.Functions { + sfs.FunctionsDetails.PartitionsFunctions[partition] = funcID + } + } else if update.UpdateType == "drop" { + for partition := range update.Functions { + delete(sfs.FunctionsDetails.PartitionsFunctions, partition) + } + } + lock.Unlock() + } +} + func (sd *schemaDetails) handleSchemaUpdateInit(sui SchemaUpdateInit) { sd.name = sui.SchemaName sd.schemaType = sui.SchemaType