diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index bbeb19455..e958f506c 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -183,46 +183,48 @@ do_discovery(QueueName, PeerInfo, Type) -> {SnkWorkerCount, PerPeerLimit} = riak_kv_replrtq_snk:get_worker_counts(), StartDelayMS = riak_kv_replrtq_snk:starting_delay(), CurrentPeers = - case Type of - count_change -> + case {Type, riak_kv_replrtq_snk:current_peers(QueueName)} of + {count_change, _} -> %% Ignore current peers, to update worker counts, so all %% discovered peers will have their worker counts updated as %% the list returned from discover_peers/2 will never match %% the atom count_change. count_change; - _ -> + {_, PeerList} when is_list(PeerList) -> lists:usort( lists:map( fun({ID, _D, H, P, Prot}) -> {ID, StartDelayMS, H, P, Prot} end, - riak_kv_replrtq_snk:current_peers(QueueName))) + PeerList)); + {_, SnkResponse} -> + lager:info( + "Peer Discovery disabled as snk status ~w", [SnkResponse]), + SnkResponse end, - case discover_peers(PeerInfo, StartDelayMS) of - CurrentPeers -> + case {discover_peers(PeerInfo, StartDelayMS), CurrentPeers} of + {CurrentPeers, CurrentPeers} -> lager:info("Type=~w discovery led to no change", [Type]), false; - [] -> + {[], CurrentPeers} when is_list(CurrentPeers) -> lager:info("Type=~w discovery led to reset of peers", [Type]), - riak_kv_replrtq_snk:add_snkqueue(QueueName, - PeerInfo, - SnkWorkerCount, - PerPeerLimit), + riak_kv_replrtq_snk:add_snkqueue( + QueueName, PeerInfo, SnkWorkerCount, PerPeerLimit), false; - DiscoveredPeers -> - case CurrentPeers of - count_change -> - ok; - CurrentPeers when is_list(CurrentPeers) -> - lager:info( - "Type=~w discovery old_peers=~w new_peers=~w", - [Type, length(CurrentPeers), length(DiscoveredPeers)]) - end, - riak_kv_replrtq_snk:add_snkqueue(QueueName, - DiscoveredPeers, - SnkWorkerCount, - PerPeerLimit), - true + {DiscoveredPeers, count_change} -> + riak_kv_replrtq_snk:add_snkqueue( + QueueName, DiscoveredPeers, SnkWorkerCount, PerPeerLimit), + true; + {DiscoveredPeers, _} when is_list(CurrentPeers) -> + lager:info( + "Type=~w discovery old_peers=~w new_peers=~w", + [Type, length(CurrentPeers), length(DiscoveredPeers)]), + riak_kv_replrtq_snk:add_snkqueue( + QueueName, DiscoveredPeers, SnkWorkerCount, PerPeerLimit), + true; + {_, _} -> + lager:info("Type=~w discovery led to no change", [Type]), + false end. -spec discover_peers(list(riak_kv_replrtq_snk:peer_info()), pos_integer()) diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index 4803d39a1..827312e05 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -222,7 +222,7 @@ add_snkqueue( %% Return the current list of peers being used by this snk host, and the %% settings currently being used for this host and he workers per peer. %% Returns undefined if there are currently no peers defined. --spec current_peers(queue_name()) -> list(peer_info())|undefined. +-spec current_peers(queue_name()) -> list(peer_info())|suspended|disabled. current_peers(QueueName) -> gen_server:call(?MODULE, {current_peers, QueueName}, infinity). @@ -251,6 +251,7 @@ set_workercount( infinity ). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -339,11 +340,21 @@ handle_call({worker_count, QueueN, WorkerCount, PerPeerLimit}, _From, State) -> {reply, ok, State#state{work = W0, iteration = Iteration}} end; handle_call({current_peers, QueueN}, _From, State) -> - case lists:keyfind(QueueN, 1, State#state.work) of - false -> - {reply, undefined, State}; - {QueueN, _I, SinkWork} -> - {reply, SinkWork#sink_work.peer_list, State} + case State#state.enabled of + true -> + case lists:keyfind(QueueN, 1, State#state.work) of + false -> + {reply, [], State}; + {QueueN, _I, SinkWork} -> + case SinkWork#sink_work.suspended of + false -> + {reply, SinkWork#sink_work.peer_list, State}; + _ -> + {reply, suspended, State} + end + end; + _ -> + {reply, disabled, State} end.