Skip to content

Commit

Permalink
DP-6 working prototype of API entry points coverage, http and pb API
Browse files Browse the repository at this point in the history
There is a matching commit in a newly created branch
feature/az/api-entrypoints-coverage, for the pb part to become
functional.

The following HTTP requests are accepted (values for parameter P are
"http", "pbc", assuming the latter by default):

1) /ring/coverage/bucket/B/key/K/?proto=P

  returning a JSON of the form:

   {Host:Port}

indicating Host:Port is where an entry point is for optimal data access
to the key K in bucket B, via the given protocol API.

2) /ring/coverage/bucket/B/key/K/?proto=P

  returning a JSON of the form:

  [{Host:Port}]

providing a complete API entry point map for access to the entire
cluster via that protocol.

Responses are cached for (a default of) 15 secs, configurable via
parameter 'ring_vnodes_cache_expiry_time'.

For pb, two new messages are provided:

 RpbApiEpReq (code 90),
 RpbApiEpResp(code 91),
 RpbApiEpMapReq (code 92),
 RpbApiEpMapResp(code 93),

with the relevant snippet in riak_pb/src/riak_kv.proto:

 enum RpbApiProto {
     pbc = 0;
     http = 1;
 }
 message RpbApiEp {
     required bytes host = 1;
     required int32 port = 2;
 }

 message RpbApiEpReq {
     required bytes bucket = 1;
     required bytes key = 2;
     optional RpbApiProto proto = 3 [default = pbc];
 }
 message RpbApiEpResp {
     optional RpbApiEp ep = 1;
 }

 message RpbApiEpMapReq {
     optional RpbApiProto proto = 1 [default = pbc];
 }

 message RpbApiEpMapResp {
     repeated RpbApiEp eplist= 1;
 }

The coverage information is obtained in two steps:

1. using riak_core_apl:get_apl_ann(), get active preflist for given
   Bucket and Key, and extract riak nodes from it;

2. make a rpc call on those nodes to determine which of the relevant
   listener apps (i.e., riak_api_pb_listener, riak_api_web) are running
   on those nodes.

The corresponding new riak python client methods are

* client.get_api_entry_point(bucket, key, proto),
* client.get_api_entry_points_map(proto)

included in
github.com/basho/riak-python-client/tree/feature/az/api-entrypoints-coverage
  • Loading branch information
hmmr committed Feb 28, 2015
1 parent 9bfa7f1 commit d5feabb
Show file tree
Hide file tree
Showing 5 changed files with 417 additions and 4 deletions.
114 changes: 114 additions & 0 deletions src/riak_kv_apiep.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
%% -------------------------------------------------------------------
%%
%% riak_kv_apiep: Common functions for ring/coverage protobuff
%% callback and Webmachine resource
%%
%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% @doc Supporting functions shared between riak_kv_{wm,pb}_apiep.

-module(riak_kv_apiep).

-export([get_entrypoints/1, get_entrypoints_json/1,
get_entrypoints/2, get_entrypoints_json/2]).

-type proto() :: http|pbc.
-type ep() :: {string(), non_neg_integer()}.
-type bkey() :: {Bucket::binary(), Key::binary()}.

-define(RPC_TIMEOUT, 10000).


-spec get_entrypoints_json(proto(), bkey()) -> iolist().
%% @doc Produce API entry points for a given Bucket and Key, in a JSON form.
get_entrypoints_json(Proto, BKey) ->
mochijson2:encode(
{struct, get_entrypoints(Proto, BKey)}).

-spec get_entrypoints_json(proto()) -> iolist().
%% @doc Produce all API entry points in a JSON form.
get_entrypoints_json(Proto) ->
mochijson2:encode(
{struct, get_entrypoints(Proto)}).


-spec get_entrypoints(proto(), bkey()) -> [ep()].
%% @doc For a given protocol, determine host:port entry points of riak
%% nodes containing requested bucket and key.
get_entrypoints(Proto, BKey) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
UpNodes = riak_core_ring:all_members(Ring),
Preflist = riak_core_apl:get_apl_ann(BKey, UpNodes),
Nodes =
lists:usort(
[N || {{_Index, N}, _Type} <- Preflist]), %% filter on type?
case Proto of
http ->
get_http_entrypoints(Nodes);
pbc ->
get_pb_entrypoints(Nodes)
end.

-spec get_entrypoints(proto()) -> [ep()].
%% @doc Returns all API entry points for a given protocol.
get_entrypoints(Proto) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
Nodes = riak_core_ring:all_members(Ring),
case Proto of
http ->
get_http_entrypoints(Nodes);
pbc ->
get_pb_entrypoints(Nodes)
end.


%% ===================================================================
%% Local functions
%% ===================================================================

