diff --git a/config/cruisecontrol.properties b/config/cruisecontrol.properties index 40efc9eb12..179d801dc3 100644 --- a/config/cruisecontrol.properties +++ b/config/cruisecontrol.properties @@ -287,3 +287,15 @@ webserver.accesslog.path=access.log # HTTP Request Log retention days webserver.accesslog.retention.days=14 + +# Configurations for servlet +# ========================== + +# Enable two-step verification for processing POST requests. +two.step.verification.enabled=false + +# The maximum time in milliseconds to retain the requests in two-step (verification) purgatory. +two.step.purgatory.retention.time.ms=1209600000 + +# The maximum number of requests in two-step (verification) purgatory. +two.step.purgatory.max.requests=25 diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java index c3fe311693..d37bfd5093 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java @@ -32,7 +32,9 @@ import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef; import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters; import com.linkedin.kafka.cruisecontrol.servlet.response.AdminResult; import com.linkedin.kafka.cruisecontrol.servlet.response.BootstrapResult; @@ -40,7 +42,7 @@ import com.linkedin.kafka.cruisecontrol.servlet.response.CruiseControlState; import com.linkedin.kafka.cruisecontrol.servlet.response.PauseSamplingResult; import com.linkedin.kafka.cruisecontrol.servlet.response.ResumeSamplingResult; -import com.linkedin.kafka.cruisecontrol.servlet.response.StopProposalExecutionResult; +import com.linkedin.kafka.cruisecontrol.servlet.response.StopProposalResult; import com.linkedin.kafka.cruisecontrol.servlet.response.TrainResult; import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats; import java.io.InputStream; @@ -843,11 +845,12 @@ private void executeDemotion(Collection proposals, /** * Stop the executor if it is executing the proposals. * + * @param parameters Stop proposal parameters (not used -- added for standardization and for extensibility). * @return Stop proposal execution result. */ - public StopProposalExecutionResult stopProposalExecution() { + public StopProposalResult stopProposalExecution(StopProposalParameters parameters) { _executor.userTriggeredStopExecution(); - return new StopProposalExecutionResult(); + return new StopProposalResult(); } /** @@ -872,8 +875,11 @@ public CruiseControlState state(OperationProgress operationProgress, /** * Get the cluster state for Kafka. + * + * @param parameters Kafka cluster state parameters (not used -- added for standardization and for extensibility). + * @return Kafka cluster state. */ - public KafkaClusterState kafkaClusterState() { + public KafkaClusterState kafkaClusterState(KafkaClusterStateParameters parameters) { return new KafkaClusterState(_loadMonitor.kafkaCluster()); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlMain.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlMain.java index 6173dca3a3..d5a5582c98 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlMain.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlMain.java @@ -104,11 +104,9 @@ public static void main(String[] args) throws Exception { context.addServlet(holderWebapp, webuiPathPrefix); // Kafka Cruise Control servlet data - long maxBlockMs = config.getLong(KafkaCruiseControlConfig.WEBSERVER_REQUEST_MAX_BLOCK_TIME_MS); - long sessionExpiryMs = config.getLong(KafkaCruiseControlConfig.WEBSERVER_SESSION_EXPIRY_MS); String apiUrlPrefix = config.getString(KafkaCruiseControlConfig.WEBSERVER_API_URLPREFIX); - KafkaCruiseControlServlet kafkaCruiseControlServlet = - new KafkaCruiseControlServlet(kafkaCruiseControl, maxBlockMs, sessionExpiryMs, dropwizardMetricsRegistry, config); + KafkaCruiseControlServlet kafkaCruiseControlServlet = new KafkaCruiseControlServlet(kafkaCruiseControl, + dropwizardMetricsRegistry); ServletHolder servletHolder = new ServletHolder(kafkaCruiseControlServlet); context.addServlet(servletHolder, apiUrlPrefix); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaCruiseControlConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaCruiseControlConfig.java index dfc179ec44..45f894771b 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaCruiseControlConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaCruiseControlConfig.java @@ -698,6 +698,25 @@ public class KafkaCruiseControlConfig extends AbstractConfig { public static final String WEBSERVER_ACCESSLOG_RETENTION_DAYS = "webserver.accesslog.retention.days"; private static final String WEBSERVER_ACCESSLOG_RETENTION_DAYS_DOC = "HTTP Request log retention days"; + /** + * two.step.verification.enabled + */ + public static final String TWO_STEP_VERIFICATION_ENABLED_CONFIG = "two.step.verification.enabled"; + private static final String TWO_STEP_VERIFICATION_ENABLED_DOC = "Enable two-step verification for processing POST requests."; + + /** + * two.step.purgatory.retention.time.ms + */ + public static final String TWO_STEP_PURGATORY_RETENTION_TIME_MS_CONFIG = "two.step.purgatory.retention.time.ms"; + private static final String TWO_STEP_PURGATORY_RETENTION_TIME_MS_DOC = "The maximum time in milliseconds to " + + "retain the requests in two-step (verification) purgatory."; + + /** + * two.step.purgatory.max.requests + */ + public static final String TWO_STEP_PURGATORY_MAX_REQUESTS_CONFIG = "two.step.purgatory.max.requests"; + private static final String TWO_STEP_PURGATORY_MAX_REQUESTS_DOC = "The maximum number of requests in two-step " + + "(verification) purgatory."; static { CONFIG = new ConfigDef() @@ -707,6 +726,8 @@ public class KafkaCruiseControlConfig extends AbstractConfig { WEBSERVER_HTTP_ADDRESS_DOC) .define(WEBSERVER_HTTP_CORS_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, WEBSERVER_HTTP_CORS_ENABLED_DOC) + .define(TWO_STEP_VERIFICATION_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, + TWO_STEP_VERIFICATION_ENABLED_DOC) .define(WEBSERVER_HTTP_CORS_ORIGIN_CONFIG, ConfigDef.Type.STRING, "*", ConfigDef.Importance.LOW, WEBSERVER_HTTP_CORS_ORIGIN_DOC) .define(WEBSERVER_HTTP_CORS_ALLOWMETHODS_CONFIG, ConfigDef.Type.STRING, "OPTIONS, GET, POST", ConfigDef.Importance.HIGH, @@ -808,6 +829,16 @@ public class KafkaCruiseControlConfig extends AbstractConfig { atLeast(0), ConfigDef.Importance.MEDIUM, REMOVAL_HISTORY_RETENTION_TIME_MS_DOC) + .define(TWO_STEP_PURGATORY_RETENTION_TIME_MS_CONFIG, + ConfigDef.Type.LONG, + TimeUnit.HOURS.toMillis(336), + atLeast(TimeUnit.HOURS.toMillis(1)), + ConfigDef.Importance.MEDIUM, TWO_STEP_PURGATORY_RETENTION_TIME_MS_DOC) + .define(TWO_STEP_PURGATORY_MAX_REQUESTS_CONFIG, + ConfigDef.Type.INT, + 25, + atLeast(1), + ConfigDef.Importance.MEDIUM, TWO_STEP_PURGATORY_MAX_REQUESTS_DOC) .define(MAX_CACHED_COMPLETED_USER_TASKS_CONFIG, ConfigDef.Type.INT, 100, diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/EndPoint.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/EndPoint.java index 41fbff9740..b46a1598a8 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/EndPoint.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/EndPoint.java @@ -28,7 +28,8 @@ public enum EndPoint { KAFKA_CLUSTER_STATE, DEMOTE_BROKER, USER_TASKS, - ADMIN; + ADMIN, + REVIEW; private static final List GET_ENDPOINT = Arrays.asList(BOOTSTRAP, TRAIN, @@ -45,7 +46,8 @@ public enum EndPoint { PAUSE_SAMPLING, RESUME_SAMPLING, DEMOTE_BROKER, - ADMIN); + ADMIN, + REVIEW); private static final List CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values())); public static List getEndpoint() { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServlet.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServlet.java index d214fed995..781aa37c1a 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServlet.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServlet.java @@ -14,18 +14,23 @@ import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils; import com.linkedin.kafka.cruisecontrol.servlet.parameters.PartitionLoadParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.BaseParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.ProposalsParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters; import com.linkedin.kafka.cruisecontrol.async.AsyncKafkaCruiseControl; import com.linkedin.kafka.cruisecontrol.async.OperationFuture; import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; +import com.linkedin.kafka.cruisecontrol.servlet.purgatory.Purgatory; +import com.linkedin.kafka.cruisecontrol.servlet.purgatory.RequestInfo; import com.linkedin.kafka.cruisecontrol.servlet.response.CruiseControlResponse; +import com.linkedin.kafka.cruisecontrol.servlet.response.PurgatoryOrReviewResult; import com.linkedin.kafka.cruisecontrol.servlet.response.UserTaskState; import java.io.IOException; import java.util.HashMap; @@ -63,21 +68,17 @@ public class KafkaCruiseControlServlet extends HttpServlet { private final Map _requestMeter = new HashMap<>(); private final Map _successfulRequestExecutionTimer = new HashMap<>(); private final boolean _corsEnabled; + private final boolean _twoStepVerification; + private final Purgatory _purgatory; - public KafkaCruiseControlServlet(AsyncKafkaCruiseControl asynckafkaCruiseControl, - long maxBlockMs, - long sessionExpiryMs, - MetricRegistry dropwizardMetricRegistry, - KafkaCruiseControlConfig kafkaCruiseControlConfig) { - _config = kafkaCruiseControlConfig; + public KafkaCruiseControlServlet(AsyncKafkaCruiseControl asynckafkaCruiseControl, MetricRegistry dropwizardMetricRegistry) { + _config = asynckafkaCruiseControl.config(); _corsEnabled = _config.getBoolean(KafkaCruiseControlConfig.WEBSERVER_HTTP_CORS_ENABLED_CONFIG); _asyncKafkaCruiseControl = asynckafkaCruiseControl; - KafkaCruiseControlConfig config = asynckafkaCruiseControl.config(); - _userTaskManager = new UserTaskManager(sessionExpiryMs, config.getInt(KafkaCruiseControlConfig.MAX_ACTIVE_USER_TASKS_CONFIG), - config.getLong(KafkaCruiseControlConfig.COMPLETED_USER_TASK_RETENTION_TIME_MS_CONFIG), - config.getInt(KafkaCruiseControlConfig.MAX_CACHED_COMPLETED_USER_TASKS_CONFIG), - dropwizardMetricRegistry, _successfulRequestExecutionTimer); - _maxBlockMs = maxBlockMs; + _twoStepVerification = _config.getBoolean(KafkaCruiseControlConfig.TWO_STEP_VERIFICATION_ENABLED_CONFIG); + _purgatory = _twoStepVerification ? new Purgatory(_config) : null; + _userTaskManager = new UserTaskManager(_config, dropwizardMetricRegistry, _successfulRequestExecutionTimer, _purgatory); + _maxBlockMs = _config.getLong(KafkaCruiseControlConfig.WEBSERVER_REQUEST_MAX_BLOCK_TIME_MS); _asyncOperationStep = new ThreadLocal<>(); _asyncOperationStep.set(0); @@ -93,6 +94,7 @@ public KafkaCruiseControlServlet(AsyncKafkaCruiseControl asynckafkaCruiseControl public void destroy() { super.destroy(); _userTaskManager.close(); + _purgatory.close(); } /** @@ -136,11 +138,11 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t switch (endPoint) { case BOOTSTRAP: syncRequest(() -> new BootstrapParameters(request), _asyncKafkaCruiseControl::bootstrapLoadMonitor, - request, response, _successfulRequestExecutionTimer.get(endPoint)); + request, response, endPoint); break; case TRAIN: syncRequest(() -> new TrainParameters(request), _asyncKafkaCruiseControl::trainLoadModel, - request, response, _successfulRequestExecutionTimer.get(endPoint)); + request, response, endPoint); break; case LOAD: getClusterLoad(request, response); @@ -156,11 +158,10 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t break; case KAFKA_CLUSTER_STATE: syncRequest(() -> new KafkaClusterStateParameters(request), _asyncKafkaCruiseControl::kafkaClusterState, - request, response, _successfulRequestExecutionTimer.get(endPoint)); + request, response, endPoint); break; case USER_TASKS: - syncRequest(() -> new UserTasksParameters(request), this::userTaskState, - request, response, _successfulRequestExecutionTimer.get(endPoint)); + syncRequest(() -> new UserTasksParameters(request), this::userTaskState, request, response, endPoint); break; default: throw new UserRequestException("Invalid URL for GET"); @@ -181,6 +182,51 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t } } + private void sanityCheckSubmittedRequest(HttpServletRequest request, RequestInfo requestInfo) { + if (requestInfo.accessToAlreadySubmittedRequest() + && _userTaskManager.getUserTaskByUserTaskId(_userTaskManager.getUserTaskId(request), request) == null) { + throw new UserRequestException( + String.format("Attempt to start a new user task with an already submitted review. If you are trying to retrieve" + + " the result of a submitted execution, please use its UUID in your request header via %s flag." + + " If you are starting a new execution with the same parameters, please submit a new review " + + "request and get approval for it.", UserTaskManager.USER_TASK_HEADER_NAME)); + } + } + + @SuppressWarnings("unchecked") + private

