Skip to content

Commit

Permalink
Merge pull request basho#501 from basho/feature/csm/aae-repl-develop
Browse files Browse the repository at this point in the history
Port of 1.4 AAE replication fixes to 2.0.
  • Loading branch information
cmeiklejohn committed Jan 6, 2014
2 parents 9bb9d76 + a18464f commit 03d6508
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 138 deletions.
2 changes: 2 additions & 0 deletions include/riak_repl.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
-define(DEFAULT_SOURCE_PER_NODE, 1).
-define(DEFAULT_SOURCE_PER_CLUSTER, 5).
-define(DEFAULT_MAX_SINKS_NODE, 1).
%% How many times during a fullsync we should try a partition
-define(DEFAULT_SOURCE_RETRIES, infinity).
%% 20 seconds. sources should claim within 5 seconds, but give them a little more time
-define(RESERVATION_TIMEOUT, (20 * 1000)).
-define(DEFAULT_MAX_FS_BUSIES_TOLERATED, 10).
Expand Down
29 changes: 21 additions & 8 deletions src/riak_repl2_fs_node_reserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,21 @@ reserve(Partition) ->
down
end.

%% @doc Release a reservation for the given partition on the correct node.
-spec unreserve(Partition :: any()) -> 'ok'.
%% @doc Release a reservation for the given partition on the correct
%% node. If the node is down or a reservation process is not running,
%% `down' is returned.
-spec unreserve(Partition :: any()) -> 'ok' | 'busy' | 'down'.
unreserve(Partition) ->
Node = get_partition_node(Partition),
gen_server:cast({?SERVER, Node}, {unreserve, Partition}).
try gen_server:call({?SERVER, Node}, {unreserve, Partition}) of
Out ->
Out
catch
exit:{noproc, _} ->
down;
exit:{{nodedown, _}, _} ->
down
end.

%% @doc Indicates a reservation has been converted to a running sink. Usually
%% used by a sink.
Expand Down Expand Up @@ -85,20 +95,23 @@ handle_call({reserve, Partition}, _From, State) ->
Reserved2 = [{Partition, Tref} | State#state.reservations],
{reply, ok, State#state{reservations = Reserved2}};
true ->
lager:debug("Node busy for partition ~p. running=~p reserved=~p max=~p",
lager:info("Node busy for partition ~p. running=~p reserved=~p max=~p",
[Partition, Running, Reserved, Max]),
{reply, busy, State}
end;

%% @hidden
%% This message is a call to prevent unreserve/reserve races, as well as
%% detect failed processes.
handle_call({unreserve, Partition}, _From, State) ->
Reserved2 = cancel_reservation_timeout(Partition, State#state.reservations),
{reply, ok, State#state{reservations = Reserved2}};

handle_call(_Request, _From, State) ->
{reply, ok, State}.


%% @hidden
handle_cast({unreserve, Partition}, State) ->
Reserved2 = cancel_reservation_timeout(Partition, State#state.reservations),
{noreply, State#state{reservations = Reserved2}};

handle_cast({claim_reservation, Partition}, State) ->
Reserved2 = cancel_reservation_timeout(Partition, State#state.reservations),
{noreply, State#state{reservations = Reserved2}};
Expand Down
127 changes: 83 additions & 44 deletions src/riak_repl2_fscoordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
owners = [],
connection_ref,
partition_queue = queue:new(),
retries = dict:new(),
whereis_waiting = [],
busy_nodes = sets:new(),
running_sources = [],
successful_exits = 0,
error_exits = 0,
retry_exits = 0,
pending_fullsync = false,
dirty_nodes = ordsets:new(), % these nodes should run fullsync
dirty_nodes_during_fs = ordsets:new(), % these nodes reported realtime errors
Expand Down Expand Up @@ -213,6 +215,7 @@ handle_call(status, _From, State = #state{socket=Socket}) ->
{starting, length(State#state.whereis_waiting)},
{successful_exits, State#state.successful_exits},
{error_exits, State#state.error_exits},
{retry_exits, State#state.retry_exits},
{busy_nodes, sets:size(State#state.busy_nodes)},
{running_stats, SourceStats},
{socket, SocketStats},
Expand Down Expand Up @@ -299,8 +302,10 @@ handle_cast(start_fullsync, State) ->
largest_n = N,
owners = riak_core_ring:all_owners(Ring),
partition_queue = queue:from_list(Partitions),
retries = dict:new(),
successful_exits = 0,
error_exits = 0,
retry_exits = 0,
fullsync_start_time = riak_core_util:moment()
},
State3 = start_up_reqs(State2),
Expand All @@ -319,6 +324,7 @@ handle_cast(stop_fullsync, State) ->
largest_n = undefined,
owners = [],
partition_queue = queue:new(),
retries = dict:new(),
whereis_waiting = [],
running_sources = []
},
Expand All @@ -330,79 +336,83 @@ handle_cast(_Msg, State) ->


%% @hidden
handle_info({'EXIT', Pid, Cause}, State) when Cause =:= normal; Cause =:= shutdown ->
handle_info({'EXIT', Pid, Cause},
#state{socket=Socket, transport=Transport}=State) when Cause =:= normal; Cause =:= shutdown ->
lager:debug("fssource ~p exited normally", [Pid]),
PartitionEntry = lists:keytake(Pid, 1, State#state.running_sources),
case PartitionEntry of
false ->
% late exit or otherwise non-existant
{noreply, State};
{value, {Pid, Partition}, Running} ->
{value, {Pid, {Index, _, _}=Partition}, Running} ->

% likely a slot on the remote node opened up, so re-enable that
% remote node for whereis requests.
{_, _, Node} = Partition,
NewBusies = sets:del_element(Node, State#state.busy_nodes),

% ensure we unreserve the partition on the remote node
% instead of waiting for a timeout.
Transport:send(Socket, term_to_binary({unreserve, Index})),

% stats
Sucesses = State#state.successful_exits + 1,
State2 = State#state{successful_exits = Sucesses},
State2 = State#state{successful_exits = Sucesses,
busy_nodes = NewBusies},

% are we done?
EmptyRunning = Running == [],
QEmpty = queue:is_empty(State#state.partition_queue),
Waiting = State#state.whereis_waiting,
case {EmptyRunning, QEmpty, Waiting} of
{true, true, []} ->
MyClusterName = riak_core_connection:symbolic_clustername(),
lager:info("Fullsync complete from ~s to ~s",
[MyClusterName, State#state.other_cluster]),
% clear the "rt dirty" stat if it's set,
% otherwise, don't do anything
State3 = notify_rt_dirty_nodes(State2),
%% update legacy stats too! some riak_tests depend on them.
riak_repl_stats:server_fullsyncs(),
TotalFullsyncs = State#state.fullsyncs_completed + 1,
Finish = riak_core_util:moment(),
ElapsedSeconds = Finish - State#state.fullsync_start_time,
riak_repl_util:schedule_cluster_fullsync(State#state.other_cluster),
{noreply, State3#state{running_sources = Running,
busy_nodes = NewBusies,
fullsyncs_completed = TotalFullsyncs,
fullsync_start_time = undefined,
last_fullsync_duration=ElapsedSeconds
}};
_ ->
% there's something waiting for a response.
State3 = start_up_reqs(State2#state{running_sources = Running,
busy_nodes = NewBusies}),
{noreply, State3}
end
maybe_complete_fullsync(Running, State2)
end;

handle_info({'EXIT', Pid, _Cause}, State) ->
lager:warning("fssource ~p exited abnormally", [Pid]),
handle_info({'EXIT', Pid, Cause},
#state{socket=Socket, transport=Transport}=State) ->
lager:info("fssource ~p exited abnormally: ~p", [Pid, Cause]),
PartitionEntry = lists:keytake(Pid, 1, State#state.running_sources),
case PartitionEntry of
false ->
% late exit
{noreply, State};
{value, {Pid, Partition}, Running} ->
{value, {Pid, {Index, _, _}=Partition}, Running} ->

% even a bad exit opens a slot on the remote node
{_, _, Node} = Partition,
NewBusies = sets:del_element(Node, State#state.busy_nodes),

% stats
ErrorExits = State#state.error_exits + 1,
#state{partition_queue = PQueue} = State,
% ensure we unreserve the partition on the remote node
% instead of waiting for a timeout.
Transport:send(Socket, term_to_binary({unreserve, Index})),

% reset for retry later
PQueue2 = queue:in(Partition, PQueue),
State2 = State#state{partition_queue = PQueue2, busy_nodes = NewBusies,
running_sources = Running, error_exits = ErrorExits},
State3 = start_up_reqs(State2),
{noreply, State3}
% stats
#state{partition_queue = PQueue, retries = Retries0} = State,

RetryLimit = app_helper:get_env(riak_repl, max_fssource_retries,
?DEFAULT_SOURCE_RETRIES),
Retries = dict:update_counter(Partition, 1, Retries0),

case dict:fetch(Partition, Retries) of
N when N > RetryLimit, is_integer(RetryLimit) ->
lager:warning("fssource dropping partition: ~p, ~p failed"
"retries", [Partition, RetryLimit]),
ErrorExits = State#state.error_exits + 1,
State2 = State#state{busy_nodes = NewBusies,
retries = Retries,
running_sources = Running,
error_exits = ErrorExits},
maybe_complete_fullsync(Running, State2);
_ -> %% have not run out of retries yet
% reset for retry later
lager:info("fssource rescheduling partition: ~p",
[Partition]),
PQueue2 = queue:in(Partition, PQueue),
RetryExits = State#state.retry_exits + 1,
State2 = State#state{partition_queue = PQueue2,
retries = Retries,
busy_nodes = NewBusies,
running_sources = Running,
retry_exits = RetryExits},
State3 = start_up_reqs(State2),
{noreply, State3}
end
end;

handle_info({Partition, whereis_timeout}, State) ->
Expand Down Expand Up @@ -756,6 +766,35 @@ is_fullsync_in_progress(State) ->
true
end.

maybe_complete_fullsync(Running, State) ->
EmptyRunning = Running == [],
QEmpty = queue:is_empty(State#state.partition_queue),
Waiting = State#state.whereis_waiting,
case {EmptyRunning, QEmpty, Waiting} of
{true, true, []} ->
MyClusterName = riak_core_connection:symbolic_clustername(),
lager:info("Fullsync complete from ~s to ~s",
[MyClusterName, State#state.other_cluster]),
% clear the "rt dirty" stat if it's set,
% otherwise, don't do anything
State2 = notify_rt_dirty_nodes(State),
%% update legacy stats too! some riak_tests depend on them.
riak_repl_stats:server_fullsyncs(),
TotalFullsyncs = State#state.fullsyncs_completed + 1,
Finish = riak_core_util:moment(),
ElapsedSeconds = Finish - State#state.fullsync_start_time,
riak_repl_util:schedule_cluster_fullsync(State#state.other_cluster),
{noreply, State2#state{running_sources = Running,
fullsyncs_completed = TotalFullsyncs,
fullsync_start_time = undefined,
last_fullsync_duration=ElapsedSeconds
}};
_ ->
% there's something waiting for a response.
State2 = start_up_reqs(State#state{running_sources = Running}),
{noreply, State2}
end.

% dirty_nodes is the set of nodes that are marked "dirty"
% due to a realtime repl issue while fullsync isn't running.
% dirty_nodes_during_fs is the set of nodes that are marked "dirty"
Expand Down
18 changes: 12 additions & 6 deletions src/riak_repl2_fscoordinator_serv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,18 @@ get_partition_node(Partition, Ring) ->
get_node_ip_port(Node, NormIP) ->
{ok, {_IP, Port}} = rpc:call(Node, application, get_env, [riak_core, cluster_mgr]),
{ok, IfAddrs} = inet:getifaddrs(),
CIDR = riak_repl2_ip:determine_netmask(IfAddrs, NormIP),
case get_matching_address(Node, NormIP, CIDR) of
{ok, {ListenIP, _}} ->
{ok, {ListenIP, Port}};
Else ->
Else
case riak_repl2_ip:determine_netmask(IfAddrs, NormIP) of
undefined ->
lager:warning("Can't determine netmask for ~p, please ensure you have NAT configured correctly.",
[NormIP]),
{error, ip_not_local};
CIDR ->
case get_matching_address(Node, NormIP, CIDR) of
{ok, {ListenIP, _}} ->
{ok, {ListenIP, Port}};
Else ->
Else
end
end.

get_matching_address(Node, NormIP, Masked) when Node =:= node() ->
Expand Down
32 changes: 18 additions & 14 deletions src/riak_repl2_fssource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ init([Partition, IP]) ->
case riak_repl_util:maybe_get_vnode_lock(Partition) of
ok ->
%% got the lock, or ignored it.
connect(IP, SupportedStrategy, Partition);
case connect(IP, SupportedStrategy, Partition) of
{error, Reason} ->
{stop, Reason};
Result ->
Result
end;
{error, Reason} ->
%% the vnode is probably busy. Try again later.
{stop, Reason}
Expand Down Expand Up @@ -113,8 +118,6 @@ handle_call({connected, Socket, Transport, _Endpoint, Proto, Props},
Transport, Socket,
Partition,
self()),
%% We want a 'DOWN' message when the aae worker stops itself for not_responsible
erlang:monitor(process, FullsyncWorker),
%% Give control of socket to AAE worker. It will consume all TCP messages.
ok = Transport:controlling_process(Socket, FullsyncWorker),
riak_repl_aae_source:start_exchange(FullsyncWorker),
Expand Down Expand Up @@ -170,6 +173,17 @@ handle_call(cluster_name, _From, State) ->
handle_call(_Msg, _From, State) ->
{reply, ok, State}.

handle_cast(not_responsible, State=#state{partition=Partition}) ->
lager:info("Fullsync of partition ~p stopped because AAE trees can't be compared.", [Partition]),
lager:info("Probable cause is one or more differing bucket n_val properties between source and sink clusters."),
lager:info("Restarting fullsync connection for partition ~p with keylist strategy.", [Partition]),
Strategy = keylist,
case connect(State#state.ip, Strategy, Partition) of
{ok, State2} ->
{noreply, State2};
{error, Reason} ->
{stop, Reason, State}
end;
handle_cast(fullsync_complete, State=#state{partition=Partition}) ->
%% sent from AAE fullsync worker
lager:info("Fullsync for partition ~p complete.", [Partition]),
Expand All @@ -182,16 +196,6 @@ handle_cast({connect_failed, _Pid, Reason},
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({'DOWN', Ref, process, _Pid, not_responsible}, State=#state{partition=Partition}) ->
erlang:demonitor(Ref),
lager:info("Fullsync of partition ~p stopped because AAE trees can't be compared.", [Partition]),
lager:info("Probable cause is one or more differing bucket n_val properties between source and sink clusters."),
lager:info("Restarting fullsync connection for partition ~p with keylist strategy.", [Partition]),
Strategy = keylist,
case connect(State#state.ip, Strategy, Partition) of
{ok, State2} -> {noreply, State2};
Error -> Error
end;
handle_info({Closed, Socket}, State=#state{socket=Socket})
when Closed == tcp_closed; Closed == ssl_closed ->
lager:info("Connection for site ~p closed", [State#state.cluster]),
Expand Down Expand Up @@ -307,5 +311,5 @@ connect(IP, Strategy, Partition) ->
connection_ref = Ref, partition=Partition}};
{error, Reason}->
lager:warning("Error connecting to remote"),
{stop, Reason}
{error, Reason}
end.
Loading

0 comments on commit 03d6508

Please sign in to comment.