-spec get_http_entrypoints([node()]) -> [ep()].
%% @private
get_http_entrypoints(Nodes) ->
{ResL, FailedNodes} =
rpc:multicall(
Nodes, riak_api_web, get_listeners, [], ?RPC_TIMEOUT),
case FailedNodes of
[] ->
fine;
FailedNodes ->
lagger:warning(
self(), "Failed to get http riak api listeners at node(s) ~9999p", [FailedNodes])
end,
[HP || {_Proto, HP} <- lists:flatten(ResL)].


-spec get_pb_entrypoints([node()]) -> [ep()].
%% @private
get_pb_entrypoints(Nodes) ->
{ResL, FailedNodes} =
rpc:multicall(
Nodes, riak_api_pb_listener, get_listeners, [], ?RPC_TIMEOUT),
case FailedNodes of
[] ->
fine;
FailedNodes ->
lagger:warning(
self(), "Failed to get pb riak api listeners at node(s) ~9999p", [FailedNodes])
end,
lists:flatten(ResL).
5 changes: 3 additions & 2 deletions src/riak_kv_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%%
%% riak_app: application startup for Riak
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
Expand Down Expand Up @@ -36,7 +36,8 @@
{riak_kv_pb_bucket_key_apl, 33, 34}, %% (Active) Preflist requests
{riak_kv_pb_csbucket, 40, 41}, %% CS bucket folding support
{riak_kv_pb_counter, 50, 53}, %% counter requests
{riak_kv_pb_crdt, 80, 83} %% CRDT requests
{riak_kv_pb_crdt, 80, 83}, %% CRDT requests
{riak_kv_pb_apiep, 90, 93} %% API entry points coverage
]).
-define(MAX_FLUSH_PUT_FSM_RETRIES, 10).

Expand Down
95 changes: 95 additions & 0 deletions src/riak_kv_pb_apiep.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
%% -------------------------------------------------------------------
%%
%% riak_api_pb_apiep: Protobuff callbacks providing a `location service'
%% to external clients for optimal access to hosts
%% with partitions containing known buckets/key
%%
%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% @doc Protobuff callbacks providing a `location service'
%% to external clients for optimal access to hosts
%% with partitions containing certain buckets/key
%%
%% This module serves requests (code), returning response (code):
%% RpbApiEpReq (90) -> RpbApiEpResp (91),
%% RpbApiEpMapReq (92) -> RpbApiEpMapResp (93)

-module(riak_kv_pb_apiep).

-behaviour(riak_api_pb_service).

-export([init/0,
decode/2,
encode/1,
process/2,
process_stream/3]).

-include_lib("riak_pb/include/riak_kv_pb.hrl").

-spec init() -> undefined.
init() ->
undefined.

decode(Code, Bin) when Code == 90 ->
Msg = riak_pb_codec:decode(Code, Bin),
case Msg of
#rpbapiepreq{bucket = B, key = K, proto = P} ->
{ok, Msg, {"riak_kv.apiep", {B, K, P}}}
end;

decode(Code, Bin) when Code == 92 ->
Msg = riak_pb_codec:decode(Code, Bin),
case Msg of
#rpbapiepmapreq{proto = P} ->
{ok, Msg, {"riak_kv.apiepmap", {P}}}
end.


encode(Message) ->
{ok, riak_pb_codec:encode(Message)}.


process(#rpbapiepreq{bucket = Bucket, key = Key, proto = Proto}, State) ->
{Host, Port} =
case riak_kv_apiep:get_entrypoints(Proto, {Bucket, Key}) of
[] ->
{"", 0};
[HP={_H,_P}|_] ->
%% there's a call to underlying function get_listeners(),
%% suggesting multiple entry points are possible, but
%% effectively there is just one
HP
end,
{reply, #rpbapiepresp{
ep = #rpbapiep{
host = list_to_binary(Host),
port = Port}},
State};

process(#rpbapiepmapreq{proto = Proto}, State) ->
HPList = riak_kv_apiep:get_entrypoints(Proto),
{reply, #rpbapiepmapresp{
eplist = [#rpbapiep{
host = list_to_binary(Host),
port = Port} || {Host, Port} <- HPList]},
State}.


process_stream(_, _, State) ->
{ignore, State}.
10 changes: 8 additions & 2 deletions src/riak_kv_web.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%%
%% riak_kv_web: setup Riak's KV HTTP interface
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
Expand Down Expand Up @@ -119,7 +119,13 @@ raw_dispatch(Name) ->
riak_kv_wm_link_walker, Props},

{Prefix ++ ["buckets", bucket, "index", field, '*'],
riak_kv_wm_index, Props}
riak_kv_wm_index, Props},

{Prefix ++ ["ring", "coverage"],
riak_kv_wm_apiep, Props},

{Prefix ++ ["ring", "coverage", "bucket", bucket, "key", key],
riak_kv_wm_apiep, Props}

] || {Prefix, Props} <- Props2 ]).

Expand Down
Loading

0 comments on commit d5feabb

Please sign in to comment.