diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index 9e5ac234d..e05177ad3 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -1337,13 +1337,20 @@ {default, "q1_ttaaefs:block_rtq"} ]}. -%% @doc Enable this node zlib compress objects over the wire +%% @doc Enable this node to zlib compress objects over the wire {mapping, "replrtq_compressonwire", "riak_kv.replrtq_compressonwire", [ {datatype, {flag, enabled, disabled}}, {default, disabled}, {commented, enabled} ]}. +%% @doc Enable this node to replicate reap requests to other clusters +{mapping, "repl_reap", "riak_kv.repl_reap", [ + {datatype, {flag, enabled, disabled}}, + {default, disabled}, + {commented, enabled} +]}. + %% @doc Enable this node to act as a sink and consume from a src cluster {mapping, "replrtq_enablesink", "riak_kv.replrtq_enablesink", [ {datatype, {flag, enabled, disabled}}, diff --git a/src/riak_client.erl b/src/riak_client.erl index 4a524808e..e0f414a20 100644 --- a/src/riak_client.erl +++ b/src/riak_client.erl @@ -196,6 +196,7 @@ replrtq_reset_all_workercounts(WorkerC, PerPeerL) -> {ok, riak_object:riak_object()} | {ok, queue_empty} | {ok, {deleted, vclock:vclock(), riak_object:riak_object()}} | + {ok, {reap, {riak_object:bucket(), riak_object:key(), vclock:vclock(), erlang:timestamp()}}}| {error, timeout} | {error, not_yet_implemented} | {error, Err :: term()}. @@ -223,10 +224,11 @@ fetch(QueueName, {?MODULE, [Node, _ClientId]}) -> -spec push(riak_object:riak_object()|binary(), boolean(), list(), riak_client()) -> {ok, erlang:timestamp()} | + {ok, reap} | {error, too_many_fails} | {error, timeout} | {error, {n_val_violation, N::integer()}}. -push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) -> +push(RObjMaybeBin, IsDeleted, Opts, RiakClient) -> RObj = case riak_object:is_robject(RObjMaybeBin) of % May get pushed a riak object, or a riak object as a binary, but @@ -236,6 +238,25 @@ push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) -> false -> riak_object:nextgenrepl_decode(RObjMaybeBin) end, + case RObj of + {reap, {B, K, TC, LMD}} -> + {repl_reap(B, K, TC), LMD}; + RObj -> + repl_push(RObj, IsDeleted, Opts, RiakClient) + end. + +-spec repl_reap( + riak_object:bucket(), riak_object:key(), vclock:vclock()) -> ok. +repl_reap(B, K, TC) -> + riak_kv_reaper:request_reap({{B, K}, TC, false}). + +-spec repl_push(riak_object:riak_object()|binary(), + boolean(), list(), riak_client()) -> + {ok, erlang:timestamp()} | + {error, too_many_fails} | + {error, timeout} | + {error, {n_val_violation, N::integer()}}. +repl_push(RObj, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) -> Bucket = riak_object:bucket(RObj), Key = riak_object:key(RObj), Me = self(), @@ -579,26 +600,30 @@ consistent_delete(Bucket, Key, Options, _Timeout, {?MODULE, [Node, _ClientId]}) end. --spec reap(riak_object:bucket(), riak_object:key(), riak_client()) - -> boolean(). +-spec reap( + riak_object:bucket(), riak_object:key(), riak_client()) -> boolean(). reap(Bucket, Key, Client) -> case normal_get(Bucket, Key, [deletedvclock], Client) of {error, {deleted, TombstoneVClock}} -> - DeleteHash = riak_object:delete_hash(TombstoneVClock), - reap(Bucket, Key, DeleteHash, Client); + reap(Bucket, Key, TombstoneVClock, Client); _Unexpected -> false end. --spec reap(riak_object:bucket(), riak_object:key(), pos_integer(), - riak_client()) -> boolean(). -reap(Bucket, Key, DeleteHash, {?MODULE, [Node, _ClientId]}) -> +-spec reap( + riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client()) + -> boolean(). +reap(Bucket, Key, TombClock, {?MODULE, [Node, _ClientId]}) -> case node() of Node -> - riak_kv_reaper:direct_reap({{Bucket, Key}, DeleteHash}); + riak_kv_reaper:direct_reap({{Bucket, Key}, TombClock, true}); _ -> - riak_core_util:safe_rpc(Node, riak_kv_reaper, direct_reap, - [{{Bucket, Key}, DeleteHash}]) + riak_core_util:safe_rpc( + Node, + riak_kv_reaper, + direct_reap, + [{{Bucket, Key}, TombClock, true}] + ) end. %% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client()) -> diff --git a/src/riak_kv_clusteraae_fsm.erl b/src/riak_kv_clusteraae_fsm.erl index d27dcaed1..c47aaf703 100644 --- a/src/riak_kv_clusteraae_fsm.erl +++ b/src/riak_kv_clusteraae_fsm.erl @@ -585,8 +585,8 @@ json_encode_results(find_keys, Result) -> Keys = {struct, [{<<"results">>, [{struct, encode_find_key(Key, Int)} || {_Bucket, Key, Int} <- Result]} ]}, mochijson2:encode(Keys); -json_encode_results(find_tombs, Result) -> - json_encode_results(find_keys, Result); +json_encode_results(find_tombs, KeysNClocks) -> + encode_keys_and_clocks(KeysNClocks); json_encode_results(reap_tombs, Count) -> mochijson2:encode({struct, [{<<"dispatched_count">>, Count}]}); json_encode_results(erase_keys, Count) -> @@ -616,7 +616,7 @@ pb_encode_results(merge_branch_nval, _QD, Branches) -> level_two = L2 }; pb_encode_results(fetch_clocks_nval, _QD, KeysNClocks) -> - #rpbaaefoldkeyvalueresp{ + #rpbaaefoldkeyvalueresp{ response_type = atom_to_binary(clock, unicode), keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)}; pb_encode_results(merge_tree_range, QD, Tree) -> @@ -662,8 +662,10 @@ pb_encode_results(find_keys, _QD, Results) -> end, #rpbaaefoldkeycountresp{response_type = <<"find_keys">>, keys_count = lists:map(KeyCountMap, Results)}; -pb_encode_results(find_tombs, QD, Results) -> - pb_encode_results(find_keys, QD, Results); +pb_encode_results(find_tombs, _QD, KeysNClocks) -> + #rpbaaefoldkeyvalueresp{ + response_type = atom_to_binary(clock, unicode), + keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)}; pb_encode_results(reap_tombs, _QD, Count) -> #rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>, keys_count = diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index 8dd1937fa..a9e4b5f39 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -237,6 +237,13 @@ queue_fetch(timeout, StateData) -> Msg = {ReqID, {ok, {deleted, ExpectedClock, Obj}}}, Pid ! Msg, ok = riak_kv_stat:update(ngrfetch_prefetch), + {stop, normal, StateData}; + {Bucket, Key, TombClock, {reap, LMD}} -> + % A reap request was queued - so there is no need to fetch + % A tombstone was queued - so there is no need to fetch + Msg = {ReqID, {ok, {reap, {Bucket, Key, TombClock, LMD}}}}, + Pid ! Msg, + ok = riak_kv_stat:update(ngrfetch_prefetch), {stop, normal, StateData} end. diff --git a/src/riak_kv_pb_object.erl b/src/riak_kv_pb_object.erl index 4d577db26..b266f8fc9 100644 --- a/src/riak_kv_pb_object.erl +++ b/src/riak_kv_pb_object.erl @@ -202,6 +202,19 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin}, case Result of {ok, queue_empty} -> {reply, #rpbfetchresp{queue_empty = true}, State}; + {ok, {reap, {B, K, TC, LMD}}} -> + EncObj = + riak_object:nextgenrepl_encode( + repl_v1, {reap, {B, K, TC, LMD}}, false), + CRC32 = erlang:crc32(EncObj), + Resp = + #rpbfetchresp{ + queue_empty = false, + replencoded_object = EncObj, + crc_check = CRC32}, + {reply, + encode_nextgenrepl_response(Encoding, Resp, {B, K, TC}), + State}; {ok, {deleted, TombClock, RObj}} -> % Never bother compressing tombstones, they're practically empty EncObj = riak_object:nextgenrepl_encode(repl_v1, RObj, false), @@ -212,18 +225,7 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin}, replencoded_object = EncObj, crc_check = CRC32, deleted_vclock = pbify_rpbvc(TombClock)}, - case Encoding of - internal -> - {reply, Resp, State}; - internal_aaehash -> - BK = make_binarykey(RObj), - {SegID, SegHash} = - leveled_tictac:tictac_hash(BK, lists:sort(TombClock)), - {reply, - Resp#rpbfetchresp{segment_id = SegID, - segment_hash = SegHash}, - State} - end; + {reply, encode_nextgenrepl_response(Encoding, Resp, RObj), State}; {ok, RObj} -> EncObj = riak_object:nextgenrepl_encode(repl_v1, RObj, ToCompress), CRC32 = erlang:crc32(EncObj), @@ -232,19 +234,7 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin}, deleted = false, replencoded_object = EncObj, crc_check = CRC32}, - case Encoding of - internal -> - {reply, Resp, State}; - internal_aaehash -> - BK = make_binarykey(RObj), - Clock = lists:sort(riak_object:vclock(RObj)), - {SegID, SegHash} = - leveled_tictac:tictac_hash(BK, Clock), - {reply, - Resp#rpbfetchresp{segment_id = SegID, - segment_hash = SegHash}, - State} - end; + {reply, encode_nextgenrepl_response(Encoding, Resp, RObj), State}; {error, Reason} -> {error, {format, Reason}, State} end; @@ -443,7 +433,35 @@ process_stream(_,_,State) -> %% Internal functions %% =================================================================== --spec make_binarykey(riak_object:riak_object()) -> binary(). +-spec encode_nextgenrepl_response( + intenal|internal_aaehash, + #rpbfetchresp{}, + riak_object:riak_object()| + {riak_object:bucket(), riak_object:key(), vclock:vclock()}) + -> #rpbfetchresp{}. +encode_nextgenrepl_response(Encoding, Resp, RObj) -> + case Encoding of + internal -> + Resp; + internal_aaehash -> + {SegID, SegHash} = + case RObj of + {B, K, TC} -> + BK = make_binarykey({B, K}), + leveled_tictac:tictac_hash(BK, lists:sort(TC)); + RObj -> + BK = make_binarykey(RObj), + leveled_tictac:tictac_hash( + BK, lists:sort(riak_object:vclock(RObj))) + end, + Resp#rpbfetchresp{segment_id = SegID, segment_hash = SegHash} + end. + +-spec make_binarykey( + riak_object:riak_object()|{riak_object:bucket(), riak_object:key()}) + -> binary(). +make_binarykey({B, K}) -> + make_binarykey(B, K); make_binarykey(RObj) -> make_binarykey(riak_object:bucket(RObj), riak_object:key(RObj)). diff --git a/src/riak_kv_reaper.erl b/src/riak_kv_reaper.erl index ef04ce582..08cae2b32 100644 --- a/src/riak_kv_reaper.erl +++ b/src/riak_kv_reaper.erl @@ -63,7 +63,12 @@ redo/0]). -type reap_reference() :: - {{riak_object:bucket(), riak_object:key()}, non_neg_integer()}. + {{riak_object:bucket(), riak_object:key()}, vclock:vclock(), boolean()}. + %% the reap reference is {Bucket, Key, Clock (of tombstone), Forward}. The + %% Forward boolean() indicates if this reap should be replicated if + %% riak_kv.repl_reap is true. When a reap is received via replication + %% Forward should be set to false, to prevent reaps from perpetually + %% circulating -type job_id() :: pos_integer(). -export_type([reap_reference/0, job_id/0]). @@ -149,7 +154,7 @@ get_limits() -> %% we will not redo - redo is only to handle the failure related to unavailable %% primaries -spec action(reap_reference(), boolean()) -> boolean(). -action({{Bucket, Key}, DeleteHash}, Redo) -> +action({{Bucket, Key}, TombClock, ToRepl}, Redo) -> BucketProps = riak_core_bucket:get_bucket(Bucket), DocIdx = riak_core_util:chash_key({Bucket, Key}, BucketProps), {n_val, N} = lists:keyfind(n_val, 1, BucketProps), @@ -160,7 +165,11 @@ action({{Bucket, Key}, DeleteHash}, Redo) -> PL0 = lists:map(fun({Target, primary}) -> Target end, PrefList), case check_all_mailboxes(PL0) of ok -> - riak_kv_vnode:reap(PL0, {Bucket, Key}, DeleteHash), + riak_kv_vnode:reap( + PL0, + {Bucket, Key}, + riak_object:delete_hash(TombClock)), + maybe_repl_reap(Bucket, Key, TombClock, ToRepl), timer:sleep(TombPause), true; soft_loaded -> @@ -171,6 +180,7 @@ action({{Bucket, Key}, DeleteHash}, Redo) -> if Redo -> false; true -> true end end. + -spec redo() -> boolean(). redo() -> true. @@ -180,6 +190,18 @@ redo() -> true. -type preflist_entry() :: {non_neg_integer(), node()}. +-spec maybe_repl_reap( + riak_object:bucket(), riak_object:key(), vclock:vclock(), boolean()) -> ok. +maybe_repl_reap(Bucket, Key, TombClock, ToReap) -> + case application:get_env(riak_kv, repl_reap, false) and ToReap of + true -> + riak_kv_replrtq_src:replrtq_reap( + Bucket, Key, TombClock, os:timestamp()); + false -> + ok + end. + + %% Protect against overloading the system when not reaping should any %% mailbox be in soft overload state -spec check_all_mailboxes(list(preflist_entry())) -> ok|soft_loaded. diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index e0328c84d..4803d39a1 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -139,10 +139,10 @@ % Modified time by bucket - second, minute, hour, day, longer} -type reply_tuple() :: - {queue_empty, non_neg_integer()} | - {tomb, non_neg_integer(), non_neg_integer(), non_neg_integer()} | - {object, non_neg_integer(), non_neg_integer(), non_neg_integer()} | - {error, any(), any()}. + {queue_empty, non_neg_integer()}| + {tomb, non_neg_integer(), non_neg_integer(), non_neg_integer()}| + {object, non_neg_integer(), non_neg_integer(), non_neg_integer()}| + {error, any(), any()}. -export_type([peer_info/0, queue_name/0]). @@ -723,9 +723,9 @@ repl_fetcher(WorkItem) -> SWFetched = os:timestamp(), {ok, LMD} = riak_client:push(RObj, false, [], LocalClient), SWPushed = os:timestamp(), - ModSplit = timer:now_diff(SWPushed, LMD), FetchSplit = timer:now_diff(SWFetched, SW), PushSplit = timer:now_diff(SWPushed, SWFetched), + ModSplit = timer:now_diff(SWPushed, LMD), ok = riak_kv_stat:update(ngrrepl_object), done_work(WorkItem, true, {object, FetchSplit, PushSplit, ModSplit}); @@ -750,9 +750,16 @@ repl_fetcher(WorkItem) -> done_work(UpdWorkItem, false, {error, error, remote_error}) end catch - Type:Exception -> - lager:warning("Snk worker failed at Peer ~w due to ~w error ~w", - [Peer, Type, Exception]), + Type:Exception:Stk -> + lager:warning( + "Snk worker failed at Peer ~w due to ~w error ~w", + [Peer, Type, Exception]), + case app_helper:get_env(riak_kv, log_snk_stacktrace, false) of + true -> + lager:warning("Snk worker failed due to ~p", [Stk]); + _ -> + ok + end, RemoteFun(close), UpdWorkItem0 = setelement(3, WorkItem, RenewClientFun()), ok = riak_kv_stat:update(ngrrepl_error), @@ -797,8 +804,9 @@ add_success({{success, Success}, F, FT, PT, RT, MT}) -> add_failure({S, {failure, Failure}, FT, PT, RT, MT}) -> {S, {failure, Failure + 1}, FT, PT, RT, MT}. --spec add_repltime(queue_stats(), - {integer(), integer(), integer()}) -> queue_stats(). +-spec add_repltime( + queue_stats(), {non_neg_integer(), non_neg_integer(), non_neg_integer()}) + -> queue_stats(). add_repltime({S, F, {replfetch_time, FT}, {replpush_time, PT}, {replmod_time, RT}, @@ -810,7 +818,7 @@ add_repltime({S, {replmod_time, RT + RT0}, MT}. --spec add_modtime(queue_stats(), integer()) -> queue_stats(). +-spec add_modtime(queue_stats(), non_neg_integer()) -> queue_stats(). add_modtime({S, F, FT, PT, RT, MT}, ModTime) -> E = mod_split_element(ModTime div 1000) + 1, C = element(E, MT), diff --git a/src/riak_kv_replrtq_src.erl b/src/riak_kv_replrtq_src.erl index fa15ff0db..829a4f6b0 100644 --- a/src/riak_kv_replrtq_src.erl +++ b/src/riak_kv_replrtq_src.erl @@ -35,6 +35,7 @@ handle_call/3, handle_cast/2, handle_info/2, + handle_continue/2, terminate/2, code_change/3, format_status/2]). @@ -44,6 +45,7 @@ replrtq_aaefold/2, replrtq_ttaaefs/2, replrtq_coordput/1, + replrtq_reap/4, register_rtq/2, delist_rtq/1, suspend_rtq/1, @@ -114,9 +116,13 @@ -type object_ref() :: {tomb, riak_object:riak_object()}| {object, riak_object:riak_object()}| + {reap, erlang:timestamp()}| to_fetch. % The object reference can be the actual object or a request to fetch the % actual object using the Bucket, Key and Clock in the repl_entry + % If the replicated operation is a reap the future vector clock should be + % the mpty list (as there will be no object) and the delete hash should be + % passed for validation (as required by riak_kv_vnode:final_delete/3). -type repl_entry() :: {riak_object:bucket(), riak_object:key(), vclock:vclock(), object_ref()}. % If the object is a tombstone which had been PUT, then the actual @@ -222,12 +228,23 @@ replrtq_ttaaefs(QueueName, ReplEntries) -> %% @doc %% Add a single repl_entry associated with a PUT coordinated on this node. -%% Never wait for the response or backoff - replictaion should be asynchronous +%% Never wait for the response or backoff - replication should be asynchronous %% and never slow the PUT path on the src cluster. -spec replrtq_coordput(repl_entry()) -> ok. replrtq_coordput({Bucket, _, _, _} = ReplEntry) when is_binary(Bucket); is_tuple(Bucket) -> gen_server:cast(?MODULE, {rtq_coordput, Bucket, ReplEntry}). + +%% @doc +%% Add a single reference to a reap to be replicated. This is a call to +%% prevent queued reaps from overloading the mailbox of the real-time queue. +-spec replrtq_reap( + riak_object:bucket(), riak_object:key(), + vclock:vclock(), erlang:timestamp()) -> ok. +replrtq_reap(Bucket, Key, TombClock, LMD) -> + gen_server:call( + ?MODULE, {rtq_reap, Bucket, Key, TombClock, LMD}, infinity). + %% @doc %% Setup a queue with a given queuename, which will take coordput repl_entries %% that pass the given filter. @@ -391,6 +408,10 @@ handle_call({bulk_add, Priority, QueueName, ReplEntries}, _From, State) -> _ -> {reply, ok, State} end; +handle_call({rtq_reap, Bucket, Key, TombClock, LMD}, _From, State) -> + QueueNames = find_queues(Bucket, State#state.queue_filtermap, []), + ReapRef = {Bucket, Key, TombClock, {reap, LMD}}, + {reply, ok, State, {continue, {repl, ReapRef, QueueNames}}}; handle_call({length_rtq, QueueName}, _From, State) -> case lists:keyfind(QueueName, 1, State#state.queue_local) of {QueueName, LocalQueues} -> @@ -601,61 +622,9 @@ handle_call(stop, _From, State) -> handle_cast({rtq_coordput, Bucket, ReplEntry}, State) -> - QueueNames = - find_queues(Bucket, State#state.queue_filtermap, []), - AddFun = - fun(QueueName, AccState) -> - {QueueName, LQ} = - lists:keyfind(QueueName, 1, AccState#state.queue_local), - case element(?RTQ_PRIORITY, LQ) of - {_Q, LC, OC} when (LC + OC) >= State#state.queue_limit -> - _ = riak_kv_stat:update({ngrrepl_srcdiscard, 1}), - AccState; - {Q, LC, OC} when LC >= State#state.object_limit; OC > 0 -> - {QueueName, OverflowQ} = - lists:keyfind( - QueueName, - 1, - AccState#state.queue_overflow), - UpdOverflowQ = - riak_kv_overflow_queue:addto_queue( - ?RTQ_PRIORITY, - filter_on_objectlimit(ReplEntry), - OverflowQ), - UpdOverflowQueues = - lists:keyreplace( - QueueName, - 1, - AccState#state.queue_overflow, - {QueueName, UpdOverflowQ}), - UpdLQs = - lists:keyreplace( - QueueName, - 1, - AccState#state.queue_local, - {QueueName, - setelement( - ?RTQ_PRIORITY, - LQ, - {Q, LC, OC + 1})}), - AccState#state{ - queue_overflow = UpdOverflowQueues, - queue_local = UpdLQs}; - {Q, LC, 0} -> - UpdLQs = - lists:keyreplace( - QueueName, - 1, - AccState#state.queue_local, - {QueueName, - setelement( - ?RTQ_PRIORITY, - LQ, - {queue:in(ReplEntry, Q), LC + 1, 0})}), - AccState#state{queue_local = UpdLQs} - end - end, - {noreply, lists:foldl(AddFun, State, QueueNames)}. + QueueNames = find_queues(Bucket, State#state.queue_filtermap, []), + {noreply, State, {continue, {repl, ReplEntry, QueueNames}}}. + handle_info(log_queue, State) -> LogFun = @@ -678,6 +647,55 @@ handle_info(log_queue, State) -> erlang:send_after(State#state.log_frequency_in_ms, self(), log_queue), {noreply, State}. + +handle_continue({repl, _ReplEntry, []}, State) -> + {noreply, State}; +handle_continue({repl, ReplEntry, [QueueName|OtherQueues]}, State) -> + {QueueName, LQ} = lists:keyfind(QueueName, 1, State#state.queue_local), + case element(?RTQ_PRIORITY, LQ) of + {_Q, LC, OC} when (LC + OC) >= State#state.queue_limit -> + _ = riak_kv_stat:update({ngrrepl_srcdiscard, 1}), + {noreply, State, {continue, {repl, ReplEntry, OtherQueues}}}; + {Q, LC, OC} when LC >= State#state.object_limit; OC > 0 -> + {QueueName, OverflowQ} = + lists:keyfind(QueueName, 1, State#state.queue_overflow), + UpdOverflowQ = + riak_kv_overflow_queue:addto_queue( + ?RTQ_PRIORITY, + filter_on_objectlimit(ReplEntry), + OverflowQ), + UpdOverflowQueues = + lists:keyreplace( + QueueName, + 1, + State#state.queue_overflow, + {QueueName, UpdOverflowQ}), + UpdCount = + {QueueName, setelement(?RTQ_PRIORITY, LQ, {Q, LC, OC + 1})}, + UpdLQs = + lists:keyreplace( + QueueName, 1, State#state.queue_local, UpdCount), + {noreply, + State#state{ + queue_overflow = UpdOverflowQueues, + queue_local = UpdLQs}, + {continue, {repl, ReplEntry, OtherQueues}}}; + {Q, LC, 0} -> + UpdLQs = + lists:keyreplace( + QueueName, + 1, + State#state.queue_local, + {QueueName, + setelement( + ?RTQ_PRIORITY, + LQ, + {queue:in(ReplEntry, Q), LC + 1, 0})}), + {noreply, + State#state{queue_local = UpdLQs}, + {continue, {repl, ReplEntry, OtherQueues}}} + end. + format_status(normal, [_PDict, State]) -> State; format_status(terminate, [_PDict, State]) -> diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 93282deee..cf151886d 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1961,14 +1961,14 @@ handle_aaefold({find_tombs, IndexNs, Filtered, ReturnFun, Cntrl, Sender, State) -> FoldFun = - fun(BF, KF, EFs, TombHashAcc) -> + fun(BF, KF, EFs, TombClockAcc) -> {md, MD} = lists:keyfind(md, 1, EFs), case riak_object:is_aae_object_deleted(MD, false) of {true, undefined} -> {clock, VV} = lists:keyfind(clock, 1, EFs), - [{BF, KF, riak_object:delete_hash(VV)}|TombHashAcc]; + [{BF, KF, VV}|TombClockAcc]; {false, undefined} -> - TombHashAcc + TombClockAcc end end, WrappedFoldFun = aaefold_withcoveragecheck(FoldFun, IndexNs, Filtered), @@ -1997,25 +1997,24 @@ handle_aaefold({reap_tombs, case riak_object:is_aae_object_deleted(MD, false) of {true, undefined} -> {clock, VV} = lists:keyfind(clock, 1, EFs), - DH = riak_object:delete_hash(VV), case TombHashAcc of {BatchList, Count, local} -> NewCount = Count + 1, case NewCount rem ?REAPER_BATCH_SIZE of 0 -> riak_kv_reaper:bulk_request_reap( - [{{BF, KF}, DH}|BatchList] + [{{BF, KF}, VV, true}|BatchList] ), {[], NewCount, local}; _ -> - {[{{BF, KF}, DH}|BatchList], + {[{{BF, KF}, VV, true}|BatchList], NewCount, local} end; {BatchList, Count, count} -> {BatchList, Count + 1, count}; {BatchList, Count, Job} -> - {[{{BF, KF}, DH}|BatchList], Count + 1, Job} + {[{{BF, KF}, VV, true}|BatchList], Count + 1, Job} end; {false, undefined} -> TombHashAcc diff --git a/src/riak_kv_wm_queue.erl b/src/riak_kv_wm_queue.erl index 1f5e836ce..02e686482 100644 --- a/src/riak_kv_wm_queue.erl +++ b/src/riak_kv_wm_queue.erl @@ -309,6 +309,16 @@ format_response(_, {ok, queue_empty}, RD, Ctx) -> format_response(_, {error, Reason}, RD, Ctx) -> lager:warning("Fetch error ~w", [Reason]), {{error, Reason}, RD, Ctx}; +format_response(internal_aaehash, {ok, {reap, {B, K, TC, LMD}}}, RD, Ctx) -> + BK = make_binarykey(B, K), + {SegmentID, SegmentHash} = + leveled_tictac:tictac_hash(BK, lists:sort(TC)), + SuccessMark = <<1:8/integer>>, + IsTombstone = <<0:8/integer>>, + ObjBin = encode_riakobject({reap, {B, K, TC, LMD}}), + {<>, RD, Ctx}; format_response(internal_aaehash, {ok, {deleted, TombClock, RObj}}, RD, Ctx) -> BK = make_binarykey(riak_object:bucket(RObj), riak_object:key(RObj)), {SegmentID, SegmentHash} = @@ -332,6 +342,12 @@ format_response(internal_aaehash, {ok, RObj}, RD, Ctx) -> {<>, RD, Ctx}; +format_response(internal, {ok, {reap, {B, K, TC, LMD}}}, RD, Ctx) -> + SuccessMark = <<1:8/integer>>, + IsTombstone = <<0:8/integer>>, + ObjBin = encode_riakobject({reap, {B, K, TC, LMD}}), + {<>, RD, Ctx}; format_response(internal, {ok, {deleted, TombClock, RObj}}, RD, Ctx) -> SuccessMark = <<1:8/integer>>, IsTombstone = <<1:8/integer>>, diff --git a/src/riak_object.erl b/src/riak_object.erl index 9cc4820d5..6a63f83aa 100644 --- a/src/riak_object.erl +++ b/src/riak_object.erl @@ -1188,24 +1188,26 @@ to_binary_version(Vsn, _B, _K, Obj = #r_object{}) -> binary_version(<<131,_/binary>>) -> v0; binary_version(<>) -> v1. +-type repl_ref() :: + {reap, + {riak_object:bucket(), riak_object:key(), + vclock:vclock(), erlang:timestamp()}}. + %% @doc Encode for nextgen_repl --spec nextgenrepl_encode(repl_v1, riak_object(), boolean()) -> binary(). +-spec nextgenrepl_encode( + repl_v1, riak_object()|repl_ref(), boolean()) -> binary(). +nextgenrepl_encode(repl_v1, {reap, {B, K, TC, LMD}}, _ToCompress) -> + ObjBK = nextgenrepl_binarykey(B, K), + TCBin = term_to_binary(TC), + {Mega, Secs, Micro} = LMD, + <<1:4/integer, 0:1/integer, 1:1/integer, 0:2/integer, + ObjBK/binary, + Mega:32/integer, Secs:32/integer, Micro:32/integer, + TCBin/binary>>; nextgenrepl_encode(repl_v1, RObj, ToCompress) -> B = riak_object:bucket(RObj), K = riak_object:key(RObj), - KS = byte_size(K), - ObjBK = - case B of - {T, B0} -> - TS = byte_size(T), - B0S = byte_size(B0), - <>; - B0 -> - B0S = byte_size(B0), - <<0:32/integer, B0S:32/integer, B0/binary, - KS:32/integer, K/binary>> - end, + ObjBK = nextgenrepl_binarykey(B, K), {Version, ObjBin} = case ToCompress of true -> @@ -1217,22 +1219,40 @@ nextgenrepl_encode(repl_v1, RObj, ToCompress) -> end, <>. -%% @doc Deocde for nextgen_repl --spec nextgenrepl_decode(binary()) -> riak_object(). -nextgenrepl_decode(<<1:4/integer, C:1/integer, _:3/integer, +-spec nextgenrepl_binarykey( + riak_object:bucket(), riak_object:key()) -> binary(). +nextgenrepl_binarykey({T, B}, K) -> + TS = byte_size(T), + BS = byte_size(B), + KS = byte_size(K), + <>; +nextgenrepl_binarykey(B, K) -> + BS = byte_size(B), + KS = byte_size(K), + <<0:32/integer, BS:32/integer, B/binary, KS:32/integer, K/binary>>. + +%% @doc Deocde for nextgenrepl +-spec nextgenrepl_decode(binary()) -> riak_object()|repl_ref(). +nextgenrepl_decode(<<1:4/integer, C:1/integer, R:1/integer, _:2/integer, 0:32/integer, BL:32/integer, B:BL/binary, KL:32/integer, K:KL/binary, ObjBin/binary>>) -> - nextgenrepl_decode(B, K, C == 1, ObjBin); -nextgenrepl_decode(<<1:4/integer, C:1/integer, _:3/integer, + nextgenrepl_decode(B, K, C == 1, R == 1, ObjBin); +nextgenrepl_decode(<<1:4/integer, C:1/integer, R:1/integer, _:2/integer, TL:32/integer, T:TL/binary, BL:32/integer, B:BL/binary, KL:32/integer, K:KL/binary, ObjBin/binary>>) -> - nextgenrepl_decode({T, B}, K, C == 1, ObjBin). - -nextgenrepl_decode(B, K, true, ObjBin) -> - nextgenrepl_decode(B, K, false, zlib:uncompress(ObjBin)); -nextgenrepl_decode(B, K, false, ObjBin) -> + nextgenrepl_decode({T, B}, K, C == 1, R == 1, ObjBin). + +nextgenrepl_decode(B, K, _, true, MetaBin) -> + <> = + MetaBin, + {reap, {B, K, binary_to_term(TClockBin), {Mega, Secs, Micro}}}; +nextgenrepl_decode(B, K, true, false, ObjBin) -> + nextgenrepl_decode(B, K, false, false, zlib:uncompress(ObjBin)); +nextgenrepl_decode(B, K, false, false, ObjBin) -> riak_object:from_binary(B, K, ObjBin). %% @doc Convert binary object to riak object @@ -2257,7 +2277,17 @@ nextgenrepl() -> ACZ = riak_object:reconcile([A, C, Z], true), ACZ0 = nextgenrepl_decode(nextgenrepl_encode(repl_v1, ACZ, false)), ACZ0 = nextgenrepl_decode(nextgenrepl_encode(repl_v1, ACZ, true)), - ?assertEqual(ACZ0, ACZ). + ?assertEqual(ACZ, ACZ0), + LMD = os:timestamp(), + {reap, {B, K, ACZ1, LMD}} = + nextgenrepl_decode( + nextgenrepl_encode(repl_v1, {reap, {B, K, ACZ, LMD}}, false)), + {reap, {B, K, ACZ1, LMD}} = + nextgenrepl_decode( + nextgenrepl_encode(repl_v1, {reap, {B, K, ACZ, LMD}}, true)), + ?assertEqual(ACZ, ACZ1). + + verify_contents([], []) ->