Skip to content

Commit

Permalink
Improve the implementation of diskspace-check to avoid performance de…
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Sep 12, 2017
1 parent 74ce7d0 commit c49cc62
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 82 deletions.
17 changes: 16 additions & 1 deletion include/leo_object_storage.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,14 @@
is_strict_check = false :: boolean(),
is_locked = false :: boolean(),
is_del_blocked = false :: boolean(),
threshold_slow_processing = ?DEF_THRESHOLD_SLOW_PROC :: non_neg_integer()
threshold_slow_processing = ?DEF_THRESHOLD_SLOW_PROC :: non_neg_integer(),
is_able_to_write = true :: boolean()
}).

%% For Disk space monitor
-record(avs_path_and_servers, {
path = [] :: string(),
servers = [] :: [atom()]
}).

%% apllication-env
Expand Down Expand Up @@ -628,6 +635,14 @@
end,_TargetContainers))
end).

-define(env_diskspace_check_intervals(),
case application:get_env(?APP_NAME, diskspace_check_intervals) of
{ok, EnvDiskSpaceCheckIntervals} ->
EnvDiskSpaceCheckIntervals;
_ ->
timer:minutes(1)
end).


%% custom-metadata's items for MDC-replication:
-define(PROP_CMETA_CLUSTER_ID, 'cluster_id').
Expand Down
168 changes: 168 additions & 0 deletions src/leo_object_storage_diskspace_mon.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
%%======================================================================
%%
%% Leo Object Storage
%%
%% Copyright (c) 2012-2017 Rakuten, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% ---------------------------------------------------------------------
%% Leo Object Storage - Server
%%
%% @doc The object storage server
%% @reference https://github.com/leo-project/leo_object_storage/blob/master/src/leo_object_storage_server.erl
%% @end
%%======================================================================
-module(leo_object_storage_diskspace_mon).
-behaviour(gen_server).

-include("leo_object_storage.hrl").
-include_lib("eunit/include/eunit.hrl").

%% API
-export([start_link/2,
stop/0]).

%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).


-record(state, {
check_intervals = timer:minutes(1) :: pos_integer(),
avs_path_and_servers_list = [] :: [#avs_path_and_servers{}]
}).

-define(DEF_TIMEOUT, timer:seconds(30)).


%%====================================================================
%% API
%%====================================================================
%% @doc Starts the server with check intervals (sec)
%%
-spec(start_link(CheckIntervals, AvsPathAndServersList) ->
{ok, pid()} | {error, any()} when CheckIntervals::pos_integer(),
AvsPathAndServersList::[#avs_path_and_servers{}]).
start_link(CheckIntervals, AvsPathAndServersList) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [CheckIntervals, AvsPathAndServersList], []).


%% @doc Stop this server
%%
-spec(stop() ->
ok).
stop() ->
gen_server:call(?MODULE, stop, ?DEF_TIMEOUT).


%%====================================================================
%% GEN_SERVER CALLBACKS
%%====================================================================
%% @doc Initiates the server
init([CheckIntervals, AvsPathAndServersList]) ->
erlang:send_after(CheckIntervals, self(), trigger),
{ok, #state{check_intervals = CheckIntervals,
avs_path_and_servers_list = AvsPathAndServersList}}.


%% @doc gen_server callback - Module:handle_call(Request, From, State) -> Result
handle_call(stop,_From, State) ->
{stop, shutdown, ok, State};
handle_call(_,_From, State) ->
{reply, ok, State}.

%% @doc Handling cast message
%% <p>
%% gen_server callback - Module:handle_cast(Request, State) -> Result.
%% </p>
handle_cast(_Msg, State) ->
{noreply, State}.


%% @doc Handling all non call/cast messages
%% <p>
%% gen_server callback - Module:handle_info(Info, State) -> Result.
%% </p>
handle_info(trigger, #state{check_intervals = CheckIntervals,
avs_path_and_servers_list = AvsPathAndServersList} = State) ->
{ok, NewAvsPathAndServersList} = check_disk_space(AvsPathAndServersList),
erlang:send_after(CheckIntervals, self(), trigger),
{noreply, State#state{avs_path_and_servers_list = NewAvsPathAndServersList}};
handle_info(_Info, State) ->
{noreply, State}.

%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
terminate(_Reason,_State) ->
ok.

%% @doc Convert process state when code is changed
code_change(_OldVsn, State, _Extra) ->
{ok, State}.


%%====================================================================
%% Internal Functions
%%====================================================================
check_disk_space([]) ->
ok;
check_disk_space([#avs_path_and_servers{path = AVSPath,
servers = ServerList} | Rest]) ->
case is_freespace_lt_avs(AVSPath) of
{ok, Ret} ->
notify_diskspace_status(ServerList, Ret);
{error, Error} ->
error_logger:error_msg(
"~p,~p,~p,~p~n",
[{module, ?MODULE_STRING}, {function, "check_disk_space/2"},
{line, ?LINE},
{body, [{simple_reason, "Cannot retrieve a mount path of AVS"},
{cause, Error}]}])
end,
check_disk_space(Rest).


