Skip to content

Commit

Permalink
Avoid change feed rewinds after shard moves
Browse files Browse the repository at this point in the history
When shards are moved to new nodes, and the user supplies a change sequence
from the old shard map configuration, attempt to match missing nodes and ranges
by inspecting current shard uuids in order to avoid rewinds.

Previously, if a node and range was missing, we randomly picked a node in the
appropriate range, so 1/3 of the time we might have hit the exact node, but 2/3
of the time we would end up with a complete changes feed rewind to 0.

Unfortunately, this involves a fabric worker scatter gather operation to all
shard copies. This should only happen when we get an old sequence. We rely on
that happening rarely, mostly right after the shards moved, then users would
get new sequence from the recent shard map.
  • Loading branch information
nickva committed Aug 26, 2021
1 parent c632dd3 commit e83935c
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 9 deletions.
7 changes: 6 additions & 1 deletion src/fabric/src/fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
% miscellany
-export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0,
cleanup_index_files/1, cleanup_index_files_all_nodes/1, dbname/1,
inactive_index_files/1]).
inactive_index_files/1, db_uuids/1]).

-include_lib("fabric/include/fabric.hrl").

Expand Down Expand Up @@ -551,6 +551,11 @@ dbname(Db) ->
erlang:error({illegal_database_name, Db})
end.

%% @doc get db shard uuids
-spec db_uuids(dbname()) -> map().
db_uuids(DbName) ->
fabric_db_uuids:go(dbname(DbName)).

name(Thing) ->
couch_util:to_binary(Thing).

Expand Down
67 changes: 67 additions & 0 deletions src/fabric/src/fabric_db_uuids.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(fabric_db_uuids).


-export([go/1]).


-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").


go(DbName) when is_binary(DbName) ->
Shards = mem3:live_shards(DbName, [node() | nodes()]),
Workers = fabric_util:submit_jobs(Shards, get_uuid, []),
RexiMon = fabric_util:create_monitors(Shards),
Acc0 = {fabric_dict:init(Workers, nil), []},
try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
{timeout, {WorkersDict, _}} ->
DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
fabric_util:log_timeout(DefunctWorkers, "db_uuids"),
{error, timeout};
Else ->
Else
after
rexi_monitor:stop(RexiMon)
end.


handle_message({rexi_DOWN, _, {_, NodeRef},_}, _Shard, {Cntrs, Res}) ->
case fabric_ring:node_down(NodeRef, Cntrs, Res, [all]) of
{ok, Cntrs1} -> {ok, {Cntrs1, Res}};
error -> {error, {nodedown, <<"progress not possible">>}}
end;

handle_message({rexi_EXIT, Reason}, Shard, {Cntrs, Res}) ->
case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
{ok, Cntrs1} -> {ok, {Cntrs1, Res}};
error -> {error, Reason}
end;

handle_message(Uuid, Shard, {Cntrs, Res}) when is_binary(Uuid) ->
case fabric_ring:handle_response(Shard, Uuid, Cntrs, Res, [all]) of
{ok, {Cntrs1, Res1}} ->
{ok, {Cntrs1, Res1}};
{stop, Res1} ->
Uuids = fabric_dict:fold(fun(#shard{} = S, Id, #{} = Acc) ->
Acc#{Id => S#shard{ref = undefined}}
end, #{}, Res1),
{stop, Uuids}
end;

handle_message(Reason, Shard, {Cntrs, Res}) ->
case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
{ok, Cntrs1} -> {ok, {Cntrs1, Res}};
error -> {error, Reason}
end.
54 changes: 51 additions & 3 deletions src/fabric/src/fabric_ring.erl
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ handle_response(Shard, Response, Workers, Responses, RingOpts) ->
% a partitioned database. As soon as a result from any of the shards
% arrives, result collection stops.
%
% * When RingOpts is [all], responses are accepted until all the shards return
% results
%
handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
Workers1 = fabric_dict:erase(Shard, Workers),
case RingOpts of
Expand All @@ -130,7 +133,10 @@ handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
Responses1 = [{{B, E}, Shard, Response} | Responses],
handle_response_ring(Workers1, Responses1, CleanupCb);
[{any, Any}] ->
handle_response_any(Shard, Response, Workers1, Any, CleanupCb)
handle_response_any(Shard, Response, Workers1, Any, CleanupCb);
[all] ->
Responses1 = [{Shard, Response} | Responses],
handle_response_all(Workers1, Responses1)
end.


Expand Down Expand Up @@ -164,6 +170,15 @@ handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
end.


handle_response_all(Workers, Responses) ->
case fabric_dict:size(Workers) =:= 0 of
true ->
{stop, fabric_dict:from_list(Responses)};
false ->
{ok, {Workers, Responses}}
end.


% Check if workers still waiting and the already received responses could
% still form a continous range. The range won't always be the full ring, and
% the bounds are computed based on the minimum and maximum interval beginning
Expand All @@ -186,6 +201,9 @@ is_progress_possible(Counters, Responses, MinB, MaxE, []) ->
Ranges = fabric_util:worker_ranges(Counters) ++ ResponseRanges,
mem3_util:get_ring(Ranges, MinB, MaxE) =/= [];

is_progress_possible(Counters, _Responses, _, _, [all]) ->
fabric_dict:size(Counters) > 0;

is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) ->
InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end,
case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of
Expand Down Expand Up @@ -305,7 +323,7 @@ is_progress_possible_with_responses_test() ->
?assertEqual(true, is_progress_possible([], RS1, 7, 8, [])).


is_progress_possible_with_ring_opts_test() ->
is_progress_possible_with_ring_opts_any_test() ->
Opts = [{any, [mk_shard("n1", [0, 5]), mk_shard("n2", [3, 10])]}],
C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
RS1 = mk_resps([{"n1", 3, 10, 42}]),
Expand All @@ -323,6 +341,12 @@ is_progress_possible_with_ring_opts_test() ->
?assertEqual(true, is_progress_possible(C2, [], 0, ?RING_END, Opts)).


is_progress_possible_with_ring_opts_all_test() ->
C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [all])),
?assertEqual(false, is_progress_possible([], [], 0, ?RING_END, [all])).


