Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
shohamroditimemphis committed Dec 28, 2023
1 parent 04aded2 commit 1eb87e5
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 53 deletions.
76 changes: 48 additions & 28 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ func GetConsumerGroupMembers(cgName string, station models.Station) ([]models.Cg
}

func (s *Server) createConsumerDirectV0(c *client, reply, tenantName string, ccr createConsumerRequestV0, requestVersion int) {
_, err := s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, requestVersion, 1, -1, ccr.ConnectionId)
_, err := s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, requestVersion, 1, -1, ccr.ConnectionId, "")
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
}

func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationName, cGroup, cType, connectionId, tenantName, userName string, maxAckTime, maxMsgDeliveries, requestVersion int, startConsumeFromSequence uint64, lastMessages int64, appId string) ([]int, error) {
func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationName, cGroup, cType, connectionId, tenantName, userName string, maxAckTime, maxMsgDeliveries, requestVersion int, startConsumeFromSequence uint64, lastMessages int64, appId, sdkLang string) ([]int, error) {
name := strings.ToLower(consumerName)
err := validateConsumerName(name)
if err != nil {
Expand Down Expand Up @@ -179,9 +179,11 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam
serv.Errorf("[tenant: %v]createConsumerDirectCommon at isConsumerGroupExist: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
sdkName := sdkLang
if sdkLang == "" {
sdkName = c.opts.Lang
}

splitted := strings.Split(c.opts.Lang, ".")
sdkName := splitted[len(splitted)-1]
var newConsumer models.Consumer
if strings.HasPrefix(user.Username, "$") {
newConsumer, err = db.InsertNewConsumer(name, station.ID, "connector", connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId)
Expand Down Expand Up @@ -266,7 +268,7 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam
}

func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) {
var ccr createConsumerRequestV2
var ccr createConsumerRequestV3
var resp createConsumerResponse

tenantName, message, err := s.getTenantNameAndMessage(msg)
Expand All @@ -275,31 +277,49 @@ func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) {
return
}

if err := json.Unmarshal([]byte(message), &ccr); err != nil || ccr.RequestVersion < 3 {
var ccrV1 createConsumerRequestV1
if err := json.Unmarshal([]byte(message), &ccrV1); err != nil {
var ccrV0 createConsumerRequestV0
if err := json.Unmarshal([]byte(message), &ccrV0); err != nil {
s.Errorf("[tenant: %v]createConsumerDirect at json.Unmarshal: Failed creating consumer: %v: %v", tenantName, err.Error(), string(msg))
respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp)
if err := json.Unmarshal([]byte(message), &ccr); err != nil || ccr.RequestVersion < 4 {
var ccrV2 createConsumerRequestV2
if err = json.Unmarshal([]byte(message), &ccrV2); err != nil {
var ccrV1 createConsumerRequestV1
if err = json.Unmarshal([]byte(message), &ccrV1); err != nil {
var ccrV0 createConsumerRequestV0
if err := json.Unmarshal([]byte(message), &ccrV0); err != nil {
s.Errorf("[tenant: %v]createConsumerDirect at json.Unmarshal: Failed creating consumer: %v: %v", tenantName, err.Error(), string(msg))
respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp)
return
}
s.createConsumerDirectV0(c, reply, tenantName, ccrV0, ccr.RequestVersion)
return
}
s.createConsumerDirectV0(c, reply, tenantName, ccrV0, ccr.RequestVersion)
return

ccr = createConsumerRequestV3{
Name: ccrV1.Name,
StationName: ccrV1.StationName,
ConnectionId: ccrV1.ConnectionId,
ConsumerType: ccrV1.ConsumerType,
ConsumerGroup: ccrV1.ConsumerGroup,
MaxAckTimeMillis: ccrV1.MaxAckTimeMillis,
MaxMsgDeliveries: ccrV1.MaxMsgDeliveries,
Username: ccrV1.Username,
StartConsumeFromSequence: ccrV1.StartConsumeFromSequence,
LastMessages: ccrV1.LastMessages,
RequestVersion: ccrV1.RequestVersion,
AppId: ccrV1.ConnectionId,
}
}
ccr = createConsumerRequestV2{
Name: ccrV1.Name,
StationName: ccrV1.StationName,
ConnectionId: ccrV1.ConnectionId,
ConsumerType: ccrV1.ConsumerType,
ConsumerGroup: ccrV1.ConsumerGroup,
MaxAckTimeMillis: ccrV1.MaxAckTimeMillis,
MaxMsgDeliveries: ccrV1.MaxMsgDeliveries,
Username: ccrV1.Username,
StartConsumeFromSequence: ccrV1.StartConsumeFromSequence,
LastMessages: ccrV1.LastMessages,
RequestVersion: ccrV1.RequestVersion,
AppId: ccrV1.ConnectionId,
ccr = createConsumerRequestV3{
Name: ccrV2.Name,
StationName: ccrV2.StationName,
ConnectionId: ccrV2.ConnectionId,
ConsumerType: ccrV2.ConsumerType,
ConsumerGroup: ccrV2.ConsumerGroup,
MaxAckTimeMillis: ccrV2.MaxAckTimeMillis,
MaxMsgDeliveries: ccrV2.MaxMsgDeliveries,
Username: ccrV2.Username,
StartConsumeFromSequence: ccrV2.StartConsumeFromSequence,
LastMessages: ccrV2.LastMessages,
RequestVersion: ccrV2.RequestVersion,
AppId: ccrV2.ConnectionId,
}
}

Expand All @@ -325,7 +345,7 @@ func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) {
return
}

partitions, err := s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, ccr.RequestVersion, ccr.StartConsumeFromSequence, ccr.LastMessages, ccr.AppId)
partitions, err := s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, ccr.RequestVersion, ccr.StartConsumeFromSequence, ccr.LastMessages, ccr.AppId, ccr.SdkLang)
if err != nil {
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
}
Expand Down
4 changes: 2 additions & 2 deletions server/memphis_handlers_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ const (
unhealthyStatus = "unhealthy"
dangerousStatus = "dangerous"
riskyStatus = "risky"
lastProducerCreationReqVersion = 3
lastConsumerCreationReqVersion = 3
lastProducerCreationReqVersion = 4
lastConsumerCreationReqVersion = 4
)

func clientSetClusterConfig() error {
Expand Down
63 changes: 40 additions & 23 deletions server/memphis_handlers_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func validateProducerType(producerType string) error {
return nil
}

func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnectionId string, pStationName StationName, username string, tenantName string, version int, appId string) (bool, bool, error, models.Station) {
func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnectionId string, pStationName StationName, username string, tenantName string, version int, appId, sdkLang string) (bool, bool, error, models.Station) {
name := strings.ToLower(pName)
err := validateProducerName(name)
if err != nil {
Expand Down Expand Up @@ -127,8 +127,11 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection
return false, false, err, models.Station{}
}

splitted := strings.Split(c.opts.Lang, ".")
sdkName := splitted[len(splitted)-1]
sdkName := sdkLang
if sdkLang == "" {
sdkName = c.opts.Lang
}

if strings.HasPrefix(user.Username, "$") && name != "gui" {
_, err := db.InsertNewProducer(name, station.ID, "connector", pConnectionId, station.TenantName, station.PartitionsList, version, sdkName, appId)
if err != nil {
Expand Down Expand Up @@ -180,12 +183,12 @@ func (s *Server) createProducerDirectV0(c *client, reply string, cpr createProdu
return
}
_, _, err, _ = s.createProducerDirectCommon(c, cpr.Name,
cpr.ProducerType, cpr.ConnectionId, sn, cpr.Username, tenantName, 0, cpr.ConnectionId)
cpr.ProducerType, cpr.ConnectionId, sn, cpr.Username, tenantName, 0, cpr.ConnectionId, "")
respondWithErr(s.MemphisGlobalAccountString(), s, reply, err)
}

func (s *Server) createProducerDirect(c *client, reply string, msg []byte) {
var cpr createProducerRequestV2
var cpr createProducerRequestV3
var resp createProducerResponse

tenantName, message, err := s.getTenantNameAndMessage(msg)
Expand All @@ -194,26 +197,40 @@ func (s *Server) createProducerDirect(c *client, reply string, msg []byte) {
return
}

if err := json.Unmarshal([]byte(message), &cpr); err != nil || cpr.RequestVersion < 3 {
var cprV1 createProducerRequestV1
if err := json.Unmarshal([]byte(message), &cprV1); err != nil {
var cprV0 createProducerRequestV0
if err := json.Unmarshal([]byte(message), &cprV0); err != nil {
s.Errorf("[tenant: %v]createProducerDirect: %v", tenantName, err.Error())
respondWithRespErr(s.MemphisGlobalAccountString(), s, reply, err, &resp)
if err := json.Unmarshal([]byte(message), &cpr); err != nil || cpr.RequestVersion < 4 {
var cprV2 createProducerRequestV2
if err := json.Unmarshal([]byte(message), &cprV2); err != nil {
var cprV1 createProducerRequestV1
if err := json.Unmarshal([]byte(message), &cprV1); err != nil {
var cprV0 createProducerRequestV0
if err := json.Unmarshal([]byte(message), &cprV0); err != nil {
s.Errorf("[tenant: %v]createProducerDirect: %v", tenantName, err.Error())
respondWithRespErr(s.MemphisGlobalAccountString(), s, reply, err, &resp)
return
}
s.createProducerDirectV0(c, reply, cprV0, tenantName)
return
}
s.createProducerDirectV0(c, reply, cprV0, tenantName)
return

cpr = createProducerRequestV3{
Name: cprV1.Name,
StationName: cprV1.StationName,
ConnectionId: cprV1.ConnectionId,
ProducerType: cprV1.ProducerType,
RequestVersion: cprV1.RequestVersion,
Username: cprV1.Username,
AppId: cprV1.ConnectionId,
}
}
cpr = createProducerRequestV2{
Name: cprV1.Name,
StationName: cprV1.StationName,
ConnectionId: cprV1.ConnectionId,
ProducerType: cprV1.ProducerType,
RequestVersion: cprV1.RequestVersion,
Username: cprV1.Username,
AppId: cprV1.ConnectionId,

cpr = createProducerRequestV3{
Name: cprV2.Name,
StationName: cprV2.StationName,
ConnectionId: cprV2.ConnectionId,
ProducerType: cprV2.ProducerType,
RequestVersion: cprV2.RequestVersion,
Username: cprV2.Username,
AppId: cprV2.ConnectionId,
}
}
cpr.TenantName = tenantName
Expand All @@ -224,7 +241,7 @@ func (s *Server) createProducerDirect(c *client, reply string, msg []byte) {
return
}

clusterSendNotification, schemaVerseToDls, err, station := s.createProducerDirectCommon(c, cpr.Name, cpr.ProducerType, cpr.ConnectionId, sn, cpr.Username, tenantName, cpr.RequestVersion, cpr.AppId)
clusterSendNotification, schemaVerseToDls, err, station := s.createProducerDirectCommon(c, cpr.Name, cpr.ProducerType, cpr.ConnectionId, sn, cpr.Username, tenantName, cpr.RequestVersion, cpr.AppId, cpr.SdkLang)
if err != nil {
respondWithRespErr(s.MemphisGlobalAccountString(), s, reply, err, &resp)
return
Expand Down
29 changes: 29 additions & 0 deletions server/memphis_sdk_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ type createProducerRequestV2 struct {
AppId string `json:"app_id"`
}

type createProducerRequestV3 struct {
Name string `json:"name"`
StationName string `json:"station_name"`
ConnectionId string `json:"connection_id"`
ProducerType string `json:"producer_type"`
RequestVersion int `json:"req_version"`
Username string `json:"username"`
TenantName string `json:"tenant_name"`
AppId string `json:"app_id"`
SdkLang string `json:"sdk_lang"`
}

type createConsumerResponse struct {
Err string `json:"error"`
}
Expand Down Expand Up @@ -154,6 +166,23 @@ type createConsumerRequestV2 struct {
AppId string `json:"app_id"`
}

type createConsumerRequestV3 struct {
Name string `json:"name"`
StationName string `json:"station_name"`
ConnectionId string `json:"connection_id"`
ConsumerType string `json:"consumer_type"`
ConsumerGroup string `json:"consumers_group"`
MaxAckTimeMillis int `json:"max_ack_time_ms"`
MaxMsgDeliveries int `json:"max_msg_deliveries"`
Username string `json:"username"`
StartConsumeFromSequence uint64 `json:"start_consume_from_sequence"`
LastMessages int64 `json:"last_messages"`
RequestVersion int `json:"req_version"`
TenantName string `json:"tenant_name"`
AppId string `json:"app_id"`
SdkLang string `json:"sdk_lang"`
}

type attachSchemaRequest struct {
Name string `json:"name"`
StationName string `json:"station_name"`
Expand Down

0 comments on commit 1eb87e5

Please sign in to comment.