Skip to content

Commit

Permalink
Merge pull request #9418 from rabbitmq/osiris-1.6.6
Browse files Browse the repository at this point in the history
Osiris 1.6.7
  • Loading branch information
michaelklishin authored Sep 15, 2023
2 parents 9ba3e3d + 739214b commit ed88e38
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 64 deletions.
2 changes: 1 addition & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ bazel_dep(

bazel_dep(
name = "rabbitmq_osiris",
version = "1.6.4",
version = "1.6.7",
repo_name = "osiris",
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop
PLT_APPS += mnesia

dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris v1.6.4
dep_osiris = git https://github.com/rabbitmq/osiris v1.6.7
dep_systemd = hex 0.6.1
dep_seshat = git https://github.com/rabbitmq/seshat v0.6.1

Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ writer_pid(StreamId) when is_list(StreamId) ->
MFA = {?MODULE, query_writer_pid, [StreamId]},
query_pid(StreamId, MFA).

-spec local_pid(string()) ->
{ok, pid()} | {error, not_found | term()}.
local_pid(StreamId) when is_list(StreamId) ->
MFA = {?MODULE, query_local_pid, [StreamId, node()]},
query_pid(StreamId, MFA).
Expand Down
114 changes: 52 additions & 62 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,11 @@ consume(Q, #{limiter_active := true}, _State)
when ?amqqueue_is_stream(Q) ->
{error, global_qos_not_supported_for_queue_type};
consume(Q, Spec,
#stream_client{filtering_supported = FilteringSupported} = QState0) when ?amqqueue_is_stream(Q) ->
#stream_client{filtering_supported = FilteringSupported} = QState0)
when ?amqqueue_is_stream(Q) ->
%% Messages should include the offset as a custom header.
case check_queue_exists_in_local_node(Q) of
ok ->
case get_local_pid(QState0) of
{LocalPid, QState} when is_pid(LocalPid) ->
#{no_ack := NoAck,
channel_pid := ChPid,
prefetch_count := ConsumerPrefetchCount,
Expand All @@ -250,30 +251,34 @@ consume(Q, Spec,
args := Args,
ok_msg := OkMsg} = Spec,
QName = amqqueue:get_name(Q),
case parse_offset_arg(rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of
case parse_offset_arg(
rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of
{error, _} = Err ->
Err;
{ok, OffsetSpec} ->
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, QName,
ConsumerPrefetchCount, false,
up, Args),
%% FIXME: reply needs to be sent before the stream begins sending
%% really it should be sent by the stream queue process like classic queues
%% do
maybe_send_reply(ChPid, OkMsg),
FilterSpec = filter_spec(Args),
case {FilterSpec, FilteringSupported} of
{#{filter_spec := _}, false} ->
{protocol_error, precondition_failed, "Filtering is not supported", []};
{protocol_error, precondition_failed,
"Filtering is not supported", []};
_ ->
begin_stream(QState0, ConsumerTag, OffsetSpec,
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag,
ExclusiveConsume,
not NoAck, QName,
ConsumerPrefetchCount,
false, up, Args),
%% reply needs to be sent before the stream
%% begins sending
maybe_send_reply(ChPid, OkMsg),
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
begin_stream(QState, ConsumerTag, OffsetSpec,
ConsumerPrefetchCount, FilterSpec)
end
end;
Err ->
Err
{undefined, _} ->
{protocol_error, precondition_failed,
"queue '~ts' does not have a running replica on the local node",
[rabbit_misc:rs(amqqueue:get_name(Q))]}
end.

-spec parse_offset_arg(undefined |
Expand Down Expand Up @@ -339,8 +344,7 @@ get_local_pid(#stream_client{local_pid = Pid} = State)
get_local_pid(#stream_client{leader = Pid} = State)
when is_pid(Pid) andalso node(Pid) == node() ->
{Pid, State#stream_client{local_pid = Pid}};
get_local_pid(#stream_client{stream_id = StreamId,
local_pid = undefined} = State) ->
get_local_pid(#stream_client{stream_id = StreamId} = State) ->
%% query local coordinator to get pid
case rabbit_stream_coordinator:local_pid(StreamId) of
{ok, Pid} ->
Expand All @@ -349,34 +353,30 @@ get_local_pid(#stream_client{stream_id = StreamId,
{undefined, State}
end.

begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
Tag, Offset, Max, Options) ->
{LocalPid, State} = get_local_pid(State0),
case LocalPid of
undefined ->
{error, no_local_stream_replica_available};
_ ->
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
NextOffset = osiris_log:next_offset(Seg0) - 1,
osiris:register_offset_listener(LocalPid, NextOffset),
%% TODO: avoid double calls to the same process
StartOffset = case Offset of
first -> NextOffset;
last -> NextOffset;
next -> NextOffset;
{timestamp, _} -> NextOffset;
_ -> Offset
end,
Str0 = #stream{credit = Max,
start_offset = StartOffset,
listening_offset = NextOffset,
log = Seg0,
max = Max,
reader_options = Options},
{ok, State#stream_client{local_pid = LocalPid,
readers = Readers0#{Tag => Str0}}}
end.
begin_stream(#stream_client{name = QName,
readers = Readers0,
local_pid = LocalPid} = State,
Tag, Offset, Max, Options)
when is_pid(LocalPid) ->
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
NextOffset = osiris_log:next_offset(Seg0) - 1,
osiris:register_offset_listener(LocalPid, NextOffset),
StartOffset = case Offset of
first -> NextOffset;
last -> NextOffset;
next -> NextOffset;
{timestamp, _} -> NextOffset;
_ -> Offset
end,
Str0 = #stream{credit = Max,
start_offset = StartOffset,
listening_offset = NextOffset,
log = Seg0,
max = Max,
reader_options = Options},
{ok, State#stream_client{local_pid = LocalPid,
readers = Readers0#{Tag => Str0}}}.

cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
name = QName} = State) ->
Expand All @@ -396,8 +396,8 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
end.

credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
name = Name,
local_pid = LocalPid} = State) ->
name = Name,
local_pid = LocalPid} = State) ->
{Readers1, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
Expand All @@ -411,7 +411,8 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
true ->
case Readers1 of
#{CTag := #stream{credit = Credit1} = Str2} ->
{Readers0#{CTag => Str2#stream{credit = 0}}, [{send_drained, {CTag, Credit1}}]};
{Readers0#{CTag => Str2#stream{credit = 0}},
[{send_drained, {CTag, Credit1}}]};
_ ->
{Readers1, []}
end;
Expand Down Expand Up @@ -444,7 +445,7 @@ deliver0(MsgId, Msg,
soft_limit = SftLmt,
slow = Slow0,
filtering_supported = FilteringSupported} = State,
Actions0) ->
Actions0) ->
ok = osiris:write(LeaderPid, WriterId, Seq,
stream_message(Msg, FilteringSupported)),
Correlation = case MsgId of
Expand Down Expand Up @@ -1020,17 +1021,6 @@ stream_name(#resource{virtual_host = VHost, name = Name}) ->
recover(Q) ->
{ok, Q}.

check_queue_exists_in_local_node(Q) ->
#{name := StreamId} = amqqueue:get_type_state(Q),
case rabbit_stream_coordinator:local_pid(StreamId) of
{ok, Pid} when is_pid(Pid) ->
ok;
_ ->
{protocol_error, precondition_failed,
"queue '~ts' does not have a replica on the local node",
[rabbit_misc:rs(amqqueue:get_name(Q))]}
end.

maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).

Expand Down

0 comments on commit ed88e38

Please sign in to comment.