get_shard_replacements_test() ->
Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [
{"n1", 11, 20}, {"n1", 21, ?RING_END},
Expand Down Expand Up @@ -422,7 +446,7 @@ handle_response_backtracking_test() ->
?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4).


handle_response_ring_opts_test() ->
handle_response_ring_opts_any_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n2", [0, 1]),
Shard3 = mk_shard("n3", [0, 1]),
Expand All @@ -446,6 +470,30 @@ handle_response_ring_opts_test() ->
?assertEqual({stop, [{Shard3, 44}]}, Result3).


handle_response_ring_opts_all_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n2", [0, 1]),
Shard3 = mk_shard("n3", [0, 1]),

ShardList = [Shard1, Shard2, Shard3],
[W1, W2, W3] = WithRefs = [S#shard{ref = make_ref()} || S <- ShardList],
Workers1 = fabric_dict:init(WithRefs, nil),

Result1 = handle_response(W1, 42, Workers1, [], [all], undefined),
?assertMatch({ok, {_, _}}, Result1),
{ok, {Workers2, _}} = Result1,

% Even though n2 and n3 cover the same range, with 'all' option we wait for
% all workers to return.
Result2 = handle_response(W2, 43, Workers2, [], [all], undefined),
?assertMatch({ok, {_, _}}, Result2),
{ok, {Workers3, _}} = Result2,

% Stop only after all the shards respond
Result3 = handle_response(W3, 44, Workers3, [], [all], undefined),
?assertMatch({stop, [_ | _]}, Result3).


handle_error_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n1", [10, ?RING_END]),
Expand Down
10 changes: 6 additions & 4 deletions src/fabric/src/fabric_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

-export([get_db_info/2, get_doc_count/2, get_design_doc_count/2,
get_update_seq/2, changes/4, map_view/5, reduce_view/5,
group_info/3, update_mrview/4]).
group_info/3, update_mrview/4, get_uuid/1]).

-include_lib("fabric/include/fabric.hrl").
-include_lib("couch/include/couch_db.hrl").
Expand Down Expand Up @@ -318,6 +318,9 @@ compact(ShardName, DesignName) ->
Ref = erlang:make_ref(),
Pid ! {'$gen_call', {self(), Ref}, compact}.

get_uuid(DbName) ->
with_db(DbName, [], {couch_db, get_uuid, []}).

%%
%% internal
%%
Expand Down Expand Up @@ -637,10 +640,9 @@ calculate_start_seq(Db, Node, Seq) ->

uuid(Db) ->
Uuid = couch_db:get_uuid(Db),
binary:part(Uuid, {0, uuid_prefix_len()}).
Prefix = fabric_util:get_uuid_prefix_len(),
binary:part(Uuid, {0, Prefix}).

uuid_prefix_len() ->
list_to_integer(config:get("fabric", "uuid_prefix_len", "7")).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
Expand Down
55 changes: 55 additions & 0 deletions src/fabric/src/fabric_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
-export([validate_all_docs_args/2, validate_args/3]).
-export([upgrade_mrargs/1]).
-export([worker_ranges/1]).
-export([get_uuid_prefix_len/0]).
-export([isolate/1, isolate/2]).


-compile({inline, [{doc_id_and_rev,1}]}).

Expand Down Expand Up @@ -345,3 +348,55 @@ worker_ranges(Workers) ->
[{X, Y} | Acc]
end, [], Workers),
lists:usort(Ranges).


