Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RND-322-check-all-authentication-types-in-kafka-connector-and-support-ssl-in-conf #1633

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3354,7 +3354,7 @@ func CountActiveConsumersInCG(consumersGroup string, stationId int) (int64, erro
return 0, err
}
defer conn.Release()
query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND consumers_group = $2 AND is_active = true AND type = 'application'`
query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND consumers_group = $2 AND is_active = true`
stmt, err := conn.Conn().Prepare(ctx, "count_active_consumers_in_cg", query)
if err != nil {
return 0, err
Expand Down Expand Up @@ -8119,7 +8119,7 @@ func UpdatePermissions(tenantName, username string, readPermissions, writePermis
return nil
}

func CheckUserStationPermissions(rolesId []int, stationName string) (bool, error) {
func CheckUserStationPermissions(rolesId []int, stationName, operation string) (bool, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

Expand All @@ -8131,18 +8131,18 @@ func CheckUserStationPermissions(rolesId []int, stationName string) (bool, error
query := `SELECT COUNT(*)
FROM permissions
WHERE role_id = ANY($1)
AND type = 'write'
AND type = $2
AND restriction_type = 'allow'
AND (
(position('*' in pattern) > 0 AND $2 ~ pattern) OR
(position('*' in pattern) = 0 AND $2 = pattern)
(position('*' in pattern) > 0 AND $3 ~ pattern) OR
(position('*' in pattern) = 0 AND $3 = pattern)
);`
stmt, err := conn.Conn().Prepare(ctx, "check_user_station_permissions", query)
if err != nil {
return false, err
}
var count int
err = conn.Conn().QueryRow(ctx, stmt.Name, rolesId, stationName).Scan(&count)
err = conn.Conn().QueryRow(ctx, stmt.Name, rolesId, operation, stationName).Scan(&count)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ require (
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/memphisdev/memphis.go v1.3.2-beta.1
github.com/memphisdev/memphis.go v1.3.1
github.com/moby/spdystream v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/memphisdev/memphis.go v1.3.1 h1:WbJ2iinvxtOZPZ6rLYFGIlHW27jU30nYqovBlS1md1Y=
github.com/memphisdev/memphis.go v1.3.1/go.mod h1:KurLqbBBZ5PMabJuOh3JX9VpSykRsog1QQKcwW5b9bU=
github.com/memphisdev/memphis.go v1.3.2-beta.1 h1:KsYWa3AcKHuvjc9AbSOAM19pIcjttmIYqrhJ+i8D1iA=
github.com/memphisdev/memphis.go v1.3.2-beta.1/go.mod h1:jZKJ82lyQHr01QissJUdDrnj0KT5wXjh2irhb+j0JH8=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
Expand Down
2 changes: 1 addition & 1 deletion server/memphis_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func CreateDefaultStation(tenantName string, s *Server, sn StationName, user mod
return models.Station{}, false, err
}

allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, sn.Ext(), user.TenantName)
allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, sn.Ext(), user.TenantName, "write")
if err != nil {
return models.Station{}, false, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam
serv.Warnf("[tenant: %v]createConsumerDirectCommon at CreateDefaultStation: Consumer %v at station %v : %v", tenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
allowed, _, err := ValidateStationPermissions(user.Roles, cStationName, user.TenantName)
allowed, _, err := ValidateStationPermissions(user.Roles, cStationName, user.TenantName, "read")
if err != nil {
serv.Errorf("[tenant: %v][user:%v]createConsumerDirectCommon at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, cStationName, err.Error())
return []int{}, err
Expand Down
2 changes: 1 addition & 1 deletion server/memphis_handlers_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection
serv.Warnf("[tenant: %v]createProducerDirectCommon : Producer %v at station %v : %v", user.TenantName, pName, pStationName, err.Error())
return false, false, err, models.Station{}
}
allowed, _, err := ValidateStationPermissions(user.Roles, pStationName.Ext(), user.TenantName)
allowed, _, err := ValidateStationPermissions(user.Roles, pStationName.Ext(), user.TenantName, "write")
if err != nil {
serv.Errorf("[tenant: %v][user:%v]createProducerDirectCommon at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, pStationName.Ext(), err.Error())
return false, false, err, models.Station{}
Expand Down
4 changes: 2 additions & 2 deletions server/memphis_handlers_rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
)

// the function returns a bool for is allowd to create and a bool for if a reload is needed
func ValidateStationPermissions(rolesId []int, stationName, tenantName string) (bool, bool, error) {
func ValidateStationPermissions(rolesId []int, stationName, tenantName, operation string) (bool, bool, error) {
// if the user dosent have a role len rolesId is 0 then he allowd to create
// TODO: add check if denied when we allow to deny
if len(rolesId) == 0 {
Expand All @@ -24,7 +24,7 @@ func ValidateStationPermissions(rolesId []int, stationName, tenantName string) (
}
return true, neededReload, nil
} else {
allowd, err := db.CheckUserStationPermissions(rolesId, stationName)
allowd, err := db.CheckUserStationPermissions(rolesId, stationName, operation)
if err != nil {
return false, false, err
}
Expand Down
6 changes: 3 additions & 3 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (s *Server) createStationDirectIntern(c *client,
return
}

allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), csr.TenantName)
allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), csr.TenantName, "write")
if err != nil {
serv.Errorf("[tenant: %v][user:%v]createStationDirect at ValidateStationPermissions: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error())
respondWithErr(s.MemphisGlobalAccountString(), s, reply, err)
Expand Down Expand Up @@ -939,7 +939,7 @@ func (sh StationsHandler) CreateStation(c *gin.Context) {
return
}

allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName)
allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName, "write")
if err != nil {
serv.Errorf("[tenant: %v][user: %v]CreateStation at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, body.Name, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
Expand Down Expand Up @@ -1484,7 +1484,7 @@ func (s *Server) removeStationDirectIntern(c *client,
return
}

allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName)
allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName, "write")
if err != nil {
serv.Errorf("[tenant: %v][user: %v]CreateStation at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, stationName.Ext(), err.Error())
respondWithErr(s.MemphisGlobalAccountString(), s, reply, err)
Expand Down
24 changes: 12 additions & 12 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,41 +716,43 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta
} else {
consumerName = consumer.Name
}

consumerName = getInternalConsumerName(consumerName)

var maxAckTimeMs int64
if consumer.MaxAckTimeMs <= 0 {
maxAckTimeMs = 30000 // 30 sec
} else {
maxAckTimeMs = consumer.MaxAckTimeMs
}

var MaxMsgDeliveries int
if consumer.MaxMsgDeliveries <= 0 || consumer.MaxMsgDeliveries > 10 {
MaxMsgDeliveries = 10
} else {
MaxMsgDeliveries = consumer.MaxMsgDeliveries
}

stationName, err := StationNameFromStr(station.Name)
if err != nil {
return err
}

if len(partitionsList) > len(station.PartitionsList) {
partitionsList = station.PartitionsList
}

var deliveryPolicy DeliverPolicy
var optStartSeq uint64
// This check for case when the last message is 0 (in case StartConsumeFromSequence > 1 the LastMessages is 0 )
if consumer.LastMessages == 0 && consumer.StartConsumeFromSeq == 0 {
if consumer.LastMessages == 0 && consumer.StartConsumeFromSeq == 1 {
deliveryPolicy = DeliverNew
} else if consumer.LastMessages > 0 {
streamInfo, err := serv.memphisStreamInfo(tenantName, stationName.Intern())
if err != nil {
return err
var streamInfo *StreamInfo
if len(partitionsList) == 1 {
streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+"$1.final")
if err != nil {
return err
}
} else {
streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+".final")
if err != nil {
return err
}
}
lastSeq := streamInfo.State.LastSeq
lastMessages := (lastSeq - uint64(consumer.LastMessages)) + 1
Expand Down Expand Up @@ -779,7 +781,6 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta
// RateLimit: ,// Bits per sec
// Heartbeat: // time.Duration,
}

if deliveryPolicy == DeliverByStartSequence {
consumerConfig.OptStartSeq = optStartSeq
}
Expand All @@ -800,7 +801,6 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta
// RateLimit: ,// Bits per sec
// Heartbeat: // time.Duration,
}

if deliveryPolicy == DeliverByStartSequence {
consumerConfig.OptStartSeq = optStartSeq
}
Expand Down
Loading