diff --git a/include/leo_object_storage.hrl b/include/leo_object_storage.hrl index 9020d3f..f238045 100644 --- a/include/leo_object_storage.hrl +++ b/include/leo_object_storage.hrl @@ -478,7 +478,14 @@ is_strict_check = false :: boolean(), is_locked = false :: boolean(), is_del_blocked = false :: boolean(), - threshold_slow_processing = ?DEF_THRESHOLD_SLOW_PROC :: non_neg_integer() + threshold_slow_processing = ?DEF_THRESHOLD_SLOW_PROC :: non_neg_integer(), + is_able_to_write = true :: boolean() + }). + +%% For Disk space monitor +-record(avs_path_and_servers, { + path = [] :: string(), + servers = [] :: [atom()] }). %% apllication-env @@ -628,6 +635,14 @@ end,_TargetContainers)) end). +-define(env_diskspace_check_intervals(), + case application:get_env(?APP_NAME, diskspace_check_intervals) of + {ok, EnvDiskSpaceCheckIntervals} -> + EnvDiskSpaceCheckIntervals; + _ -> + timer:minutes(1) + end). + %% custom-metadata's items for MDC-replication: -define(PROP_CMETA_CLUSTER_ID, 'cluster_id'). diff --git a/src/leo_object_storage_diskspace_mon.erl b/src/leo_object_storage_diskspace_mon.erl new file mode 100644 index 0000000..051af89 --- /dev/null +++ b/src/leo_object_storage_diskspace_mon.erl @@ -0,0 +1,168 @@ +%%====================================================================== +%% +%% Leo Object Storage +%% +%% Copyright (c) 2012-2017 Rakuten, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% --------------------------------------------------------------------- +%% Leo Object Storage - Server +%% +%% @doc The object storage server +%% @reference https://github.com/leo-project/leo_object_storage/blob/master/src/leo_object_storage_server.erl +%% @end +%%====================================================================== +-module(leo_object_storage_diskspace_mon). +-behaviour(gen_server). + +-include("leo_object_storage.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%% API +-export([start_link/2, + stop/0]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + + +-record(state, { + check_intervals = timer:minutes(1) :: pos_integer(), + avs_path_and_servers_list = [] :: [#avs_path_and_servers{}] + }). + +-define(DEF_TIMEOUT, timer:seconds(30)). + + +%%==================================================================== +%% API +%%==================================================================== +%% @doc Starts the server with check intervals (sec) +%% +-spec(start_link(CheckIntervals, AvsPathAndServersList) -> + {ok, pid()} | {error, any()} when CheckIntervals::pos_integer(), + AvsPathAndServersList::[#avs_path_and_servers{}]). +start_link(CheckIntervals, AvsPathAndServersList) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [CheckIntervals, AvsPathAndServersList], []). + + +%% @doc Stop this server +%% +-spec(stop() -> + ok). +stop() -> + gen_server:call(?MODULE, stop, ?DEF_TIMEOUT). + + +%%==================================================================== +%% GEN_SERVER CALLBACKS +%%==================================================================== +%% @doc Initiates the server +init([CheckIntervals, AvsPathAndServersList]) -> + erlang:send_after(CheckIntervals, self(), trigger), + {ok, #state{check_intervals = CheckIntervals, + avs_path_and_servers_list = AvsPathAndServersList}}. + + +%% @doc gen_server callback - Module:handle_call(Request, From, State) -> Result +handle_call(stop,_From, State) -> + {stop, shutdown, ok, State}; +handle_call(_,_From, State) -> + {reply, ok, State}. + +%% @doc Handling cast message +%%
+%% gen_server callback - Module:handle_cast(Request, State) -> Result. +%%
+handle_cast(_Msg, State) -> + {noreply, State}. + + +%% @doc Handling all non call/cast messages +%%+%% gen_server callback - Module:handle_info(Info, State) -> Result. +%%
+handle_info(trigger, #state{check_intervals = CheckIntervals, + avs_path_and_servers_list = AvsPathAndServersList} = State) -> + {ok, NewAvsPathAndServersList} = check_disk_space(AvsPathAndServersList), + erlang:send_after(CheckIntervals, self(), trigger), + {noreply, State#state{avs_path_and_servers_list = NewAvsPathAndServersList}}; +handle_info(_Info, State) -> + {noreply, State}. + +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +terminate(_Reason,_State) -> + ok. + +%% @doc Convert process state when code is changed +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%==================================================================== +%% Internal Functions +%%==================================================================== +check_disk_space([]) -> + ok; +check_disk_space([#avs_path_and_servers{path = AVSPath, + servers = ServerList} | Rest]) -> + case is_freespace_lt_avs(AVSPath) of + {ok, Ret} -> + notify_diskspace_status(ServerList, Ret); + {error, Error} -> + error_logger:error_msg( + "~p,~p,~p,~p~n", + [{module, ?MODULE_STRING}, {function, "check_disk_space/2"}, + {line, ?LINE}, + {body, [{simple_reason, "Cannot retrieve a mount path of AVS"}, + {cause, Error}]}]) + end, + check_disk_space(Rest). + + +%% @doc Return {ok, true} if the free disk space is less than(lt) the size of the current AVS, +%% otherwise {ok, false} or {error, Cause} if some error happened. +%% @private +is_freespace_lt_avs(AVSPath) -> + case leo_file:file_get_mount_path(AVSPath) of + {ok, {_MountPath, TotalSize, UsedPercentage}} -> + FreeSize = TotalSize * ((100 - UsedPercentage) / 100) * 1024, + AVSSize = filelib:file_size(AVSPath), + {ok, FreeSize < AVSSize}; + Error -> + {error, Error} + end. + + +%% @private +notify_diskspace_status([],_) -> + ok; +notify_diskspace_status([Server|Rest], Ret) -> + Msg = case Ret of + true -> + 'error'; + false -> + 'ok' + end, + leo_object_storage_server:update_diskspace_status(Server, Msg), + notify_diskspace_status(Rest, Ret). diff --git a/src/leo_object_storage_server.erl b/src/leo_object_storage_server.erl index e1cd698..8237a84 100644 --- a/src/leo_object_storage_server.erl +++ b/src/leo_object_storage_server.erl @@ -48,7 +48,8 @@ switch_container/4, append_compaction_history/2, get_compaction_worker/1, - get_eof_offset/1 + get_eof_offset/1, + update_diskspace_status/2 ]). %% gen_server callbacks @@ -301,6 +302,11 @@ get_eof_offset(Id) -> gen_server:call(Id, get_eof_offset, ?DEF_TIMEOUT). +%% @doc Update each diskspace's status +update_diskspace_status(Id, DiskSpaceStatus) -> + gen_server:call(Id, {update_diskspace_status, DiskSpaceStatus}, ?DEF_TIMEOUT). + + -ifdef(TEST). %% @doc Store metadata and data %% @@ -412,22 +418,20 @@ handle_call({put,_,_}, _From, #obj_server_state{is_locked = true} = State) -> {reply, {error, ?ERROR_LOCKED_CONTAINER}, State}; handle_call({put,_,_}, _From, #obj_server_state{privilege = ?OBJ_PRV_READ_ONLY} = State) -> {reply, {error, ?ERROR_NOT_ALLOWED_ACCESS}, State}; + + +handle_call({put,_Object,_InTime}, _From, + #obj_server_state{is_able_to_write = false} = State) -> + {reply, {error, ?ERROR_FREESPACE_LT_AVS}, State}; handle_call({put = Method, #?OBJECT{addr_id = AddrId, key = Key} = Object, InTime}, _From, #obj_server_state{object_storage = StorageInfo} = State) -> - case is_freespace_lt_avs(StorageInfo#backend_info.file_path) of - {ok, true} -> - {reply, {error, ?ERROR_FREESPACE_LT_AVS}, State}; - {ok, false} -> - Key_1 = ?gen_backend_key(StorageInfo#backend_info.avs_ver_cur, - AddrId, Key), - {Reply, State_1} = put_1(Key_1, Object, State), - - erlang:garbage_collect(self()), - reply(Method, Key, Reply, InTime, State_1); - Error -> - {reply, {error, Error}, State} - end; + Key_1 = ?gen_backend_key(StorageInfo#backend_info.avs_ver_cur, + AddrId, Key), + {Reply, State_1} = put_1(Key_1, Object, State), + + erlang:garbage_collect(self()), + reply(Method, Key, Reply, InTime, State_1); %% Retrieve an object handle_call({get = Method, {AddrId, Key}, StartPos, EndPos, IsForcedCheck, InTime}, @@ -498,40 +502,37 @@ handle_call({store,_,_,_}, _From, #obj_server_state{is_locked = true} = State) - {reply, {error, ?ERROR_LOCKED_CONTAINER}, State}; handle_call({store,_,_,_}, _From, #obj_server_state{privilege = ?OBJ_PRV_READ_ONLY} = State) -> {reply, {error, ?ERROR_NOT_ALLOWED_ACCESS}, State}; +handle_call({store,_Metadata,_Bin,_InTime}, _From, + #obj_server_state{is_able_to_write = false} = State) -> + {reply, {error, ?ERROR_FREESPACE_LT_AVS}, State}; handle_call({store = Method, Metadata, Bin, InTime}, _From, #obj_server_state{object_storage = StorageInfo, is_del_blocked = IsDelBlocked} = State) -> - case is_freespace_lt_avs(StorageInfo#backend_info.file_path) of - {ok, true} -> - {reply, {error, ?ERROR_FREESPACE_LT_AVS}, State}; - {ok, false} -> - Metadata_1 = leo_object_storage_transformer:transform_metadata(Metadata), - Key = ?gen_backend_key(StorageInfo#backend_info.avs_ver_cur, - Metadata_1#?METADATA.addr_id, - Metadata_1#?METADATA.key), - Object = leo_object_storage_transformer:metadata_to_object(Bin, Metadata), - {Reply, State_1} = - case Metadata_1#?METADATA.del of - ?DEL_TRUE when IsDelBlocked == true -> - {{error, ?ERROR_LOCKED_CONTAINER}, State}; - ?DEL_TRUE -> - delete_1(Key, Object, State); - ?DEL_FALSE -> - put_1(Key, Object, State) - end, - Reply_1 = case Reply of - ok -> - ok; - {ok, _} -> - ok; - Other -> - Other - end, - erlang:garbage_collect(self()), - reply(Method, Metadata_1#?METADATA.key, Reply_1, InTime, State_1); - Error -> - {reply, {error, Error}, State} - end; + Metadata_1 = leo_object_storage_transformer:transform_metadata(Metadata), + Key = ?gen_backend_key(StorageInfo#backend_info.avs_ver_cur, + Metadata_1#?METADATA.addr_id, + Metadata_1#?METADATA.key), + Object = leo_object_storage_transformer:metadata_to_object(Bin, Metadata), + {Reply, State_1} = + case Metadata_1#?METADATA.del of + ?DEL_TRUE when IsDelBlocked == true -> + {{error, ?ERROR_LOCKED_CONTAINER}, State}; + ?DEL_TRUE -> + delete_1(Key, Object, State); + ?DEL_FALSE -> + put_1(Key, Object, State) + end, + Reply_1 = case Reply of + ok -> + ok; + {ok, _} -> + ok; + Other -> + Other + end, + erlang:garbage_collect(self()), + reply(Method, Metadata_1#?METADATA.key, Reply_1, InTime, State_1); + %% Retrieve the current status handle_call(get_stats, _From, #obj_server_state{storage_stats = StorageStats} = State) -> @@ -659,6 +660,13 @@ handle_call(get_eof_offset, Reply = leo_object_storage_haystack:get_eof_offset(StorageInfo), {reply, Reply, State}; +%% Update each diskspace's status +handle_call({update_diskspace_status, 'ok'},_From, State) -> + {reply, ok, State#obj_server_state{is_able_to_write = true}}; +handle_call({update_diskspace_status, 'error'},_From, State) -> + {reply, ok, State#obj_server_state{is_able_to_write = false}}; + + %% Put incorrect data for the unit-test handle_call({add_incorrect_data,_Bin}, _From, #obj_server_state{object_storage =_StorageInfo} = State) -> @@ -940,16 +948,3 @@ close_storage(Id, MetaDBId, StateFilePath, ok; close_storage(_,_,_,_,_,_,_) -> ok. - -%% @doc Return {ok, true} if the free disk space is less than(lt) the size of the current AVS, -%% otherwise {ok, false} or {error, Cause} if some error happened. -%% @private -is_freespace_lt_avs(AVSPath) -> - case leo_file:file_get_mount_path(AVSPath) of - {ok, {_MountPath, TotalSize, UsedPercentage}} -> - FreeSize = TotalSize * ((100 - UsedPercentage) / 100) * 1024, - AVSSize = filelib:file_size(AVSPath), - {ok, FreeSize < AVSSize}; - Error -> - {error, Error} - end. diff --git a/src/leo_object_storage_sup.erl b/src/leo_object_storage_sup.erl index 7a666c9..2e5aa88 100644 --- a/src/leo_object_storage_sup.erl +++ b/src/leo_object_storage_sup.erl @@ -115,9 +115,10 @@ start_child(ObjectStorageInfo, CallbackMod) -> BackendDBSupPid = start_child_1(), ok = start_child_2(), - {ok, ServerPairL} = start_child_3(ObjectStorageInfo, 0, - MetadataDB, BackendDBSupPid, - IsStrictCheck, []), + {ok, ServerPairL} = + start_child_3(ObjectStorageInfo, 0, + MetadataDB, BackendDBSupPid, + IsStrictCheck, [], []), ok = start_child_4(ServerPairL), ok = start_child_5(), ok = start_child_6(CallbackMod), @@ -130,14 +131,14 @@ start_child(ObjectStorageInfo, CallbackMod) -> start_child_1() -> case whereis(leo_backend_db_sup) of undefined -> - ChildSpec0 = {leo_backend_db_sup, - {leo_backend_db_sup, start_link, []}, - permanent, 2000, supervisor, [leo_backend_db_sup]}, - case supervisor:start_child(?MODULE, ChildSpec0) of + ChildSpec = {leo_backend_db_sup, + {leo_backend_db_sup, start_link, []}, + permanent, 2000, supervisor, [leo_backend_db_sup]}, + case supervisor:start_child(?MODULE, ChildSpec) of {ok, Pid} -> Pid; - {error, Cause0} -> - exit(Cause0) + {error, Cause} -> + exit(Cause) end; Pid -> Pid @@ -165,10 +166,20 @@ start_child_2() -> %% @doc Launch backend-db's processes %% under the leo_object_storage_sup %% @private -start_child_3([],_,_,_,_,Acc) -> +start_child_3([],_,_,_,_, AVSServerPairL, Acc) -> + ChildSpec = {leo_object_storage_diskspace_mon, + {leo_object_storage_diskspace_mon, start_link, + [?env_diskspace_check_intervals(), AVSServerPairL]}, + permanent, 2000, supervisor, [leo_object_storage_diskspace_mon]}, + case supervisor:start_child(?MODULE, ChildSpec) of + {ok, Pid} -> + Pid; + {error, Cause} -> + exit(Cause) + end, {ok, Acc}; start_child_3([{NumOfContainers, Path}|Rest], Index, - MetadataDB, BackendDBSupPid, IsStrictCheck, Acc) -> + MetadataDB, BackendDBSupPid, IsStrictCheck, AVSServerPairL, Acc) -> Path_1 = get_path(Path), Props = [{num_of_containers, NumOfContainers}, {path, Path_1}, @@ -179,20 +190,25 @@ start_child_3([{NumOfContainers, Path}|Rest], Index, ], true = ets:insert(?ETS_INFO_TABLE, {list_to_atom(?MODULE_STRING ++ integer_to_list(Index)), Props}), - {ok, Acc_1} = start_child_3_1(Index, NumOfContainers - 1, BackendDBSupPid, Props, Acc), - start_child_3(Rest, Index + 1, MetadataDB, BackendDBSupPid, IsStrictCheck, Acc_1). + {ok, {[{AVSPath, Servers}|_], Acc_1}} = start_child_3_1(Index, NumOfContainers - 1, + BackendDBSupPid, Props, dict:new(), Acc), + AVSServerPairL_1 = [#avs_path_and_servers{path = AVSPath, + servers = Servers}|AVSServerPairL], + start_child_3(Rest, Index + 1, MetadataDB, BackendDBSupPid, IsStrictCheck, AVSServerPairL_1, Acc_1). %% @doc Launch %% @private -start_child_3_1(_,-1,_,_,Acc) -> - {ok, Acc}; -start_child_3_1(DeviceIndex, ContainerIndex, BackendDBSupPid, Props, Acc) -> +start_child_3_1(_,-1,_,_, Dict, Acc) -> + {ok, {dict:to_list(Dict), Acc}}; +start_child_3_1(DeviceIndex, ContainerIndex, BackendDBSupPid, Props, Dict, Acc) -> Id = (DeviceIndex * ?DEVICE_ID_INTERVALS) + ContainerIndex, case add_container(BackendDBSupPid, Id, Props) of {ok, ServerPair} -> + Dict_1 = dict:append(leo_misc:get_value('path', Props), + gen_id(obj_storage, Id), Dict), start_child_3_1(DeviceIndex, ContainerIndex - 1, - BackendDBSupPid, Props, [ServerPair|Acc]); + BackendDBSupPid, Props, Dict_1, [ServerPair|Acc]); {error, Cause} -> exit(Cause) end. @@ -413,10 +429,10 @@ add_container_2(ChildIndex, Mod, BaseId, ObjStorageId, MetaDBId, CompactWorkerId, ObjServerState, Acc) -> ObjStorageId_R = gen_id(obj_storage_read, BaseId, ChildIndex), ChildSpec = {ObjStorageId_R, - {Mod, start_link, - [ObjServerState#obj_server_state{id = ObjStorageId_R, - privilege = ?OBJ_PRV_READ_ONLY}]}, - permanent, 2000, worker, [Mod]}, + {Mod, start_link, + [ObjServerState#obj_server_state{id = ObjStorageId_R, + privilege = ?OBJ_PRV_READ_ONLY}]}, + permanent, 2000, worker, [Mod]}, Ret = case supervisor:start_child(?MODULE, ChildSpec) of {ok,_} -> ok; @@ -425,7 +441,7 @@ add_container_2(ChildIndex, Mod, BaseId, {error, Cause} -> error_logger:error_msg("~p,~p,~p,~p~n", [{module, ?MODULE_STRING}, - {function, "add_container_2/8"}, + {function, "add_container_2/9"}, {line, ?LINE}, {body, Cause}]), {error, Cause} diff --git a/test/basho_bench_driver_leo_object_storage.erl b/test/basho_bench_driver_leo_object_storage.erl index b72964a..7317d3f 100644 --- a/test/basho_bench_driver_leo_object_storage.erl +++ b/test/basho_bench_driver_leo_object_storage.erl @@ -28,7 +28,7 @@ -export([new/1, run/4]). --include_lib("leo_object_storage/include/leo_object_storage.hrl"). +-include("leo_object_storage.hrl"). %% @doc initialize