get_uuid_prefix_len() ->
config:get_integer("fabric", "uuid_prefix_len", 7).


% If we issue multiple fabric calls from the same process we have to isolate
% them so in case of error they don't pollute the processes dictionary or the
% mailbox

isolate(Fun) ->
isolate(Fun, infinity).


isolate(Fun, Timeout) ->
{Pid, Ref} = erlang:spawn_monitor(fun() -> exit(do_isolate(Fun)) end),
receive
{'DOWN', Ref, _, _, {'$isolres', Res}} ->
Res;
{'DOWN', Ref, _, _, {'$isolerr', Tag, Reason, Stack}} ->
erlang:raise(Tag, Reason, Stack)
after Timeout ->
erlang:demonitor(Ref, [flush]),
exit(Pid, kill),
erlang:error(timeout)
end.


% OTP_RELEASE is defined in OTP 21+ only
-ifdef(OTP_RELEASE).


do_isolate(Fun) ->
try
{'$isolres', Fun()}
catch Tag:Reason:Stack ->
{'$isolerr', Tag, Reason, Stack}
end.


-else.


do_isolate(Fun) ->
try
{'$isolres', Fun()}
catch ?STACKTRACE(Tag, Reason, Stack)
{'$isolerr', Tag, Reason, Stack}
end.


-endif.
57 changes: 56 additions & 1 deletion src/fabric/src/fabric_view_changes.erl
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,14 @@ do_unpack_seqs(Opaque, DbName) ->
true ->
Unpacked;
false ->
Uuids = get_db_uuid_shards(DbName),
PotentialWorkers = lists:map(fun({Node, [A, B], Seq}) ->
case mem3:get_shard(DbName, Node, [A, B]) of
{ok, Shard} ->
{Shard, Seq};
{error, not_found} ->
{#shard{node = Node, range = [A, B]}, Seq}
Shard = replace_moved_shard(Node, [A, B], Seq, Uuids),
{Shard, Seq}
end
end, Deduped),
Shards = mem3:shards(DbName),
Expand Down Expand Up @@ -495,6 +497,59 @@ get_old_seq(#shard{range=R}=Shard, SinceSeqs) ->
end.


get_db_uuid_shards(DbName) ->
% Need to use an isolated process as we are performing a fabric call from
% another fabric call and there is a good chance we'd polute the mailbox
% with returned messages
Timeout = fabric_util:request_timeout(),
IsolatedFun = fun() -> fabric:db_uuids(DbName) end,
try fabric_util:isolate(IsolatedFun, Timeout) of
{ok, Uuids} ->
% Trim uuids so we match exactly based on the currently configured
% uuid_prefix_len. The assumption is that we are getting an older
% sequence from the same cluster and we didn't tweak that
% relatively obscure config option in the meantime.
PrefixLen = fabric_util:get_uuid_prefix_len(),
maps:fold(fun(Uuid, Shard, Acc) ->
TrimmedUuid = binary:part(Uuid, {0, PrefixLen}),
Acc#{TrimmedUuid => Shard}
end, #{}, Uuids);
{error, Error} ->
% Since we are doing a best-effort approach to match moved shards,
% tolerate and log errors. This should also handle cases when the
% cluster is partially upgraded, as some nodes will not have the
% newer get_uuid fabric_rpc handler.
ErrMsg = "~p : could not get db_uuids for Db:~p Error:~p",
couch_log:error(ErrMsg, [?MODULE, DbName, Error]),
#{}
catch
_Tag:Error ->
ErrMsg = "~p : could not get db_uuids for Db:~p Error:~p",
couch_log:error(ErrMsg, [?MODULE, DbName, Error]),
#{}
end.


%% Determine if the missing shard moved to a new node. Do that by matching the
%% uuids from the current shard map. If we cannot find a moved shard, we return
%% the original node and range as a shard and hope for the best.
replace_moved_shard(Node, Range, Seq, #{} = _UuidShards) when is_number(Seq) ->
% Cannot figure out shard moves without uuid matching
#shard{node = Node, range = Range};
replace_moved_shard(Node, Range, {Seq, Uuid}, #{} = UuidShards) ->
% Compatibility case for an old seq format which didn't have epoch nodes
replace_moved_shard(Node, Range, {Seq, Uuid, Node}, UuidShards);
replace_moved_shard(Node, Range, {_Seq, Uuid, _EpochNode}, #{} = UuidShards) ->
case UuidShards of
#{Uuid := #shard{range = Range} = Shard} ->
% Found a moved shard by matching both the uuid and the range
Shard;
#{} ->
% Did not find a moved shard, use the original node
#shard{node = Node, range = Range}
end.


changes_row(Props0) ->
Props1 = case couch_util:get_value(deleted, Props0) of
true ->
Expand Down
Loading

0 comments on commit e83935c

Please sign in to comment.