diff --git a/db/db.go b/db/db.go index 038a829ae..921c70911 100644 --- a/db/db.go +++ b/db/db.go @@ -3354,7 +3354,7 @@ func CountActiveConsumersInCG(consumersGroup string, stationId int) (int64, erro return 0, err } defer conn.Release() - query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND consumers_group = $2 AND is_active = true AND type = 'application'` + query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND consumers_group = $2 AND is_active = true` stmt, err := conn.Conn().Prepare(ctx, "count_active_consumers_in_cg", query) if err != nil { return 0, err @@ -8119,7 +8119,7 @@ func UpdatePermissions(tenantName, username string, readPermissions, writePermis return nil } -func CheckUserStationPermissions(rolesId []int, stationName string) (bool, error) { +func CheckUserStationPermissions(rolesId []int, stationName, operation string) (bool, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() @@ -8131,18 +8131,18 @@ func CheckUserStationPermissions(rolesId []int, stationName string) (bool, error query := `SELECT COUNT(*) FROM permissions WHERE role_id = ANY($1) - AND type = 'write' + AND type = $2 AND restriction_type = 'allow' AND ( - (position('*' in pattern) > 0 AND $2 ~ pattern) OR - (position('*' in pattern) = 0 AND $2 = pattern) + (position('*' in pattern) > 0 AND $3 ~ pattern) OR + (position('*' in pattern) = 0 AND $3 = pattern) );` stmt, err := conn.Conn().Prepare(ctx, "check_user_station_permissions", query) if err != nil { return false, err } var count int - err = conn.Conn().QueryRow(ctx, stmt.Name, rolesId, stationName).Scan(&count) + err = conn.Conn().QueryRow(ctx, stmt.Name, rolesId, operation, stationName).Scan(&count) if err != nil { return false, err } diff --git a/go.mod b/go.mod index ffa1b9007..2206b9573 100644 --- a/go.mod +++ b/go.mod @@ -113,7 +113,7 @@ require ( github.com/leodido/go-urn v1.2.4 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-isatty v0.0.19 // indirect - github.com/memphisdev/memphis.go v1.3.2-beta.1 + github.com/memphisdev/memphis.go v1.3.1 github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect diff --git a/go.sum b/go.sum index 9a5e82c7b..a587f71dc 100644 --- a/go.sum +++ b/go.sum @@ -275,6 +275,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/memphisdev/memphis.go v1.3.1 h1:WbJ2iinvxtOZPZ6rLYFGIlHW27jU30nYqovBlS1md1Y= +github.com/memphisdev/memphis.go v1.3.1/go.mod h1:KurLqbBBZ5PMabJuOh3JX9VpSykRsog1QQKcwW5b9bU= github.com/memphisdev/memphis.go v1.3.2-beta.1 h1:KsYWa3AcKHuvjc9AbSOAM19pIcjttmIYqrhJ+i8D1iA= github.com/memphisdev/memphis.go v1.3.2-beta.1/go.mod h1:jZKJ82lyQHr01QissJUdDrnj0KT5wXjh2irhb+j0JH8= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= diff --git a/server/memphis_handlers.go b/server/memphis_handlers.go index 2cd1782b8..2aa9bd5c5 100644 --- a/server/memphis_handlers.go +++ b/server/memphis_handlers.go @@ -111,7 +111,7 @@ func CreateDefaultStation(tenantName string, s *Server, sn StationName, user mod return models.Station{}, false, err } - allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, sn.Ext(), user.TenantName) + allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, sn.Ext(), user.TenantName, "write") if err != nil { return models.Station{}, false, err } diff --git a/server/memphis_handlers_consumers.go b/server/memphis_handlers_consumers.go index e65bb408d..370678925 100644 --- a/server/memphis_handlers_consumers.go +++ b/server/memphis_handlers_consumers.go @@ -172,7 +172,7 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam serv.Warnf("[tenant: %v]createConsumerDirectCommon at CreateDefaultStation: Consumer %v at station %v : %v", tenantName, consumerName, cStationName, err.Error()) return []int{}, err } - allowed, _, err := ValidateStationPermissions(user.Roles, cStationName, user.TenantName) + allowed, _, err := ValidateStationPermissions(user.Roles, cStationName, user.TenantName, "read") if err != nil { serv.Errorf("[tenant: %v][user:%v]createConsumerDirectCommon at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, cStationName, err.Error()) return []int{}, err diff --git a/server/memphis_handlers_producers.go b/server/memphis_handlers_producers.go index b7c3dc3a7..c0bfac7a4 100644 --- a/server/memphis_handlers_producers.go +++ b/server/memphis_handlers_producers.go @@ -119,7 +119,7 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection serv.Warnf("[tenant: %v]createProducerDirectCommon : Producer %v at station %v : %v", user.TenantName, pName, pStationName, err.Error()) return false, false, err, models.Station{} } - allowed, _, err := ValidateStationPermissions(user.Roles, pStationName.Ext(), user.TenantName) + allowed, _, err := ValidateStationPermissions(user.Roles, pStationName.Ext(), user.TenantName, "write") if err != nil { serv.Errorf("[tenant: %v][user:%v]createProducerDirectCommon at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, pStationName.Ext(), err.Error()) return false, false, err, models.Station{} diff --git a/server/memphis_handlers_rbac.go b/server/memphis_handlers_rbac.go index 06062dd1d..cb9317432 100644 --- a/server/memphis_handlers_rbac.go +++ b/server/memphis_handlers_rbac.go @@ -14,7 +14,7 @@ const ( ) // the function returns a bool for is allowd to create and a bool for if a reload is needed -func ValidateStationPermissions(rolesId []int, stationName, tenantName string) (bool, bool, error) { +func ValidateStationPermissions(rolesId []int, stationName, tenantName, operation string) (bool, bool, error) { // if the user dosent have a role len rolesId is 0 then he allowd to create // TODO: add check if denied when we allow to deny if len(rolesId) == 0 { @@ -24,7 +24,7 @@ func ValidateStationPermissions(rolesId []int, stationName, tenantName string) ( } return true, neededReload, nil } else { - allowd, err := db.CheckUserStationPermissions(rolesId, stationName) + allowd, err := db.CheckUserStationPermissions(rolesId, stationName, operation) if err != nil { return false, false, err } diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index 28b9956bf..d599a5c10 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -280,7 +280,7 @@ func (s *Server) createStationDirectIntern(c *client, return } - allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), csr.TenantName) + allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), csr.TenantName, "write") if err != nil { serv.Errorf("[tenant: %v][user:%v]createStationDirect at ValidateStationPermissions: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error()) respondWithErr(s.MemphisGlobalAccountString(), s, reply, err) @@ -939,7 +939,7 @@ func (sh StationsHandler) CreateStation(c *gin.Context) { return } - allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName) + allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName, "write") if err != nil { serv.Errorf("[tenant: %v][user: %v]CreateStation at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, body.Name, err.Error()) c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) @@ -1484,7 +1484,7 @@ func (s *Server) removeStationDirectIntern(c *client, return } - allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName) + allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName, "write") if err != nil { serv.Errorf("[tenant: %v][user: %v]CreateStation at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, stationName.Ext(), err.Error()) respondWithErr(s.MemphisGlobalAccountString(), s, reply, err) diff --git a/server/memphis_helper.go b/server/memphis_helper.go index 968b16808..5e258ad1a 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -716,41 +716,43 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta } else { consumerName = consumer.Name } - consumerName = getInternalConsumerName(consumerName) - var maxAckTimeMs int64 if consumer.MaxAckTimeMs <= 0 { maxAckTimeMs = 30000 // 30 sec } else { maxAckTimeMs = consumer.MaxAckTimeMs } - var MaxMsgDeliveries int if consumer.MaxMsgDeliveries <= 0 || consumer.MaxMsgDeliveries > 10 { MaxMsgDeliveries = 10 } else { MaxMsgDeliveries = consumer.MaxMsgDeliveries } - stationName, err := StationNameFromStr(station.Name) if err != nil { return err } - if len(partitionsList) > len(station.PartitionsList) { partitionsList = station.PartitionsList } - var deliveryPolicy DeliverPolicy var optStartSeq uint64 // This check for case when the last message is 0 (in case StartConsumeFromSequence > 1 the LastMessages is 0 ) - if consumer.LastMessages == 0 && consumer.StartConsumeFromSeq == 0 { + if consumer.LastMessages == 0 && consumer.StartConsumeFromSeq == 1 { deliveryPolicy = DeliverNew } else if consumer.LastMessages > 0 { - streamInfo, err := serv.memphisStreamInfo(tenantName, stationName.Intern()) - if err != nil { - return err + var streamInfo *StreamInfo + if len(partitionsList) == 1 { + streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+"$1.final") + if err != nil { + return err + } + } else { + streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+".final") + if err != nil { + return err + } } lastSeq := streamInfo.State.LastSeq lastMessages := (lastSeq - uint64(consumer.LastMessages)) + 1 @@ -779,7 +781,6 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta // RateLimit: ,// Bits per sec // Heartbeat: // time.Duration, } - if deliveryPolicy == DeliverByStartSequence { consumerConfig.OptStartSeq = optStartSeq } @@ -800,7 +801,6 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta // RateLimit: ,// Bits per sec // Heartbeat: // time.Duration, } - if deliveryPolicy == DeliverByStartSequence { consumerConfig.OptStartSeq = optStartSeq } diff --git a/ui_src/src/connectors/kafka.js b/ui_src/src/connectors/kafka.js index e10e3315b..992e41e23 100644 --- a/ui_src/src/connectors/kafka.js +++ b/ui_src/src/connectors/kafka.js @@ -7,7 +7,7 @@ export const kafka = { required: true }, { - name: 'bootstrap.servers', + name: 'bootstrap_servers', display: 'Bootstrap servers', type: 'multi', options: [], @@ -15,7 +15,7 @@ export const kafka = { placeholder: 'kafka-1:9092,kafka-2:9092' }, { - name: 'security.protocol', + name: 'security_protocol', display: 'Security protocol', type: 'select', options: ['SSL', 'SASL_SSL', 'No authentication'], @@ -24,52 +24,103 @@ export const kafka = { children: true, SSL: [ { - name: 'ssl.mechanism', - display: 'SSL mechanism', - type: 'select', - options: ['GSSAPI', 'PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'], - required: true + name: 'ssl_key_pem', + display: 'SSL Key Pem', + type: 'string', + required: true, + placeholder: '-----BEGIN PRIVATE KEY----- \n...\n-----END PRIVATE KEY-----' }, { - name: 'ssl.certificate.pem', + name: 'ssl_certificate_pem', display: 'SSL certificate pem', type: 'string', required: true, placeholder: '-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----' }, { - name: 'ssl.key.password', - display: 'SSL key password', + name: 'ssl_ca_pem', + display: 'SSL CA PEM', type: 'string', - required: true + required: true, + placeholder: '-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----' + }, + { + name:'insecure_skip_verify', + display:'Insecure skip verify', + type:'select', + options:['true','false'], + required:true, + description:'true / false' } ], SASL_SSL: [ { - name: 'sasl.mechanism', + name: 'sasl_mechanism', display: 'SASL mechanism', type: 'select', - options: ['GSSAPI', 'PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'], + options: ['SCRAM-SHA-256', 'SCRAM-SHA-512', 'PLAIN'], required: true }, { - name: 'sasl.username', + name: 'sasl_username', display: 'SASL username', type: 'string', required: true }, { - name: 'sasl.password', + name: 'sasl_password', display: 'SASL password', type: 'string', required: true, placeholder: 'password' + }, + { + name: 'tls_enabled', + display: 'TLS enabled', + type: 'select', + options: ['custom', 'default', 'none'], + required: true, + description: 'custom / default / none (no tls)', + children: true, + custom: [ + { + name: 'ssl_key_pem', + display: 'SSL Key Pem', + type: 'string', + required: true, + placeholder: '-----BEGIN PRIVATE KEY----- \n...\n-----END PRIVATE KEY-----' + }, + { + name: 'ssl_certificate_pem', + display: 'SSL certificate pem', + type: 'string', + required: true, + placeholder: '-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----' + }, + { + name: 'ssl_ca_pem', + display: 'SSL CA PEM', + type: 'string', + required: true, + placeholder: '-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----' + }, + { + name:'insecure_skip_verify', + display:'Insecure skip verify', + type:'select', + options:['true','false'], + required:true, + description:'true / false' + } + ], + default: [], + none: [] } ], 'No authentication': [] }, { - name: 'group.id', + name: 'group_id', display: 'Group id', type: 'string', required: true, @@ -83,59 +134,28 @@ export const kafka = { description: 'Topic name' }, { - name: 'partition_strategy', - display: 'Partition strategy', + name: 'offset_strategy', + display: 'Offset strategy', type: 'select', - options: ['Partition Number', 'Any Partition'], + options: ['Newest', 'Oldest'], required: true, - description: 'Partition Number / Any Partition (round robin)', - children: true, - 'Partition Number': [ - { - name: 'partition_value', - display: 'Partition Value', - type: 'string', - required: true - }, - { - name: 'offset_strategy', - display: 'Offset strategy', - type: 'select', - options: ['Earliest', 'End', 'Specific offset'], - required: false, - description: 'choose offset strategy', - children: true, - Earliest: [], - End: [], - 'Specific offset': [ - { - name: 'offset_value', - description: 'choose offset value (int)', - display: 'Value', - type: 'string', - required: true, - placeholder: 0 - } - ] - } - ], - 'Any Partition': [ - { - name: 'offset_strategy', - display: 'Offset strategy', - type: 'select', - options: ['Earliest', 'End'], - required: true, - description: 'choose offset strategy' - } - ] + description: 'Newest / Oldest', }, { - name: 'timeout_duration_seconds', - display: 'Consumer timeout duration (seconds)', + name: 'fetch_size_bytes', + display: 'Fetch size (bytes)', type: 'string', required: false, - placeholder: 10 + placeholder: 1000, + description: 'The buffer size used by Kafka Consumer (in bytes)' + }, + { + name: 'fetch_max_wait_ms', + display: 'Fetch Timeout Duration (Milliseconds)', + placeholder: 1, + type: 'string', + required: false, + description: 'The wait time before fetching the buffer (in milliseconds)', }, { name: 'instances', @@ -153,11 +173,10 @@ export const kafka = { name: 'name', display: 'Connector name', type: 'string', - required: true, - description: 'Note that the name of the sink connector is also used as the name of the consumer group' + required: true }, { - name: 'bootstrap.servers', + name: 'bootstrap_servers', display: 'Bootstrap servers', type: 'multi', options: [], @@ -165,7 +184,7 @@ export const kafka = { placeholder: 'kafka-1:9092,kafka-2:9092' }, { - name: 'security.protocol', + name: 'security_protocol', display: 'Security protocol', type: 'select', options: ['SSL', 'SASL_SSL', 'No authentication'], @@ -174,45 +193,97 @@ export const kafka = { children: true, SSL: [ { - name: 'ssl.mechanism', - display: 'SSL mechanism', - type: 'select', - options: ['GSSAPI', 'PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'], - required: true + name: 'ssl_key_pem', + display: 'SSL Key Pem', + type: 'string', + required: true, + placeholder: '-----BEGIN PRIVATE KEY----- \n...\n-----END PRIVATE KEY-----' }, { - name: 'ssl.certificate.pem', + name: 'ssl_certificate_pem', display: 'SSL certificate pem', type: 'string', required: true, placeholder: '-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----' }, { - name: 'ssl.key.password', - display: 'SSL key password', + name: 'ssl_ca_pem', + display: 'SSL CA PEM', type: 'string', - required: true + required: true, + placeholder: '-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----' + }, + { + name:'insecure_skip_verify', + display:'Insecure skip verify', + type:'select', + options:['true','false'], + required:true, + description:'true / false' } ], SASL_SSL: [ { - name: 'sasl.mechanism', + name: 'sasl_mechanism', display: 'SASL mechanism', type: 'select', - options: ['GSSAPI', 'PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'], + options: ['SCRAM-SHA-256', 'SCRAM-SHA-512', 'PLAIN'], required: true }, { - name: 'sasl.username', + name: 'sasl_username', display: 'SASL username', type: 'string', required: true }, { - name: 'sasl.password', + name: 'sasl_password', display: 'SASL password', type: 'string', - required: true + required: true, + placeholder: 'password' + }, + { + name: 'tls_enabled', + display: 'TLS enabled', + type: 'select', + options: ['custom', 'default', 'none'], + required: true, + description: 'custom / default / none (no tls)', + children: true, + true: [ + { + name: 'ssl_key_pem', + display: 'SSL Key Pem', + type: 'string', + required: true, + placeholder: '-----BEGIN PRIVATE KEY----- \n...\n-----END PRIVATE KEY-----' + }, + { + name: 'ssl_certificate_pem', + display: 'SSL certificate pem', + type: 'string', + required: true, + placeholder: '-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----' + }, + { + name: 'ssl_ca_pem', + display: 'SSL CA PEM', + type: 'string', + required: true, + placeholder: '-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----' + }, + { + name:'insecure_skip_verify', + display:'Insecure skip verify', + type:'select', + options:['true','false'], + required:true, + description:'true / false' + } + ], + false: [], + none: [] } ], 'No authentication': [] @@ -228,28 +299,51 @@ export const kafka = { name: 'partition_strategy', display: 'Partition strategy', type: 'select', - options: ['Partition Key', 'Partition Number', 'Any Partition'], + options: ['Partition Number', 'Partition Key', 'Any Partition'], required: true, - description: 'Partition Key / Partition Number / Any Partition', + description: 'Partition Number / Partition Key / Any Partition (round robin)', children: true, - 'Partition Key': [ + 'Partition Number': [ { name: 'partition_value', - display: 'Value', + display: 'Partition Value', type: 'string', required: true } ], - 'Partition Number': [ + 'Partition Key': [ { name: 'partition_value', - display: 'Value', + display: 'Partition Key', type: 'string', required: true } ], 'Any Partition': [] }, + { + name: 'flush_msg_number', + display: 'Flush Message Number', + type: 'string', + required: false, + placeholder: 100, + description: 'The buffer size used by Kafka Producer (in messages)' + }, + { + name: 'flush_frequency', + display: 'Flush Timeout Duration (Milliseconds)', + placeholder: 1, + type: 'string', + required: false, + description: 'The wait time before flushing the buffer (in milliseconds)' + }, + { + name: 'consume_from', + display: 'Start consume from the beginning / end', + type: 'select', + options: ['Beginning', 'End'], + description: 'Beginning (oldest messages) / End (newest messages) of the station', + }, { name: 'memphis_batch_size', display: 'Batch size (messages)', @@ -260,7 +354,7 @@ export const kafka = { }, { name: 'memphis_max_time_wait', - display: 'Batch Message Timeout Duration (Seconds)', + display: 'Batch Message Timeout Duration (Milliseconds)', placeholder: 2, type: 'string', required: false, diff --git a/ui_src/src/connectors/memphis.js b/ui_src/src/connectors/memphis.js index 2d305422c..566d92ab3 100644 --- a/ui_src/src/connectors/memphis.js +++ b/ui_src/src/connectors/memphis.js @@ -34,6 +34,13 @@ export const memphis = { } ] }, + { + name: 'consume_from', + display: 'Start consume from the beginning / end', + type: 'select', + options: ['Beginning', 'End'], + description: 'Beginning (oldest messages) / End (newest messages) of the station', + }, { name: 'dest_station_config', display: 'Destination Station Config', @@ -93,6 +100,14 @@ export const memphis = { } ] }, + { + name: 'memphis_max_time_wait', + display: 'Max Time Wait (Milliseconds)', + type: 'string', + required: false, + placeholder: 500, + description: 'The maximum time to wait for messages batch to be filled' + }, { name: 'instances', display: 'Scale (instances)', diff --git a/ui_src/src/connectors/redis.js b/ui_src/src/connectors/redis.js index ace249bac..036f63ad2 100644 --- a/ui_src/src/connectors/redis.js +++ b/ui_src/src/connectors/redis.js @@ -70,6 +70,13 @@ export const redis = { type: 'string', required: true }, + { + name: 'consume_from', + display: 'Start consume from the beginning / end', + type: 'select', + options: ['Beginning', 'End'], + description: 'Beginning (oldest messages) / End (newest messages) of the station', + }, { name: 'memphis_batch_size', display: 'Batch size (messages)', @@ -80,7 +87,7 @@ export const redis = { }, { name: 'memphis_max_time_wait', - display: 'Batch Message Timeout Duration (Seconds)', + display: 'Batch Message Timeout Duration (Milliseconds)', placeholder: 2, type: 'string', required: false,