diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/KafkaConnectService.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/KafkaConnectService.java index bbee44c7a3..36e364fdcf 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/KafkaConnectService.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/KafkaConnectService.java @@ -14,13 +14,9 @@ import io.aiven.klaw.clusterapi.models.enums.KafkaSupportedProtocol; import io.aiven.klaw.clusterapi.models.error.RestErrorResponse; import io.aiven.klaw.clusterapi.utils.ClusterApiUtils; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpEntity; @@ -38,9 +34,6 @@ @Slf4j public class KafkaConnectService { - private static final ParameterizedTypeReference>> - GET_CONNECTORS_TYPEREF = new ParameterizedTypeReference<>() {}; - private static final ParameterizedTypeReference> GET_CONNECTORS_STR_TYPEREF = new ParameterizedTypeReference<>() {}; private static final ParameterizedTypeReference> @@ -56,65 +49,74 @@ public KafkaConnectService(ClusterApiUtils clusterApiUtils) { } public ApiResponse deleteConnector(ClusterConnectorRequest clusterConnectorRequest) { - log.info("Into deleteConnector {}", clusterConnectorRequest); - String suffixUrl = - clusterConnectorRequest.getEnv() - + "/connectors/" - + clusterConnectorRequest.getConnectorName(); - Pair reqDetails = - clusterApiUtils.getRequestDetails(suffixUrl, clusterConnectorRequest.getProtocol()); - HttpHeaders headers = - clusterApiUtils.createHeaders( - clusterConnectorRequest.getClusterIdentification(), KafkaClustersType.KAFKA_CONNECT); - HttpEntity request = new HttpEntity<>(headers); - - try { - reqDetails - .getRight() - .exchange( - reqDetails.getLeft(), - HttpMethod.DELETE, - request, - new ParameterizedTypeReference<>() {}); - } catch (HttpServerErrorException | HttpClientErrorException e) { - log.error("Error in deleting connector ", e); - return buildErrorResponseFromRestException(e, CLUSTER_API_ERR_3); - } catch (RestClientException ex) { - log.error("Error in deleting connector ", ex); - return ApiResponse.notOk(CLUSTER_API_ERR_3); + Set errMsgResponse = new HashSet<>(); + for (String envUrl : getEnvironment(clusterConnectorRequest.getEnv())) { + try { + log.info("Into deleteConnector {}", clusterConnectorRequest); + String suffixUrl = envUrl + "/connectors/" + clusterConnectorRequest.getConnectorName(); + Pair reqDetails = + clusterApiUtils.getRequestDetails(suffixUrl, clusterConnectorRequest.getProtocol()); + HttpHeaders headers = + clusterApiUtils.createHeaders( + clusterConnectorRequest.getClusterIdentification(), + KafkaClustersType.KAFKA_CONNECT); + HttpEntity request = new HttpEntity<>(headers); + + reqDetails + .getRight() + .exchange( + reqDetails.getLeft(), + HttpMethod.DELETE, + request, + new ParameterizedTypeReference<>() {}); + return ApiResponse.SUCCESS; + } catch (HttpServerErrorException | HttpClientErrorException e) { + log.error("Rest Exception in deleting connector ", e); + errMsgResponse.add(getErrorMsgeFromRestException(e, CLUSTER_API_ERR_3)); + } catch (RestClientException ex) { + log.error("Error in deleting connector ", ex); + errMsgResponse.add(CLUSTER_API_ERR_3); + } } - return ApiResponse.SUCCESS; + return ApiResponse.notOk(StringUtils.join(errMsgResponse, ", ")); } public ApiResponse updateConnector(ClusterConnectorRequest clusterConnectorRequest) { - log.info("Into updateConnector {}", clusterConnectorRequest); - String suffixUrl = - clusterConnectorRequest.getEnv() - + "/connectors/" - + clusterConnectorRequest.getConnectorName() - + "/config"; - Pair reqDetails = - clusterApiUtils.getRequestDetails(suffixUrl, clusterConnectorRequest.getProtocol()); + Set errMsgResponse = new HashSet<>(); + for (String envUrl : getEnvironment(clusterConnectorRequest.getEnv())) { + try { + log.info("Into updateConnector {}", clusterConnectorRequest); + String suffixUrl = + envUrl + "/connectors/" + clusterConnectorRequest.getConnectorName() + "/config"; + Pair reqDetails = + clusterApiUtils.getRequestDetails(suffixUrl, clusterConnectorRequest.getProtocol()); + + HttpHeaders headers = + createKafkaConnectHeaders(clusterConnectorRequest.getClusterIdentification()); + HttpEntity request = + new HttpEntity<>(clusterConnectorRequest.getConnectorConfig(), headers); + + reqDetails.getRight().put(reqDetails.getLeft(), request, String.class); + return ApiResponse.SUCCESS; + } catch (HttpServerErrorException | HttpClientErrorException e) { + log.error("Error in updating connector ", e); + errMsgResponse.add(getErrorMsgeFromRestException(e, CLUSTER_API_ERR_2)); + } catch (Exception ex) { + log.error("Error in updating connector ", ex); + errMsgResponse.add(CLUSTER_API_ERR_2); + } + } + return ApiResponse.notOk(StringUtils.join(errMsgResponse, ", ")); + } + private HttpHeaders createKafkaConnectHeaders(String clusterIdentification) { HttpHeaders headers = - clusterApiUtils.createHeaders( - clusterConnectorRequest.getClusterIdentification(), KafkaClustersType.KAFKA_CONNECT); + clusterApiUtils.createHeaders(clusterIdentification, KafkaClustersType.KAFKA_CONNECT); headers.set("Content-Type", "application/json"); - HttpEntity request = - new HttpEntity<>(clusterConnectorRequest.getConnectorConfig(), headers); - - try { - reqDetails.getRight().put(reqDetails.getLeft(), request, String.class); - } catch (HttpServerErrorException | HttpClientErrorException e) { - log.error("Error in updating connector ", e); - return buildErrorResponseFromRestException(e, CLUSTER_API_ERR_2); - } catch (Exception ex) { - return ApiResponse.notOk(CLUSTER_API_ERR_2); - } - return ApiResponse.SUCCESS; + return headers; } - private static ApiResponse buildErrorResponseFromRestException( + private static String getErrorMsgeFromRestException( HttpStatusCodeException e, String defaultErrorMsg) { RestErrorResponse errorResponse = null; try { @@ -122,35 +124,40 @@ private static ApiResponse buildErrorResponseFromRestException( } catch (Exception ex) { log.error("Error caught trying to process the error response. ", ex); } - return ApiResponse.notOk(errorResponse == null ? defaultErrorMsg : errorResponse.getMessage()); + return errorResponse == null ? defaultErrorMsg : errorResponse.getMessage(); } - public ApiResponse postNewConnector(ClusterConnectorRequest clusterConnectorRequest) - throws Exception { + public ApiResponse postNewConnector(ClusterConnectorRequest clusterConnectorRequest) { log.info("Into postNewConnector clusterConnectorRequest {} ", clusterConnectorRequest); - - String suffixUrl = clusterConnectorRequest.getEnv() + "/connectors"; - Pair reqDetails = - clusterApiUtils.getRequestDetails(suffixUrl, clusterConnectorRequest.getProtocol()); - - HttpHeaders headers = - clusterApiUtils.createHeaders( - clusterConnectorRequest.getClusterIdentification(), KafkaClustersType.KAFKA_CONNECT); - headers.set("Content-Type", "application/json"); - - HttpEntity request = - new HttpEntity<>(clusterConnectorRequest.getConnectorConfig(), headers); - ResponseEntity responseNew; - try { - responseNew = - reqDetails.getRight().postForEntity(reqDetails.getLeft(), request, String.class); - } catch (HttpServerErrorException | HttpClientErrorException e) { - - return buildErrorResponseFromRestException(e, CLUSTER_API_ERR_1); - } catch (Exception ex) { - return ApiResponse.notOk(CLUSTER_API_ERR_1); + Set errMsgResponse = new HashSet<>(); + ResponseEntity responseNew = null; + for (String envUrl : getEnvironment(clusterConnectorRequest.getEnv())) { + try { + String suffixUrl = envUrl + "/connectors"; + Pair reqDetails = + clusterApiUtils.getRequestDetails(suffixUrl, clusterConnectorRequest.getProtocol()); + + HttpHeaders headers = + createKafkaConnectHeaders(clusterConnectorRequest.getClusterIdentification()); + + HttpEntity request = + new HttpEntity<>(clusterConnectorRequest.getConnectorConfig(), headers); + + responseNew = + reqDetails.getRight().postForEntity(reqDetails.getLeft(), request, String.class); + if (responseNew.getStatusCode().is2xxSuccessful()) { + return ApiResponse.SUCCESS; + } + } catch (HttpServerErrorException | HttpClientErrorException e) { + log.error("postNewConnector Rest Exception", e); + errMsgResponse.add(getErrorMsgeFromRestException(e, CLUSTER_API_ERR_1)); + } catch (Exception ex) { + log.error("postNewConnector Error", ex); + errMsgResponse.add(CLUSTER_API_ERR_1); + } } - return responseNew.getStatusCodeValue() == 201 ? ApiResponse.SUCCESS : ApiResponse.FAILURE; + return processResponseBody( + responseNew, ApiResponse.notOk(StringUtils.join(errMsgResponse, ", "))); } public ConnectorsStatus getConnectors( @@ -161,183 +168,209 @@ public ConnectorsStatus getConnectors( ConnectorsStatus connectorsStatus = new ConnectorsStatus(); List connectorStateList = new ArrayList<>(); connectorsStatus.setConnectorStateList(connectorStateList); + for (String envUrl : getEnvironment(environmentVal)) { + try { + log.info("Into getConnectors {} {}", environmentVal, protocol); + if (envUrl == null) { + return null; + } - try { - log.info("Into getConnectors {} {}", environmentVal, protocol); - if (environmentVal == null) { - return null; - } + String suffixUrl = envUrl + "/connectors"; - String suffixUrl = environmentVal + "/connectors"; + if (getConnectorStatuses) { + suffixUrl = suffixUrl + CONNECTOR_URI_EXPAND_STATUS; + } + + Pair reqDetails = + clusterApiUtils.getRequestDetails(suffixUrl, protocol); + + HttpHeaders headers = + clusterApiUtils.createHeaders(clusterIdentification, KafkaClustersType.KAFKA_CONNECT); + HttpEntity request = new HttpEntity<>(headers); + Map params = new HashMap<>(); + + if (!getConnectorStatuses) { + ResponseEntity> responseList = + reqDetails + .getRight() + .exchange( + reqDetails.getLeft(), + HttpMethod.GET, + request, + GET_CONNECTORS_STR_TYPEREF, + params); + log.info("connectors list " + responseList); + if (responseList.getBody() != null) { + for (String connectorName : responseList.getBody()) { + ConnectorState connectorState = new ConnectorState(); + connectorState.setConnectorName(connectorName); + connectorStateList.add(connectorState); + } + } - if (getConnectorStatuses) { - suffixUrl = suffixUrl + CONNECTOR_URI_EXPAND_STATUS; + return connectorsStatus; + } + + ResponseEntity>> responseEntity = + getConnectorStatus(reqDetails, request, params); + Map> responseBody = responseEntity.getBody(); + + for (String connectorName : Objects.requireNonNull(responseBody).keySet()) { + Map statusMap = responseBody.get(connectorName); + Status statusConnector = statusMap.get("status"); + long failedTasksCount = + statusConnector.getTasks().stream() + .filter(task -> task.getState().equals(FAILED_STATUS)) + .count(); + long runningTasksCount = + statusConnector.getTasks().stream() + .filter(task -> task.getState().equals(RUNNING_STATUS)) + .count(); + + ConnectorState connectorState = new ConnectorState(); + connectorState.setConnectorName(connectorName); + connectorState.setConnectorStatus(statusConnector.getConnector().getState()); + connectorState.setRunningTasks(runningTasksCount); + connectorState.setFailedTasks(failedTasksCount); + connectorStateList.add(connectorState); + } + connectorsStatus.setConnectorStateList(connectorStateList); + + log.info("connectors list " + responseEntity); + return connectorsStatus; + } catch (Exception e) { + log.error("Error in getting connectors " + e); } + } + return connectorsStatus; + } - Pair reqDetails = - clusterApiUtils.getRequestDetails(suffixUrl, protocol); + private static ResponseEntity>> getConnectorStatus( + Pair reqDetails, + HttpEntity request, + Map params) { + return reqDetails + .getRight() + .exchange( + reqDetails.getLeft(), + HttpMethod.GET, + request, + new ParameterizedTypeReference<>() {}, + params); + } - HttpHeaders headers = - clusterApiUtils.createHeaders(clusterIdentification, KafkaClustersType.KAFKA_CONNECT); - HttpEntity request = new HttpEntity<>(headers); - Map params = new HashMap<>(); + public Map getConnectorDetails( + String connector, + String environmentVal, + KafkaSupportedProtocol protocol, + String clusterIdentification) { + for (String envUrl : getEnvironment(environmentVal)) { + try { + log.info("Into getConnectorDetails {} {}", environmentVal, protocol); + if (envUrl == null) { + return null; + } - if (!getConnectorStatuses) { - ResponseEntity> responseList = + String suffixUrl = envUrl + "/connectors" + "/" + connector; + Pair reqDetails = + clusterApiUtils.getRequestDetails(suffixUrl, protocol); + + HttpHeaders headers = + clusterApiUtils.createHeaders(clusterIdentification, KafkaClustersType.KAFKA_CONNECT); + HttpEntity request = new HttpEntity<>(headers); + Map params = new HashMap<>(); + + ResponseEntity> responseList = reqDetails .getRight() .exchange( reqDetails.getLeft(), HttpMethod.GET, request, - GET_CONNECTORS_STR_TYPEREF, + GET_CONNECTOR_DETAILS_TYPEREF, params); log.info("connectors list " + responseList); - if (responseList.getBody() != null) { - for (String connectorName : responseList.getBody()) { - ConnectorState connectorState = new ConnectorState(); - connectorState.setConnectorName(connectorName); - connectorStateList.add(connectorState); - } - } - return connectorsStatus; + return responseList.getBody(); + } catch (Exception e) { + log.error("Error in getting connector detail ", e); } - - ResponseEntity>> responseEntity = - reqDetails - .getRight() - .exchange( - reqDetails.getLeft(), - HttpMethod.GET, - request, - new ParameterizedTypeReference<>() {}, - params); - Map> responseBody = responseEntity.getBody(); - - for (String connectorName : Objects.requireNonNull(responseBody).keySet()) { - Map statusMap = responseBody.get(connectorName); - Status statusConnector = statusMap.get("status"); - long failedTasksCount = - statusConnector.getTasks().stream() - .filter(task -> task.getState().equals(FAILED_STATUS)) - .count(); - long runningTasksCount = - statusConnector.getTasks().stream() - .filter(task -> task.getState().equals(RUNNING_STATUS)) - .count(); - - ConnectorState connectorState = new ConnectorState(); - connectorState.setConnectorName(connectorName); - connectorState.setConnectorStatus(statusConnector.getConnector().getState()); - connectorState.setRunningTasks(runningTasksCount); - connectorState.setFailedTasks(failedTasksCount); - connectorStateList.add(connectorState); - } - connectorsStatus.setConnectorStateList(connectorStateList); - - log.info("connectors list " + responseEntity); - return connectorsStatus; - } catch (Exception e) { - log.error("Error in getting connectors " + e); - return connectorsStatus; } + return Collections.emptyMap(); } - public Map getConnectorDetails( - String connector, - String environmentVal, - KafkaSupportedProtocol protocol, - String clusterIdentification) { - try { - log.info("Into getConnectorDetails {} {}", environmentVal, protocol); - if (environmentVal == null) { - return null; - } - - String suffixUrl = environmentVal + "/connectors" + "/" + connector; + protected ClusterStatus getKafkaConnectStatus( + String environment, KafkaSupportedProtocol protocol, String clusterIdentification) { + log.info( + "env : {} , protocol: {}, clusterIdentification: {}", + environment, + protocol, + clusterIdentification); + for (String env : getEnvironment(environment)) { + String suffixUrl = env + "/connectors"; Pair reqDetails = clusterApiUtils.getRequestDetails(suffixUrl, protocol); - HttpHeaders headers = clusterApiUtils.createHeaders(clusterIdentification, KafkaClustersType.KAFKA_CONNECT); HttpEntity request = new HttpEntity<>(headers); - Map params = new HashMap<>(); - - ResponseEntity> responseList = - reqDetails - .getRight() - .exchange( - reqDetails.getLeft(), - HttpMethod.GET, - request, - GET_CONNECTOR_DETAILS_TYPEREF, - params); - log.info("connectors list " + responseList); - - return responseList.getBody(); - } catch (Exception e) { - log.error("Error in getting connector detail ", e); - return Collections.emptyMap(); - } - } - protected ClusterStatus getKafkaConnectStatus( - String environment, KafkaSupportedProtocol protocol, String clusterIdentification) { - String suffixUrl = environment + "/connectors"; - Pair reqDetails = clusterApiUtils.getRequestDetails(suffixUrl, protocol); - HttpHeaders headers = - clusterApiUtils.createHeaders(clusterIdentification, KafkaClustersType.KAFKA_CONNECT); - HttpEntity request = new HttpEntity<>(headers); - - try { - reqDetails - .getRight() - .exchange( - reqDetails.getLeft(), HttpMethod.GET, request, new ParameterizedTypeReference<>() {}); - return ClusterStatus.ONLINE; - } catch (RestClientException e) { - log.error("Exception:", e); - return ClusterStatus.OFFLINE; + try { + reqDetails + .getRight() + .exchange( + reqDetails.getLeft(), + HttpMethod.GET, + request, + new ParameterizedTypeReference<>() {}); + return ClusterStatus.ONLINE; + } catch (RestClientException e) { + log.error("Exception Connectin to {} :", env, e); + } } + return ClusterStatus.OFFLINE; } public ApiResponse restartConnector(ClusterConnectorRequest clusterConnectorRequest) { log.info("Into restartConnector clusterConnectorRequest {} ", clusterConnectorRequest); - - String suffixUrl = - clusterConnectorRequest.getEnv() - + "/connectors" - + "/" - + clusterConnectorRequest.getConnectorName() - + "/" - + "restart"; - - suffixUrl = - suffixUrl - + "?includeTasks=true&onlyFailed=" - + clusterConnectorRequest.isIncludeFailedTasksOnly(); - - Pair reqDetails = - clusterApiUtils.getRequestDetails(suffixUrl, clusterConnectorRequest.getProtocol()); - - HttpHeaders headers = - clusterApiUtils.createHeaders( - clusterConnectorRequest.getClusterIdentification(), KafkaClustersType.KAFKA_CONNECT); - headers.set("Content-Type", "application/json"); - HttpEntity request = new HttpEntity<>(headers); - ResponseEntity responseNew; - try { - responseNew = - reqDetails.getRight().postForEntity(reqDetails.getLeft(), request, String.class); - } catch (HttpServerErrorException | HttpClientErrorException e) { - return buildErrorResponseFromRestException(e, CLUSTER_API_ERR_1); - } catch (Exception ex) { - return ApiResponse.notOk(CLUSTER_API_ERR_1); + ResponseEntity responseNew = null; + for (String envUrl : getEnvironment(clusterConnectorRequest.getEnv())) { + try { + String suffixUrl = + envUrl + + "/connectors" + + "/" + + clusterConnectorRequest.getConnectorName() + + "/" + + "restart"; + + suffixUrl = + suffixUrl + + "?includeTasks=true&onlyFailed=" + + clusterConnectorRequest.isIncludeFailedTasksOnly(); + + Pair reqDetails = + clusterApiUtils.getRequestDetails(suffixUrl, clusterConnectorRequest.getProtocol()); + + HttpHeaders headers = + createKafkaConnectHeaders(clusterConnectorRequest.getClusterIdentification()); + HttpEntity request = new HttpEntity<>(headers); + + responseNew = + reqDetails.getRight().postForEntity(reqDetails.getLeft(), request, String.class); + } catch (HttpServerErrorException | HttpClientErrorException e) { + log.error("restartConnector Rest Exception", e); + } catch (Exception ex) { + log.error("restartConnector Error", ex); + } } - return responseNew.getStatusCode().is2xxSuccessful() + return processResponseBody(responseNew, ApiResponse.notOk("Unable to restart Connector.")); + } + + private static ApiResponse processResponseBody( + ResponseEntity responseNew, ApiResponse customFailureMessage) { + return responseNew != null && responseNew.getStatusCode().is2xxSuccessful() ? ApiResponse.SUCCESS - : ApiResponse.FAILURE; + : customFailureMessage; } public ApiResponse pauseConnector(ClusterConnectorRequest clusterConnectorRequest) { @@ -347,4 +380,8 @@ public ApiResponse pauseConnector(ClusterConnectorRequest clusterConnectorReques public ApiResponse resumeConnector(ClusterConnectorRequest clusterConnectorRequest) { return ApiResponse.notOk("To be implemented"); } + + public String[] getEnvironment(String environments) { + return environments.split(","); + } } diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/KafkaConnectServiceTest.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/KafkaConnectServiceTest.java index 7674ae4243..0232868baa 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/KafkaConnectServiceTest.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/KafkaConnectServiceTest.java @@ -64,7 +64,6 @@ public void getConnectors_returnList() throws JsonProcessingException { withSuccess( objectMapper.writeValueAsString(utilMethods.getConnectorsListMap()), MediaType.APPLICATION_JSON)); - assertThat( kafkaConnectService .getConnectors("env", KafkaSupportedProtocol.PLAINTEXT, "CLID1", true) @@ -75,8 +74,12 @@ public void getConnectors_returnList() throws JsonProcessingException { // TODO need to add proper return value @Test public void getConnectorDetails_returnMap() throws JsonProcessingException { - when(getAdminClient.getRequestDetails(any(), eq(KafkaSupportedProtocol.PLAINTEXT))) + + // Connector operations + when(getAdminClient.getRequestDetails( + eq("env/connectors/conn1"), eq(KafkaSupportedProtocol.PLAINTEXT))) .thenReturn(Pair.of("/env/connectors/conn1", restTemplate)); + this.mockRestServiceServer .expect(requestTo("/env/connectors/conn1")) .andRespond( @@ -93,7 +96,6 @@ public void getConnectorDetails_returnMap() throws JsonProcessingException { @Test public void createConnector_bad_request() throws Exception { ClusterConnectorRequest connectorRequest = stubCreateOrDeleteConnector(); - this.mockRestServiceServer .expect(requestTo("/env/connectors/conn1")) .andRespond( @@ -131,10 +133,10 @@ public void createConnector_fail() throws Exception { this.mockRestServiceServer .expect(requestTo("/env/connectors/conn1")) - .andRespond(withRawStatus(207).contentType(MediaType.APPLICATION_JSON)); + .andRespond(withRawStatus(404).contentType(MediaType.APPLICATION_JSON)); ApiResponse connectorResponse = kafkaConnectService.postNewConnector(connectorRequest); assertThat(connectorResponse.isSuccess()).isFalse(); - assertThat(connectorResponse.getMessage()).isEqualTo(ApiResultStatus.FAILURE.value); + assertThat(connectorResponse.getMessage()).isEqualTo("Unable to create Connector on Cluster."); } @Test @@ -209,6 +211,114 @@ public void deleteConnector_success() { assertThat(connectorResponse.getMessage()).isEqualTo(ApiResultStatus.SUCCESS.value); } + @Test + public void createConnectorMultiEnvs_success() throws Exception { + ClusterConnectorRequest connectorRequest = stubCreateOrDeleteConnectorMultiUrl(); + + this.mockRestServiceServer + .expect(requestTo("/env/connectors")) + .andRespond(withRawStatus(404).contentType(MediaType.APPLICATION_JSON)); + + this.mockRestServiceServer + .expect(requestTo("/env1/connectors")) + .andRespond(withRawStatus(201).contentType(MediaType.APPLICATION_JSON)); + ApiResponse connectorResponse = kafkaConnectService.postNewConnector(connectorRequest); + assertThat(connectorResponse.isSuccess()).isTrue(); + assertThat(connectorResponse.getMessage()).isEqualTo(ApiResultStatus.SUCCESS.value); + mockRestServiceServer.verify(); + } + + @Test + public void deleteConnectorMultiEnvs_success() throws Exception { + ClusterConnectorRequest connectorRequest = stubCreateOrDeleteConnectorMultiUrl(); + + this.mockRestServiceServer + .expect(requestTo("/env/connectors/conn1")) + .andRespond(withRawStatus(404).contentType(MediaType.APPLICATION_JSON)); + this.mockRestServiceServer + .expect(requestTo("/env1/connectors/conn1")) + .andRespond(withRawStatus(500).contentType(MediaType.APPLICATION_JSON)); + + this.mockRestServiceServer + .expect(requestTo("/env2/connectors/conn1")) + .andRespond(withRawStatus(201).contentType(MediaType.APPLICATION_JSON)); + + ApiResponse connectorResponse = kafkaConnectService.deleteConnector(connectorRequest); + assertThat(connectorResponse.getMessage()).isEqualTo(ApiResultStatus.SUCCESS.value); + mockRestServiceServer.verify(); + } + + @Test + public void updateConnector_WithMulti_url_success() { + ClusterConnectorRequest connectorRequest = stubUpdateConnectorMultiUrl(); + + this.mockRestServiceServer + .expect(requestTo("/env/connectors/conn1/config")) + .andRespond(withRawStatus(401).contentType(MediaType.APPLICATION_JSON)); + this.mockRestServiceServer + .expect(requestTo("/env1/connectors/conn1/config")) + .andRespond(withRawStatus(201).contentType(MediaType.APPLICATION_JSON)); + + ApiResponse connectorResponse = kafkaConnectService.updateConnector(connectorRequest); + assertThat(connectorResponse.isSuccess()).isTrue(); + assertThat(connectorResponse.getMessage()).isEqualTo(ApiResultStatus.SUCCESS.value); + } + + @Test + public void createConnectorMultiEnvs_failure() throws Exception { + ClusterConnectorRequest connectorRequest = stubCreateOrDeleteConnectorMultiUrl(); + + this.mockRestServiceServer + .expect(requestTo("/env/connectors")) + .andRespond(withRawStatus(404).contentType(MediaType.APPLICATION_JSON)); + + this.mockRestServiceServer + .expect(requestTo("/env1/connectors")) + .andRespond(withRawStatus(201).contentType(MediaType.APPLICATION_JSON)); + ApiResponse connectorResponse = kafkaConnectService.postNewConnector(connectorRequest); + assertThat(connectorResponse.isSuccess()).isTrue(); + assertThat(connectorResponse.getMessage()).isEqualTo(ApiResultStatus.SUCCESS.value); + mockRestServiceServer.verify(); + } + + @Test + public void deleteConnectorMultiEnvs_failure() throws Exception { + ClusterConnectorRequest connectorRequest = stubCreateOrDeleteConnectorMultiUrl(); + + this.mockRestServiceServer + .expect(requestTo("/env/connectors/conn1")) + .andRespond(withRawStatus(404).contentType(MediaType.APPLICATION_JSON)); + this.mockRestServiceServer + .expect(requestTo("/env1/connectors/conn1")) + .andRespond(withRawStatus(500).contentType(MediaType.APPLICATION_JSON)); + + this.mockRestServiceServer + .expect(requestTo("/env2/connectors/conn1")) + .andRespond(withRawStatus(401).contentType(MediaType.APPLICATION_JSON)); + ApiResponse connectorResponse = kafkaConnectService.deleteConnector(connectorRequest); + assertThat(connectorResponse.getMessage()).isEqualTo("Unable To Delete Connector on Cluster."); + mockRestServiceServer.verify(); + } + + @Test + public void updateConnector_WithMulti_url_failure() { + ClusterConnectorRequest connectorRequest = stubUpdateConnector(); + + this.mockRestServiceServer + .expect(requestTo("/env/connectors/conn1/config")) + .andRespond(withRawStatus(401).contentType(MediaType.APPLICATION_JSON)); + this.mockRestServiceServer + .expect(requestTo("/env1/connectors/conn1/config")) + .andRespond(withRawStatus(401).contentType(MediaType.APPLICATION_JSON)); + this.mockRestServiceServer + .expect(requestTo("/env2/connectors/conn1/config")) + .andRespond(withRawStatus(401).contentType(MediaType.APPLICATION_JSON)); + + ApiResponse connectorResponse = kafkaConnectService.updateConnector(connectorRequest); + assertThat(connectorResponse.isSuccess()).isFalse(); + assertThat(connectorResponse.getMessage()).isEqualTo("Unable to update Connector on Cluster"); + } + private ClusterConnectorRequest stubCreateOrDeleteConnector() { when(getAdminClient.getRequestDetails(any(), eq(KafkaSupportedProtocol.PLAINTEXT))) .thenReturn(Pair.of("/env/connectors/conn1", restTemplate)); @@ -224,8 +334,41 @@ private ClusterConnectorRequest stubCreateOrDeleteConnector() { return connectorRequest; } + private ClusterConnectorRequest stubCreateOrDeleteConnectorMultiUrl() { + when(getAdminClient.getRequestDetails( + eq("env/connectors"), eq(KafkaSupportedProtocol.PLAINTEXT))) + .thenReturn(Pair.of("/env/connectors", restTemplate)); + when(getAdminClient.getRequestDetails( + eq("env1/connectors"), eq(KafkaSupportedProtocol.PLAINTEXT))) + .thenReturn(Pair.of("/env1/connectors", restTemplate)); + when(getAdminClient.getRequestDetails( + eq("env2/connectors"), eq(KafkaSupportedProtocol.PLAINTEXT))) + .thenReturn(Pair.of("/env2/connectors", restTemplate)); + // Connector operations + when(getAdminClient.getRequestDetails( + eq("env/connectors/conn1"), eq(KafkaSupportedProtocol.PLAINTEXT))) + .thenReturn(Pair.of("/env/connectors/conn1", restTemplate)); + when(getAdminClient.getRequestDetails( + eq("env1/connectors/conn1"), eq(KafkaSupportedProtocol.PLAINTEXT))) + .thenReturn(Pair.of("/env1/connectors/conn1", restTemplate)); + when(getAdminClient.getRequestDetails( + eq("env2/connectors/conn1"), eq(KafkaSupportedProtocol.PLAINTEXT))) + .thenReturn(Pair.of("/env2/connectors/conn1", restTemplate)); + when(getAdminClient.createHeaders(eq("1"), eq(KafkaClustersType.KAFKA_CONNECT))) + .thenReturn(new HttpHeaders()); + ClusterConnectorRequest connectorRequest = + ClusterConnectorRequest.builder() + .connectorName("conn1") + .clusterIdentification("1") + .env("env,env1,env2") + .protocol(KafkaSupportedProtocol.PLAINTEXT) + .build(); + return connectorRequest; + } + private ClusterConnectorRequest stubUpdateConnector() { - when(getAdminClient.getRequestDetails(any(), eq(KafkaSupportedProtocol.PLAINTEXT))) + when(getAdminClient.getRequestDetails( + eq("env/connectors/conn1/config"), eq(KafkaSupportedProtocol.PLAINTEXT))) .thenReturn(Pair.of("/env/connectors/conn1/config", restTemplate)); when(getAdminClient.createHeaders(eq("1"), eq(KafkaClustersType.KAFKA_CONNECT))) .thenReturn(new HttpHeaders()); @@ -238,4 +381,27 @@ private ClusterConnectorRequest stubUpdateConnector() { .build(); return connectorRequest; } + + private ClusterConnectorRequest stubUpdateConnectorMultiUrl() { + when(getAdminClient.getRequestDetails( + eq("env/connectors/conn1/config"), eq(KafkaSupportedProtocol.PLAINTEXT))) + .thenReturn(Pair.of("/env/connectors/conn1/config", restTemplate)); + when(getAdminClient.getRequestDetails( + eq("env1/connectors/conn1/config"), eq(KafkaSupportedProtocol.PLAINTEXT))) + .thenReturn(Pair.of("/env1/connectors/conn1/config", restTemplate)); + when(getAdminClient.getRequestDetails( + eq("env2/connectors/conn1/config"), eq(KafkaSupportedProtocol.PLAINTEXT))) + .thenReturn(Pair.of("/env2/connectors/conn1/config", restTemplate)); + + when(getAdminClient.createHeaders(eq("1"), eq(KafkaClustersType.KAFKA_CONNECT))) + .thenReturn(new HttpHeaders()); + ClusterConnectorRequest connectorRequest = + ClusterConnectorRequest.builder() + .connectorName("conn1") + .clusterIdentification("1") + .env("env,env1,env2") + .protocol(KafkaSupportedProtocol.PLAINTEXT) + .build(); + return connectorRequest; + } }