%% @doc Return {ok, true} if the free disk space is less than(lt) the size of the current AVS,
%% otherwise {ok, false} or {error, Cause} if some error happened.
%% @private
is_freespace_lt_avs(AVSPath) ->
case leo_file:file_get_mount_path(AVSPath) of
{ok, {_MountPath, TotalSize, UsedPercentage}} ->
FreeSize = TotalSize * ((100 - UsedPercentage) / 100) * 1024,
AVSSize = filelib:file_size(AVSPath),
{ok, FreeSize < AVSSize};
Error ->
{error, Error}
end.


%% @private
notify_diskspace_status([],_) ->
ok;
notify_diskspace_status([Server|Rest], Ret) ->
Msg = case Ret of
true ->
'error';
false ->
'ok'
end,
leo_object_storage_server:update_diskspace_status(Server, Msg),
notify_diskspace_status(Rest, Ret).
111 changes: 53 additions & 58 deletions src/leo_object_storage_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
switch_container/4,
append_compaction_history/2,
get_compaction_worker/1,
get_eof_offset/1
get_eof_offset/1,
update_diskspace_status/2
]).

%% gen_server callbacks
Expand Down Expand Up @@ -301,6 +302,11 @@ get_eof_offset(Id) ->
gen_server:call(Id, get_eof_offset, ?DEF_TIMEOUT).


%% @doc Update each diskspace's status
update_diskspace_status(Id, DiskSpaceStatus) ->
gen_server:call(Id, {update_diskspace_status, DiskSpaceStatus}, ?DEF_TIMEOUT).


