Skip to content

Commit

Permalink
Add 2-step verification for POST requests. (#582)
Browse files Browse the repository at this point in the history
  • Loading branch information
efeg authored Mar 20, 2019
1 parent 2b98cdb commit d35af1b
Show file tree
Hide file tree
Showing 26 changed files with 1,055 additions and 186 deletions.
12 changes: 12 additions & 0 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,15 @@ webserver.accesslog.path=access.log

# HTTP Request Log retention days
webserver.accesslog.retention.days=14

# Configurations for servlet
# ==========================

# Enable two-step verification for processing POST requests.
two.step.verification.enabled=false

# The maximum time in milliseconds to retain the requests in two-step (verification) purgatory.
two.step.purgatory.retention.time.ms=1209600000

# The maximum number of requests in two-step (verification) purgatory.
two.step.purgatory.max.requests=25
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
import com.linkedin.kafka.cruisecontrol.servlet.response.AdminResult;
import com.linkedin.kafka.cruisecontrol.servlet.response.BootstrapResult;
import com.linkedin.kafka.cruisecontrol.servlet.response.KafkaClusterState;
import com.linkedin.kafka.cruisecontrol.servlet.response.CruiseControlState;
import com.linkedin.kafka.cruisecontrol.servlet.response.PauseSamplingResult;
import com.linkedin.kafka.cruisecontrol.servlet.response.ResumeSamplingResult;
import com.linkedin.kafka.cruisecontrol.servlet.response.StopProposalExecutionResult;
import com.linkedin.kafka.cruisecontrol.servlet.response.StopProposalResult;
import com.linkedin.kafka.cruisecontrol.servlet.response.TrainResult;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.io.InputStream;
Expand Down Expand Up @@ -843,11 +845,12 @@ private void executeDemotion(Collection<ExecutionProposal> proposals,
/**
* Stop the executor if it is executing the proposals.
*
* @param parameters Stop proposal parameters (not used -- added for standardization and for extensibility).
* @return Stop proposal execution result.
*/
public StopProposalExecutionResult stopProposalExecution() {
public StopProposalResult stopProposalExecution(StopProposalParameters parameters) {
_executor.userTriggeredStopExecution();
return new StopProposalExecutionResult();
return new StopProposalResult();
}

/**
Expand All @@ -872,8 +875,11 @@ public CruiseControlState state(OperationProgress operationProgress,

/**
* Get the cluster state for Kafka.
*
* @param parameters Kafka cluster state parameters (not used -- added for standardization and for extensibility).
* @return Kafka cluster state.
*/
public KafkaClusterState kafkaClusterState() {
public KafkaClusterState kafkaClusterState(KafkaClusterStateParameters parameters) {
return new KafkaClusterState(_loadMonitor.kafkaCluster());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,9 @@ public static void main(String[] args) throws Exception {
context.addServlet(holderWebapp, webuiPathPrefix);

// Kafka Cruise Control servlet data
long maxBlockMs = config.getLong(KafkaCruiseControlConfig.WEBSERVER_REQUEST_MAX_BLOCK_TIME_MS);
long sessionExpiryMs = config.getLong(KafkaCruiseControlConfig.WEBSERVER_SESSION_EXPIRY_MS);
String apiUrlPrefix = config.getString(KafkaCruiseControlConfig.WEBSERVER_API_URLPREFIX);
KafkaCruiseControlServlet kafkaCruiseControlServlet =
new KafkaCruiseControlServlet(kafkaCruiseControl, maxBlockMs, sessionExpiryMs, dropwizardMetricsRegistry, config);
KafkaCruiseControlServlet kafkaCruiseControlServlet = new KafkaCruiseControlServlet(kafkaCruiseControl,
dropwizardMetricsRegistry);
ServletHolder servletHolder = new ServletHolder(kafkaCruiseControlServlet);
context.addServlet(servletHolder, apiUrlPrefix);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,25 @@ public class KafkaCruiseControlConfig extends AbstractConfig {
public static final String WEBSERVER_ACCESSLOG_RETENTION_DAYS = "webserver.accesslog.retention.days";
private static final String WEBSERVER_ACCESSLOG_RETENTION_DAYS_DOC = "HTTP Request log retention days";

/**
* <code>two.step.verification.enabled</code>
*/
public static final String TWO_STEP_VERIFICATION_ENABLED_CONFIG = "two.step.verification.enabled";
private static final String TWO_STEP_VERIFICATION_ENABLED_DOC = "Enable two-step verification for processing POST requests.";

/**
* <code>two.step.purgatory.retention.time.ms</code>
*/
public static final String TWO_STEP_PURGATORY_RETENTION_TIME_MS_CONFIG = "two.step.purgatory.retention.time.ms";
private static final String TWO_STEP_PURGATORY_RETENTION_TIME_MS_DOC = "The maximum time in milliseconds to "
+ "retain the requests in two-step (verification) purgatory.";

/**
* <code>two.step.purgatory.max.requests</code>
*/
public static final String TWO_STEP_PURGATORY_MAX_REQUESTS_CONFIG = "two.step.purgatory.max.requests";
private static final String TWO_STEP_PURGATORY_MAX_REQUESTS_DOC = "The maximum number of requests in two-step "
+ "(verification) purgatory.";

static {
CONFIG = new ConfigDef()
Expand All @@ -707,6 +726,8 @@ public class KafkaCruiseControlConfig extends AbstractConfig {
WEBSERVER_HTTP_ADDRESS_DOC)
.define(WEBSERVER_HTTP_CORS_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW,
WEBSERVER_HTTP_CORS_ENABLED_DOC)
.define(TWO_STEP_VERIFICATION_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM,
TWO_STEP_VERIFICATION_ENABLED_DOC)
.define(WEBSERVER_HTTP_CORS_ORIGIN_CONFIG, ConfigDef.Type.STRING, "*", ConfigDef.Importance.LOW,
WEBSERVER_HTTP_CORS_ORIGIN_DOC)
.define(WEBSERVER_HTTP_CORS_ALLOWMETHODS_CONFIG, ConfigDef.Type.STRING, "OPTIONS, GET, POST", ConfigDef.Importance.HIGH,
Expand Down Expand Up @@ -808,6 +829,16 @@ public class KafkaCruiseControlConfig extends AbstractConfig {
atLeast(0),
ConfigDef.Importance.MEDIUM,
REMOVAL_HISTORY_RETENTION_TIME_MS_DOC)
.define(TWO_STEP_PURGATORY_RETENTION_TIME_MS_CONFIG,
ConfigDef.Type.LONG,
TimeUnit.HOURS.toMillis(336),
atLeast(TimeUnit.HOURS.toMillis(1)),
ConfigDef.Importance.MEDIUM, TWO_STEP_PURGATORY_RETENTION_TIME_MS_DOC)
.define(TWO_STEP_PURGATORY_MAX_REQUESTS_CONFIG,
ConfigDef.Type.INT,
25,
atLeast(1),
ConfigDef.Importance.MEDIUM, TWO_STEP_PURGATORY_MAX_REQUESTS_DOC)
.define(MAX_CACHED_COMPLETED_USER_TASKS_CONFIG,
ConfigDef.Type.INT,
100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public enum EndPoint {
KAFKA_CLUSTER_STATE,
DEMOTE_BROKER,
USER_TASKS,
ADMIN;
ADMIN,
REVIEW;

private static final List<EndPoint> GET_ENDPOINT = Arrays.asList(BOOTSTRAP,
TRAIN,
Expand All @@ -45,7 +46,8 @@ public enum EndPoint {
PAUSE_SAMPLING,
RESUME_SAMPLING,
DEMOTE_BROKER,
ADMIN);
ADMIN,
REVIEW);
private static final List<EndPoint> CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values()));

public static List<EndPoint> getEndpoint() {
Expand Down
Loading

0 comments on commit d35af1b

Please sign in to comment.