P maybeAddToPurgatory(HttpServletRequest request, + HttpServletResponse response, + Supplier

paramSupplier) throws IOException { + Integer reviewId = ParameterUtils.reviewId(request, _twoStepVerification); + if (reviewId != null) { + // Submit the request with reviewId that should already be in the purgatory associated with the request endpoint. + RequestInfo requestInfo = _purgatory.submit(reviewId, request); + // Ensure that if the request has already been submitted, the user is not attempting to create another user task + // with the same parameters and endpoint. + sanityCheckSubmittedRequest(request, requestInfo); + + return (P) requestInfo.parameters(); + } else { + P parameters = paramSupplier.get(); + if (!parameters.parseParameters(response)) { + // Add request to purgatory and return PurgatoryOrReviewResult. + PurgatoryOrReviewResult purgatoryOrReviewResult = _purgatory.addRequest(request, parameters); + purgatoryOrReviewResult.writeSuccessResponse(parameters, response); + LOG.info("Added request {} (parameters: {}) to purgatory.", request.getPathInfo(), request.getParameterMap()); + } + + return null; + } + } + + private

P evaluateReviewableParams(HttpServletRequest request, + HttpServletResponse response, + Supplier

paramSupplier) + throws IOException { + // Do not add to the purgatory if the two-step verification is disabled. + return !_twoStepVerification ? paramSupplier.get() : maybeAddToPurgatory(request, response, paramSupplier); + } + /** * The POST method allows user to perform the following actions: * @@ -188,11 +234,12 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t * 1. Decommission a broker (See {@link AddedOrRemovedBrokerParameters}). * 2. Add a broker (See {@link AddedOrRemovedBrokerParameters}). * 3. Trigger a workload balance (See {@link RebalanceParameters}). - * 4. Stop the proposal execution (See {@link BaseParameters}). + * 4. Stop the proposal execution (See {@link StopProposalParameters}). * 5. Pause metrics sampling (See {@link PauseResumeParameters}). * 6. Resume metrics sampling (See {@link PauseResumeParameters}). * 7. Demote a broker (See {@link DemoteBrokerParameters}). * 8. Admin operations on Cruise Control (See {@link AdminParameters}). + * 9. Review requests for two-step verification (See {@link ReviewParameters}). * */ @Override @@ -202,32 +249,62 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) EndPoint endPoint = getValidEndpoint(request, response); if (endPoint != null && hasValidParameters(request, response)) { _requestMeter.get(endPoint).mark(); + CruiseControlParameters reviewableParams; switch (endPoint) { case ADD_BROKER: case REMOVE_BROKER: - addOrRemoveBroker(request, response, endPoint); + reviewableParams = evaluateReviewableParams(request, response, + () -> new AddedOrRemovedBrokerParameters(request, _config)); + if (reviewableParams != null) { + addOrRemoveBroker(request, response, endPoint, () -> (AddedOrRemovedBrokerParameters) reviewableParams); + } break; case REBALANCE: - rebalance(request, response); + reviewableParams = evaluateReviewableParams(request, response, () -> new RebalanceParameters(request, _config)); + if (reviewableParams != null) { + rebalance(request, response, () -> (RebalanceParameters) reviewableParams); + } break; case STOP_PROPOSAL_EXECUTION: - syncRequest(() -> new BaseParameters(request), _asyncKafkaCruiseControl::stopProposalExecution, - request, response, _successfulRequestExecutionTimer.get(endPoint)); + reviewableParams = evaluateReviewableParams(request, response, + () -> new StopProposalParameters(request, _twoStepVerification)); + if (reviewableParams != null) { + syncRequest(() -> (StopProposalParameters) reviewableParams, _asyncKafkaCruiseControl::stopProposalExecution, + request, response, endPoint); + } break; case PAUSE_SAMPLING: - syncRequest(() -> new PauseResumeParameters(request), _asyncKafkaCruiseControl::pauseLoadMonitorActivity, - request, response, _successfulRequestExecutionTimer.get(endPoint)); + reviewableParams = evaluateReviewableParams(request, response, + () -> new PauseResumeParameters(request, _twoStepVerification)); + if (reviewableParams != null) { + syncRequest(() -> (PauseResumeParameters) reviewableParams, _asyncKafkaCruiseControl::pauseLoadMonitorActivity, + request, response, endPoint); + } break; case RESUME_SAMPLING: - syncRequest(() -> new PauseResumeParameters(request), _asyncKafkaCruiseControl::resumeLoadMonitorActivity, - request, response, _successfulRequestExecutionTimer.get(endPoint)); + reviewableParams = evaluateReviewableParams(request, response, + () -> new PauseResumeParameters(request, _twoStepVerification)); + if (reviewableParams != null) { + syncRequest(() -> (PauseResumeParameters) reviewableParams, _asyncKafkaCruiseControl::resumeLoadMonitorActivity, + request, response, endPoint); + } break; case DEMOTE_BROKER: - demoteBroker(request, response); + reviewableParams = evaluateReviewableParams(request, response, () -> new DemoteBrokerParameters(request, _config)); + if (reviewableParams != null) { + demoteBroker(request, response, () -> (DemoteBrokerParameters) reviewableParams); + } break; case ADMIN: - syncRequest(() -> new AdminParameters(request), _asyncKafkaCruiseControl::handleAdminRequest, - request, response, _successfulRequestExecutionTimer.get(endPoint)); + reviewableParams = evaluateReviewableParams(request, response, + () -> new AdminParameters(request, _twoStepVerification)); + if (reviewableParams != null) { + syncRequest(() -> (AdminParameters) reviewableParams, _asyncKafkaCruiseControl::handleAdminRequest, + request, response, endPoint); + } + break; + case REVIEW: + syncRequest(() -> new ReviewParameters(request), this::handleReviewRequest, request, response, endPoint); break; default: throw new UserRequestException("Invalid URL for POST"); @@ -248,8 +325,18 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) } } - private UserTaskState userTaskState() { - return new UserTaskState(_userTaskManager.getActiveUserTasks(), _userTaskManager.getCompletedUserTasks()); + /** + * Get the user task state. + * + * @param parameters User task state parameters (not used -- added for standardization and for extensibility). + * @return User task state. + */ + private UserTaskState userTaskState(UserTasksParameters parameters) { + return new UserTaskState(_userTaskManager); + } + + private synchronized PurgatoryOrReviewResult handleReviewRequest(ReviewParameters parameters) { + return _purgatory.applyReview(parameters.reviewRequests(), parameters.reason()); } private void getClusterLoad(HttpServletRequest request, HttpServletResponse response) @@ -280,21 +367,22 @@ private void getState(HttpServletRequest request, HttpServletResponse response) request, response); } - private void rebalance(HttpServletRequest request, HttpServletResponse response) + private void rebalance(HttpServletRequest request, HttpServletResponse response, Supplier paramSupplier) throws IOException, ExecutionException, InterruptedException { - asyncRequest(() -> new RebalanceParameters(request, _config), - parameters -> (uuid -> _asyncKafkaCruiseControl.rebalance(parameters, uuid)), + asyncRequest(paramSupplier, parameters -> (uuid -> _asyncKafkaCruiseControl.rebalance(parameters, uuid)), request, response); } - private void demoteBroker(HttpServletRequest request, HttpServletResponse response) + private void demoteBroker(HttpServletRequest request, HttpServletResponse response, Supplier paramSupplier) throws IOException, ExecutionException, InterruptedException { - asyncRequest(() -> new DemoteBrokerParameters(request, _config), - parameters -> (uuid -> _asyncKafkaCruiseControl.demoteBrokers(uuid, parameters)), + asyncRequest(paramSupplier, parameters -> (uuid -> _asyncKafkaCruiseControl.demoteBrokers(uuid, parameters)), request, response); } - private void addOrRemoveBroker(HttpServletRequest request, HttpServletResponse response, EndPoint endPoint) + private void addOrRemoveBroker(HttpServletRequest request, + HttpServletResponse response, + EndPoint endPoint, + Supplier paramSupplier) throws IOException, ExecutionException, InterruptedException { Function> function; if (endPoint == ADD_BROKER) { @@ -303,7 +391,7 @@ private void addOrRemoveBroker(HttpServletRequest request, HttpServletResponse r function = parameters -> (uuid -> _asyncKafkaCruiseControl.decommissionBrokers(parameters, uuid)); } - asyncRequest(() -> new AddedOrRemovedBrokerParameters(request, _config), function, request, response); + asyncRequest(paramSupplier, function, request, response); } /** @@ -336,39 +424,11 @@ private

void asyncRequest(Supplier

paramS LOG.info("Computation is completed for async request: {}.", request.getPathInfo()); } - private

void syncRequest(Supplier

paramSupplier, - Supplier resultSupplier, - HttpServletRequest request, - HttpServletResponse response, - Timer successfulRequestExecutionTimer) - throws ExecutionException, InterruptedException, IOException { - long requestExecutionStartTime = System.nanoTime(); - P parameters = paramSupplier.get(); - - if (!parameters.parseParameters(response)) { - // Successfully parsed parameters. - int step = 0; - - OperationFuture resultFuture = _userTaskManager.getOrCreateUserTask(request, response, uuid -> { - OperationFuture future = new OperationFuture(String.format("%s request", parameters.endPoint().toString())); - future.complete(resultSupplier.get()); - return future; - }, step, false).get(step); - - CruiseControlResponse result = resultFuture.get(); - - result.writeSuccessResponse(parameters, response); - successfulRequestExecutionTimer.update(System.nanoTime() - requestExecutionStartTime, TimeUnit.NANOSECONDS); - } else { - LOG.warn("Failed to parse parameters: {} for sync request: {}.", request.getParameterMap(), request.getPathInfo()); - } - } - private

void syncRequest(Supplier

paramSupplier, Function resultFunction, HttpServletRequest request, HttpServletResponse response, - Timer successfulRequestExecutionTimer) + EndPoint endPoint) throws ExecutionException, InterruptedException, IOException { long requestExecutionStartTime = System.nanoTime(); P parameters = paramSupplier.get(); @@ -386,7 +446,7 @@ private

voi CruiseControlResponse result = resultFuture.get(); result.writeSuccessResponse(parameters, response); - successfulRequestExecutionTimer.update(System.nanoTime() - requestExecutionStartTime, TimeUnit.NANOSECONDS); + _successfulRequestExecutionTimer.get(endPoint).update(System.nanoTime() - requestExecutionStartTime, TimeUnit.NANOSECONDS); } else { LOG.warn("Failed to parse parameters: {} for sync request: {}.", request.getParameterMap(), request.getPathInfo()); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java index 771ae357f2..82faebecfc 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java @@ -99,4 +99,8 @@ static String handleException(Exception e, HttpServletRequest request, HttpServl writeErrorResponse(response, sw.toString(), errorMessage, SC_INTERNAL_SERVER_ERROR, wantJSON(request)); return errorMessage; } + + public static String httpServletRequestToString(HttpServletRequest request) { + return String.format("%s %s", request.getMethod(), request.getRequestURI()); + } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/UserTaskManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/UserTaskManager.java index ce6ef39d8c..841b30e616 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/UserTaskManager.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/UserTaskManager.java @@ -9,7 +9,9 @@ import com.codahale.metrics.Timer; import com.linkedin.kafka.cruisecontrol.async.OperationFuture; import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory; +import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; import com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils; +import com.linkedin.kafka.cruisecontrol.servlet.purgatory.Purgatory; import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; @@ -34,6 +36,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.httpServletRequestToString; +import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.getClientIpAddress; +import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.REVIEW_ID_PARAM; + /** * {@link UserTaskManager} keeps track of Sync and Async user tasks. When a {@link HttpServletRequest} comes in, the servlet @@ -62,15 +68,16 @@ public class UserTaskManager implements Closeable { private final UUIDGenerator _uuidGenerator; private final Map _successfulRequestExecutionTimer; private final long _completedUserTaskRetentionTimeMs; + private final Purgatory _purgatory; - public UserTaskManager(long sessionExpiryMs, - long maxActiveUserTasks, - long completedUserTaskRetentionTimeMs, - int maxCachedCompletedUserTasks, + public UserTaskManager(KafkaCruiseControlConfig config, MetricRegistry dropwizardMetricRegistry, - Map successfulRequestExecutionTimer) { + Map successfulRequestExecutionTimer, + Purgatory purgatory) { + _purgatory = purgatory; _sessionKeyToUserTaskIdMap = new HashMap<>(); Map activeUserTaskIdToFuturesMap = new LinkedHashMap<>(); + int maxCachedCompletedUserTasks = config.getInt(KafkaCruiseControlConfig.MAX_CACHED_COMPLETED_USER_TASKS_CONFIG); // completedUserTaskIdToFuturesMap stores tasks completed either successfully or exceptionally. Map completedUserTaskIdToFuturesMap = new LinkedHashMap() { @Override @@ -81,10 +88,9 @@ protected boolean removeEldestEntry(Map.Entry eldest) { _allUuidToUserTaskInfoMap = new HashMap<>(2); _allUuidToUserTaskInfoMap.put(TaskState.ACTIVE, activeUserTaskIdToFuturesMap); _allUuidToUserTaskInfoMap.put(TaskState.COMPLETED, completedUserTaskIdToFuturesMap); - - _sessionExpiryMs = sessionExpiryMs; - _maxActiveUserTasks = maxActiveUserTasks; - _completedUserTaskRetentionTimeMs = completedUserTaskRetentionTimeMs; + _sessionExpiryMs = config.getLong(KafkaCruiseControlConfig.WEBSERVER_SESSION_EXPIRY_MS); + _maxActiveUserTasks = config.getInt(KafkaCruiseControlConfig.MAX_ACTIVE_USER_TASKS_CONFIG); + _completedUserTaskRetentionTimeMs = config.getLong(KafkaCruiseControlConfig.COMPLETED_USER_TASK_RETENTION_TIME_MS_CONFIG); _time = Time.SYSTEM; _uuidGenerator = new UUIDGenerator(); _userTaskScannerExecutor.scheduleAtFixedRate(new UserTaskScanner(), @@ -105,6 +111,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { int maxCachedCompletedUserTasks, Time time, UUIDGenerator uuidGenerator) { + _purgatory = null; _sessionKeyToUserTaskIdMap = new HashMap<>(); Map activeUserTaskIdToFuturesMap = new LinkedHashMap<>(); Map completedUserTaskIdToFuturesMap = new LinkedHashMap() { @@ -139,10 +146,6 @@ protected boolean removeEldestEntry(Map.Entry eldest) { this(sessionExpiryMs, maxActiveUserTasks, completedUserTaskRetentionTimeMs, maxCachedCompletedUserTasks, time, new UUIDGenerator()); } - private static String httpServletRequestToString(HttpServletRequest request) { - return String.format("%s %s", request.getMethod(), request.getRequestURI()); - } - /** * This method creates a {@link UserTaskInfo} reference for a new sync or async request and a UUID to map to it. For async request * a {@link SessionKey} is also created to map to the UUID. For async request, both UUID and {@link HttpSession} can be used to fetch @@ -295,10 +298,33 @@ synchronized void checkActiveUserTasks() { } } + private synchronized void removeFromPurgatory(UserTaskInfo userTaskInfo) { + // Purgatory is null if the two-step verification is disabled. + if (_purgatory != null) { + String parameterString = ParameterUtils.caseSensitiveParameterName(userTaskInfo.queryParams(), REVIEW_ID_PARAM); + if (parameterString != null) { + int reviewId = Integer.parseInt(userTaskInfo.queryParams().get(parameterString)[0]); + // Remove submitted request from purgatory. + try { + _purgatory.removeSubmitted(reviewId); + LOG.info("Successfully removed submitted request corresponding to review id {} from purgatory.", reviewId); + } catch (IllegalStateException ise) { + LOG.error("Should never attempt to remove this request from purgatory.", ise); + } + } + } + } + private synchronized void removeOldUserTasks() { LOG.debug("Remove old user tasks"); - _allUuidToUserTaskInfoMap.get(TaskState.COMPLETED).entrySet().removeIf(entry -> (entry.getValue().startMs() - + _completedUserTaskRetentionTimeMs < _time.milliseconds())); + for (Iterator> iterator = + _allUuidToUserTaskInfoMap.get(TaskState.COMPLETED).entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry entry = iterator.next(); + if (entry.getValue().startMs() + _completedUserTaskRetentionTimeMs < _time.milliseconds()) { + removeFromPurgatory(entry.getValue()); + iterator.remove(); + } + } } synchronized UserTaskInfo getUserTaskByUserTaskId(UUID userTaskId, HttpServletRequest httpServletRequest) { @@ -475,9 +501,8 @@ public UserTaskInfo(HttpServletRequest httpServletRequest, long startMs, UUID userTaskId, TaskState state) { - this(futures, httpServletRequestToString(httpServletRequest), - KafkaCruiseControlServletUtils.getClientIpAddress(httpServletRequest), startMs, userTaskId, - httpServletRequest.getParameterMap(), ParameterUtils.endPoint(httpServletRequest), state); + this(futures, httpServletRequestToString(httpServletRequest), getClientIpAddress(httpServletRequest), startMs, + userTaskId, httpServletRequest.getParameterMap(), ParameterUtils.endPoint(httpServletRequest), state); } public UserTaskInfo(List futures, @@ -594,10 +619,6 @@ public enum TaskState { _type = type; } - public String type() { - return _type; - } - @Override public String toString() { return _type; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AbstractParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AbstractParameters.java index db35479831..de00f1b6fe 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AbstractParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AbstractParameters.java @@ -55,6 +55,11 @@ public boolean json() { return _json; } + @Override + public void setReviewId(int reviewId) { + // Relevant to parameters with review process. + } + @Override public EndPoint endPoint() { return _endPoint; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AddedOrRemovedBrokerParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AddedOrRemovedBrokerParameters.java index 0b9e60f404..3a59499583 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AddedOrRemovedBrokerParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AddedOrRemovedBrokerParameters.java @@ -15,6 +15,8 @@ * Parameters for {@link com.linkedin.kafka.cruisecontrol.servlet.EndPoint#ADD_BROKER} and * {@link com.linkedin.kafka.cruisecontrol.servlet.EndPoint#REMOVE_BROKER}. * + *

  • Note that "review_id" is mutually exclusive to the other parameters -- i.e. they cannot be used together.
  • + * *
      * 1. Decommission a broker
      *    POST /kafkacruisecontrol/remove_broker?brokerid=[id1,id2...]&dryRun=[true/false]
    @@ -23,6 +25,7 @@
      *    &json=[true/false]&skip_hard_goal_check=[true/false]&excluded_topics=[pattern]
      *    &use_ready_default_goals=[true/false]&verbose=[true/false]&exclude_recently_demoted_brokers=[true/false]
      *    &exclude_recently_removed_brokers=[true/false]&replica_movement_strategies=[strategy1,strategy2...]
    + *    &review_id=[id]
      *
      * 2. Add a broker
      *    POST /kafkacruisecontrol/add_broker?brokerid=[id1,id2...]&dryRun=[true/false]
    @@ -31,6 +34,7 @@
      *    &json=[true/false]&skip_hard_goal_check=[true/false]&excluded_topics=[pattern]
      *    &use_ready_default_goals=[true/false]&verbose=[true/false]&exclude_recently_demoted_brokers=[true/false]
      *    &exclude_recently_removed_brokers=[true/false]&replica_movement_strategies=[strategy1,strategy2...]
    + *    &review_id=[id]
      * 
    */ public class AddedOrRemovedBrokerParameters extends GoalBasedOptimizationParameters { @@ -41,6 +45,7 @@ public class AddedOrRemovedBrokerParameters extends GoalBasedOptimizationParamet private boolean _throttleAddedOrRemovedBrokers; private boolean _skipHardGoalCheck; private ReplicaMovementStrategy _replicaMovementStrategy; + private Integer _reviewId; private final KafkaCruiseControlConfig _config; public AddedOrRemovedBrokerParameters(HttpServletRequest request, KafkaCruiseControlConfig config) { @@ -58,6 +63,17 @@ protected void initParameters() throws UnsupportedEncodingException { _concurrentLeaderMovements = ParameterUtils.concurrentMovements(_request, false); _skipHardGoalCheck = ParameterUtils.skipHardGoalCheck(_request); _replicaMovementStrategy = ParameterUtils.getReplicaMovementStrategy(_request, _config); + boolean twoStepVerificationEnabled = _config.getBoolean(KafkaCruiseControlConfig.TWO_STEP_VERIFICATION_ENABLED_CONFIG); + _reviewId = ParameterUtils.reviewId(_request, twoStepVerificationEnabled); + } + + @Override + public void setReviewId(int reviewId) { + _reviewId = reviewId; + } + + public Integer reviewId() { + return _reviewId; } public List brokerIds() { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AdminParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AdminParameters.java index b333a74702..c17d0da0fe 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AdminParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AdminParameters.java @@ -14,10 +14,12 @@ /** * Parameters for {@link com.linkedin.kafka.cruisecontrol.servlet.EndPoint#ADMIN} * + *
  • Note that "review_id" is mutually exclusive to the other parameters -- i.e. they cannot be used together.
  • + * *
      *    POST /kafkacruisecontrol/admin?json=[true/false]&disable_self_healing_for=[Set-of-{@link AnomalyType}]
      *    &enable_self_healing_for=[Set-of-{@link AnomalyType}]&concurrent_partition_movements_per_broker=[POSITIVE-INTEGER]
    - *    &concurrent_leader_movements=[POSITIVE-INTEGER]
    + *    &concurrent_leader_movements=[POSITIVE-INTEGER]&review_id=[id]
      * 
    */ public class AdminParameters extends AbstractParameters { @@ -25,9 +27,12 @@ public class AdminParameters extends AbstractParameters { private Set _enableSelfHealingFor; private Integer _concurrentPartitionMovements; private Integer _concurrentLeaderMovements; + private Integer _reviewId; + private boolean _twoStepVerificationEnabled; - public AdminParameters(HttpServletRequest request) { + public AdminParameters(HttpServletRequest request, boolean twoStepVerificationEnabled) { super(request); + _twoStepVerificationEnabled = twoStepVerificationEnabled; } @Override @@ -38,6 +43,16 @@ protected void initParameters() throws UnsupportedEncodingException { _disableSelfHealingFor = selfHealingFor.get(false); _concurrentPartitionMovements = ParameterUtils.concurrentMovements(_request, true); _concurrentLeaderMovements = ParameterUtils.concurrentMovements(_request, false); + _reviewId = ParameterUtils.reviewId(_request, _twoStepVerificationEnabled); + } + + @Override + public void setReviewId(int reviewId) { + _reviewId = reviewId; + } + + public Integer reviewId() { + return _reviewId; } public Set disableSelfHealingFor() { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/BaseParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/BaseParameters.java deleted file mode 100644 index f2fc4edda6..0000000000 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/BaseParameters.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2018 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. - */ - -package com.linkedin.kafka.cruisecontrol.servlet.parameters; - -import javax.servlet.http.HttpServletRequest; - - -/** - * A concrete class for endpoints with base parameters -- e.g. - * {@link com.linkedin.kafka.cruisecontrol.servlet.EndPoint#STOP_PROPOSAL_EXECUTION} - * - *
    - * Stop the proposal execution.
    - *    POST /kafkacruisecontrol/stop_proposal_execution?json=[true/false]
    - * 
    - */ -public class BaseParameters extends AbstractParameters { - public BaseParameters(HttpServletRequest request) { - super(request); - } -} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/CruiseControlParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/CruiseControlParameters.java index 537acd5c4f..7d703f662d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/CruiseControlParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/CruiseControlParameters.java @@ -25,4 +25,9 @@ public interface CruiseControlParameters { * @return True if requested response is in JSON, false otherwise. */ boolean json(); + + /** + * @param reviewId The review id. + */ + void setReviewId(int reviewId); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/DemoteBrokerParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/DemoteBrokerParameters.java index c41046d4a1..0bf1506f1e 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/DemoteBrokerParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/DemoteBrokerParameters.java @@ -14,12 +14,15 @@ /** * Parameters for {@link com.linkedin.kafka.cruisecontrol.servlet.EndPoint#DEMOTE_BROKER} * + *
  • Note that "review_id" is mutually exclusive to the other parameters -- i.e. they cannot be used together.
  • + * *
      * Demote a broker
      *    POST /kafkacruisecontrol/demote_broker?brokerid=[id1,id2...]&dryRun=[true/false]
      *    &concurrent_leader_movements=[POSITIVE-INTEGER]&allow_capacity_estimation=[true/false]&json=[true/false]
      *    &skip_urp_demotion=[true/false]&exclude_follower_demotion=[true/false]&verbose=[true/false]
      *    &exclude_recently_demoted_brokers=[true/false]&replica_movement_strategies=[strategy1,strategy2...]
    + *    &review_id=[id]
      * 
    */ public class DemoteBrokerParameters extends KafkaOptimizationParameters { @@ -30,6 +33,7 @@ public class DemoteBrokerParameters extends KafkaOptimizationParameters { private boolean _excludeFollowerDemotion; private ReplicaMovementStrategy _replicaMovementStrategy; private KafkaCruiseControlConfig _config; + private Integer _reviewId; public DemoteBrokerParameters(HttpServletRequest request, KafkaCruiseControlConfig config) { super(request); @@ -46,6 +50,17 @@ protected void initParameters() throws UnsupportedEncodingException { _skipUrpDemotion = ParameterUtils.skipUrpDemotion(_request); _excludeFollowerDemotion = ParameterUtils.excludeFollowerDemotion(_request); _replicaMovementStrategy = ParameterUtils.getReplicaMovementStrategy(_request, _config); + boolean twoStepVerificationEnabled = _config.getBoolean(KafkaCruiseControlConfig.TWO_STEP_VERIFICATION_ENABLED_CONFIG); + _reviewId = ParameterUtils.reviewId(_request, twoStepVerificationEnabled); + } + + @Override + public void setReviewId(int reviewId) { + _reviewId = reviewId; + } + + public Integer reviewId() { + return _reviewId; } public boolean dryRun() { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java index 1aa948231c..54219fd5f9 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java @@ -11,6 +11,7 @@ import com.linkedin.kafka.cruisecontrol.servlet.EndPoint; import com.linkedin.kafka.cruisecontrol.servlet.UserRequestException; import com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager; +import com.linkedin.kafka.cruisecontrol.servlet.purgatory.ReviewStatus; import com.linkedin.kafka.cruisecontrol.servlet.response.CruiseControlState; import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal; import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal; @@ -40,6 +41,8 @@ import static com.linkedin.kafka.cruisecontrol.servlet.EndPoint.*; import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.REQUEST_URI; import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.getClientIpAddress; +import static com.linkedin.kafka.cruisecontrol.servlet.purgatory.ReviewStatus.APPROVED; +import static com.linkedin.kafka.cruisecontrol.servlet.purgatory.ReviewStatus.DISCARDED; import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.writeErrorResponse; import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; @@ -64,6 +67,7 @@ public class ParameterUtils { public static final String MAX_LOAD_PARAM = "max_load"; public static final String GOALS_PARAM = "goals"; public static final String BROKER_ID_PARAM = "brokerid"; + public static final String REVIEW_ID_PARAM = "review_id"; public static final String TOPIC_PARAM = "topic"; public static final String PARTITION_PARAM = "partition"; public static final String DRY_RUN_PARAM = "dryrun"; @@ -89,6 +93,9 @@ public class ParameterUtils { public static final String EXCLUDE_RECENTLY_DEMOTED_BROKERS_PARAM = "exclude_recently_demoted_brokers"; public static final String EXCLUDE_RECENTLY_REMOVED_BROKERS_PARAM = "exclude_recently_removed_brokers"; public static final String REPLICA_MOVEMENT_STRATEGIES_PARAM = "replica_movement_strategies"; + public static final String APPROVE_PARAM = "approve"; + public static final String DISCARD_PARAM = "discard"; + private static final int MAX_REASON_LENGTH = 50; private static final Map> VALID_ENDPOINT_PARAM_NAMES; @@ -159,6 +166,7 @@ public class ParameterUtils { addOrRemoveBroker.add(EXCLUDE_RECENTLY_DEMOTED_BROKERS_PARAM); addOrRemoveBroker.add(EXCLUDE_RECENTLY_REMOVED_BROKERS_PARAM); addOrRemoveBroker.add(REPLICA_MOVEMENT_STRATEGIES_PARAM); + addOrRemoveBroker.add(REVIEW_ID_PARAM); Set addBroker = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); addBroker.add(THROTTLE_ADDED_BROKER_PARAM); @@ -179,6 +187,7 @@ public class ParameterUtils { demoteBroker.add(EXCLUDE_FOLLOWER_DEMOTION_PARAM); demoteBroker.add(EXCLUDE_RECENTLY_DEMOTED_BROKERS_PARAM); demoteBroker.add(REPLICA_MOVEMENT_STRATEGIES_PARAM); + demoteBroker.add(REVIEW_ID_PARAM); Set rebalance = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); rebalance.add(DRY_RUN_PARAM); @@ -197,19 +206,25 @@ public class ParameterUtils { rebalance.add(EXCLUDE_RECENTLY_REMOVED_BROKERS_PARAM); rebalance.add(REPLICA_MOVEMENT_STRATEGIES_PARAM); rebalance.add(IGNORE_PROPOSAL_CACHE_PARAM); + rebalance.add(REVIEW_ID_PARAM); Set kafkaClusterState = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); kafkaClusterState.add(VERBOSE_PARAM); kafkaClusterState.add(JSON_PARAM); Set pauseSampling = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + pauseSampling.add(REASON_PARAM); pauseSampling.add(JSON_PARAM); + pauseSampling.add(REVIEW_ID_PARAM); Set resumeSampling = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + resumeSampling.add(REASON_PARAM); resumeSampling.add(JSON_PARAM); + resumeSampling.add(REVIEW_ID_PARAM); Set stopProposalExecution = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); stopProposalExecution.add(JSON_PARAM); + stopProposalExecution.add(REVIEW_ID_PARAM); Set userTasks = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); userTasks.add(JSON_PARAM); @@ -225,6 +240,14 @@ public class ParameterUtils { admin.add(ENABLE_SELF_HEALING_FOR_PARAM); admin.add(CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_PARAM); admin.add(CONCURRENT_LEADER_MOVEMENTS_PARAM); + admin.add(REVIEW_ID_PARAM); + + Set review = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + review.add(APPROVE_PARAM); + review.add(DISCARD_PARAM); + review.add(REASON_PARAM); + review.add(JSON_PARAM); + // TODO: Add support to filter reviews by reviewID validParamNames.put(BOOTSTRAP, Collections.unmodifiableSet(bootstrap)); validParamNames.put(TRAIN, Collections.unmodifiableSet(train)); @@ -242,6 +265,7 @@ public class ParameterUtils { validParamNames.put(KAFKA_CLUSTER_STATE, Collections.unmodifiableSet(kafkaClusterState)); validParamNames.put(USER_TASKS, Collections.unmodifiableSet(userTasks)); validParamNames.put(ADMIN, Collections.unmodifiableSet(admin)); + validParamNames.put(REVIEW, Collections.unmodifiableSet(review)); VALID_ENDPOINT_PARAM_NAMES = Collections.unmodifiableMap(validParamNames); } @@ -300,17 +324,17 @@ public static boolean hasValidParameters(HttpServletRequest request, HttpServlet /** * Returns the case sensitive request parameter name, or null if the parameter does not exist. */ - private static String caseSensitiveParameterName(HttpServletRequest request, String parameter) { - return request.getParameterMap().keySet().stream().filter(parameter::equalsIgnoreCase).findFirst().orElse(null); + public static String caseSensitiveParameterName(Map parameterMap, String parameter) { + return parameterMap.keySet().stream().filter(parameter::equalsIgnoreCase).findFirst().orElse(null); } private static boolean getBooleanParam(HttpServletRequest request, String parameter, boolean defaultIfMissing) { - String parameterString = caseSensitiveParameterName(request, parameter); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), parameter); return parameterString == null ? defaultIfMissing : Boolean.parseBoolean(request.getParameter(parameterString)); } private static List getListParam(HttpServletRequest request, String parameter) throws UnsupportedEncodingException { - String parameterString = caseSensitiveParameterName(request, parameter); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), parameter); List retList = parameterString == null ? new ArrayList<>() : Arrays.asList(urlDecode(request.getParameter(parameterString)).split(",")); retList.removeIf(String::isEmpty); @@ -337,7 +361,7 @@ private static boolean excludeBrokers(HttpServletRequest request, String paramet private static boolean getBooleanExcludeGiven(HttpServletRequest request, String getParameter, String excludeParameter) { boolean booleanParam = getBooleanParam(request, getParameter, false); - if (booleanParam && caseSensitiveParameterName(request, excludeParameter) != null) { + if (booleanParam && caseSensitiveParameterName(request.getParameterMap(), excludeParameter) != null) { throw new UserRequestException("Cannot set " + getParameter + " parameter to true when explicitly specifying " + excludeParameter + " in the request."); } @@ -396,7 +420,7 @@ static boolean throttleAddedOrRemovedBrokers(HttpServletRequest request, EndPoin } static long time(HttpServletRequest request) { - String parameterString = caseSensitiveParameterName(request, TIME_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), TIME_PARAM); if (parameterString == null) { return System.currentTimeMillis(); } @@ -406,22 +430,22 @@ static long time(HttpServletRequest request) { } static Long startMs(HttpServletRequest request) { - String parameterString = caseSensitiveParameterName(request, START_MS_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), START_MS_PARAM); return parameterString == null ? null : Long.valueOf(request.getParameter(parameterString)); } static Long endMs(HttpServletRequest request) { - String parameterString = caseSensitiveParameterName(request, END_MS_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), END_MS_PARAM); return parameterString == null ? null : Long.valueOf(request.getParameter(parameterString)); } static Pattern topic(HttpServletRequest request) { - String parameterString = caseSensitiveParameterName(request, TOPIC_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), TOPIC_PARAM); return parameterString == null ? null : Pattern.compile(request.getParameter(parameterString)); } static Double minValidPartitionRatio(HttpServletRequest request) { - String parameterString = caseSensitiveParameterName(request, MIN_VALID_PARTITION_RATIO_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), MIN_VALID_PARTITION_RATIO_PARAM); if (parameterString == null) { return null; } else { @@ -435,19 +459,23 @@ static Double minValidPartitionRatio(HttpServletRequest request) { } static String resourceString(HttpServletRequest request) { - String parameterString = caseSensitiveParameterName(request, RESOURCE_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), RESOURCE_PARAM); return parameterString == null ? DEFAULT_PARTITION_LOAD_RESOURCE : request.getParameter(parameterString); } - static String reason(HttpServletRequest request) { - String parameterString = caseSensitiveParameterName(request, REASON_PARAM); + public static String reason(HttpServletRequest request) { + String parameterString = caseSensitiveParameterName(request.getParameterMap(), REASON_PARAM); + if (parameterString != null && parameterString.length() > MAX_REASON_LENGTH) { + throw new IllegalArgumentException(String.format("Reason cannot be longer than %d characters (attempted: %d).", + MAX_REASON_LENGTH, parameterString.length())); + } String ip = getClientIpAddress(request); return String.format("%s (Client: %s, Date: %s)", parameterString == null ? "No reason provided" : request.getParameter(parameterString), ip, currentUtcDate()); } private static Set parseParamToStringSet(HttpServletRequest request, String param) throws UnsupportedEncodingException { - String parameterString = caseSensitiveParameterName(request, param); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), param); Set paramsString = parameterString == null ? new HashSet<>(0) : new HashSet<>(Arrays.asList(urlDecode(request.getParameter(parameterString)).split(","))); @@ -455,6 +483,14 @@ private static Set parseParamToStringSet(HttpServletRequest request, Str return paramsString; } + private static Set parseParamToIntegerSet(HttpServletRequest request, String param) throws UnsupportedEncodingException { + String parameterString = caseSensitiveParameterName(request.getParameterMap(), param); + + return parameterString == null ? new HashSet<>(0) + : Arrays.stream(urlDecode(request.getParameter(parameterString)).split(",")) + .map(Integer::parseInt).collect(Collectors.toSet()); + } + /** * Empty parameter means all substates are requested. */ @@ -475,7 +511,8 @@ static Set substates(HttpServletRequest request) th } private static Set anomalyTypes(HttpServletRequest request, boolean isEnable) throws UnsupportedEncodingException { - String parameterString = caseSensitiveParameterName(request, isEnable ? ENABLE_SELF_HEALING_FOR_PARAM : DISABLE_SELF_HEALING_FOR_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), isEnable ? ENABLE_SELF_HEALING_FOR_PARAM + : DISABLE_SELF_HEALING_FOR_PARAM); Set selfHealingForString = parameterString == null ? new HashSet<>(0) : new HashSet<>(Arrays.asList(urlDecode(request.getParameter(parameterString)).split(","))); @@ -508,7 +545,7 @@ static Map> selfHealingFor(HttpServletRequest request) intersection.retainAll(disableSelfHealingFor); if (!intersection.isEmpty()) { throw new IllegalArgumentException(String.format("The same anomaly cannot be specified in both disable and" - + "enable parameters. Intersection: %s", intersection)); + + "enable parameters. Intersection: %s.", intersection)); } Map> selfHealingFor = new HashMap<>(2); @@ -574,16 +611,40 @@ static List getGoals(HttpServletRequest request) throws UnsupportedEncod } public static int entries(HttpServletRequest request) { - String parameterString = caseSensitiveParameterName(request, ENTRIES_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), ENTRIES_PARAM); return parameterString == null ? Integer.MAX_VALUE : Integer.parseInt(request.getParameter(parameterString)); } + /** + * Mutually exclusive with the other parameters and can only be used if two step verification is enabled. + */ + public static Integer reviewId(HttpServletRequest request, boolean twoStepVerificationEnabled) { + String parameterString = caseSensitiveParameterName(request.getParameterMap(), REVIEW_ID_PARAM); + if (parameterString == null) { + return null; + } else if (!twoStepVerificationEnabled) { + throw new UserRequestException( + String.format("%s parameter is not relevant when two-step verification is disabled.", REVIEW_ID_PARAM)); + } + + Integer reviewId = Integer.parseInt(request.getParameter(parameterString)); + // Sanity check: Ensure that if a review id is provided, no other parameter is in the request. + if (request.getParameterMap().size() != 1) { + throw new UserRequestException( + String.format("%s parameter must be mutually exclusive with other parameters (Request parameters: %s).", + REVIEW_ID_PARAM, request.getParameterMap())); + } + + return reviewId; + } + /** * @param isPartitionMovement True if partition movement per broker, false if the total leader movement. */ static Integer concurrentMovements(HttpServletRequest request, boolean isPartitionMovement) { - String parameterString = caseSensitiveParameterName(request, isPartitionMovement ? CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_PARAM - : CONCURRENT_LEADER_MOVEMENTS_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), isPartitionMovement + ? CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_PARAM + : CONCURRENT_LEADER_MOVEMENTS_PARAM); if (parameterString == null) { return null; } @@ -597,7 +658,7 @@ static Integer concurrentMovements(HttpServletRequest request, boolean isPartiti } static Pattern excludedTopics(HttpServletRequest request) { - String parameterString = caseSensitiveParameterName(request, EXCLUDED_TOPICS_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), EXCLUDED_TOPICS_PARAM); return parameterString == null ? null : Pattern.compile(request.getParameter(parameterString)); } @@ -605,7 +666,7 @@ static Pattern excludedTopics(HttpServletRequest request) { * @param isUpperBound True if upper bound, false if lower bound. */ static int partitionBoundary(HttpServletRequest request, boolean isUpperBound) { - String parameterString = caseSensitiveParameterName(request, PARTITION_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), PARTITION_PARAM); if (parameterString == null) { return isUpperBound ? Integer.MAX_VALUE : Integer.MIN_VALUE; } @@ -622,22 +683,51 @@ static int partitionBoundary(HttpServletRequest request, boolean isUpperBound) { } static List brokerIds(HttpServletRequest request) throws UnsupportedEncodingException { - List brokerIds = new ArrayList<>(); - String parameterString = caseSensitiveParameterName(request, BROKER_ID_PARAM); - if (parameterString != null) { - brokerIds = Arrays.stream(urlDecode(request.getParameter(parameterString)).split(",")).map(Integer::parseInt).collect(Collectors.toList()); - } + Set brokerIds = parseParamToIntegerSet(request, BROKER_ID_PARAM); if (brokerIds.isEmpty()) { throw new IllegalArgumentException("Target broker ID is not provided."); } - return Collections.unmodifiableList(brokerIds); + return Collections.unmodifiableList(new ArrayList<>(brokerIds)); + } + + /** + * Default: An empty set. + */ + private static Set review(HttpServletRequest request, boolean isApprove) throws UnsupportedEncodingException { + Set parsedReview = parseParamToIntegerSet(request, isApprove ? APPROVE_PARAM : DISCARD_PARAM); + return Collections.unmodifiableSet(parsedReview); + } + + /** + * Get {@link ReviewStatus#APPROVED} and {@link ReviewStatus#DISCARDED} requests via {@link #APPROVE_PARAM} and + * {@link #DISCARD_PARAM}. + * + * Sanity check ensures that the same request cannot be specified in both configs. + */ + static Map> reviewRequests(HttpServletRequest request) throws UnsupportedEncodingException { + Set approve = review(request, true); + Set discard = review(request, false); + + // Sanity check: Ensure that the same + Set intersection = new HashSet<>(approve); + intersection.retainAll(discard); + if (!intersection.isEmpty()) { + throw new IllegalArgumentException(String.format("The same request cannot be specified in both approve and" + + "discard parameters. Intersection: %s.", intersection)); + } + + Map> reviewRequest = new HashMap<>(2); + reviewRequest.put(APPROVED, approve); + reviewRequest.put(DISCARDED, discard); + + return reviewRequest; } /** * Default: An empty set. */ public static Set userTaskIds(HttpServletRequest request) throws UnsupportedEncodingException { - String parameterString = caseSensitiveParameterName(request, USER_TASK_IDS_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), USER_TASK_IDS_PARAM); return parameterString == null ? Collections.emptySet() : Arrays.stream(urlDecode(request.getParameter(parameterString)).split(",")).map(UUID::fromString).collect(Collectors.toSet()); @@ -687,7 +777,7 @@ public static Set types(HttpServletRequest request) t */ static DataFrom getDataFrom(HttpServletRequest request) { DataFrom dataFrom = DataFrom.VALID_WINDOWS; - String parameterString = caseSensitiveParameterName(request, DATA_FROM_PARAM); + String parameterString = caseSensitiveParameterName(request.getParameterMap(), DATA_FROM_PARAM); if (parameterString != null) { dataFrom = DataFrom.valueOf(request.getParameter(parameterString).toUpperCase()); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/PauseResumeParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/PauseResumeParameters.java index 8243b0b8b9..88fd1087aa 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/PauseResumeParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/PauseResumeParameters.java @@ -12,28 +12,43 @@ * Parameters for {@link com.linkedin.kafka.cruisecontrol.servlet.EndPoint#PAUSE_SAMPLING} and * {@link com.linkedin.kafka.cruisecontrol.servlet.EndPoint#RESUME_SAMPLING}. * + *
  • Note that "review_id" is mutually exclusive to the other parameters -- i.e. they cannot be used together.
  • + * *
      * 1. Pause metrics sampling. (RUNNING -> PAUSED).
    - *    POST /kafkacruisecontrol/pause_sampling?json=[true/false]&reason=[reason-for-pause]
    + *    POST /kafkacruisecontrol/pause_sampling?json=[true/false]&reason=[reason-for-pause]&review_id=[id]
      *
      * 2. Resume metrics sampling. (PAUSED -> RUNNING).
    - *    POST /kafkacruisecontrol/resume_sampling?json=[true/false]&reason=[reason-for-resume]
    + *    POST /kafkacruisecontrol/resume_sampling?json=[true/false]&reason=[reason-for-resume]&review_id=[id]
      * 
    */ public class PauseResumeParameters extends AbstractParameters { private String _reason; + private Integer _reviewId; + private boolean _twoStepVerificationEnabled; - public PauseResumeParameters(HttpServletRequest request) { + public PauseResumeParameters(HttpServletRequest request, boolean twoStepVerificationEnabled) { super(request); + _twoStepVerificationEnabled = twoStepVerificationEnabled; } @Override protected void initParameters() throws UnsupportedEncodingException { super.initParameters(); _reason = ParameterUtils.reason(_request); + _reviewId = ParameterUtils.reviewId(_request, _twoStepVerificationEnabled); } public String reason() { return _reason; } + + @Override + public void setReviewId(int reviewId) { + _reviewId = reviewId; + } + + public Integer reviewId() { + return _reviewId; + } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RebalanceParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RebalanceParameters.java index 7641dbbb6b..d1d9e27813 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RebalanceParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RebalanceParameters.java @@ -13,6 +13,8 @@ /** * Parameters for {@link com.linkedin.kafka.cruisecontrol.servlet.EndPoint#REBALANCE} * + *
  • Note that "review_id" is mutually exclusive to the other parameters -- i.e. they cannot be used together.
  • + * *
      * Trigger a workload balance.
      *    POST /kafkacruisecontrol/rebalance?dryRun=[true/false]&goals=[goal1,goal2...]
    @@ -21,7 +23,7 @@
      *    &excluded_topics=[pattern]&use_ready_default_goals=[true/false]&verbose=[true/false]
      *    &exclude_recently_demoted_brokers=[true/false]&exclude_recently_removed_brokers=[true/false]
      *    &replica_movement_strategies=[strategy1,strategy2...]
    - *    &ignore_proposal_cache=[true/false]
    + *    &ignore_proposal_cache=[true/false]&review_id=[id]
      * 
    */ public class RebalanceParameters extends GoalBasedOptimizationParameters { @@ -32,6 +34,7 @@ public class RebalanceParameters extends GoalBasedOptimizationParameters { private ReplicaMovementStrategy _replicaMovementStrategy; private final KafkaCruiseControlConfig _config; private boolean _ignoreProposalCache; + private Integer _reviewId; public RebalanceParameters(HttpServletRequest request, KafkaCruiseControlConfig config) { super(request); @@ -47,6 +50,17 @@ protected void initParameters() throws UnsupportedEncodingException { _skipHardGoalCheck = ParameterUtils.skipHardGoalCheck(_request); _replicaMovementStrategy = ParameterUtils.getReplicaMovementStrategy(_request, _config); _ignoreProposalCache = ParameterUtils.ignoreProposalCache(_request); + boolean twoStepVerificationEnabled = _config.getBoolean(KafkaCruiseControlConfig.TWO_STEP_VERIFICATION_ENABLED_CONFIG); + _reviewId = ParameterUtils.reviewId(_request, twoStepVerificationEnabled); + } + + @Override + public void setReviewId(int reviewId) { + _reviewId = reviewId; + } + + public Integer reviewId() { + return _reviewId; } public boolean dryRun() { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ReviewParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ReviewParameters.java new file mode 100644 index 0000000000..5d2451dada --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ReviewParameters.java @@ -0,0 +1,44 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.parameters; + +import com.linkedin.kafka.cruisecontrol.servlet.purgatory.ReviewStatus; +import java.io.UnsupportedEncodingException; +import java.util.Map; +import java.util.Set; +import javax.servlet.http.HttpServletRequest; + + +/** + * Parameters for {@link com.linkedin.kafka.cruisecontrol.servlet.EndPoint#REVIEW}. + * + *
    + *    POST /kafkacruisecontrol/review?json=[true/false]&approve=[id1,id2,...]&discard=[id1,id2,...]
    + *    &reason=[reason-for-review]
    + * 
    + */ +public class ReviewParameters extends AbstractParameters { + private String _reason; + private Map> _reviewRequests; + + public ReviewParameters(HttpServletRequest request) { + super(request); + } + + @Override + protected void initParameters() throws UnsupportedEncodingException { + super.initParameters(); + _reason = ParameterUtils.reason(_request); + _reviewRequests = ParameterUtils.reviewRequests(_request); + } + + public String reason() { + return _reason; + } + + public Map> reviewRequests() { + return _reviewRequests; + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/StopProposalParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/StopProposalParameters.java new file mode 100644 index 0000000000..661656d85f --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/StopProposalParameters.java @@ -0,0 +1,44 @@ +/* + * Copyright 2018 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.parameters; + +import java.io.UnsupportedEncodingException; +import javax.servlet.http.HttpServletRequest; + + +/** + * Parameters for {@link com.linkedin.kafka.cruisecontrol.servlet.EndPoint#STOP_PROPOSAL_EXECUTION}. + * + *
  • Note that "review_id" is mutually exclusive to the other parameters -- i.e. they cannot be used together.
  • + * + *
    + * Stop the proposal execution.
    + *    POST /kafkacruisecontrol/stop_proposal_execution?json=[true/false]&review_id=[id]
    + * 
    + */ +public class StopProposalParameters extends AbstractParameters { + private Integer _reviewId; + private boolean _twoStepVerificationEnabled; + + public StopProposalParameters(HttpServletRequest request, boolean twoStepVerificationEnabled) { + super(request); + _twoStepVerificationEnabled = twoStepVerificationEnabled; + } + + @Override + protected void initParameters() throws UnsupportedEncodingException { + super.initParameters(); + _reviewId = ParameterUtils.reviewId(_request, _twoStepVerificationEnabled); + } + + @Override + public void setReviewId(int reviewId) { + _reviewId = reviewId; + } + + public Integer reviewId() { + return _reviewId; + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/purgatory/Purgatory.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/purgatory/Purgatory.java new file mode 100644 index 0000000000..adab86cb9d --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/purgatory/Purgatory.java @@ -0,0 +1,202 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.purgatory; + +import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory; +import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; +import com.linkedin.kafka.cruisecontrol.servlet.EndPoint; +import com.linkedin.kafka.cruisecontrol.servlet.UserRequestException; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils; +import com.linkedin.kafka.cruisecontrol.servlet.response.PurgatoryOrReviewResult; +import java.io.Closeable; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.servlet.http.HttpServletRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.linkedin.kafka.cruisecontrol.servlet.EndPoint.REVIEW; +import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.httpServletRequestToString; + + +/** + * A Class to keep POST requests that are awaiting review if two-step verification is enabled. + * + * The Purgatory is thread-safe. + */ +public class Purgatory implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(Purgatory.class); + private static final long PURGATORY_CLEANER_PERIOD_SECONDS = 10; + private static final long PURGATORY_CLEANER_INITIAL_DELAY_SECONDS = 0; + private int _requestId; + private final long _purgatoryRetentionTimeMs; + private final Map _requestInfoById; + private final ScheduledExecutorService _purgatoryCleaner = + Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("PurgatoryCleaner", true, null)); + + public Purgatory(KafkaCruiseControlConfig config) { + _requestId = 0; + _purgatoryRetentionTimeMs = config.getLong(KafkaCruiseControlConfig.TWO_STEP_PURGATORY_RETENTION_TIME_MS_CONFIG); + int purgatoryMaxCachedRequests = config.getInt(KafkaCruiseControlConfig.TWO_STEP_PURGATORY_MAX_REQUESTS_CONFIG); + + _requestInfoById = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return this.size() > purgatoryMaxCachedRequests; + } + }; + + _purgatoryCleaner.scheduleAtFixedRate(new PurgatoryCleaner(), + PURGATORY_CLEANER_INITIAL_DELAY_SECONDS, + PURGATORY_CLEANER_PERIOD_SECONDS, + TimeUnit.SECONDS); + } + + /** + * Add request to the purgatory and return the {@link PurgatoryOrReviewResult} for the request that has been added to + * the purgatory. + * + * @param request Http Servlet Request to add to the purgatory. + * @param parameters Request parameters. + * @param

    Type corresponding to the request parameters. + * @return The result showing the {@link PurgatoryOrReviewResult} for the request that has been added to the purgatory. + */ + public synchronized

    PurgatoryOrReviewResult addRequest(HttpServletRequest request, + P parameters) { + if (!request.getMethod().equals("POST")) { + throw new IllegalArgumentException(String.format("Purgatory can only contain POST request (Attempted to add: %s).", + httpServletRequestToString(request))); + } + RequestInfo requestInfo = new RequestInfo(request, parameters); + _requestInfoById.put(_requestId, requestInfo); + + PurgatoryOrReviewResult result = new PurgatoryOrReviewResult(Collections.singletonMap(_requestId, requestInfo), + Collections.singleton(_requestId)); + _requestId++; + return result; + } + + /** + * Ensure that: + *

      + *
    • A request with the given review id exists in the purgatory.
    • + *
    • The request with the given review id matches the given request.
    • + *
    • The request with the given review id is approved in the purgatory.
    • + *
    + * + * Then mark the review status as submitted. + * + * @param reviewId The review id for which the corresponding request is requested to be submitted. + * @param request The request to submit. + * @return Submitted request info. + */ + public synchronized RequestInfo submit(int reviewId, HttpServletRequest request) { + RequestInfo requestInfo = _requestInfoById.get(reviewId); + // 1. Ensure that a request with the given review id exists in the purgatory. + if (requestInfo == null) { + throw new UserRequestException( + String.format("No request with review id %d exists in purgatory. Please use %s endpoint to check for the " + + "current requests awaiting review in purgatory.", reviewId, REVIEW)); + } + + // 2. Ensure that the request with the given review id matches the given request. + EndPoint endpoint = ParameterUtils.endPoint(request); + if (requestInfo.endPoint() != endpoint) { + throw new UserRequestException( + String.format("Request with review id %d is associated with %s endpoint, but the given request has %s endpoint." + + "Please use %s endpoint to check for the current requests awaiting review in purgatory.", + reviewId, requestInfo.endPoint(), endpoint, REVIEW)); + } + + if (requestInfo.status() == ReviewStatus.SUBMITTED) { + LOG.info("Request {} has already been submitted (review: {}).", requestInfo.endpointWithParams(), reviewId); + requestInfo.setAccessToAlreadySubmittedRequest(); + } else { + // 3. Ensure that the request with the given review id is approved in the purgatory, and mark the status as submitted. + requestInfo.submitReview(reviewId); + LOG.info("Submitted request {} for execution (review: {}).", requestInfo.endpointWithParams(), reviewId); + } + return requestInfo; + } + + /** + * Remove the {@link ReviewStatus#SUBMITTED} request associated with the given review id from purgatory. + * + * @param reviewId Review id of the request to be removed from the purgatory. + * @return Removed submitted request if exists in purgatory, null otherwise. + */ + public synchronized RequestInfo removeSubmitted(int reviewId) { + RequestInfo requestInfo = _requestInfoById.get(reviewId); + if (requestInfo == null) { + return null; + } else if (requestInfo.status() != ReviewStatus.SUBMITTED) { + throw new IllegalStateException( + String.format("Attempt to remove request associated with review id %d from purgatory. Status (current %s, " + + "expected: %s).", reviewId, requestInfo.status(), ReviewStatus.SUBMITTED)); + } + + return _requestInfoById.remove(reviewId); + } + + /** + * Apply the given target states to review the corresponding requests. Get the post-review result of the purgatory. + * + * @param requestIdsByTargetState Request Ids by target review state for requests in purgatory. + * @param reason Common reason for applying the review to the requests. + * @return The result showing the current purgatory state after the review. + */ + public synchronized PurgatoryOrReviewResult applyReview(Map> requestIdsByTargetState, String reason) { + // Sanity check if all request ids in the review exists in the purgatory. + Set reviewedRequestIds = new HashSet<>(); + for (Map.Entry> entry : requestIdsByTargetState.entrySet()) { + Set requestIds = entry.getValue(); + if (!_requestInfoById.keySet().containsAll(requestIds)) { + throw new IllegalStateException(String.format("Review contains request ids (%s) that do not exist in purgatory.", + requestIds.removeAll(_requestInfoById.keySet()))); + } + // Apply review to each Request Info + ReviewStatus targetReviewStatus = entry.getKey(); + requestIds.forEach(requestId -> _requestInfoById.get(requestId).applyReview(targetReviewStatus, reason)); + reviewedRequestIds.addAll(requestIds); + } + + // Return the post-review result of the purgatory. + return new PurgatoryOrReviewResult(new HashMap<>(_requestInfoById), reviewedRequestIds); + } + + private synchronized void removeOldRequests() { + LOG.debug("Remove old requests from purgatory."); + _requestInfoById.entrySet().removeIf(entry -> (entry.getValue().submissionTimeMs() + + _purgatoryRetentionTimeMs < System.currentTimeMillis())); + } + + @Override + public void close() { + _purgatoryCleaner.shutdownNow(); + _requestInfoById.clear(); + } + + /** + * A runnable class to remove expired requests. + */ + private class PurgatoryCleaner implements Runnable { + @Override + public void run() { + try { + removeOldRequests(); + } catch (Throwable t) { + LOG.warn("Received exception when trying to remove old requests from purgatory.", t); + } + } + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/purgatory/RequestInfo.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/purgatory/RequestInfo.java new file mode 100644 index 0000000000..111936ca9c --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/purgatory/RequestInfo.java @@ -0,0 +1,148 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.purgatory; + +import com.linkedin.kafka.cruisecontrol.servlet.EndPoint; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import javax.servlet.http.HttpServletRequest; + +import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.getClientIpAddress; +import static com.linkedin.kafka.cruisecontrol.servlet.purgatory.ReviewStatus.*; + + +/** + * A class to represent request information in purgatory. Possible status of requests, with supported transitions: + *
      + *
    • {@link ReviewStatus#PENDING_REVIEW} -> {@link ReviewStatus#APPROVED}, {@link ReviewStatus#DISCARDED}
    • + *
    • {@link ReviewStatus#APPROVED} -> {@link ReviewStatus#DISCARDED}, {@link ReviewStatus#SUBMITTED}
    • + *
    + */ +public class RequestInfo { + private static final String INIT_REASON = "Awaiting review."; + private static final String FINAL_REASON = "Submitted approved request."; + private static final Map> VALID_TRANSFER = new HashMap<>(); + static { + VALID_TRANSFER.put(PENDING_REVIEW, new HashSet<>(Arrays.asList(APPROVED, DISCARDED))); + VALID_TRANSFER.put(APPROVED, new HashSet<>(Arrays.asList(DISCARDED, SUBMITTED))); + } + private final String _submitterAddress; + private final long _submissionTimeMs; + private final Map _parameterMap; + private final EndPoint _endPoint; + private final CruiseControlParameters _parameters; + private volatile ReviewStatus _status; + private volatile String _reason; + private volatile boolean _accessToAlreadySubmittedRequest; + + public

    RequestInfo(HttpServletRequest request, P parameters) { + _submitterAddress = getClientIpAddress(request); + _submissionTimeMs = System.currentTimeMillis(); + _parameterMap = request.getParameterMap(); + _endPoint = ParameterUtils.endPoint(request); + _parameters = parameters; + _status = PENDING_REVIEW; + _reason = INIT_REASON; + _accessToAlreadySubmittedRequest = false; + } + + public CruiseControlParameters parameters() { + return _parameters; + } + + public String submitterAddress() { + return _submitterAddress; + } + + public long submissionTimeMs() { + return _submissionTimeMs; + } + + public String reason() { + return _reason; + } + + public Map parameterMap() { + return _parameterMap; + } + + public String endpointWithParams() { + StringBuilder sb = new StringBuilder(_endPoint.toString()); + String queryParamDelimiter = "?"; + for (Map.Entry paramSet : _parameterMap.entrySet()) { + for (String paramValue : paramSet.getValue()) { + sb.append(queryParamDelimiter).append(paramSet.getKey()).append("=").append(paramValue); + if (queryParamDelimiter.equals("?")) { + queryParamDelimiter = "&"; + } + } + } + return sb.toString(); + } + + public EndPoint endPoint() { + return _endPoint; + } + + public ReviewStatus status() { + return _status; + } + + /** + * Update review status and the corresponding reason for change to apply the review. + * + * @param targetStatus The status after applying the review. + * @param reason The reason for the status change upon review. + */ + void applyReview(ReviewStatus targetStatus, String reason) { + if (!canTransferToStatus(targetStatus)) { + throw new IllegalStateException("Cannot mark a task in " + _status + " to " + targetStatus + " status. The " + + "valid target statuses are " + validTargetStatus()); + } + _status = targetStatus; + _reason = reason; + } + + /** + * Submit the review to indicate that it is . + * + * @param reviewId The review id for which the corresponding request is requested to be submitted. + */ + void submitReview(int reviewId) { + applyReview(SUBMITTED, FINAL_REASON); + _parameters.setReviewId(reviewId); + } + + public void setAccessToAlreadySubmittedRequest() { + _accessToAlreadySubmittedRequest = true; + } + + public boolean accessToAlreadySubmittedRequest() { + return _accessToAlreadySubmittedRequest; + } + + /** + * Check if the status transfer is possible. + * + * @param targetStatus The status to transfer to. + * @return True if the transfer is valid, false otherwise. + */ + private boolean canTransferToStatus(ReviewStatus targetStatus) { + return VALID_TRANSFER.get(_status).contains(targetStatus); + } + + /** + * @return The valid target status to transfer to. + */ + private Set validTargetStatus() { + return Collections.unmodifiableSet(VALID_TRANSFER.get(_status)); + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/purgatory/ReviewStatus.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/purgatory/ReviewStatus.java new file mode 100644 index 0000000000..69354d678e --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/purgatory/ReviewStatus.java @@ -0,0 +1,27 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.purgatory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + + +/** + * Possible status of requests in {@link Purgatory}. + */ +public enum ReviewStatus { + PENDING_REVIEW, APPROVED, SUBMITTED, DISCARDED; + + private static final List CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values())); + + /** + * Use this instead of values() because values() creates a new array each time. + * @return enumerated values in the same order as values() + */ + public static List cachedValues() { + return CACHED_VALUES; + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/PurgatoryOrReviewResult.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/PurgatoryOrReviewResult.java new file mode 100644 index 0000000000..c15eae0504 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/PurgatoryOrReviewResult.java @@ -0,0 +1,135 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.response; + +import com.google.gson.Gson; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlParameters; +import com.linkedin.kafka.cruisecontrol.servlet.purgatory.RequestInfo; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.DATE_FORMAT; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.TIME_ZONE; +import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.VERSION; +import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.JSON_VERSION; + + +public class PurgatoryOrReviewResult extends AbstractCruiseControlResponse { + private static final String ID = "Id"; + private static final String SUBMITTER_ADDRESS = "SubmitterAddress"; + private static final String SUBMISSION_TIME_MS = "SubmissionTimeMs"; + private static final String STATUS = "Status"; + private static final String ENDPOINT_WITH_PARAMS = "EndpointWithParams"; + private static final String REASON = "Reason"; + private static final String REQUEST_INFO = "RequestInfo"; + private final Map _requestInfoById; + private final Set _filteredRequestIds; + + /** + * @param requestInfoById Request info by Id. + * @param filteredRequestIds Requests for which the result is requested, empty set implies all requests in purgatory + */ + public PurgatoryOrReviewResult(Map requestInfoById, Set filteredRequestIds) { + _requestInfoById = requestInfoById; + _filteredRequestIds = filteredRequestIds.isEmpty() ? _requestInfoById.keySet() : filteredRequestIds; + } + + private String getPlaintext() { + StringBuilder sb = new StringBuilder(); + int padding = 2; + int idLabelSize = ID.length(); + int submitterAddressLabelSize = SUBMITTER_ADDRESS.length(); + int submissionTimeLabelSize = 15; // Plaintext response, returns submission time -- i.e. not submission time ms. + int statusLabelSize = STATUS.length(); + int endpointWithParamsLabelSize = ENDPOINT_WITH_PARAMS.length(); + int reasonLabelSize = REASON.length(); + + for (Map.Entry entry : _requestInfoById.entrySet()) { + if (_filteredRequestIds.contains(entry.getKey())) { + // ID + idLabelSize = Math.max(idLabelSize, (int) (Math.log10(entry.getKey()) + 1)); + RequestInfo requestInfo = entry.getValue(); + // SUBMITTER_ADDRESS + submitterAddressLabelSize = Math.max(submitterAddressLabelSize, requestInfo.submitterAddress().length()); + // SUBMISSION_TIME_MS + String dateFormatted = KafkaCruiseControlUtils.toDateString(requestInfo.submissionTimeMs(), DATE_FORMAT, TIME_ZONE); + submissionTimeLabelSize = Math.max(submissionTimeLabelSize, dateFormatted.length()); + // STATUS + statusLabelSize = Math.max(statusLabelSize, requestInfo.status().toString().length()); + // ENDPOINT_WITH_PARAMS + endpointWithParamsLabelSize = Math.max(endpointWithParamsLabelSize, requestInfo.endpointWithParams().length()); + // REASON + reasonLabelSize = Math.max(reasonLabelSize, requestInfo.reason().length()); + } + } + + // Populate header. + StringBuilder formattingStringBuilder = new StringBuilder("%n%-"); + formattingStringBuilder.append(idLabelSize + padding) + .append("d%-") + .append(submitterAddressLabelSize + padding) + .append("s%-") + .append(submissionTimeLabelSize + padding) + .append("s%-") + .append(statusLabelSize + padding) + .append("s%-") + .append(endpointWithParamsLabelSize + padding) + .append("s%-") + .append(reasonLabelSize + padding) + .append("s"); + sb.append(String.format(formattingStringBuilder.toString(), "ID", "SUBMITTER ADDRESS", "SUBMISSION TIME", "STATUS", + "ENDPOINT WITH PARAMS", "REASON")); + + // Populate values. + for (Map.Entry entry : _requestInfoById.entrySet()) { + if (_filteredRequestIds.contains(entry.getKey())) { + RequestInfo requestInfo = entry.getValue(); + String dateFormatted = KafkaCruiseControlUtils.toDateString(requestInfo.submissionTimeMs(), DATE_FORMAT, TIME_ZONE); + sb.append(String.format(formattingStringBuilder.toString(), entry.getKey(), requestInfo.submitterAddress(), + dateFormatted, requestInfo.status(), requestInfo.endpointWithParams(), requestInfo.reason())); + } + } + + return sb.toString(); + } + + private String getJSONString() { + List> jsonRequestInfoList = new ArrayList<>(_filteredRequestIds.size()); + for (Map.Entry entry : _requestInfoById.entrySet()) { + if (_filteredRequestIds.contains(entry.getKey())) { + addJSONRequestInfo(jsonRequestInfoList, entry); + } + } + Map jsonResponse = new HashMap<>(2); + jsonResponse.put(REQUEST_INFO, jsonRequestInfoList); + jsonResponse.put(VERSION, JSON_VERSION); + return new Gson().toJson(jsonResponse); + } + + private void addJSONRequestInfo(List> jsonRequestInfoList, Map.Entry entry) { + Map jsonObjectMap = new HashMap<>(); + RequestInfo requestInfo = entry.getValue(); + jsonObjectMap.put(ID, entry.getKey()); + jsonObjectMap.put(SUBMITTER_ADDRESS, requestInfo.submitterAddress()); + jsonObjectMap.put(SUBMISSION_TIME_MS, requestInfo.submissionTimeMs()); + jsonObjectMap.put(STATUS, requestInfo.status().toString()); + jsonObjectMap.put(ENDPOINT_WITH_PARAMS, requestInfo.endpointWithParams()); + jsonObjectMap.put(REASON, requestInfo.reason()); + jsonRequestInfoList.add(jsonObjectMap); + } + + @Override + protected void discardIrrelevantAndCacheRelevant(CruiseControlParameters parameters) { + // Cache relevant response. + _cachedResponse = parameters.json() ? getJSONString() : getPlaintext(); + // Discard irrelevant response. + _requestInfoById.clear(); + _filteredRequestIds.clear(); + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/StopProposalExecutionResult.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/StopProposalResult.java similarity index 89% rename from cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/StopProposalExecutionResult.java rename to cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/StopProposalResult.java index ce05a735c0..3b22b6ee14 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/StopProposalExecutionResult.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/StopProposalResult.java @@ -9,7 +9,7 @@ import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.getBaseJSONString; -public class StopProposalExecutionResult extends AbstractCruiseControlResponse { +public class StopProposalResult extends AbstractCruiseControlResponse { @Override protected void discardIrrelevantAndCacheRelevant(CruiseControlParameters parameters) { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/UserTaskState.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/UserTaskState.java index df4aa136f0..e9fdebcf81 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/UserTaskState.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/UserTaskState.java @@ -20,16 +20,13 @@ import java.util.function.Consumer; import java.util.function.Predicate; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.DATE_FORMAT; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.TIME_ZONE; import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.JSON_VERSION; import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.VERSION; public class UserTaskState extends AbstractCruiseControlResponse { - private static final String DATE_FORMAT = "YYYY-MM-dd_HH:mm:ss z"; - private static final String TIME_ZONE = "UTC"; - private static final String ACTIVE_TASK_LABEL_VALUE = UserTaskManager.TaskState.ACTIVE.type(); - private static final String COMPLETED_TASK_LABEL_VALUE = UserTaskManager.TaskState.COMPLETED.type(); - private static final String COMPLETED_WITH_ERROR_TASK_LABEL_VALUE = UserTaskManager.TaskState.COMPLETED_WITH_ERROR.type(); private static final String USER_TASK_ID = "UserTaskId"; private static final String REQUEST_URL = "RequestURL"; private static final String CLIENT_ID = "ClientIdentity"; @@ -38,11 +35,10 @@ public class UserTaskState extends AbstractCruiseControlResponse { private static final String USER_TASKS = "userTasks"; private final Map> _userTasksByTaskState; - public UserTaskState(List activeUserTasks, - List completedUserTasks) { + public UserTaskState(UserTaskManager userTaskManager) { _userTasksByTaskState = new HashMap<>(2); - _userTasksByTaskState.put(UserTaskManager.TaskState.ACTIVE, activeUserTasks); - _userTasksByTaskState.put(UserTaskManager.TaskState.COMPLETED, completedUserTasks); + _userTasksByTaskState.put(UserTaskManager.TaskState.ACTIVE, userTaskManager.getActiveUserTasks()); + _userTasksByTaskState.put(UserTaskManager.TaskState.COMPLETED, userTaskManager.getCompletedUserTasks()); } private String getJSONString(CruiseControlParameters parameters) { @@ -69,23 +65,10 @@ public List prepareResultList(CruiseControlParamet return resultList.subList(0, Math.min(entries, resultList.size())); } - private static String getStatus(UserTaskManager.UserTaskInfo userTaskInfo) { - switch (userTaskInfo.state()) { - case ACTIVE: - return ACTIVE_TASK_LABEL_VALUE; - case COMPLETED: - return COMPLETED_TASK_LABEL_VALUE; - case COMPLETED_WITH_ERROR: - return COMPLETED_WITH_ERROR_TASK_LABEL_VALUE; - default: - throw new IllegalStateException("Unrecognized state " + userTaskInfo.state()); - } - } - private void addJSONTask(List> jsonUserTaskList, UserTaskManager.UserTaskInfo userTaskInfo) { Map jsonObjectMap = new HashMap<>(); - String status = getStatus(userTaskInfo); + String status = userTaskInfo.state().toString(); jsonObjectMap.put(USER_TASK_ID, userTaskInfo.userTaskId().toString()); jsonObjectMap.put(REQUEST_URL, userTaskInfo.requestWithParams()); jsonObjectMap.put(CLIENT_ID, userTaskInfo.clientIdentity()); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletEndpointTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletEndpointTest.java index 0e3946fa10..7169c7fd0f 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletEndpointTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletEndpointTest.java @@ -144,7 +144,7 @@ public void testUserTaskParameters() throws UnsupportedEncodingException { EasyMock.replay(_mockUUIDGenerator, _mockHttpSession, _mockHttpServletResponse); populateUserTaskManager(_mockHttpServletResponse, _userTaskManager); - UserTaskState userTaskState = new UserTaskState(_userTaskManager.getActiveUserTasks(), _userTaskManager.getCompletedUserTasks()); + UserTaskState userTaskState = new UserTaskState(_userTaskManager); // Test Case 1: Get all PROPOSAL or REBALANCE tasks Map answerQueryParam1 = new HashMap<>(); @@ -202,7 +202,7 @@ public void testUserTaskParameters() throws UnsupportedEncodingException { // Update task manager active vs completed state _userTaskManager.checkActiveUserTasks(); // Now the UserTaskManager state has changed, so we reload the states - UserTaskState userTaskState2 = new UserTaskState(_userTaskManager.getActiveUserTasks(), _userTaskManager.getCompletedUserTasks()); + UserTaskState userTaskState2 = new UserTaskState(_userTaskManager); // Test Case 5: Get all LOAD or REMOVE_BROKER tasks that's completed and with user task id repeatUUID Map answerQueryParam5 = new HashMap<>();