-ifdef(TEST).
%% @doc Store metadata and data
%%
Expand Down Expand Up @@ -412,22 +418,20 @@ handle_call({put,_,_}, _From, #obj_server_state{is_locked = true} = State) ->
{reply, {error, ?ERROR_LOCKED_CONTAINER}, State};
handle_call({put,_,_}, _From, #obj_server_state{privilege = ?OBJ_PRV_READ_ONLY} = State) ->
{reply, {error, ?ERROR_NOT_ALLOWED_ACCESS}, State};


handle_call({put,_Object,_InTime}, _From,
#obj_server_state{is_able_to_write = false} = State) ->
{reply, {error, ?ERROR_FREESPACE_LT_AVS}, State};
handle_call({put = Method, #?OBJECT{addr_id = AddrId,
key = Key} = Object, InTime}, _From,
#obj_server_state{object_storage = StorageInfo} = State) ->
case is_freespace_lt_avs(StorageInfo#backend_info.file_path) of
{ok, true} ->
{reply, {error, ?ERROR_FREESPACE_LT_AVS}, State};
{ok, false} ->
Key_1 = ?gen_backend_key(StorageInfo#backend_info.avs_ver_cur,
AddrId, Key),
{Reply, State_1} = put_1(Key_1, Object, State),

erlang:garbage_collect(self()),
reply(Method, Key, Reply, InTime, State_1);
Error ->
{reply, {error, Error}, State}
end;
Key_1 = ?gen_backend_key(StorageInfo#backend_info.avs_ver_cur,
AddrId, Key),
{Reply, State_1} = put_1(Key_1, Object, State),

erlang:garbage_collect(self()),
reply(Method, Key, Reply, InTime, State_1);

%% Retrieve an object
handle_call({get = Method, {AddrId, Key}, StartPos, EndPos, IsForcedCheck, InTime},
Expand Down Expand Up @@ -498,40 +502,37 @@ handle_call({store,_,_,_}, _From, #obj_server_state{is_locked = true} = State) -
{reply, {error, ?ERROR_LOCKED_CONTAINER}, State};
handle_call({store,_,_,_}, _From, #obj_server_state{privilege = ?OBJ_PRV_READ_ONLY} = State) ->
{reply, {error, ?ERROR_NOT_ALLOWED_ACCESS}, State};
handle_call({store,_Metadata,_Bin,_InTime}, _From,
#obj_server_state{is_able_to_write = false} = State) ->
{reply, {error, ?ERROR_FREESPACE_LT_AVS}, State};
handle_call({store = Method, Metadata, Bin, InTime}, _From,
#obj_server_state{object_storage = StorageInfo,
is_del_blocked = IsDelBlocked} = State) ->
case is_freespace_lt_avs(StorageInfo#backend_info.file_path) of
{ok, true} ->
{reply, {error, ?ERROR_FREESPACE_LT_AVS}, State};
{ok, false} ->
Metadata_1 = leo_object_storage_transformer:transform_metadata(Metadata),
Key = ?gen_backend_key(StorageInfo#backend_info.avs_ver_cur,
Metadata_1#?METADATA.addr_id,
Metadata_1#?METADATA.key),
Object = leo_object_storage_transformer:metadata_to_object(Bin, Metadata),
{Reply, State_1} =
case Metadata_1#?METADATA.del of
?DEL_TRUE when IsDelBlocked == true ->
{{error, ?ERROR_LOCKED_CONTAINER}, State};
?DEL_TRUE ->
delete_1(Key, Object, State);
?DEL_FALSE ->
put_1(Key, Object, State)
end,
Reply_1 = case Reply of
ok ->
ok;
{ok, _} ->
ok;
Other ->
Other
end,
erlang:garbage_collect(self()),
reply(Method, Metadata_1#?METADATA.key, Reply_1, InTime, State_1);
Error ->
{reply, {error, Error}, State}
end;
Metadata_1 = leo_object_storage_transformer:transform_metadata(Metadata),
Key = ?gen_backend_key(StorageInfo#backend_info.avs_ver_cur,
Metadata_1#?METADATA.addr_id,
Metadata_1#?METADATA.key),
Object = leo_object_storage_transformer:metadata_to_object(Bin, Metadata),
{Reply, State_1} =
case Metadata_1#?METADATA.del of
?DEL_TRUE when IsDelBlocked == true ->
{{error, ?ERROR_LOCKED_CONTAINER}, State};
?DEL_TRUE ->
delete_1(Key, Object, State);
?DEL_FALSE ->
put_1(Key, Object, State)
end,
Reply_1 = case Reply of
ok ->
ok;
{ok, _} ->
ok;
Other ->
Other
end,
erlang:garbage_collect(self()),
reply(Method, Metadata_1#?METADATA.key, Reply_1, InTime, State_1);


%% Retrieve the current status
handle_call(get_stats, _From, #obj_server_state{storage_stats = StorageStats} = State) ->
Expand Down Expand Up @@ -659,6 +660,13 @@ handle_call(get_eof_offset,
Reply = leo_object_storage_haystack:get_eof_offset(StorageInfo),
{reply, Reply, State};

%% Update each diskspace's status
handle_call({update_diskspace_status, 'ok'},_From, State) ->
{reply, ok, State#obj_server_state{is_able_to_write = true}};
handle_call({update_diskspace_status, 'error'},_From, State) ->
{reply, ok, State#obj_server_state{is_able_to_write = false}};


%% Put incorrect data for the unit-test
handle_call({add_incorrect_data,_Bin},
_From, #obj_server_state{object_storage =_StorageInfo} = State) ->
Expand Down Expand Up @@ -940,16 +948,3 @@ close_storage(Id, MetaDBId, StateFilePath,
ok;
close_storage(_,_,_,_,_,_,_) ->
ok.

%% @doc Return {ok, true} if the free disk space is less than(lt) the size of the current AVS,
%% otherwise {ok, false} or {error, Cause} if some error happened.
%% @private
is_freespace_lt_avs(AVSPath) ->
case leo_file:file_get_mount_path(AVSPath) of
{ok, {_MountPath, TotalSize, UsedPercentage}} ->
FreeSize = TotalSize * ((100 - UsedPercentage) / 100) * 1024,
AVSSize = filelib:file_size(AVSPath),
{ok, FreeSize < AVSSize};
Error ->
{error, Error}
end.
Loading

0 comments on commit c49cc62

Please sign in to comment.