Skip to content

Commit

Permalink
Merge pull request #92 from kafka4beam/api-timeout-and-deadline
Browse files Browse the repository at this point in the history
Api timeout and deadline
  • Loading branch information
zmstone authored Jul 11, 2021
2 parents 94643e1 + a7b5f5e commit 60e14c5
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 28 deletions.
10 changes: 4 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
name: kafka_protocol
on:
push:
branches:
- '*'
pull_request:
branches:
- master

jobs:
build:
strategy:
fail-fast: false
matrix:
include:
- platform: ubuntu-20.04
Expand All @@ -22,7 +20,7 @@ jobs:
kafka-version: 1.1
- platform: ubuntu-20.04
lsb_release: focal
otp-version: 21.3.8.17-1
otp-version: 24.0.2-1
kafka-version: 0.11
runs-on: ${{ matrix.platform }}
steps:
Expand Down Expand Up @@ -61,5 +59,5 @@ jobs:
- name: Run tests
run: |
export KAFKA_VERSION=${{ matrix.kafka-version }}
sudo make testbed
make eunit
make test-env
make eunit || (cd scripts && docker-compose logs)
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ erl_crash.dump
rebar.lock
.gradle/
*.log
*.iml
.idea/*
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ which is generated from `priv/kafka.bnf`.
The root level `schema` is always a `struct`.
A `struct` consists of fields having lower level (maybe nested) `schema`

Struct fileds are documented in `priv/kafka.bnf` as comments,
Struct fields are documented in `priv/kafka.bnf` as comments,
but the comments are not generated as Erlang comments in `kpro_schema.erl`

Take `produce` API for example
Expand Down
4 changes: 4 additions & 0 deletions scripts/setup-test-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ TD="$(cd "$(dirname "$0")" && pwd)"
docker-compose -f $TD/docker-compose.yml down || true
docker-compose -f $TD/docker-compose.yml up -d

# give kafka some time
sleep 5

n=0
while [ "$(docker exec kafka-1 bash -c '/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do
if [ $n -gt 4 ]; then
Expand Down Expand Up @@ -67,3 +70,4 @@ docker exec kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server l
if [[ "$KAFKA_VERSION" != 0.9* ]] && [[ "$KAFKA_VERSION" != 0.10* ]]; then
docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice
fi

2 changes: 1 addition & 1 deletion src/kpro_brokers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ discover_partition_leader(Connection, Topic, Partition, Timeout) ->
{ok, {Host, Port}}
end
],
kpro_lib:ok_pipe(FL).
kpro_lib:ok_pipe(FL, Timeout).

%% @doc Discover group or transactional coordinator.
-spec discover_coordinator(connection(), coordinator_type(),
Expand Down
24 changes: 15 additions & 9 deletions src/kpro_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,12 @@ init(Parent, Host, Port, Config) ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(State, Debug).

%% Connect to the given endpoint, then initalize connection.
%% Connect to the given endpoint, then initialize connection.
%% Raise an error exception for any failure.
-spec connect(pid(), hostname(), portnum(), config()) -> state().
connect(Parent, Host, Port, Config) ->
Timeout = get_connect_timeout(Config),
Deadline = deadline(Timeout),
%% initial active opt should be 'false' before upgrading to ssl
SockOpts = [{active, false}, binary] ++ get_extra_sock_opts(Config),
case gen_tcp:connect(Host, Port, SockOpts, Timeout) of
Expand All @@ -202,7 +203,7 @@ connect(Parent, Host, Port, Config) ->
, remote = {Host, Port}
, sock = Sock
},
init_connection(State, Config);
init_connection(State, Config, Deadline);
{error, Reason} ->
erlang:error(Reason)
end.
Expand All @@ -214,8 +215,7 @@ connect(Parent, Host, Port, Config) ->
init_connection(#state{ client_id = ClientId
, sock = Sock
, remote = {Host, _}
} = State, Config) ->
Timeout = get_connect_timeout(Config),
} = State, Config, Deadline) ->
%% adjusting buffer size as per recommendation at
%% http://erlang.org/doc/man/inet.html#setopts-2
%% idea is from github.com/epgsql/epgsql
Expand All @@ -224,26 +224,26 @@ init_connection(#state{ client_id = ClientId
ok = inet:setopts(Sock, [{buffer, max(RecBufSize, SndBufSize)}]),
SslOpts = maps:get(ssl, Config, false),
Mod = get_tcp_mod(SslOpts),
NewSock = maybe_upgrade_to_ssl(Sock, Mod, SslOpts, Host, Timeout),
NewSock = maybe_upgrade_to_ssl(Sock, Mod, SslOpts, Host, timeout(Deadline)),
%% from now on, it's all packet-4 messages
ok = setopts(NewSock, Mod, [{packet, 4}]),
Versions =
case Config of
#{query_api_versions := false} -> ?undef;
_ -> query_api_versions(NewSock, Mod, ClientId, Timeout)
_ -> query_api_versions(NewSock, Mod, ClientId, Deadline)
end,
HandshakeVsn = case Versions of
#{sasl_handshake := {_, V}} -> V;
_ -> 0
end,
SaslOpts = get_sasl_opt(Config),
ok = kpro_sasl:auth(Host, NewSock, Mod, ClientId,
Timeout, SaslOpts, HandshakeVsn),
timeout(Deadline), SaslOpts, HandshakeVsn),
State#state{mod = Mod, sock = NewSock, api_vsns = Versions}.

query_api_versions(Sock, Mod, ClientId, Timeout) ->
query_api_versions(Sock, Mod, ClientId, Deadline) ->
Req = kpro_req_lib:make(api_versions, 0, []),
Rsp = kpro_lib:send_and_recv(Req, Sock, Mod, ClientId, Timeout),
Rsp = kpro_lib:send_and_recv(Req, Sock, Mod, ClientId, timeout(Deadline)),
ErrorCode = find(error_code, Rsp),
case ErrorCode =:= ?no_error of
true ->
Expand Down Expand Up @@ -608,6 +608,12 @@ get_client_id(Config) ->

find(FieldName, Struct) -> kpro_lib:find(FieldName, Struct).

deadline(Timeout) ->
erlang:monotonic_time(millisecond) + Timeout.

timeout(Deadline) ->
erlang:max(0, Deadline - erlang:monotonic_time(millisecond)).

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down
6 changes: 3 additions & 3 deletions src/kpro_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ send_and_recv(#kpro_req{api = API, vsn = Vsn} = Req,
end.

%% @doc Function pipeline.
%% The first function takes no args, all succeeding ones shoud be arity-0 or 1
%% The first function takes no args, all succeeding ones should be arity-0 or 1
%% functions. All functions should retrun
%% `ok' | `{ok, Result}' | `{error, Reason}'.
%% where `Result' is the input arg of the next function,
%% or the result of pipeline if it's the last pipe node.
%%
%% NOTE: If a funcition returns `ok' the next should be an arity-0 function.
%% NOTE: If a function returns `ok' the next should be an arity-0 function.
%% Any `{error, Reason}' return value would cause the pipeline to abort.
%%
%% NOTE: The pipe funcions are delegated to an agent process to evaluate,
%% NOTE: The pipe functions are delegated to an agent process to evaluate,
%% only exceptions and process links are propagated back to caller
%% other side-effects like monitor references are not handled.
ok_pipe(FunList, Timeout) ->
Expand Down
11 changes: 6 additions & 5 deletions test/kpro_group_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@
%% 4. heartbeat-cycle, to tell broker that it is still alive
%% 5. leave_group
full_flow_test_() ->
[{atom_to_list(KafkaVsn),
fun() -> test_full_flow(KafkaVsn) end }
|| KafkaVsn <- kafka_vsns()].
[{timeout, 60,
{atom_to_list(KafkaVsn),
fun() -> test_full_flow(KafkaVsn) end }}
|| KafkaVsn <- kafka_vsns()].

test_full_flow(KafkaVsn) ->
GroupId = make_group_id(full_flow_test),
Expand Down Expand Up @@ -157,7 +158,7 @@ sync_group(Connection, GroupId, MemberId, Generation,
ok.

describe_groups(Connection, GroupId, KafkaVsn) ->
Groups = [GroupId, <<"unknown-group">>],
Groups = [GroupId],
Body = #{groups => Groups, include_authorized_operations => true},
Rsp = request_sync(Connection, describe_groups, Body, KafkaVsn),
#{groups := RspGroups} = Rsp,
Expand Down Expand Up @@ -206,7 +207,7 @@ heartbeat_loop(SendFun) ->
stop ->
exit(normal)
after
100 ->
1000 ->
heartbeat_loop(SendFun)
end.

Expand Down
6 changes: 3 additions & 3 deletions test/kpro_txn_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
%% basic test of begin -> write -> commit
txn_produce_test_() ->
{Vsns, FetchVsn} = produce_fetch_versions(),
[{"vsn=" ++ integer_to_list(V),
fun() -> test_txn_produce(V, FetchVsn) end
} || V <- Vsns].
{inorder, [{"vsn=" ++ integer_to_list(V),
fun() -> test_txn_produce(V, FetchVsn) end
} || V <- Vsns]}.

test_txn_produce(ProduceVsn, FetchVsn) ->
Topic = topic(),
Expand Down

0 comments on commit 60e14c5

Please sign in to comment.