diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 035be9b63e22..0dd3a5a2fea3 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971] - Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961] - Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770] +- Support for Kafka 2.0.0 in kafka output {pull}8399[8399] *Auditbeat* @@ -113,6 +114,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Added 'died' PID state to process_system metricset on system module {pull}8275[8275] - Add `metrics` metricset to MongoDB module. {pull}7611[7611] - Added `ccr` metricset to Elasticsearch module. {pull}8335[8335] +- Support for Kafka 2.0.0 {pull}8399[8399] - Added support for query params in configuration {issue}8286[8286] {pull}8292[8292] *Packetbeat* diff --git a/NOTICE.txt b/NOTICE.txt index b1eb9c7eece8..8af108741821 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -2175,8 +2175,8 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------- Dependency: github.com/Shopify/sarama -Version: v1.17.0/enh/offset-replica-id -Revision: d1575e4abe04acbbe8ac766320585cdf271dd189 +Version: =v1.18.0/enh/offset-replica-id +Revision: 0143592836b090a1b481def4d902cfb3c5c05ae5 License type (autodetected): MIT ./vendor/github.com/Shopify/sarama/LICENSE: -------------------------------------------------------------------- diff --git a/libbeat/outputs/kafka/version.go b/libbeat/common/kafka/version.go similarity index 75% rename from libbeat/outputs/kafka/version.go rename to libbeat/common/kafka/version.go index 884e67616a20..6cc8d9cb369a 100644 --- a/libbeat/outputs/kafka/version.go +++ b/libbeat/common/kafka/version.go @@ -17,7 +17,14 @@ package kafka -import "github.com/Shopify/sarama" +import ( + "fmt" + + "github.com/Shopify/sarama" +) + +// Version is a kafka version +type Version string // TODO: remove me. // Compat version overwrite for missing versions in sarama @@ -31,8 +38,6 @@ var ( v1_1_1 = parseKafkaVersion("1.1.1") kafkaVersions = map[string]sarama.KafkaVersion{ - "": sarama.V1_0_0_0, - "0.8.2.0": sarama.V0_8_2_0, "0.8.2.1": sarama.V0_8_2_1, "0.8.2.2": sarama.V0_8_2_2, @@ -68,6 +73,10 @@ var ( "1.1.1": v1_1_1, "1.1": v1_1_1, "1": v1_1_1, + + "2.0.0": sarama.V2_0_0_0, + "2.0": sarama.V2_0_0_0, + "2": sarama.V2_0_0_0, } ) @@ -78,3 +87,29 @@ func parseKafkaVersion(s string) sarama.KafkaVersion { } return v } + +// Validate that a kafka version is among the possible options +func (v *Version) Validate() error { + if _, ok := kafkaVersions[string(*v)]; !ok { + return fmt.Errorf("unknown/unsupported kafka vesion '%v'", *v) + } + + return nil +} + +// Unpack a kafka version +func (v *Version) Unpack(s string) error { + tmp := Version(s) + if err := tmp.Validate(); err != nil { + return err + } + + *v = tmp + return nil +} + +// Get a sarama kafka version +func (v Version) Get() (sarama.KafkaVersion, bool) { + kv, ok := kafkaVersions[string(v)] + return kv, ok +} diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index 735a5cecb005..5155775dc4a3 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -674,7 +674,7 @@ NOTE: Events bigger than <> will be ==== Compatibility -This output works with all Kafka in between 0.11 and 1.1.1. Older versions +This output works with all Kafka versions in between 0.11 and 2.0.0. Older versions might work as well, but are not supported. ==== Configuration options diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 956a8bd8cb4b..9cf81d342611 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/elastic/beats/libbeat/common/kafka" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" @@ -48,7 +49,7 @@ type kafkaConfig struct { BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"` Compression string `config:"compression"` CompressionLevel int `config:"compression_level"` - Version string `config:"version"` + Version kafka.Version `config:"version"` BulkMaxSize int `config:"bulk_max_size"` MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` ClientID string `config:"client_id"` @@ -96,7 +97,7 @@ func defaultConfig() kafkaConfig { BrokerTimeout: 10 * time.Second, Compression: "gzip", CompressionLevel: 4, - Version: "1.0.0", + Version: kafka.Version("1.0.0"), MaxRetries: 3, ClientID: "beats", ChanBufferSize: 256, @@ -114,8 +115,8 @@ func (c *kafkaConfig) Validate() error { return fmt.Errorf("compression mode '%v' unknown", c.Compression) } - if _, ok := kafkaVersions[c.Version]; !ok { - return fmt.Errorf("unknown/unsupported kafka version '%v'", c.Version) + if err := c.Version.Validate(); err != nil { + return err } if c.Username != "" && c.Password == "" { @@ -200,7 +201,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { // configure client ID k.ClientID = config.ClientID - version, ok := kafkaVersions[config.Version] + version, ok := config.Version.Get() if !ok { return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version) } diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 039d8b8000ce..d9f9e86ac18c 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -33,13 +33,6 @@ import ( "github.com/elastic/beats/libbeat/outputs/outil" ) -type kafka struct { - config kafkaConfig - topic outil.Selector - - partitioner sarama.PartitionerConstructor -} - const ( defaultWaitRetry = 1 * time.Second diff --git a/metricbeat/docker-compose.yml b/metricbeat/docker-compose.yml index 7c461b9d4ba9..9758e4236a0a 100644 --- a/metricbeat/docker-compose.yml +++ b/metricbeat/docker-compose.yml @@ -98,12 +98,20 @@ services: kafka: build: context: ./module/kafka/_meta - dockerfile: Dockerfile.1.1.0 + args: + KAFKA_VERSION: 2.0.0 + + kafka_1_1_0: + build: + context: ./module/kafka/_meta + args: + KAFKA_VERSION: 1.1.0 kafka_0_10_2: build: context: ./module/kafka/_meta - dockerfile: Dockerfile.0.10.2 + args: + KAFKA_VERSION: 0.10.2.1 kibana: build: ./module/kibana/_meta diff --git a/metricbeat/docs/modules/kafka.asciidoc b/metricbeat/docs/modules/kafka.asciidoc index b9b472645fe7..c83a1f850e26 100644 --- a/metricbeat/docs/modules/kafka.asciidoc +++ b/metricbeat/docs/modules/kafka.asciidoc @@ -14,7 +14,7 @@ The default metricsets are `consumergroup` and `partition`. [float] === Compability -This module is tested with Kafka 0.10.2 and 1.1.0. +This module is tested with Kafka 0.10.2.1, 1.1.0 and 2.0.0. [float] diff --git a/metricbeat/module/kafka/_meta/Dockerfile.1.1.0 b/metricbeat/module/kafka/_meta/Dockerfile similarity index 97% rename from metricbeat/module/kafka/_meta/Dockerfile.1.1.0 rename to metricbeat/module/kafka/_meta/Dockerfile index 54f0d105f12e..aab10dbed24b 100644 --- a/metricbeat/module/kafka/_meta/Dockerfile.1.1.0 +++ b/metricbeat/module/kafka/_meta/Dockerfile @@ -1,9 +1,10 @@ FROM debian:stretch +ARG KAFKA_VERSION=2.0.0 + ENV KAFKA_HOME /kafka # The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it ENV KAFKA_LOGS_DIR="/kafka-logs" -ENV KAFKA_VERSION 1.1.0 ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true" ENV TERM=linux diff --git a/metricbeat/module/kafka/_meta/Dockerfile.0.10.2 b/metricbeat/module/kafka/_meta/Dockerfile.0.10.2 deleted file mode 100644 index 52ae3ff0339b..000000000000 --- a/metricbeat/module/kafka/_meta/Dockerfile.0.10.2 +++ /dev/null @@ -1,25 +0,0 @@ -FROM debian:stretch - -ENV KAFKA_HOME /kafka -# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it -ENV KAFKA_LOGS_DIR="/kafka-logs" -ENV KAFKA_VERSION 0.10.2.1 -ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true" -ENV TERM=linux - -RUN apt-get update && apt-get install -y curl openjdk-8-jre-headless netcat dnsutils - -RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && curl -s -o $INSTALL_DIR/kafka.tgz \ - "http://ftp.wayne.edu/apache/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz" && \ - tar xzf ${INSTALL_DIR}/kafka.tgz -C ${KAFKA_HOME} --strip-components 1 - -ADD run.sh /run.sh -ADD healthcheck.sh /healthcheck.sh - -EXPOSE 9092 -EXPOSE 2181 - -# Healthcheck creates an empty topic foo. As soon as a topic is created, it assumes broke is available -HEALTHCHECK --interval=1s --retries=90 CMD /healthcheck.sh - -ENTRYPOINT ["/run.sh"] diff --git a/metricbeat/module/kafka/_meta/docs.asciidoc b/metricbeat/module/kafka/_meta/docs.asciidoc index 05e52c5369ce..aa228583dbc4 100644 --- a/metricbeat/module/kafka/_meta/docs.asciidoc +++ b/metricbeat/module/kafka/_meta/docs.asciidoc @@ -5,4 +5,4 @@ The default metricsets are `consumergroup` and `partition`. [float] === Compability -This module is tested with Kafka 0.10.2 and 1.1.0. +This module is tested with Kafka 0.10.2.1, 1.1.0 and 2.0.0. diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index dd5a15b47a88..1efff5cd1f2c 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -29,8 +29,14 @@ import ( "github.com/Shopify/sarama" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kafka" ) +// Version returns a kafka version from its string representation +func Version(version string) kafka.Version { + return kafka.Version(version) +} + // Broker provides functionality for communicating with a single kafka broker type Broker struct { broker *sarama.Broker @@ -50,7 +56,7 @@ type BrokerSettings struct { Backoff time.Duration TLS *tls.Config Username, Password string - Version Version + Version kafka.Version } type GroupDescription struct { @@ -83,7 +89,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker { cfg.Net.SASL.User = user cfg.Net.SASL.Password = settings.Password } - cfg.Version = settings.Version.get() + cfg.Version, _ = settings.Version.Get() return &Broker{ broker: sarama.NewBroker(host), diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go index 2aec964f2e91..713e27e5df95 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -86,7 +86,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { Password: config.Password, // consumer groups API requires at least 0.9.0.0 - Version: kafka.Version{String: "0.9.0.0"}, + Version: kafka.Version("0.9.0.0"), } return &MetricSet{ diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index a2318fcfc90a..e9cbbf9154ec 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -83,6 +83,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { TLS: tls, Username: config.Username, Password: config.Password, + Version: kafka.Version("0.8.2.0"), } return &MetricSet{ diff --git a/metricbeat/module/kafka/version.go b/metricbeat/module/kafka/version.go deleted file mode 100644 index 01ba381a962e..000000000000 --- a/metricbeat/module/kafka/version.go +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package kafka - -import ( - "fmt" - - "github.com/Shopify/sarama" -) - -type Version struct { - String string -} - -var ( - minVersion = sarama.V0_8_2_0 - - kafkaVersions = map[string]sarama.KafkaVersion{ - "": sarama.V0_8_2_0, - - "0.8.2.0": sarama.V0_8_2_0, - "0.8.2.1": sarama.V0_8_2_1, - "0.8.2.2": sarama.V0_8_2_2, - "0.8.2": sarama.V0_8_2_2, - "0.8": sarama.V0_8_2_2, - - "0.9.0.0": sarama.V0_9_0_0, - "0.9.0.1": sarama.V0_9_0_1, - "0.9.0": sarama.V0_9_0_1, - "0.9": sarama.V0_9_0_1, - - "0.10.0.0": sarama.V0_10_0_0, - "0.10.0.1": sarama.V0_10_0_1, - "0.10.0": sarama.V0_10_0_1, - "0.10.1.0": sarama.V0_10_1_0, - "0.10.1": sarama.V0_10_1_0, - "0.10": sarama.V0_10_1_0, - } -) - -func (v *Version) Validate() error { - if _, ok := kafkaVersions[v.String]; !ok { - return fmt.Errorf("unknown/unsupported kafka vesion '%v'", v.String) - } - return nil -} - -func (v *Version) Unpack(s string) error { - tmp := Version{s} - if err := tmp.Validate(); err != nil { - return err - } - - *v = tmp - return nil -} - -func (v *Version) get() sarama.KafkaVersion { - if v, ok := kafkaVersions[v.String]; ok { - return v - } - - return minVersion -} diff --git a/metricbeat/tests/system/test_kafka.py b/metricbeat/tests/system/test_kafka.py index 4a21ee3d0605..a7ba2d26e288 100644 --- a/metricbeat/tests/system/test_kafka.py +++ b/metricbeat/tests/system/test_kafka.py @@ -7,6 +7,7 @@ class KafkaTest(metricbeat.BaseTest): COMPOSE_SERVICES = ['kafka'] + VERSION = "2.0.0" @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") def test_partition(self): @@ -20,7 +21,8 @@ def test_partition(self): "name": "kafka", "metricsets": ["partition"], "hosts": self.get_hosts(), - "period": "1s" + "period": "1s", + "version": self.VERSION, }]) proc = self.start_beat() self.wait_until(lambda: self.output_lines() > 0, max_timeout=20) @@ -45,5 +47,11 @@ def get_hosts(self): os.getenv('KAFKA_PORT', '9092')] +class Kafka_1_1_0_Test(KafkaTest): + COMPOSE_SERVICES = ['kafka_1_1_0'] + VERSION = "1.1.0" + + class Kafka_0_10_2_Test(KafkaTest): COMPOSE_SERVICES = ['kafka_0_10_2'] + VERSION = "0.10.2" diff --git a/testing/environments/docker/kafka/Dockerfile b/testing/environments/docker/kafka/Dockerfile index cda937e25873..581513efcbda 100644 --- a/testing/environments/docker/kafka/Dockerfile +++ b/testing/environments/docker/kafka/Dockerfile @@ -4,7 +4,7 @@ ENV KAFKA_HOME /kafka # The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it ENV KAFKA_ADVERTISED_HOST kafka ENV KAFKA_LOGS_DIR="/kafka-logs" -ENV KAFKA_VERSION 1.1.1 +ENV KAFKA_VERSION 2.0.0 ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true" ENV TERM=linux diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md index 16d5829c9953..57a2bf9ba20c 100644 --- a/vendor/github.com/Shopify/sarama/CHANGELOG.md +++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md @@ -1,5 +1,41 @@ # Changelog +#### Version 1.18.0 (2018-09-07) + +New Features: + - Make `Partitioner.RequiresConsistency` vary per-message + ([#1112](https://github.com/Shopify/sarama/pull/1112)). + - Add customizable partitioner + ([#1118](https://github.com/Shopify/sarama/pull/1118)). + - Add `ClusterAdmin` support for `CreateTopic`, `DeleteTopic`, `CreatePartitions`, + `DeleteRecords`, `DescribeConfig`, `AlterConfig`, `CreateACL`, `ListAcls`, `DeleteACL` + ([#1055](https://github.com/Shopify/sarama/pull/1055)). + +Improvements: + - Add support for Kafka 2.0.0 + ([#1149](https://github.com/Shopify/sarama/pull/1149)). + - Allow setting `LocalAddr` when dialing an address to support multi-homed hosts + ([#1123](https://github.com/Shopify/sarama/pull/1123)). + - Simpler offset management + ([#1127](https://github.com/Shopify/sarama/pull/1127)). + +Bug Fixes: + - Fix mutation of `ProducerMessage.MetaData` when producing to Kafka + ([#1110](https://github.com/Shopify/sarama/pull/1110)). + - Fix consumer block when response did not contain all the + expected topic/partition blocks + ([#1086](https://github.com/Shopify/sarama/pull/1086)). + - Fix consumer block when response contains only constrol messages + ([#1115](https://github.com/Shopify/sarama/pull/1115)). + - Add timeout config for ClusterAdmin requests + ([#1142](https://github.com/Shopify/sarama/pull/1142)). + - Add version check when producing message with headers + ([#1117](https://github.com/Shopify/sarama/pull/1117)). + - Fix `MetadataRequest` for empty list of topics + ([#1132](https://github.com/Shopify/sarama/pull/1132)). + - Fix producer topic metadata on-demand fetch when topic error happens in metadata response + ([#1125](https://github.com/Shopify/sarama/pull/1125)). + #### Version 1.17.0 (2018-05-30) New Features: diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go index 1eff81cbf621..897225540928 100644 --- a/vendor/github.com/Shopify/sarama/async_producer.go +++ b/vendor/github.com/Shopify/sarama/async_producer.go @@ -145,8 +145,9 @@ type ProducerMessage struct { // least version 0.10.0. Timestamp time.Time - retries int - flags flagSet + retries int + flags flagSet + expectation chan *ProducerError } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -270,6 +271,9 @@ func (p *asyncProducer) dispatcher() { version := 1 if p.conf.Version.IsAtLeast(V0_11_0_0) { version = 2 + } else if msg.Headers != nil { + p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11")) + continue } if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes { p.returnError(msg, ErrMessageSizeTooLarge) @@ -343,7 +347,14 @@ func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { var partitions []int32 err := tp.breaker.Run(func() (err error) { - if tp.partitioner.RequiresConsistency() { + var requiresConsistency = false + if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok { + requiresConsistency = ep.MessageRequiresConsistency(msg) + } else { + requiresConsistency = tp.partitioner.RequiresConsistency() + } + + if requiresConsistency { partitions, err = tp.parent.client.Partitions(msg.Topic) } else { partitions, err = tp.parent.client.WritablePartitions(msg.Topic) diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go index d836bee6d86a..26f63d51d6d8 100644 --- a/vendor/github.com/Shopify/sarama/broker.go +++ b/vendor/github.com/Shopify/sarama/broker.go @@ -86,6 +86,7 @@ func (b *Broker) Open(conf *Config) error { dialer := net.Dialer{ Timeout: conf.Net.DialTimeout, KeepAlive: conf.Net.KeepAlive, + LocalAddr: conf.Net.LocalAddr, } if conf.Net.TLS.Enable { @@ -386,8 +387,8 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, return response, nil } -func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) { - response := new(CreatePartitionsResponse) +func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) { + response := new(CreateTopicsResponse) err := b.sendAndReceive(request, response) if err != nil { @@ -397,8 +398,8 @@ func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePart return response, nil } -func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) { - response := new(CreateTopicsResponse) +func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) { + response := new(DeleteTopicsResponse) err := b.sendAndReceive(request, response) if err != nil { @@ -408,8 +409,8 @@ func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsRespon return response, nil } -func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) { - response := new(DeleteTopicsResponse) +func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) { + response := new(CreatePartitionsResponse) err := b.sendAndReceive(request, response) if err != nil { diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go index 019cb43735ae..ad805346b4b3 100644 --- a/vendor/github.com/Shopify/sarama/client.go +++ b/vendor/github.com/Shopify/sarama/client.go @@ -17,7 +17,7 @@ type Client interface { // altered after it has been created. Config() *Config - // Controller returns the cluster controller broker. + // Controller returns the cluster controller broker. Requires Kafka 0.10 or higher. Controller() (*Broker, error) // Brokers returns the current set of active brokers as retrieved from cluster metadata. @@ -100,10 +100,11 @@ type client struct { seedBrokers []*Broker deadSeeds []*Broker - controllerID int32 // cluster controller broker id - brokers map[int32]*Broker // maps broker ids to brokers - metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata - coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs + controllerID int32 // cluster controller broker id + brokers map[int32]*Broker // maps broker ids to brokers + metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata + metadataTopics map[string]none // topics that need to collect metadata + coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs // If the number of partitions is large, we can get some churn calling cachedPartitions, // so the result is cached. It is important to update this value whenever metadata is changed @@ -136,6 +137,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) { closed: make(chan none), brokers: make(map[int32]*Broker), metadata: make(map[string]map[int32]*PartitionMetadata), + metadataTopics: make(map[string]none), cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32), coordinators: make(map[string]int32), } @@ -207,6 +209,7 @@ func (client *client) Close() error { client.brokers = nil client.metadata = nil + client.metadataTopics = nil return nil } @@ -231,6 +234,22 @@ func (client *client) Topics() ([]string, error) { return ret, nil } +func (client *client) MetadataTopics() ([]string, error) { + if client.Closed() { + return nil, ErrClosedClient + } + + client.lock.RLock() + defer client.lock.RUnlock() + + ret := make([]string, 0, len(client.metadataTopics)) + for topic := range client.metadataTopics { + ret = append(ret, topic) + } + + return ret, nil +} + func (client *client) Partitions(topic string) ([]int32, error) { if client.Closed() { return nil, ErrClosedClient @@ -388,6 +407,10 @@ func (client *client) Controller() (*Broker, error) { return nil, ErrClosedClient } + if !client.conf.Version.IsAtLeast(V0_10_0_0) { + return nil, ErrUnsupportedVersion + } + controller := client.cachedController() if controller == nil { if err := client.refreshMetadata(); err != nil { @@ -645,7 +668,7 @@ func (client *client) refreshMetadata() error { topics := []string{} if !client.conf.Metadata.Full { - if specificTopics, err := client.Topics(); err != nil { + if specificTopics, err := client.MetadataTopics(); err != nil { return err } else if len(specificTopics) == 0 { return ErrNoTopicsToUpdateMetadata @@ -728,9 +751,16 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo if allKnownMetaData { client.metadata = make(map[string]map[int32]*PartitionMetadata) + client.metadataTopics = make(map[string]none) client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32) } for _, topic := range data.Topics { + // topics must be added firstly to `metadataTopics` to guarantee that all + // requested topics must be recorded to keep them trackable for periodically + // metadata refresh. + if _, exists := client.metadataTopics[topic.Name]; !exists { + client.metadataTopics[topic.Name] = none{} + } delete(client.metadata, topic.Name) delete(client.cachedPartitionsResults, topic.Name) diff --git a/vendor/github.com/Shopify/sarama/config.go b/vendor/github.com/Shopify/sarama/config.go index a564b5c23e4b..08f53308411b 100644 --- a/vendor/github.com/Shopify/sarama/config.go +++ b/vendor/github.com/Shopify/sarama/config.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "fmt" "io/ioutil" + "net" "regexp" "time" @@ -17,6 +18,13 @@ var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) // Config is used to pass multiple configuration options to Sarama's constructors. type Config struct { + // Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client. + Admin struct { + // The maximum duration the administrative Kafka client will wait for ClusterAdmin operations, + // including topics, brokers, configurations and ACLs (defaults to 3 seconds). + Timeout time.Duration + } + // Net is the namespace for network-level properties used by the Broker, and // shared by the Client/Producer/Consumer. Net struct { @@ -58,6 +66,12 @@ type Config struct { // KeepAlive specifies the keep-alive period for an active network connection. // If zero, keep-alives are disabled. (default is 0: disabled). KeepAlive time.Duration + + // LocalAddr is the local address to use when dialing an + // address. The address must be of a compatible type for the + // network being dialed. + // If nil, a local address is automatically chosen. + LocalAddr net.Addr } // Metadata is the namespace for metadata management properties used by the @@ -248,6 +262,12 @@ type Config struct { // broker version 0.9.0 or later. // (default is 0: disabled). Retention time.Duration + + Retry struct { + // The total number of times to retry failing commit + // requests during OffsetManager shutdown (default 3). + Max int + } } } @@ -279,6 +299,8 @@ type Config struct { func NewConfig() *Config { c := &Config{} + c.Admin.Timeout = 3 * time.Second + c.Net.MaxOpenRequests = 5 c.Net.DialTimeout = 30 * time.Second c.Net.ReadTimeout = 30 * time.Second @@ -307,6 +329,7 @@ func NewConfig() *Config { c.Consumer.Return.Errors = false c.Consumer.Offsets.CommitInterval = 1 * time.Second c.Consumer.Offsets.Initial = OffsetNewest + c.Consumer.Offsets.Retry.Max = 3 c.ClientID = defaultClientID c.ChannelBufferSize = 256 @@ -377,6 +400,12 @@ func (c *Config) Validate() error { return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled") } + // validate the Admin values + switch { + case c.Admin.Timeout <= 0: + return ConfigurationError("Admin.Timeout must be > 0") + } + // validate the Metadata values switch { case c.Metadata.Retry.Max < 0: @@ -443,7 +472,8 @@ func (c *Config) Validate() error { return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0") case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest: return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest") - + case c.Consumer.Offsets.Retry.Max < 0: + return ConfigurationError("Consumer.Offsets.Retry.Max must be >= 0") } // validate misc shared values diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go index 96226ac5bf52..33d9d143f917 100644 --- a/vendor/github.com/Shopify/sarama/consumer.go +++ b/vendor/github.com/Shopify/sarama/consumer.go @@ -531,7 +531,7 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes child.offset = offset + 1 } if len(messages) == 0 { - return nil, ErrIncompleteResponse + child.offset += 1 } return messages, nil } @@ -579,10 +579,6 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu messages := []*ConsumerMessage{} for _, records := range block.RecordsSet { - if control, err := records.isControl(); err != nil || control { - continue - } - switch records.recordsType { case legacyRecords: messageSetMessages, err := child.parseMessages(records.MsgSet) @@ -596,6 +592,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu if err != nil { return nil, err } + if control, err := records.isControl(); err != nil || control { + continue + } messages = append(messages, recordBatchMessages...) default: diff --git a/vendor/github.com/Shopify/sarama/dev.yml b/vendor/github.com/Shopify/sarama/dev.yml index 294fcdb413b8..bc8c4e45294f 100644 --- a/vendor/github.com/Shopify/sarama/dev.yml +++ b/vendor/github.com/Shopify/sarama/dev.yml @@ -2,7 +2,7 @@ name: sarama up: - go: - version: '1.9' + version: '1.10' commands: test: diff --git a/vendor/github.com/Shopify/sarama/fetch_response.go b/vendor/github.com/Shopify/sarama/fetch_response.go index ae91bb9eb097..dade1c47dac2 100644 --- a/vendor/github.com/Shopify/sarama/fetch_response.go +++ b/vendor/github.com/Shopify/sarama/fetch_response.go @@ -104,15 +104,26 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) return err } - // If we have at least one full records, we skip incomplete ones - if partial && len(b.RecordsSet) > 0 { - break + n, err := records.numRecords() + if err != nil { + return err } - b.RecordsSet = append(b.RecordsSet, records) + if n > 0 || (partial && len(b.RecordsSet) == 0) { + b.RecordsSet = append(b.RecordsSet, records) + + if b.Records == nil { + b.Records = records + } + } - if b.Records == nil { - b.Records = records + overflow, err := records.isOverflow() + if err != nil { + return err + } + + if partial || overflow { + break } } diff --git a/vendor/github.com/Shopify/sarama/length_field.go b/vendor/github.com/Shopify/sarama/length_field.go index 576b1a6f6f8d..da199a70a0e5 100644 --- a/vendor/github.com/Shopify/sarama/length_field.go +++ b/vendor/github.com/Shopify/sarama/length_field.go @@ -5,6 +5,19 @@ import "encoding/binary" // LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths. type lengthField struct { startOffset int + length int32 +} + +func (l *lengthField) decode(pd packetDecoder) error { + var err error + l.length, err = pd.getInt32() + if err != nil { + return err + } + if l.length > int32(pd.remaining()) { + return ErrInsufficientData + } + return nil } func (l *lengthField) saveOffset(in int) { @@ -21,7 +34,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error { } func (l *lengthField) check(curOffset int, buf []byte) error { - if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) { + if int32(curOffset-l.startOffset-4) != l.length { return PacketDecodingError{"length field invalid"} } diff --git a/vendor/github.com/Shopify/sarama/message_set.go b/vendor/github.com/Shopify/sarama/message_set.go index 27db52fdf1f7..600c7c4dfb7b 100644 --- a/vendor/github.com/Shopify/sarama/message_set.go +++ b/vendor/github.com/Shopify/sarama/message_set.go @@ -47,6 +47,7 @@ func (msb *MessageBlock) decode(pd packetDecoder) (err error) { type MessageSet struct { PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock + OverflowMessage bool // whether the set on the wire contained an overflow message Messages []*MessageBlock } @@ -85,7 +86,12 @@ func (ms *MessageSet) decode(pd packetDecoder) (err error) { case ErrInsufficientData: // As an optimization the server is allowed to return a partial message at the // end of the message set. Clients should handle this case. So we just ignore such things. - ms.PartialTrailingMessage = true + if msb.Offset == -1 { + // This is an overflow message caused by chunked down conversion + ms.OverflowMessage = true + } else { + ms.PartialTrailingMessage = true + } return nil default: return err diff --git a/vendor/github.com/Shopify/sarama/metadata_request.go b/vendor/github.com/Shopify/sarama/metadata_request.go index 48adfa28cb93..17dc4289a3a9 100644 --- a/vendor/github.com/Shopify/sarama/metadata_request.go +++ b/vendor/github.com/Shopify/sarama/metadata_request.go @@ -10,7 +10,7 @@ func (r *MetadataRequest) encode(pe packetEncoder) error { if r.Version < 0 || r.Version > 5 { return PacketEncodingError{"invalid or unsupported MetadataRequest version field"} } - if r.Version == 0 || r.Topics != nil || len(r.Topics) > 0 { + if r.Version == 0 || len(r.Topics) > 0 { err := pe.putArrayLength(len(r.Topics)) if err != nil { return err diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go index 5541d32ec69c..1720441996fd 100644 --- a/vendor/github.com/Shopify/sarama/mockresponses.go +++ b/vendor/github.com/Shopify/sarama/mockresponses.go @@ -538,3 +538,190 @@ func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder { } return res } + +type MockCreateTopicsResponse struct { + t TestReporter +} + +func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse { + return &MockCreateTopicsResponse{t: t} +} + +func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*CreateTopicsRequest) + res := &CreateTopicsResponse{} + res.TopicErrors = make(map[string]*TopicError) + + for topic, _ := range req.TopicDetails { + res.TopicErrors[topic] = &TopicError{Err: ErrNoError} + } + return res +} + +type MockDeleteTopicsResponse struct { + t TestReporter +} + +func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse { + return &MockDeleteTopicsResponse{t: t} +} + +func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*DeleteTopicsRequest) + res := &DeleteTopicsResponse{} + res.TopicErrorCodes = make(map[string]KError) + + for _, topic := range req.Topics { + res.TopicErrorCodes[topic] = ErrNoError + } + return res +} + +type MockCreatePartitionsResponse struct { + t TestReporter +} + +func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse { + return &MockCreatePartitionsResponse{t: t} +} + +func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*CreatePartitionsRequest) + res := &CreatePartitionsResponse{} + res.TopicPartitionErrors = make(map[string]*TopicPartitionError) + + for topic, _ := range req.TopicPartitions { + res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError} + } + return res +} + +type MockDeleteRecordsResponse struct { + t TestReporter +} + +func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse { + return &MockDeleteRecordsResponse{t: t} +} + +func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*DeleteRecordsRequest) + res := &DeleteRecordsResponse{} + res.Topics = make(map[string]*DeleteRecordsResponseTopic) + + for topic, deleteRecordRequestTopic := range req.Topics { + partitions := make(map[int32]*DeleteRecordsResponsePartition) + for partition, _ := range deleteRecordRequestTopic.PartitionOffsets { + partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError} + } + res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions} + } + return res +} + +type MockDescribeConfigsResponse struct { + t TestReporter +} + +func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse { + return &MockDescribeConfigsResponse{t: t} +} + +func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*DescribeConfigsRequest) + res := &DescribeConfigsResponse{} + + var configEntries []*ConfigEntry + configEntries = append(configEntries, &ConfigEntry{Name: "my_topic", + Value: "my_topic", + ReadOnly: true, + Default: true, + Sensitive: false, + }) + + for _, r := range req.Resources { + res.Resources = append(res.Resources, &ResourceResponse{Name: r.Name, Configs: configEntries}) + } + return res +} + +type MockAlterConfigsResponse struct { + t TestReporter +} + +func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse { + return &MockAlterConfigsResponse{t: t} +} + +func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*AlterConfigsRequest) + res := &AlterConfigsResponse{} + + for _, r := range req.Resources { + res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name, + Type: TopicResource, + ErrorMsg: "", + }) + } + return res +} + +type MockCreateAclsResponse struct { + t TestReporter +} + +func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse { + return &MockCreateAclsResponse{t: t} +} + +func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*CreateAclsRequest) + res := &CreateAclsResponse{} + + for range req.AclCreations { + res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError}) + } + return res +} + +type MockListAclsResponse struct { + t TestReporter +} + +func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse { + return &MockListAclsResponse{t: t} +} + +func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*DescribeAclsRequest) + res := &DescribeAclsResponse{} + + res.Err = ErrNoError + acl := &ResourceAcls{} + acl.Resource.ResourceName = *req.ResourceName + acl.Resource.ResourceType = req.ResourceType + acl.Acls = append(acl.Acls, &Acl{}) + res.ResourceAcls = append(res.ResourceAcls, acl) + + return res +} + +type MockDeleteAclsResponse struct { + t TestReporter +} + +func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse { + return &MockDeleteAclsResponse{t: t} +} + +func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*DeleteAclsRequest) + res := &DeleteAclsResponse{} + + for range req.Filters { + response := &FilterResponse{Err: ErrNoError} + response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError}) + res.FilterResponses = append(res.FilterResponses, response) + } + return res +} diff --git a/vendor/github.com/Shopify/sarama/offset_manager.go b/vendor/github.com/Shopify/sarama/offset_manager.go index 6c01f959e99f..2a1a89f3affb 100644 --- a/vendor/github.com/Shopify/sarama/offset_manager.go +++ b/vendor/github.com/Shopify/sarama/offset_manager.go @@ -25,10 +25,17 @@ type offsetManager struct { client Client conf *Config group string + ticker *time.Ticker - lock sync.Mutex - poms map[string]map[int32]*partitionOffsetManager - boms map[*Broker]*brokerOffsetManager + broker *Broker + brokerLock sync.RWMutex + + poms map[string]map[int32]*partitionOffsetManager + pomsLock sync.Mutex + + closeOnce sync.Once + closing chan none + closed chan none } // NewOffsetManagerFromClient creates a new OffsetManager from the given client. @@ -39,13 +46,18 @@ func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, err return nil, ErrClosedClient } + conf := client.Config() om := &offsetManager{ client: client, - conf: client.Config(), + conf: conf, group: group, + ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval), poms: make(map[string]map[int32]*partitionOffsetManager), - boms: make(map[*Broker]*brokerOffsetManager), + + closing: make(chan none), + closed: make(chan none), } + go withRecover(om.mainLoop) return om, nil } @@ -56,8 +68,8 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti return nil, err } - om.lock.Lock() - defer om.lock.Unlock() + om.pomsLock.Lock() + defer om.pomsLock.Unlock() topicManagers := om.poms[topic] if topicManagers == nil { @@ -74,53 +86,293 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti } func (om *offsetManager) Close() error { + om.closeOnce.Do(func() { + // exit the mainLoop + close(om.closing) + <-om.closed + + // mark all POMs as closed + om.asyncClosePOMs() + + // flush one last time + for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ { + om.flushToBroker() + if om.releasePOMs(false) == 0 { + break + } + } + + om.releasePOMs(true) + om.brokerLock.Lock() + om.broker = nil + om.brokerLock.Unlock() + }) return nil } -func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager { - om.lock.Lock() - defer om.lock.Unlock() +func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) { + broker, err := om.coordinator() + if err != nil { + if retries <= 0 { + return 0, "", err + } + return om.fetchInitialOffset(topic, partition, retries-1) + } + + req := new(OffsetFetchRequest) + req.Version = 1 + req.ConsumerGroup = om.group + req.AddPartition(topic, partition) + + resp, err := broker.FetchOffset(req) + if err != nil { + if retries <= 0 { + return 0, "", err + } + om.releaseCoordinator(broker) + return om.fetchInitialOffset(topic, partition, retries-1) + } + + block := resp.GetBlock(topic, partition) + if block == nil { + return 0, "", ErrIncompleteResponse + } + + switch block.Err { + case ErrNoError: + return block.Offset, block.Metadata, nil + case ErrNotCoordinatorForConsumer: + if retries <= 0 { + return 0, "", block.Err + } + om.releaseCoordinator(broker) + return om.fetchInitialOffset(topic, partition, retries-1) + case ErrOffsetsLoadInProgress: + if retries <= 0 { + return 0, "", block.Err + } + select { + case <-om.closing: + return 0, "", block.Err + case <-time.After(om.conf.Metadata.Retry.Backoff): + } + return om.fetchInitialOffset(topic, partition, retries-1) + default: + return 0, "", block.Err + } +} + +func (om *offsetManager) coordinator() (*Broker, error) { + om.brokerLock.RLock() + broker := om.broker + om.brokerLock.RUnlock() + + if broker != nil { + return broker, nil + } + + om.brokerLock.Lock() + defer om.brokerLock.Unlock() + + if broker := om.broker; broker != nil { + return broker, nil + } + + if err := om.client.RefreshCoordinator(om.group); err != nil { + return nil, err + } + + broker, err := om.client.Coordinator(om.group) + if err != nil { + return nil, err + } + + om.broker = broker + return broker, nil +} + +func (om *offsetManager) releaseCoordinator(b *Broker) { + om.brokerLock.Lock() + if om.broker == b { + om.broker = nil + } + om.brokerLock.Unlock() +} + +func (om *offsetManager) mainLoop() { + defer om.ticker.Stop() + defer close(om.closed) + + for { + select { + case <-om.ticker.C: + om.flushToBroker() + om.releasePOMs(false) + case <-om.closing: + return + } + } +} + +func (om *offsetManager) flushToBroker() { + req := om.constructRequest() + if req == nil { + return + } + + broker, err := om.coordinator() + if err != nil { + om.handleError(err) + return + } + + resp, err := broker.CommitOffset(req) + if err != nil { + om.handleError(err) + om.releaseCoordinator(broker) + _ = broker.Close() + return + } + + om.handleResponse(broker, req, resp) +} + +func (om *offsetManager) constructRequest() *OffsetCommitRequest { + var r *OffsetCommitRequest + var perPartitionTimestamp int64 + if om.conf.Consumer.Offsets.Retention == 0 { + perPartitionTimestamp = ReceiveTime + r = &OffsetCommitRequest{ + Version: 1, + ConsumerGroup: om.group, + ConsumerGroupGeneration: GroupGenerationUndefined, + } + } else { + r = &OffsetCommitRequest{ + Version: 2, + RetentionTime: int64(om.conf.Consumer.Offsets.Retention / time.Millisecond), + ConsumerGroup: om.group, + ConsumerGroupGeneration: GroupGenerationUndefined, + } + + } + + om.pomsLock.Lock() + defer om.pomsLock.Unlock() - bom := om.boms[broker] - if bom == nil { - bom = om.newBrokerOffsetManager(broker) - om.boms[broker] = bom + for _, topicManagers := range om.poms { + for _, pom := range topicManagers { + pom.lock.Lock() + if pom.dirty { + r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata) + } + pom.lock.Unlock() + } } - bom.refs++ + if len(r.blocks) > 0 { + return r + } - return bom + return nil } -func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) { - om.lock.Lock() - defer om.lock.Unlock() +func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) { + om.pomsLock.Lock() + defer om.pomsLock.Unlock() + + for _, topicManagers := range om.poms { + for _, pom := range topicManagers { + if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil { + continue + } + + var err KError + var ok bool + + if resp.Errors[pom.topic] == nil { + pom.handleError(ErrIncompleteResponse) + continue + } + if err, ok = resp.Errors[pom.topic][pom.partition]; !ok { + pom.handleError(ErrIncompleteResponse) + continue + } + + switch err { + case ErrNoError: + block := req.blocks[pom.topic][pom.partition] + pom.updateCommitted(block.offset, block.metadata) + case ErrNotLeaderForPartition, ErrLeaderNotAvailable, + ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer: + // not a critical error, we just need to redispatch + om.releaseCoordinator(broker) + case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize: + // nothing we can do about this, just tell the user and carry on + pom.handleError(err) + case ErrOffsetsLoadInProgress: + // nothing wrong but we didn't commit, we'll get it next time round + break + case ErrUnknownTopicOrPartition: + // let the user know *and* try redispatching - if topic-auto-create is + // enabled, redispatching should trigger a metadata req and create the + // topic; if not then re-dispatching won't help, but we've let the user + // know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706) + fallthrough + default: + // dunno, tell the user and try redispatching + pom.handleError(err) + om.releaseCoordinator(broker) + } + } + } +} - bom.refs-- +func (om *offsetManager) handleError(err error) { + om.pomsLock.Lock() + defer om.pomsLock.Unlock() - if bom.refs == 0 { - close(bom.updateSubscriptions) - if om.boms[bom.broker] == bom { - delete(om.boms, bom.broker) + for _, topicManagers := range om.poms { + for _, pom := range topicManagers { + pom.handleError(err) } } } -func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) { - om.lock.Lock() - defer om.lock.Unlock() +func (om *offsetManager) asyncClosePOMs() { + om.pomsLock.Lock() + defer om.pomsLock.Unlock() - delete(om.boms, bom.broker) + for _, topicManagers := range om.poms { + for _, pom := range topicManagers { + pom.AsyncClose() + } + } } -func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManager) { - om.lock.Lock() - defer om.lock.Unlock() +// Releases/removes closed POMs once they are clean (or when forced) +func (om *offsetManager) releasePOMs(force bool) (remaining int) { + om.pomsLock.Lock() + defer om.pomsLock.Unlock() + + for topic, topicManagers := range om.poms { + for partition, pom := range topicManagers { + pom.lock.Lock() + releaseDue := pom.done && (force || !pom.dirty) + pom.lock.Unlock() - delete(om.poms[pom.topic], pom.partition) - if len(om.poms[pom.topic]) == 0 { - delete(om.poms, pom.topic) + if releaseDue { + pom.release() + + delete(om.poms[topic], partition) + if len(om.poms[topic]) == 0 { + delete(om.poms, topic) + } + } + } + remaining += len(om.poms[topic]) } + return } // Partition Offset Manager @@ -187,138 +439,26 @@ type partitionOffsetManager struct { offset int64 metadata string dirty bool - clean sync.Cond - broker *brokerOffsetManager + done bool - errors chan *ConsumerError - rebalance chan none - dying chan none + releaseOnce sync.Once + errors chan *ConsumerError } func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) { - pom := &partitionOffsetManager{ + offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max) + if err != nil { + return nil, err + } + + return &partitionOffsetManager{ parent: om, topic: topic, partition: partition, errors: make(chan *ConsumerError, om.conf.ChannelBufferSize), - rebalance: make(chan none, 1), - dying: make(chan none), - } - pom.clean.L = &pom.lock - - if err := pom.selectBroker(); err != nil { - return nil, err - } - - if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil { - return nil, err - } - - pom.broker.updateSubscriptions <- pom - - go withRecover(pom.mainLoop) - - return pom, nil -} - -func (pom *partitionOffsetManager) mainLoop() { - for { - select { - case <-pom.rebalance: - if err := pom.selectBroker(); err != nil { - pom.handleError(err) - pom.rebalance <- none{} - } else { - pom.broker.updateSubscriptions <- pom - } - case <-pom.dying: - if pom.broker != nil { - select { - case <-pom.rebalance: - case pom.broker.updateSubscriptions <- pom: - } - pom.parent.unrefBrokerOffsetManager(pom.broker) - } - pom.parent.abandonPartitionOffsetManager(pom) - close(pom.errors) - return - } - } -} - -func (pom *partitionOffsetManager) selectBroker() error { - if pom.broker != nil { - pom.parent.unrefBrokerOffsetManager(pom.broker) - pom.broker = nil - } - - var broker *Broker - var err error - - if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil { - return err - } - - if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil { - return err - } - - pom.broker = pom.parent.refBrokerOffsetManager(broker) - return nil -} - -func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error { - request := new(OffsetFetchRequest) - request.Version = 1 - request.ConsumerGroup = pom.parent.group - request.AddPartition(pom.topic, pom.partition) - - response, err := pom.broker.broker.FetchOffset(request) - if err != nil { - return err - } - - block := response.GetBlock(pom.topic, pom.partition) - if block == nil { - return ErrIncompleteResponse - } - - switch block.Err { - case ErrNoError: - pom.offset = block.Offset - pom.metadata = block.Metadata - return nil - case ErrNotCoordinatorForConsumer: - if retries <= 0 { - return block.Err - } - if err := pom.selectBroker(); err != nil { - return err - } - return pom.fetchInitialOffset(retries - 1) - case ErrOffsetsLoadInProgress: - if retries <= 0 { - return block.Err - } - time.Sleep(pom.parent.conf.Metadata.Retry.Backoff) - return pom.fetchInitialOffset(retries - 1) - default: - return block.Err - } -} - -func (pom *partitionOffsetManager) handleError(err error) { - cErr := &ConsumerError{ - Topic: pom.topic, - Partition: pom.partition, - Err: err, - } - - if pom.parent.conf.Consumer.Return.Errors { - pom.errors <- cErr - } else { - Logger.Println(cErr) - } + offset: offset, + metadata: metadata, + }, nil } func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError { @@ -353,7 +493,6 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string if pom.offset == offset && pom.metadata == metadata { pom.dirty = false - pom.clean.Signal() } } @@ -369,16 +508,9 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) { } func (pom *partitionOffsetManager) AsyncClose() { - go func() { - pom.lock.Lock() - defer pom.lock.Unlock() - - for pom.dirty { - pom.clean.Wait() - } - - close(pom.dying) - }() + pom.lock.Lock() + pom.done = true + pom.lock.Unlock() } func (pom *partitionOffsetManager) Close() error { @@ -395,166 +527,22 @@ func (pom *partitionOffsetManager) Close() error { return nil } -// Broker Offset Manager - -type brokerOffsetManager struct { - parent *offsetManager - broker *Broker - timer *time.Ticker - updateSubscriptions chan *partitionOffsetManager - subscriptions map[*partitionOffsetManager]none - refs int -} - -func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager { - bom := &brokerOffsetManager{ - parent: om, - broker: broker, - timer: time.NewTicker(om.conf.Consumer.Offsets.CommitInterval), - updateSubscriptions: make(chan *partitionOffsetManager), - subscriptions: make(map[*partitionOffsetManager]none), - } - - go withRecover(bom.mainLoop) - - return bom -} - -func (bom *brokerOffsetManager) mainLoop() { - for { - select { - case <-bom.timer.C: - if len(bom.subscriptions) > 0 { - bom.flushToBroker() - } - case s, ok := <-bom.updateSubscriptions: - if !ok { - bom.timer.Stop() - return - } - if _, ok := bom.subscriptions[s]; ok { - delete(bom.subscriptions, s) - } else { - bom.subscriptions[s] = none{} - } - } - } -} - -func (bom *brokerOffsetManager) flushToBroker() { - request := bom.constructRequest() - if request == nil { - return - } - - response, err := bom.broker.CommitOffset(request) - - if err != nil { - bom.abort(err) - return - } - - for s := range bom.subscriptions { - if request.blocks[s.topic] == nil || request.blocks[s.topic][s.partition] == nil { - continue - } - - var err KError - var ok bool - - if response.Errors[s.topic] == nil { - s.handleError(ErrIncompleteResponse) - delete(bom.subscriptions, s) - s.rebalance <- none{} - continue - } - if err, ok = response.Errors[s.topic][s.partition]; !ok { - s.handleError(ErrIncompleteResponse) - delete(bom.subscriptions, s) - s.rebalance <- none{} - continue - } - - switch err { - case ErrNoError: - block := request.blocks[s.topic][s.partition] - s.updateCommitted(block.offset, block.metadata) - case ErrNotLeaderForPartition, ErrLeaderNotAvailable, - ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer: - // not a critical error, we just need to redispatch - delete(bom.subscriptions, s) - s.rebalance <- none{} - case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize: - // nothing we can do about this, just tell the user and carry on - s.handleError(err) - case ErrOffsetsLoadInProgress: - // nothing wrong but we didn't commit, we'll get it next time round - break - case ErrUnknownTopicOrPartition: - // let the user know *and* try redispatching - if topic-auto-create is - // enabled, redispatching should trigger a metadata request and create the - // topic; if not then re-dispatching won't help, but we've let the user - // know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706) - fallthrough - default: - // dunno, tell the user and try redispatching - s.handleError(err) - delete(bom.subscriptions, s) - s.rebalance <- none{} - } +func (pom *partitionOffsetManager) handleError(err error) { + cErr := &ConsumerError{ + Topic: pom.topic, + Partition: pom.partition, + Err: err, } -} -func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest { - var r *OffsetCommitRequest - var perPartitionTimestamp int64 - if bom.parent.conf.Consumer.Offsets.Retention == 0 { - perPartitionTimestamp = ReceiveTime - r = &OffsetCommitRequest{ - Version: 1, - ConsumerGroup: bom.parent.group, - ConsumerGroupGeneration: GroupGenerationUndefined, - } + if pom.parent.conf.Consumer.Return.Errors { + pom.errors <- cErr } else { - r = &OffsetCommitRequest{ - Version: 2, - RetentionTime: int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond), - ConsumerGroup: bom.parent.group, - ConsumerGroupGeneration: GroupGenerationUndefined, - } - - } - - for s := range bom.subscriptions { - s.lock.Lock() - if s.dirty { - r.AddBlock(s.topic, s.partition, s.offset, perPartitionTimestamp, s.metadata) - } - s.lock.Unlock() - } - - if len(r.blocks) > 0 { - return r + Logger.Println(cErr) } - - return nil } -func (bom *brokerOffsetManager) abort(err error) { - _ = bom.broker.Close() // we don't care about the error this might return, we already have one - bom.parent.abandonBroker(bom) - - for pom := range bom.subscriptions { - pom.handleError(err) - pom.rebalance <- none{} - } - - for s := range bom.updateSubscriptions { - if _, ok := bom.subscriptions[s]; !ok { - s.handleError(err) - s.rebalance <- none{} - } - } - - bom.subscriptions = make(map[*partitionOffsetManager]none) +func (pom *partitionOffsetManager) release() { + pom.releaseOnce.Do(func() { + go close(pom.errors) + }) } diff --git a/vendor/github.com/Shopify/sarama/partitioner.go b/vendor/github.com/Shopify/sarama/partitioner.go index 972932728a54..6a708e729ee5 100644 --- a/vendor/github.com/Shopify/sarama/partitioner.go +++ b/vendor/github.com/Shopify/sarama/partitioner.go @@ -22,11 +22,51 @@ type Partitioner interface { RequiresConsistency() bool } +// DynamicConsistencyPartitioner can optionally be implemented by Partitioners +// in order to allow more flexibility than is originally allowed by the +// RequiresConsistency method in the Partitioner interface. This allows +// partitioners to require consistency sometimes, but not all times. It's useful +// for, e.g., the HashPartitioner, which does not require consistency if the +// message key is nil. +type DynamicConsistencyPartitioner interface { + Partitioner + + // MessageRequiresConsistency is similar to Partitioner.RequiresConsistency, + // but takes in the message being partitioned so that the partitioner can + // make a per-message determination. + MessageRequiresConsistency(message *ProducerMessage) bool +} + // PartitionerConstructor is the type for a function capable of constructing new Partitioners. type PartitionerConstructor func(topic string) Partitioner type manualPartitioner struct{} +// HashPartitionOption lets you modify default values of the partitioner +type HashPartitionerOption func(*hashPartitioner) + +// WithAbsFirst means that the partitioner handles absolute values +// in the same way as the reference Java implementation +func WithAbsFirst() HashPartitionerOption { + return func(hp *hashPartitioner) { + hp.referenceAbs = true + } +} + +// WithCustomHashFunction lets you specify what hash function to use for the partitioning +func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption { + return func(hp *hashPartitioner) { + hp.hasher = hasher() + } +} + +// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty +func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption { + return func(hp *hashPartitioner) { + hp.random = hp + } +} + // NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided // ProducerMessage's Partition field as the partition to produce to. func NewManualPartitioner(topic string) Partitioner { @@ -83,8 +123,9 @@ func (p *roundRobinPartitioner) RequiresConsistency() bool { } type hashPartitioner struct { - random Partitioner - hasher hash.Hash32 + random Partitioner + hasher hash.Hash32 + referenceAbs bool } // NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. @@ -95,6 +136,21 @@ func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor p := new(hashPartitioner) p.random = NewRandomPartitioner(topic) p.hasher = hasher() + p.referenceAbs = false + return p + } +} + +// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options +func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor { + return func(topic string) Partitioner { + p := new(hashPartitioner) + p.random = NewRandomPartitioner(topic) + p.hasher = fnv.New32a() + p.referenceAbs = false + for _, option := range options { + option(p) + } return p } } @@ -107,6 +163,19 @@ func NewHashPartitioner(topic string) Partitioner { p := new(hashPartitioner) p.random = NewRandomPartitioner(topic) p.hasher = fnv.New32a() + p.referenceAbs = false + return p +} + +// NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values +// in the same way as the reference Java implementation. NewHashPartitioner was supposed to do +// that but it had a mistake and now there are people depending on both behaviours. This will +// all go away on the next major version bump. +func NewReferenceHashPartitioner(topic string) Partitioner { + p := new(hashPartitioner) + p.random = NewRandomPartitioner(topic) + p.hasher = fnv.New32a() + p.referenceAbs = true return p } @@ -123,9 +192,18 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3 if err != nil { return -1, err } - partition := int32(p.hasher.Sum32()) % numPartitions - if partition < 0 { - partition = -partition + var partition int32 + // Turns out we were doing our absolute value in a subtly different way from the upstream + // implementation, but now we need to maintain backwards compat for people who started using + // the old version; if referenceAbs is set we are compatible with the reference java client + // but not past Sarama versions + if p.referenceAbs { + partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions + } else { + partition = int32(p.hasher.Sum32()) % numPartitions + if partition < 0 { + partition = -partition + } } return partition, nil } @@ -133,3 +211,7 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3 func (p *hashPartitioner) RequiresConsistency() bool { return true } + +func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool { + return message.Key != nil +} diff --git a/vendor/github.com/Shopify/sarama/records.go b/vendor/github.com/Shopify/sarama/records.go index 301055bb070c..192f5927b21c 100644 --- a/vendor/github.com/Shopify/sarama/records.go +++ b/vendor/github.com/Shopify/sarama/records.go @@ -163,6 +163,27 @@ func (r *Records) isControl() (bool, error) { return false, fmt.Errorf("unknown records type: %v", r.recordsType) } +func (r *Records) isOverflow() (bool, error) { + if r.recordsType == unknownRecords { + if empty, err := r.setTypeFromFields(); err != nil || empty { + return false, err + } + } + + switch r.recordsType { + case unknownRecords: + return false, nil + case legacyRecords: + if r.MsgSet == nil { + return false, nil + } + return r.MsgSet.OverflowMessage, nil + case defaultRecords: + return false, nil + } + return false, fmt.Errorf("unknown records type: %v", r.recordsType) +} + func magicValue(pd packetDecoder) (int8, error) { dec, err := pd.peek(magicOffset, magicLength) if err != nil { diff --git a/vendor/github.com/Shopify/sarama/sync_producer.go b/vendor/github.com/Shopify/sarama/sync_producer.go index dd096b6db671..021c5a010323 100644 --- a/vendor/github.com/Shopify/sarama/sync_producer.go +++ b/vendor/github.com/Shopify/sarama/sync_producer.go @@ -90,13 +90,8 @@ func verifyProducerConfig(config *Config) error { } func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) { - oldMetadata := msg.Metadata - defer func() { - msg.Metadata = oldMetadata - }() - expectation := make(chan *ProducerError, 1) - msg.Metadata = expectation + msg.expectation = expectation sp.producer.Input() <- msg if err := <-expectation; err != nil { @@ -107,21 +102,11 @@ func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offs } func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error { - savedMetadata := make([]interface{}, len(msgs)) - for i := range msgs { - savedMetadata[i] = msgs[i].Metadata - } - defer func() { - for i := range msgs { - msgs[i].Metadata = savedMetadata[i] - } - }() - expectations := make(chan chan *ProducerError, len(msgs)) go func() { for _, msg := range msgs { expectation := make(chan *ProducerError, 1) - msg.Metadata = expectation + msg.expectation = expectation sp.producer.Input() <- msg expectations <- expectation } @@ -144,7 +129,7 @@ func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error { func (sp *syncProducer) handleSuccesses() { defer sp.wg.Done() for msg := range sp.producer.Successes() { - expectation := msg.Metadata.(chan *ProducerError) + expectation := msg.expectation expectation <- nil } } @@ -152,7 +137,7 @@ func (sp *syncProducer) handleSuccesses() { func (sp *syncProducer) handleErrors() { defer sp.wg.Done() for err := range sp.producer.Errors() { - expectation := err.Msg.Metadata.(chan *ProducerError) + expectation := err.Msg.expectation expectation <- err } } diff --git a/vendor/github.com/Shopify/sarama/utils.go b/vendor/github.com/Shopify/sarama/utils.go index 702e22627015..1bb00d761a47 100644 --- a/vendor/github.com/Shopify/sarama/utils.go +++ b/vendor/github.com/Shopify/sarama/utils.go @@ -155,6 +155,7 @@ var ( V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) + V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, @@ -173,9 +174,10 @@ var ( V0_11_0_2, V1_0_0_0, V1_1_0_0, + V2_0_0_0, } MinVersion = V0_8_2_0 - MaxVersion = V1_1_0_0 + MaxVersion = V2_0_0_0 ) func ParseKafkaVersion(s string) (KafkaVersion, error) { diff --git a/vendor/vendor.json b/vendor/vendor.json index 819cb955707e..7a8f79d3e401 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -15,14 +15,14 @@ "revisionTime": "2018-07-23T16:30:02Z" }, { - "checksumSHA1": "xSwVjXDGIMoADDte4hBjra6ldGk=", + "checksumSHA1": "8uhvsMSF1+QTzPYxASJGv548nBU=", "origin": "github.com/urso/sarama", "path": "github.com/Shopify/sarama", - "revision": "d1575e4abe04acbbe8ac766320585cdf271dd189", + "revision": "0143592836b090a1b481def4d902cfb3c5c05ae5", "revisionTime": "2016-11-23T00:27:23Z", "tree": true, - "version": "v1.17.0/enh/offset-replica-id", - "versionExact": "v1.17.0/enh/offset-replica-id" + "version": "=v1.18.0/enh/offset-replica-id", + "versionExact": "v1.18.0/enh/offset-replica-id" }, { "checksumSHA1": "DYv6Q1+VfnUVxMwvk5IshAClLvw=",