Skip to content

Commit

Permalink
Implement validation for optuna suggestion service (#1924)
Browse files Browse the repository at this point in the history
  • Loading branch information
tenzen-y authored Aug 8, 2022
1 parent 42bc6a9 commit 8d58b0a
Show file tree
Hide file tree
Showing 3 changed files with 513 additions and 129 deletions.
99 changes: 99 additions & 0 deletions pkg/suggestion/v1beta1/optuna/base_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 2022 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import optuna
from collections import defaultdict

from pkg.suggestion.v1beta1.internal.constant import INTEGER, DOUBLE, CATEGORICAL, DISCRETE, MAX_GOAL
from pkg.suggestion.v1beta1.internal.trial import Assignment


class BaseOptunaService(object):
def __init__(self,
algorithm_name="",
algorithm_config=None,
search_space=None):
self.algorithm_name = algorithm_name
self.algorithm_config = algorithm_config
self.search_space = search_space
self.assignments_to_optuna_number = defaultdict(list)
self.recorded_trial_names = set()
self.study = None
self._create_study()

def _create_study(self):
sampler = self._create_sampler()
direction = "maximize" if self.search_space.goal == MAX_GOAL else "minimize"

self.study = optuna.create_study(sampler=sampler, direction=direction)

def _create_sampler(self):
if self.algorithm_name == "tpe" or self.algorithm_name == "multivariate-tpe":
return optuna.samplers.TPESampler(**self.algorithm_config)

elif self.algorithm_name == "cmaes":
return optuna.samplers.CmaEsSampler(**self.algorithm_config)

elif self.algorithm_name == "random":
return optuna.samplers.RandomSampler(**self.algorithm_config)

def get_suggestions(self, trials, current_request_number):
if len(trials) != 0:
self._tell(trials)
return self._ask(current_request_number)

def _ask(self, current_request_number):
list_of_assignments = []
for _ in range(current_request_number):
optuna_trial = self.study.ask(fixed_distributions=self._get_optuna_search_space())

assignments = [Assignment(k, v) for k, v in optuna_trial.params.items()]
list_of_assignments.append(assignments)

assignments_key = self._get_assignments_key(assignments)
self.assignments_to_optuna_number[assignments_key].append(optuna_trial.number)

return list_of_assignments

def _tell(self, trials):
for trial in trials:
if trial.name not in self.recorded_trial_names:
self.recorded_trial_names.add(trial.name)

value = float(trial.target_metric.value)
assignments_key = self._get_assignments_key(trial.assignments)
optuna_trial_numbers = self.assignments_to_optuna_number[assignments_key]

if len(optuna_trial_numbers) != 0:
trial_number = optuna_trial_numbers.pop(0)
self.study.tell(trial_number, value)
else:
raise ValueError("An unknown trial has been passed in the GetSuggestion request.")

@staticmethod
def _get_assignments_key(assignments):
assignments = sorted(assignments, key=lambda a: a.name)
assignments_str = [f"{a.name}:{a.value}" for a in assignments]
return ",".join(assignments_str)

def _get_optuna_search_space(self):
search_space = {}
for param in self.search_space.params:
if param.type == INTEGER:
search_space[param.name] = optuna.distributions.IntUniformDistribution(int(param.min), int(param.max))
elif param.type == DOUBLE:
search_space[param.name] = optuna.distributions.UniformDistribution(float(param.min), float(param.max))
elif param.type == CATEGORICAL or param.type == DISCRETE:
search_space[param.name] = optuna.distributions.CategoricalDistribution(param.list)
return search_space
260 changes: 135 additions & 125 deletions pkg/suggestion/v1beta1/optuna/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,159 +12,169 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import defaultdict
import threading

import optuna
import grpc
import logging

from pkg.apis.manager.v1beta1.python import api_pb2
from pkg.apis.manager.v1beta1.python import api_pb2_grpc
from pkg.suggestion.v1beta1.internal.constant import INTEGER, DOUBLE, CATEGORICAL, DISCRETE, MAX_GOAL
from pkg.suggestion.v1beta1.internal.constant import INTEGER, DOUBLE
from pkg.suggestion.v1beta1.internal.search_space import HyperParameterSearchSpace
from pkg.suggestion.v1beta1.internal.trial import Trial, Assignment
from pkg.suggestion.v1beta1.optuna.base_service import BaseOptunaService
from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer

logger = logging.getLogger(__name__)


class OptunaService(api_pb2_grpc.SuggestionServicer, HealthServicer):

def __init__(self):
super(OptunaService, self).__init__()
self.study = None
self.search_space = None
self.recorded_trial_names = set()
self.assignments_to_optuna_number = defaultdict(list)
self.lock = threading.Lock()
self.base_service = None

def GetSuggestions(self, request, context):
"""
Main function to provide suggestion.
"""
with self.lock:
if self.study is None:
self.search_space = HyperParameterSearchSpace.convert(request.experiment)
self.study = self._create_study(request.experiment.spec.algorithm, self.search_space)
name, config = OptimizerConfiguration.convert_algorithm_spec(request.experiment.spec.algorithm)
if self.base_service is None:
search_space = HyperParameterSearchSpace.convert(request.experiment)
self.base_service = BaseOptunaService(
algorithm_name=name,
algorithm_config=config,
search_space=search_space)

trials = Trial.convert(request.trials)

if len(trials) != 0:
self._tell(trials)
list_of_assignments = self._ask(request.current_request_number)

list_of_assignments = self.base_service.get_suggestions(trials, request.current_request_number)
return api_pb2.GetSuggestionsReply(
parameter_assignments=Assignment.generate(list_of_assignments)
)

def _create_study(self, algorithm_spec, search_space):
sampler = self._create_sampler(algorithm_spec)
direction = "maximize" if search_space.goal == MAX_GOAL else "minimize"

study = optuna.create_study(sampler=sampler, direction=direction)

return study

def _create_sampler(self, algorithm_spec):
name = algorithm_spec.algorithm_name
settings = {s.name: s.value for s in algorithm_spec.algorithm_settings}

if name == "tpe" or name == "multivariate-tpe":
kwargs = {}
for k, v in settings.items():
if k == "n_startup_trials":
kwargs["n_startup_trials"] = int(v)
elif k == "n_ei_candidates":
kwargs["n_ei_candidates"] = int(v)
elif k == "random_state":
kwargs["seed"] = int(v)
def ValidateAlgorithmSettings(self, request, context):
is_valid, message = OptimizerConfiguration.validate_algorithm_spec(
request.experiment.spec.algorithm)
if not is_valid:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details(message)
logger.error(message)
return api_pb2.ValidateAlgorithmSettingsReply()


class OptimizerConfiguration(object):
__conversion_dict = {
"tpe": {
"n_startup_trials": lambda x: int(x),
"n_ei_candidates": lambda x: int(x),
"seed": lambda x: int(x),
"constant_liar": True,
},
"multivariate-tpe": {
"n_startup_trials": lambda x: int(x),
"n_ei_candidates": lambda x: int(x),
"seed": lambda x: int(x),
"multivariate": "multivariate-tpe",
"constant_liar": True,
},
"cmaes": {
"restart_strategy": lambda x: None if x == "None" or x == "none" else x,
"sigma0": lambda x: float(x),
"seed": lambda x: int(x),
},
"random": {
"seed": lambda x: int(x),
},
}

@classmethod
def convert_algorithm_spec(cls, algorithm_spec):
config = {}

algorithm_name = algorithm_spec.algorithm_name
setting_schema = cls.__conversion_dict[algorithm_name]
for s in algorithm_spec.algorithm_settings:
if s.name in setting_schema:
config[s.name] = setting_schema[s.name](s.value)
elif s.name == "sigma":
config["sigma0"] = setting_schema["sigma0"](s.value)
elif s.name == "random_state":
config["seed"] = setting_schema["seed"](s.value)

if algorithm_name == "tpe" or algorithm_name == "multivariate-tpe":
config["constant_liar"] = setting_schema["constant_liar"]
if algorithm_name == "multivariate-tpe":
config["multivariate"] = setting_schema["multivariate"]

return algorithm_spec.algorithm_name, config

@classmethod
def validate_algorithm_spec(cls, algorithm_spec):
algorithm_name = algorithm_spec.algorithm_name
algorithm_settings = algorithm_spec.algorithm_settings

if algorithm_name == "tpe" or algorithm_name == "multivariate-tpe":
return cls._validate_tpe_setting(algorithm_spec)
elif algorithm_name == "cmaes":
return cls._validate_cmaes_setting(algorithm_settings)
elif algorithm_name == "random":
return cls._validate_random_setting(algorithm_settings)

@classmethod
def _validate_tpe_setting(cls, algorithm_spec):
algorithm_name = algorithm_spec.algorithm_name
algorithm_settings = algorithm_spec.algorithm_settings

for s in algorithm_settings:
try:
if s.name in ["n_startup_trials", "n_ei_candidates", "random_state"]:
if not int(s.value) >= 0:
return False, "{} should be greate or equal than zero".format(s.name)
else:
raise ValueError("Unknown name for {}: {}".format(name, k))

kwargs["multivariate"] = name == "multivariate-tpe"
kwargs["constant_liar"] = True

sampler = optuna.samplers.TPESampler(**kwargs)

elif name == "cmaes":
kwargs = {}
for k, v in settings.items():
if k == "restart_strategy":
kwargs["restart_strategy"] = v
elif k == "sigma":
kwargs["sigma0"] = float(v)
elif k == "random_state":
kwargs["seed"] = int(v)
return False, "unknown setting {} for algorithm {}".format(s.name, algorithm_name)
except Exception as e:
return False, "failed to validate {name}({value}): {exception}".format(name=s.name, value=s.value,
exception=e)

return True, ""

@classmethod
def _validate_cmaes_setting(cls, algorithm_settings):
if len(algorithm_settings) < 2:
return False, "cmaes only supports two or more dimensional continuous search space."

for s in algorithm_settings:
try:
if s.name == "restart_strategy":
if s.value not in ["ipop", "None", "none"]:
return False, "restart_strategy {} is not supported in CMAES optimization".format(s.value)
elif s.name == "sigma":
if not float(s.value) >= 0:
return False, "sigma should be greate or equal than zero"
elif s.name == "random_state":
if not int(s.value) >= 0:
return False, "random_state should be greate or equal than zero"
else:
raise ValueError("Unknown name for {}: {}".format(name, k))

sampler = optuna.samplers.CmaEsSampler(**kwargs)

elif name == "random":
kwargs = {}
for k, v in settings.items():
if k == "random_state":
kwargs["seed"] = int(v)
return False, "unknown setting {} for algorithm cmaes".format(s.name)

except Exception as e:
return False, "failed to validate {name}({value}): {exception}".format(name=s.name, value=s.value,
exception=e)
return True, ""

@classmethod
def _validate_random_setting(cls, algorithm_settings):
for s in algorithm_settings:
try:
if s.name == "random_state":
if not int(s.value) >= 0:
return False, ""
else:
raise ValueError("Unknown name for {}: {}".format(name, k))
return False, "unknown setting {} for algorithm random".format(s.name)

sampler = optuna.samplers.RandomSampler(**kwargs)
except Exception as e:
return False, "failed to validate {name}({value}): {exception}".format(name=s.name, value=s.value,
exception=e)

else:
raise ValueError("Unknown algorithm name: {}".format(name))

return sampler

def _ask(self, current_request_number):
list_of_assignments = []
for _ in range(current_request_number):
optuna_trial = self.study.ask(fixed_distributions=self._get_optuna_search_space())

assignments = [Assignment(k, v) for k, v in optuna_trial.params.items()]
list_of_assignments.append(assignments)

assignments_key = self._get_assignments_key(assignments)
self.assignments_to_optuna_number[assignments_key].append(optuna_trial.number)

return list_of_assignments

def _tell(self, trials):
for trial in trials:
if trial.name not in self.recorded_trial_names:
self.recorded_trial_names.add(trial.name)

value = float(trial.target_metric.value)
assignments_key = self._get_assignments_key(trial.assignments)
optuna_trial_numbers = self.assignments_to_optuna_number[assignments_key]

if len(optuna_trial_numbers) != 0:
trial_number = optuna_trial_numbers.pop(0)
self.study.tell(trial_number, value)
else:
raise ValueError("An unknown trial has been passed in the GetSuggestion request.")

def _get_assignments_key(self, assignments):
assignments = sorted(assignments, key=lambda a: a.name)
assignments_str = [f"{a.name}:{a.value}" for a in assignments]
return ",".join(assignments_str)

def _get_optuna_search_space(self):
search_space = {}
for param in self.search_space.params:
if param.type == INTEGER:
search_space[param.name] = optuna.distributions.IntUniformDistribution(int(param.min), int(param.max))
elif param.type == DOUBLE:
search_space[param.name] = optuna.distributions.UniformDistribution(float(param.min), float(param.max))
elif param.type == CATEGORICAL or param.type == DISCRETE:
search_space[param.name] = optuna.distributions.CategoricalDistribution(param.list)
return search_space

def _get_casted_assignment_value(self, assignment):
for param in self.search_space.params:
if param.name == assignment.name:
if param.type == INTEGER:
return int(assignment.value)
elif param.type == DOUBLE:
return float(assignment.value)
elif param.type == CATEGORICAL or param.type == DISCRETE:
return assignment.value
else:
raise ValueError("Unknown parameter type: {}".format(param.type))
raise ValueError("Parameter not found in the search space: {}".format(param.name))
return True, ""
Loading

0 comments on commit 8d58b0a

Please sign in to comment.