diff --git a/changelog.md b/changelog.md index e0259de5..d5e14e40 100644 --- a/changelog.md +++ b/changelog.md @@ -1,7 +1,19 @@ * 3.16.2 + * Update kafka_protocol from 4.0.1 to 4.0.3. + Prior to this change the actual time spent in establishing a + Kafka connection might be longer than desired due to the timeout + being used in SSL upgrade (if enabled), then API version query. + This has been fixed by turning the given timeout config + into a deadline, and the sub-steps will try to meet the deadline. + see more details here: https://github.com/kafka4beam/kafka_protocol/pull/9 + * Catch `timeout` and other `DOWN` reasons when making `gen_server` call to + `brod_client`, `brod_consumer` and producer/consumer supervisor, + and return as `Reason` in `{error, Reason}`. + Previously only `noproc` reaon is caught. (#492) * Propagate `connect_timeout` config to `kpro` API functions as `timeout` arg affected APIs: connect_group_coordinator, create_topics, delete_topics, - resolve_offset, fetch, fold, fetch_committed_offsets + resolve_offset, fetch, fold, fetch_committed_offsets (#458) + * Fix bad field name in group describe request (#486) * 3.16.1 * Fix `brod` script in `brod-cli` in release. * Support `rebalance_timeout` consumer group option diff --git a/rebar.config b/rebar.config index 47fe6434..42755e24 100644 --- a/rebar.config +++ b/rebar.config @@ -1,5 +1,5 @@ {deps, [ {supervisor3, "1.1.11"} - , {kafka_protocol, "4.0.1"} + , {kafka_protocol, "4.0.3"} , {snappyer, "1.2.8"} ]}. {edoc_opts, [{preprocess, true}, {macros, [{build_brod_cli, true}]}]}. diff --git a/src/brod.erl b/src/brod.erl index d8182f17..52ebc7da 100644 --- a/src/brod.erl +++ b/src/brod.erl @@ -455,7 +455,8 @@ get_partitions_count(Client, Topic) -> -spec get_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, Reason} when Reason :: client_down - | {consumer_down, noproc} + | {client_down, any()} + | {consumer_down, any()} | {consumer_not_found, topic()} | {consumer_not_found, topic(), partition()}. get_consumer(Client, Topic, Partition) -> @@ -465,7 +466,8 @@ get_consumer(Client, Topic, Partition) -> -spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, Reason} when Reason :: client_down - | {producer_down, noproc} + | {client_down, any()} + | {producer_down, any()} | {producer_not_found, topic()} | {producer_not_found, topic(), partition()}. get_producer(Client, Topic, Partition) -> diff --git a/src/brod_client.erl b/src/brod_client.erl index 04245ff7..7b2e0ad6 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -83,14 +83,14 @@ | ?CONSUMER_KEY(topic(), partition()). -type get_producer_error() :: client_down - | {producer_down, noproc} + | {client_down, any()} + | {producer_down, any()} | {producer_not_found, topic()} - | { producer_not_found - , topic() - , partition()}. + | {producer_not_found, topic(), partition()}. -type get_consumer_error() :: client_down - | {consumer_down, noproc} + | {client_down, any()} + | {consumer_down, any()} | {consumer_not_found, topic()} | {consumer_not_found, topic(), partition()}. @@ -829,16 +829,20 @@ ensure_partition_workers(TopicName, State, F) -> end end). -%% Catch noproc exit exception when making gen_server:call. +%% Catches exit exceptions when making gen_server:call. -spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return when Call :: term(), Timeout :: infinity | integer(), - Return :: ok | {ok, term()} | {error, client_down | term()}. + Return :: ok | {ok, term()} | {error, Reason}, + Reason :: client_down | {client_down, any()} | any(). safe_gen_call(Server, Call, Timeout) -> try gen_server:call(Server, Call, Timeout) - catch exit : {noproc, _} -> - {error, client_down} + catch + exit : {noproc, _} -> + {error, client_down}; + exit : {Reason, _} -> + {error, {client_down, Reason}} end. -spec kf(kpro:field_name(), kpro:struct()) -> kpro:field_value(). diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl index 3c154920..2a045f73 100644 --- a/src/brod_consumer.erl +++ b/src/brod_consumer.erl @@ -794,16 +794,17 @@ reset_buffer(#state{ pending_acks = #pending_acks{queue = Queue} , last_req_ref = ?undef }. -%% Catch noproc exit exception when making gen_server:call. +%% Catch exit exceptions when making gen_server:call. -spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return when Call :: term(), Timeout :: infinity | integer(), - Return :: ok | {ok, term()} | {error, consumer_down | term()}. + Return :: ok | {ok, term()} | {error, any()}. safe_gen_call(Server, Call, Timeout) -> try gen_server:call(Server, Call, Timeout) - catch exit : {noproc, _} -> - {error, consumer_down} + catch + exit : {Reason, _} -> + {error, Reason} end. %% Init payload connection regardless of subscriber state. diff --git a/src/brod_consumers_sup.erl b/src/brod_consumers_sup.erl index ba876ae4..f70ed51c 100644 --- a/src/brod_consumers_sup.erl +++ b/src/brod_consumers_sup.erl @@ -67,7 +67,7 @@ stop_consumer(SupPid, TopicName) -> {ok, pid()} | {error, Reason} when Reason :: {consumer_not_found, brod:topic()} | {consumer_not_found, brod:topic(), brod:partition()} - | {consumer_down, noproc}. + | {consumer_down, any()}. find_consumer(SupPid, Topic, Partition) -> case supervisor3:find_child(SupPid, Topic) of [] -> @@ -83,8 +83,8 @@ find_consumer(SupPid, Topic, Partition) -> [Pid] -> {ok, Pid} end - catch exit : {noproc, _} -> - {error, {consumer_down, noproc}} + catch exit : {Reason, _} -> + {error, {consumer_down, Reason}} end end. diff --git a/src/brod_producers_sup.erl b/src/brod_producers_sup.erl index 284282bf..02ee8531 100644 --- a/src/brod_producers_sup.erl +++ b/src/brod_producers_sup.erl @@ -71,7 +71,7 @@ stop_producer(SupPid, TopicName) -> {ok, pid()} | {error, Reason} when Reason :: {producer_not_found, brod:topic()} | {producer_not_found, brod:topic(), brod:partition()} - | {producer_down, noproc}. + | {producer_down, any()}. find_producer(SupPid, Topic, Partition) -> case supervisor3:find_child(SupPid, Topic) of [] -> @@ -87,8 +87,8 @@ find_producer(SupPid, Topic, Partition) -> [Pid] -> {ok, Pid} end - catch exit : {noproc, _} -> - {error, {producer_down, noproc}} + catch exit : {Reason, _} -> + {error, {producer_down, Reason}} end end.