Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable client id #388

Merged
merged 2 commits into from
Nov 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ config :kafka_ex,
# consumer groups, set this to :no_consumer_group (this is the
# only exception to the requirement that this value be a binary)
consumer_group: "kafka_ex",
# The client_id is the logical grouping of a set of kafka clients.
client_id: "kafka_ex",
# Set this value to true if you do not want the default
# `KafkaEx.Server` worker to start during application start-up -
# i.e., if you want to start your own set of named workers
Expand Down
5 changes: 5 additions & 0 deletions lib/kafka_ex/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ defmodule KafkaEx.Config do
Application.get_env(:kafka_ex, :disable_default_worker, false)
end

@doc false
def client_id do
Application.get_env(:kafka_ex, :client_id, "kafka_ex")
end

@doc false
def consumer_group do
Application.get_env(:kafka_ex, :consumer_group, "kafka_ex")
Expand Down
4 changes: 2 additions & 2 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule KafkaEx.New.Client do
the legacy KafkaEx API.
"""

alias KafkaEx.Config
alias KafkaEx.NetworkClient

alias KafkaEx.New.Broker
Expand Down Expand Up @@ -69,7 +70,6 @@ defmodule KafkaEx.New.Client do

# Default from GenServer
@default_call_timeout 5_000
@client_id "kafka_ex"
@retry_count 3
@sync_timeout 1_000

Expand Down Expand Up @@ -364,7 +364,7 @@ defmodule KafkaEx.New.Client do
defp client_request(request, state) do
%{
request
| client_id: @client_id,
| client_id: Config.client_id(),
correlation_id: state.correlation_id
}
end
Expand Down
10 changes: 5 additions & 5 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule KafkaEx.Server do
Defines the KafkaEx.Server behavior that all Kafka API servers must implement, this module also provides some common callback functions that are injected into the servers that `use` it.
"""

alias KafkaEx.Config
alias KafkaEx.NetworkClient
alias KafkaEx.Protocol.ConsumerMetadata
alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest
Expand Down Expand Up @@ -272,7 +273,6 @@ defmodule KafkaEx.Server do
alias KafkaEx.NetworkClient
alias KafkaEx.Protocol.Offset

@client_id "kafka_ex"
@retry_count 3
@wait_time 10
@min_bytes 1
Expand Down Expand Up @@ -392,7 +392,7 @@ defmodule KafkaEx.Server do

produce_request_data =
try do
Produce.create_request(correlation_id, @client_id, produce_request)
Produce.create_request(correlation_id, Config.client_id(), produce_request)
rescue
e in FunctionClauseError -> nil
end
Expand Down Expand Up @@ -507,7 +507,7 @@ defmodule KafkaEx.Server do
offset_request =
Offset.create_request(
state.correlation_id,
@client_id,
Config.client_id(),
topic,
partition,
time
Expand Down Expand Up @@ -690,7 +690,7 @@ defmodule KafkaEx.Server do
metadata_request =
Metadata.create_request(
correlation_id,
@client_id,
Config.client_id(),
topic,
api_version
)
Expand Down Expand Up @@ -815,7 +815,7 @@ defmodule KafkaEx.Server do
defp client_request(request, state) do
%{
request
| client_id: @client_id,
| client_id: Config.client_id(),
correlation_id: state.correlation_id
}
end
Expand Down
9 changes: 5 additions & 4 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule KafkaEx.Server0P10AndLater do
Implements kafkaEx.Server behaviors for kafka 0.10.1 API.
"""
use KafkaEx.Server
alias KafkaEx.Config
alias KafkaEx.Protocol.CreateTopics
alias KafkaEx.Protocol.DeleteTopics
alias KafkaEx.Protocol.ApiVersions
Expand Down Expand Up @@ -185,7 +186,7 @@ defmodule KafkaEx.Server0P10AndLater do
def kafka_server_api_versions(state) do
response =
state.correlation_id
|> ApiVersions.create_request(@client_id)
|> ApiVersions.create_request(Config.client_id())
|> first_broker_response(state)
|> ApiVersions.parse_response()

Expand All @@ -205,7 +206,7 @@ defmodule KafkaEx.Server0P10AndLater do
main_request =
DeleteTopics.create_request(
state.correlation_id,
@client_id,
Config.client_id(),
%DeleteTopics.Request{
topics: topics,
timeout: config_sync_timeout(network_timeout)
Expand Down Expand Up @@ -259,7 +260,7 @@ defmodule KafkaEx.Server0P10AndLater do
main_request =
CreateTopics.create_request(
state.correlation_id,
@client_id,
Config.client_id(),
create_topics_request,
api_version
)
Expand Down Expand Up @@ -318,7 +319,7 @@ defmodule KafkaEx.Server0P10AndLater do
) do
response =
correlation_id
|> ConsumerMetadata.create_request(@client_id, consumer_group)
|> ConsumerMetadata.create_request(Config.client_id(), consumer_group)
|> first_broker_response(state)
|> ConsumerMetadata.parse_response()

Expand Down
7 changes: 4 additions & 3 deletions lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule KafkaEx.Server0P8P2 do
]

use KafkaEx.Server
alias KafkaEx.Config
alias KafkaEx.ConsumerGroupRequiredError
alias KafkaEx.InvalidConsumerGroupError
alias KafkaEx.Protocol.ConsumerMetadata
Expand Down Expand Up @@ -138,7 +139,7 @@ defmodule KafkaEx.Server0P8P2 do
offset_fetch = %{offset_fetch | consumer_group: consumer_group}

offset_fetch_request =
OffsetFetch.create_request(state.correlation_id, @client_id, offset_fetch)
OffsetFetch.create_request(state.correlation_id, Config.client_id(), offset_fetch)

{response, state} =
case broker do
Expand Down Expand Up @@ -235,7 +236,7 @@ defmodule KafkaEx.Server0P8P2 do
) do
response =
correlation_id
|> ConsumerMetadata.create_request(@client_id, consumer_group)
|> ConsumerMetadata.create_request(Config.client_id(), consumer_group)
|> first_broker_response(state)
|> ConsumerMetadata.parse_response()

Expand Down Expand Up @@ -302,7 +303,7 @@ defmodule KafkaEx.Server0P8P2 do
offset_commit_request_payload =
OffsetCommit.create_request(
state.correlation_id,
@client_id,
Config.client_id(),
offset_commit_request
)

Expand Down
5 changes: 3 additions & 2 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule KafkaEx.Server0P9P0 do
]

use KafkaEx.Server
alias KafkaEx.Config
alias KafkaEx.ConsumerGroupRequiredError
alias KafkaEx.InvalidConsumerGroupError
alias KafkaEx.Protocol.ConsumerMetadata
Expand Down Expand Up @@ -212,7 +213,7 @@ defmodule KafkaEx.Server0P9P0 do
wire_request =
protocol_module.create_request(
state.correlation_id,
@client_id,
Config.client_id(),
request
)

Expand Down Expand Up @@ -271,7 +272,7 @@ defmodule KafkaEx.Server0P9P0 do
) do
response =
correlation_id
|> ConsumerMetadata.create_request(@client_id, consumer_group)
|> ConsumerMetadata.create_request(Config.client_id(), consumer_group)
|> first_broker_response(state)
|> ConsumerMetadata.parse_response()

Expand Down