From 95452455fbced70dd78f45d39f310e02369c8f21 Mon Sep 17 00:00:00 2001 From: Ankita Ranjan Date: Fri, 15 Dec 2023 11:52:06 +0530 Subject: [PATCH] draft for callback --- .../api/definition/BatchTransactions.java | 14 +- .../BatchTransactionsController.java | 20 +- .../bulk/camel/config/CamelProperties.java | 2 + .../camel/routes/BatchAggregateRoute.java | 4 +- .../bulk/camel/routes/InitSubBatchRoute.java | 6 +- .../camel/routes/ProcessorStartRoute.java | 60 ++--- .../bulk/camel/routes/SendCallbackRoute.java | 79 +++--- .../mifos/processor/bulk/kafka/Consumers.java | 232 +++++++++--------- .../kafka/config/KafkaConsumerConfig.java | 78 +++--- .../kafka/config/KafkaProducerConfig.java | 68 ++--- .../bulk/kafka/config/KafkaTopicConfig.java | 80 +++--- .../mifos/processor/bulk/schema/BatchDTO.java | 6 +- .../processor/bulk/utility/PhaseUtils.java | 2 +- .../processor/bulk/zeebe/ZeebeVariables.java | 5 +- .../bulk/zeebe/worker/SendCallbackWorker.java | 25 +- src/main/resources/application.yaml | 48 ++-- 16 files changed, 374 insertions(+), 355 deletions(-) diff --git a/src/main/java/org/mifos/processor/bulk/api/definition/BatchTransactions.java b/src/main/java/org/mifos/processor/bulk/api/definition/BatchTransactions.java index 119ed31e..d94c8de8 100644 --- a/src/main/java/org/mifos/processor/bulk/api/definition/BatchTransactions.java +++ b/src/main/java/org/mifos/processor/bulk/api/definition/BatchTransactions.java @@ -1,5 +1,12 @@ package org.mifos.processor.bulk.api.definition; +import org.springframework.web.bind.annotation.*; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.io.IOException; + +import static org.mifos.processor.bulk.camel.config.CamelProperties.*; import static org.mifos.processor.bulk.camel.config.CamelProperties.HEADER_CLIENT_CORRELATION_ID; import static org.mifos.processor.bulk.camel.config.CamelProperties.HEADER_PLATFORM_TENANT_ID; import static org.mifos.processor.bulk.camel.config.CamelProperties.HEADER_PROGRAM_ID; @@ -19,8 +26,11 @@ public interface BatchTransactions { @PostMapping(value = "/batchtransactions", produces = "application/json") String batchTransactions(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, @RequestHeader(value = HEADER_CLIENT_CORRELATION_ID) String requestId, - @RequestHeader(value = FILE_NAME, required = false) String fileName, @RequestHeader(value = PURPOSE) String purpose, - @RequestHeader(value = HEADER_TYPE) String type, @RequestHeader(value = HEADER_PLATFORM_TENANT_ID) String tenant, + @RequestHeader(value = FILE_NAME, required = false) String fileName, + @RequestHeader(value = PURPOSE) String purpose, + @RequestParam(value = HEADER_TYPE) String type, + @RequestHeader(value = HEADER_PLATFORM_TENANT_ID) String tenant, + @RequestHeader(value = CALLBACK, required = false) String callbackUrl, @RequestHeader(value = HEADER_REGISTERING_INSTITUTE_ID, required = false) String registeringInstitutionId, @RequestHeader(value = HEADER_PROGRAM_ID, required = false) String programId) throws IOException; diff --git a/src/main/java/org/mifos/processor/bulk/api/implementation/BatchTransactionsController.java b/src/main/java/org/mifos/processor/bulk/api/implementation/BatchTransactionsController.java index 6a34612c..bde81b0a 100644 --- a/src/main/java/org/mifos/processor/bulk/api/implementation/BatchTransactionsController.java +++ b/src/main/java/org/mifos/processor/bulk/api/implementation/BatchTransactionsController.java @@ -2,6 +2,7 @@ import static org.mifos.processor.bulk.camel.config.CamelProperties.HEADER_PROGRAM_ID; import static org.mifos.processor.bulk.camel.config.CamelProperties.HEADER_REGISTERING_INSTITUTE_ID; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.FILE_NAME; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.HEADER_CLIENT_CORRELATION_ID; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.HEADER_PLATFORM_TENANT_ID; @@ -39,6 +40,21 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import static org.mifos.processor.bulk.camel.config.CamelProperties.*; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import static org.mifos.processor.bulk.camel.config.CamelProperties.HEADER_PROGRAM_ID; +import static org.mifos.processor.bulk.camel.config.CamelProperties.HEADER_REGISTERING_INSTITUTE_ID; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.*; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.HEADER_CLIENT_CORRELATION_ID; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.HEADER_PLATFORM_TENANT_ID; + @Slf4j @RestController public class BatchTransactionsController implements BatchTransactions { @@ -63,12 +79,12 @@ public class BatchTransactionsController implements BatchTransactions { @SneakyThrows @Override public String batchTransactions(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, String requestId, - String fileName, String purpose, String type, String tenant, String registeringInstitutionId, String programId) { + String fileName, String purpose, String type, String tenant, String callbackUrl, String registeringInstitutionId, String programId) throws IOException { log.info("Inside api logic"); Headers.HeaderBuilder headerBuilder = new Headers.HeaderBuilder().addHeader(HEADER_CLIENT_CORRELATION_ID, requestId) .addHeader(PURPOSE, purpose).addHeader(HEADER_TYPE, type).addHeader(HEADER_PLATFORM_TENANT_ID, tenant) - .addHeader(HEADER_REGISTERING_INSTITUTE_ID, registeringInstitutionId).addHeader(HEADER_PROGRAM_ID, programId); + .addHeader(CALLBACK, callbackUrl).addHeader(HEADER_REGISTERING_INSTITUTE_ID, registeringInstitutionId).addHeader(HEADER_PROGRAM_ID, programId); Optional validationResponse = isValidRequest(httpServletRequest, fileName, type); if (validationResponse.isPresent()) { diff --git a/src/main/java/org/mifos/processor/bulk/camel/config/CamelProperties.java b/src/main/java/org/mifos/processor/bulk/camel/config/CamelProperties.java index e4c71cd0..71a4adab 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/config/CamelProperties.java +++ b/src/main/java/org/mifos/processor/bulk/camel/config/CamelProperties.java @@ -83,4 +83,6 @@ private CamelProperties() {} public static final String EVENT_TYPE = "eventType"; public static final String DUPLICATE_TRANSACTION_LIST = "duplicateTransactionList"; public static final String ORIGINAL_TRANSACTION_LIST = "originalTransactionList"; + public static final String CALLBACK = "X-CallbackURL"; + } diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java index 5d597060..decd8e56 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java @@ -52,7 +52,9 @@ public void configure() throws Exception { .log("Starting route direct:batch-aggregate-response-handler") // .setBody(exchange -> exchange.getIn().getBody(String.class)) .choice().when(header("CamelHttpResponseCode").isEqualTo("200")).log(LoggingLevel.INFO, "Batch summary request successful") - .unmarshal().json(JsonLibrary.Jackson, BatchDTO.class).process(exchange -> { + .log("Response body: ${body}") + .unmarshal().json(JsonLibrary.Jackson, BatchDTO.class) + .process(exchange -> { BatchDTO batchAggregateResponse = exchange.getIn().getBody(BatchDTO.class); int percentage = (int) (((double) batchAggregateResponse.getSuccessful() / batchAggregateResponse.getTotal()) * 100); diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java index 96512210..a6611471 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java @@ -198,12 +198,12 @@ public void configure() throws Exception { .setHeader(HEADER_CLIENT_CORRELATION_ID, simple("${exchangeProperty." + REQUEST_ID + "}")) .setHeader(HEADER_REGISTERING_INSTITUTE_ID, simple("${exchangeProperty." + HEADER_REGISTERING_INSTITUTE_ID + "}")) .process(exchange -> { - log.debug("Variables: {}", exchange.getProperties()); - log.debug("Emergency: {}", exchange.getIn().getHeaders()); + log.info("Variables: {}", exchange.getProperties()); + log.info("Emergency: {}", exchange.getIn().getHeaders()); }) .toD(channelURL + "${exchangeProperty.extEndpoint}" + "?bridgeEndpoint=true&throwExceptionOnFailure=false") - .log(LoggingLevel.DEBUG, "Response body: ${body}").otherwise().endChoice(); + .log(LoggingLevel.INFO, "Response body: ${body}").otherwise().endChoice(); from("direct:validate-payment-mode").id("direct:validate-payment-mode").log("Starting route direct:validate-payment-mode") .process(exchange -> { diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java index 374ddc67..c4672a39 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java @@ -13,36 +13,7 @@ import static org.mifos.processor.bulk.camel.config.CamelProperties.RESULT_TRANSACTION_LIST; import static org.mifos.processor.bulk.camel.config.CamelProperties.TENANT_NAME; import static org.mifos.processor.bulk.camel.config.CamelProperties.TRANSACTION_LIST; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.APPROVAL_ENABLED; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.AUTHORIZATION_ENABLED; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_AGGREGATE_ENABLED; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_ID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BULK_NOTIF_FAILURE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BULK_NOTIF_SUCCESS; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_URL; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CLIENT_CORRELATION_ID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.COMPLETION_THRESHOLD; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.COMPLETION_THRESHOLD_CHECK_ENABLED; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.DE_DUPLICATION_ENABLE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.FILE_NAME; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.FORMATTING_ENABLED; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.IS_FILE_VALID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_CALLBACK_RETRY; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_STATUS_RETRY; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MERGE_ENABLED; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.NOTE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ORDERING_ENABLED; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_LOOKUP_ENABLED; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PAYER_IDENTIFIER_TYPE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PAYER_IDENTIFIER_VALUE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASES; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASE_COUNT; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PROGRAM_NAME; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PURPOSE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.REQUEST_ID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.SPLITTING_ENABLED; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TENANT_ID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.THRESHOLD_DELAY; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.*; import com.fasterxml.jackson.core.JsonProcessingException; import java.io.BufferedReader; @@ -149,6 +120,9 @@ private void setup() { String requestId = exchange.getIn().getHeader("requestId", String.class); String purpose = exchange.getIn().getHeader("purpose", String.class); String batchId = UUID.randomUUID().toString(); + // extracting and setting callback Url + String callbackUrl = exchange.getIn().getHeader("X-CallbackURL", String.class); + exchange.setProperty(CALLBACK, callbackUrl); exchange.setProperty(BATCH_ID, batchId); exchange.setProperty(FILE_NAME, fileName); exchange.setProperty(REQUEST_ID, requestId); @@ -232,6 +206,7 @@ private void setup() { String requestId = exchange.getProperty(REQUEST_ID, String.class); String purpose = exchange.getProperty(PURPOSE, String.class); String batchId = exchange.getProperty(BATCH_ID, String.class); + String callbackUrl = exchange.getProperty(CALLBACK,String.class); String note = null; if (purpose == null || purpose.isEmpty()) { @@ -248,7 +223,7 @@ private void setup() { logger.debug("File absolute path: {}", file.getAbsolutePath()); boolean verifyData = verifyData(file); - logger.debug("Data verification result {}", verifyData); + logger.info("Data verification result {}", verifyData); if (!verifyData) { note = "Invalid data in file data processing stopped"; } @@ -257,9 +232,7 @@ private void setup() { logger.debug("File uploaded {}", nm); - // extracting and setting callback Url - String callbackUrl = exchange.getIn().getHeader("X-Callback-URL", String.class); - exchange.setProperty(CALLBACK_URL, callbackUrl); + List phases = phaseUtils.getValues(); logger.debug(phases.toString()); @@ -269,7 +242,7 @@ private void setup() { variables.put(REQUEST_ID, requestId); variables.put(PURPOSE, purpose); variables.put(TENANT_ID, exchange.getProperty(TENANT_NAME)); - variables.put(CALLBACK_URL, callbackUrl); + variables.put(CALLBACK, callbackUrl); variables.put(PHASES, phases); variables.put(PHASE_COUNT, phases.size()); variables.put(NOTE, note); @@ -322,10 +295,13 @@ private void setup() { String filename = exchange.getIn().getHeader("filename", String.class); String requestId = exchange.getIn().getHeader("X-CorrelationID", String.class); String purpose = exchange.getIn().getHeader("Purpose", String.class); - String type = exchange.getIn().getHeader("Type", String.class); + String type = exchange.getIn().getHeader("type", String.class); String clientCorrelationId = exchange.getIn().getHeader(HEADER_CLIENT_CORRELATION_ID, String.class); String registeringInstitutionId = exchange.getIn().getHeader(HEADER_REGISTERING_INSTITUTE_ID, String.class); String programId = exchange.getIn().getHeader(HEADER_PROGRAM_ID, String.class); + // extracting and setting callback Url + String callbackUrl = exchange.getIn().getHeader("X-CallbackURL", String.class); + exchange.setProperty(CALLBACK, callbackUrl); exchange.setProperty(FILE_NAME, filename); exchange.setProperty(REQUEST_ID, requestId); exchange.setProperty(PURPOSE, purpose); @@ -396,9 +372,9 @@ private boolean verifyData(File file) throws IOException { String line; br.readLine(); while ((line = br.readLine()) != null) { - String[] row = line.split(","); + String[] row = line.split(",", -1); if (row.length != columnNames.size()) { - logger.debug("Row invalid {} {}", row.length, columnNames.size()); + logger.info("Row invalid {} {}", row.length, columnNames.size()); return false; } if (!verifyRow(row)) { @@ -415,14 +391,14 @@ private boolean verifyRow(String[] row) { int j = row[i].indexOf("MSISDN"); if (!(j == row.length)) { if (!row[j + 1].matches("^[0-9]*$")) { - logger.debug("MSISDN invalid"); + logger.info("MSISDN invalid"); return false; } } } else if (row[i].contains("amount")) { int j = row[i].indexOf("amount"); if (!row[j].matches("^[0-9]*$")) { - logger.debug("Amount invalid"); + logger.info("Amount invalid"); return false; } @@ -437,12 +413,12 @@ private boolean verifyCsv(File csvData) throws IOException { String[] columns = new String[0]; if (header != null) { columns = header.split(","); - logger.debug("Columns in the csv file are {}", Arrays.toString(columns)); + logger.info("Columns in the csv file are {}", Arrays.toString(columns)); } int i = 0; while (i < columns.length) { if (columnNames.contains(columns[i])) { - logger.debug("Column name {} is at index {} ", columns[i], columnNames.indexOf(columns[i])); + logger.info("Column name {} is at index {} ", columns[i], columnNames.indexOf(columns[i])); i++; } else { diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java index 6810ca02..5f290d61 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/SendCallbackRoute.java @@ -1,19 +1,13 @@ package org.mifos.processor.bulk.camel.routes; import static org.mifos.processor.bulk.camel.config.CamelProperties.CALLBACK_RESPONSE_CODE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_RETRY; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_SUCCESS; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_URL; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.COMPLETION_RATE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_CODE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_DESCRIPTION; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_CALLBACK_RETRY; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASES; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASE_COUNT; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.*; import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; +import org.apache.camel.model.dataformat.JsonLibrary; +import org.mifos.connector.common.gsma.dto.GSMATransaction; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -34,55 +28,62 @@ public void configure() throws Exception { */ from(RouteId.SEND_CALLBACK.getValue()).id(RouteId.SEND_CALLBACK.getValue()).log("Starting route " + RouteId.SEND_CALLBACK.name()) - .log("Sending callback for Batch Processing").setBody(exchange -> { + .log("Sending callback for Batch Processing").process(exchange -> { String body = String.format("The Batch Aggregation API was complete with : %s", exchange.getProperty(COMPLETION_RATE).toString()); - callbackUrl = exchange.getProperty(CALLBACK_URL).toString(); - return body; - }).to("direct:set-jws-signature").toD("${header.callbackUrl}?bridgeEndpoint=true&throwExceptionOnFailure=false").choice() - .when(header("CamelHttpResponseCode").startsWith("2")).log(LoggingLevel.INFO, "Callback sending was successful") + callbackUrl = exchange.getProperty(CALLBACK, String.class); + logger.info("Callback URL: {}", callbackUrl); + logger.info("Callback Body: {}", body); + exchange.getIn().setBody(body); + }) + .marshal().json(JsonLibrary.Jackson, String.class) + .toD("${exchangeProperty.X-CallbackURL}?bridgeEndpoint=true&throwExceptionOnFailure=false") + .log(LoggingLevel.INFO, "Callback Response body: ${body}") + .choice() + .when(header(Exchange.HTTP_RESPONSE_CODE).regex("^2\\d{2}$")) + .log(LoggingLevel.INFO, "Callback sending was successful") .process(exchange -> { - List phases = (List) exchange.getProperty(PHASES); - + List phases = exchange.getProperty(PHASES, List.class); exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE)); exchange.setProperty(CALLBACK_RETRY, 1); exchange.setProperty(CALLBACK_SUCCESS, true); - eliminatePhases(exchange, phases, (Integer) exchange.getProperty(PHASE_COUNT), - (Integer) exchange.getProperty(COMPLETION_RATE)); - - }).otherwise().log(LoggingLevel.ERROR, "Callback request was unsuccessful").process(exchange -> { - if (exchange.getProperty(CALLBACK_RETRY).equals(exchange.getProperty(MAX_CALLBACK_RETRY))) { - List phases = (List) exchange.getProperty(PHASES); + eliminatePhases(exchange); + }) + .otherwise() + .log(LoggingLevel.ERROR, "Callback request was unsuccessful") + .process(exchange -> { + int retry = exchange.getProperty(CALLBACK_RETRY, Integer.class); + int maxRetry = exchange.getProperty(MAX_CALLBACK_RETRY, Integer.class); + if (retry >= maxRetry) { + List phases = exchange.getProperty(PHASES, List.class); logger.info("Retry Exhausted, setting Callback as Failed"); - eliminatePhases(exchange, phases, (Integer) exchange.getProperty(PHASE_COUNT), - (Integer) exchange.getProperty(COMPLETION_RATE)); + eliminatePhases(exchange); exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE)); + exchange.setProperty(CALLBACK_SUCCESS, false); + exchange.setProperty(ERROR_DESCRIPTION, exchange.getIn().getBody(String.class)); + exchange.setProperty(ERROR_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE)); } else { - int retry = (int) exchange.getProperty(CALLBACK_RETRY); retry++; logger.info("Retry Left {}, Setting Callback as Failed and Retrying...", - ((int) exchange.getProperty(MAX_CALLBACK_RETRY) - retry)); + (maxRetry - retry)); exchange.setProperty(CALLBACK_RETRY, retry); - exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE)); } + exchange.setProperty(CALLBACK_RESPONSE_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE)); exchange.setProperty(CALLBACK_SUCCESS, false); exchange.setProperty(ERROR_DESCRIPTION, exchange.getIn().getBody(String.class)); exchange.setProperty(ERROR_CODE, exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE)); - }); + }); } + public void eliminatePhases(Exchange exchange){ + List phases = exchange.getProperty(PHASES, List.class); + int completionRate = exchange.getProperty(COMPLETION_RATE, Integer.class); + + phases.removeIf(phase -> phase <= completionRate); - public void eliminatePhases(Exchange exchange, List phases, int phaseCount, int completionRate) { - int i = 0; - while (phases.size() > 0 && phases.size() > i) { - if (phases.get(i) <= completionRate) { - phases.remove(i); - } - i++; + exchange.setProperty(PHASES, phases); } - exchange.setProperty(PHASES, phases); - exchange.setProperty(PHASE_COUNT, phaseCount); - } -} + + } diff --git a/src/main/java/org/mifos/processor/bulk/kafka/Consumers.java b/src/main/java/org/mifos/processor/bulk/kafka/Consumers.java index 03769b77..b45588c6 100644 --- a/src/main/java/org/mifos/processor/bulk/kafka/Consumers.java +++ b/src/main/java/org/mifos/processor/bulk/kafka/Consumers.java @@ -1,116 +1,116 @@ -package org.mifos.processor.bulk.kafka; - -import static org.mifos.connector.common.mojaloop.type.InitiatorType.CONSUMER; -import static org.mifos.connector.common.mojaloop.type.Scenario.TRANSFER; -import static org.mifos.connector.common.mojaloop.type.TransactionRole.PAYER; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_ID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.GSMA_CHANNEL_REQUEST; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.INITIATOR_FSPID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.IS_RTP_REQUEST; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID_TYPE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_LOOKUP_FSPID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TENANT_ID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TRANSACTION_TYPE; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.HashMap; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.mifos.connector.common.channel.dto.TransactionChannelRequestDTO; -import org.mifos.connector.common.gsma.dto.GSMATransaction; -import org.mifos.connector.common.gsma.dto.GsmaParty; -import org.mifos.connector.common.mojaloop.dto.MoneyData; -import org.mifos.connector.common.mojaloop.dto.Party; -import org.mifos.connector.common.mojaloop.dto.PartyIdInfo; -import org.mifos.connector.common.mojaloop.dto.TransactionType; -import org.mifos.connector.common.mojaloop.type.IdentifierType; -import org.mifos.processor.bulk.schema.TransactionOlder; -import org.mifos.processor.bulk.zeebe.ZeebeProcessStarter; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -@Slf4j -public class Consumers { - - @Value("${bpmn.flows.international-remittance-payer}") - private String internationalRemittancePayer; - - @Autowired - private ObjectMapper objectMapper; - - @Autowired - private ZeebeProcessStarter zeebeProcessStarter; - - @KafkaListener(topics = "${kafka.topic.gsma.name}", groupId = "group_id") - public void listenTopicGsma(String message) throws JsonProcessingException { - log.debug("Received Message in topic GSMA and group group_id: {}", message); - TransactionOlder transaction = objectMapper.readValue(message, TransactionOlder.class); - String tenantId = "ibank-usa"; - - GSMATransaction gsmaChannelRequest = new GSMATransaction(); - gsmaChannelRequest.setAmount(transaction.getAmount()); - gsmaChannelRequest.setCurrency(transaction.getCurrency()); - gsmaChannelRequest.setRequestingLei("ibank-usa"); - gsmaChannelRequest.setReceivingLei("ibank-india"); - GsmaParty creditParty = new GsmaParty(); - creditParty.setKey("msisdn"); - creditParty.setValue(transaction.getAccountNumber()); - GsmaParty debitParty = new GsmaParty(); - debitParty.setKey("msisdn"); - debitParty.setValue(transaction.getAccountNumber()); - gsmaChannelRequest.setCreditParty(new GsmaParty[] { creditParty }); - gsmaChannelRequest.setDebitParty(new GsmaParty[] { debitParty }); - // gsmaChannelRequest.setInternationalTransferInformation().setReceivingAmount(gsmaChannelRequest.getAmount()); - - TransactionChannelRequestDTO channelRequest = new TransactionChannelRequestDTO(); // Fineract Object - Party payee = new Party(new PartyIdInfo(IdentifierType.MSISDN, transaction.getAccountNumber())); - Party payer = new Party(new PartyIdInfo(IdentifierType.MSISDN, "7543010")); - - MoneyData moneyData = new MoneyData(); - moneyData.setAmount(transaction.getAmount()); - moneyData.setCurrency(transaction.getCurrency()); - - channelRequest.setPayer(payer); - channelRequest.setPayee(payee); - channelRequest.setAmount(moneyData); - - TransactionType transactionType = new TransactionType(); - transactionType.setInitiator(PAYER); - transactionType.setInitiatorType(CONSUMER); - transactionType.setScenario(TRANSFER); - - Map extraVariables = new HashMap<>(); - extraVariables.put(IS_RTP_REQUEST, false); - extraVariables.put(TRANSACTION_TYPE, "inttransfer"); - extraVariables.put(TENANT_ID, tenantId); - - extraVariables.put(BATCH_ID, transaction.getBatchId()); - - String tenantSpecificBpmn = internationalRemittancePayer.replace("{dfspid}", tenantId); - channelRequest.setTransactionType(transactionType); - - PartyIdInfo requestedParty = (boolean) extraVariables.get(IS_RTP_REQUEST) ? channelRequest.getPayer().getPartyIdInfo() - : channelRequest.getPayee().getPartyIdInfo(); - extraVariables.put(PARTY_ID_TYPE, requestedParty.getPartyIdType()); - extraVariables.put(PARTY_ID, requestedParty.getPartyIdentifier()); - - extraVariables.put(GSMA_CHANNEL_REQUEST, objectMapper.writeValueAsString(gsmaChannelRequest)); - extraVariables.put(PARTY_LOOKUP_FSPID, gsmaChannelRequest.getReceivingLei()); - extraVariables.put(INITIATOR_FSPID, gsmaChannelRequest.getRequestingLei()); - - String transactionId = zeebeProcessStarter.startZeebeWorkflow(tenantSpecificBpmn, objectMapper.writeValueAsString(channelRequest), - extraVariables); - - log.debug("GSMA Transaction Started with:{} ", transactionId); - } - - @KafkaListener(topics = "${kafka.topic.slcb.name}", groupId = "group_id") - public void listenTopicSlcb(String message) { - log.debug("Received Message in topic SLCB and group group_id:{} ", message); - } -} +//package org.mifos.processor.bulk.kafka; +// +//import static org.mifos.connector.common.mojaloop.type.InitiatorType.CONSUMER; +//import static org.mifos.connector.common.mojaloop.type.Scenario.TRANSFER; +//import static org.mifos.connector.common.mojaloop.type.TransactionRole.PAYER; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_ID; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.GSMA_CHANNEL_REQUEST; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.INITIATOR_FSPID; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.IS_RTP_REQUEST; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID_TYPE; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_LOOKUP_FSPID; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TENANT_ID; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TRANSACTION_TYPE; +// +//import com.fasterxml.jackson.core.JsonProcessingException; +//import com.fasterxml.jackson.databind.ObjectMapper; +//import java.util.HashMap; +//import java.util.Map; +//import lombok.extern.slf4j.Slf4j; +//import org.mifos.connector.common.channel.dto.TransactionChannelRequestDTO; +//import org.mifos.connector.common.gsma.dto.GSMATransaction; +//import org.mifos.connector.common.gsma.dto.GsmaParty; +//import org.mifos.connector.common.mojaloop.dto.MoneyData; +//import org.mifos.connector.common.mojaloop.dto.Party; +//import org.mifos.connector.common.mojaloop.dto.PartyIdInfo; +//import org.mifos.connector.common.mojaloop.dto.TransactionType; +//import org.mifos.connector.common.mojaloop.type.IdentifierType; +//import org.mifos.processor.bulk.schema.TransactionOlder; +//import org.mifos.processor.bulk.zeebe.ZeebeProcessStarter; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.stereotype.Service; +// +//@Service +//@Slf4j +//public class Consumers { +// +// @Value("${bpmn.flows.international-remittance-payer}") +// private String internationalRemittancePayer; +// +// @Autowired +// private ObjectMapper objectMapper; +// +// @Autowired +// private ZeebeProcessStarter zeebeProcessStarter; +// +// @KafkaListener(topics = "${kafka.topic.gsma.name}", groupId = "group_id") +// public void listenTopicGsma(String message) throws JsonProcessingException { +// log.debug("Received Message in topic GSMA and group group_id: {}", message); +// TransactionOlder transaction = objectMapper.readValue(message, TransactionOlder.class); +// String tenantId = "ibank-usa"; +// +// GSMATransaction gsmaChannelRequest = new GSMATransaction(); +// gsmaChannelRequest.setAmount(transaction.getAmount()); +// gsmaChannelRequest.setCurrency(transaction.getCurrency()); +// gsmaChannelRequest.setRequestingLei("ibank-usa"); +// gsmaChannelRequest.setReceivingLei("ibank-india"); +// GsmaParty creditParty = new GsmaParty(); +// creditParty.setKey("msisdn"); +// creditParty.setValue(transaction.getAccountNumber()); +// GsmaParty debitParty = new GsmaParty(); +// debitParty.setKey("msisdn"); +// debitParty.setValue(transaction.getAccountNumber()); +// gsmaChannelRequest.setCreditParty(new GsmaParty[] { creditParty }); +// gsmaChannelRequest.setDebitParty(new GsmaParty[] { debitParty }); +// // gsmaChannelRequest.setInternationalTransferInformation().setReceivingAmount(gsmaChannelRequest.getAmount()); +// +// TransactionChannelRequestDTO channelRequest = new TransactionChannelRequestDTO(); // Fineract Object +// Party payee = new Party(new PartyIdInfo(IdentifierType.MSISDN, transaction.getAccountNumber())); +// Party payer = new Party(new PartyIdInfo(IdentifierType.MSISDN, "7543010")); +// +// MoneyData moneyData = new MoneyData(); +// moneyData.setAmount(transaction.getAmount()); +// moneyData.setCurrency(transaction.getCurrency()); +// +// channelRequest.setPayer(payer); +// channelRequest.setPayee(payee); +// channelRequest.setAmount(moneyData); +// +// TransactionType transactionType = new TransactionType(); +// transactionType.setInitiator(PAYER); +// transactionType.setInitiatorType(CONSUMER); +// transactionType.setScenario(TRANSFER); +// +// Map extraVariables = new HashMap<>(); +// extraVariables.put(IS_RTP_REQUEST, false); +// extraVariables.put(TRANSACTION_TYPE, "inttransfer"); +// extraVariables.put(TENANT_ID, tenantId); +// +// extraVariables.put(BATCH_ID, transaction.getBatchId()); +// +// String tenantSpecificBpmn = internationalRemittancePayer.replace("{dfspid}", tenantId); +// channelRequest.setTransactionType(transactionType); +// +// PartyIdInfo requestedParty = (boolean) extraVariables.get(IS_RTP_REQUEST) ? channelRequest.getPayer().getPartyIdInfo() +// : channelRequest.getPayee().getPartyIdInfo(); +// extraVariables.put(PARTY_ID_TYPE, requestedParty.getPartyIdType()); +// extraVariables.put(PARTY_ID, requestedParty.getPartyIdentifier()); +// +// extraVariables.put(GSMA_CHANNEL_REQUEST, objectMapper.writeValueAsString(gsmaChannelRequest)); +// extraVariables.put(PARTY_LOOKUP_FSPID, gsmaChannelRequest.getReceivingLei()); +// extraVariables.put(INITIATOR_FSPID, gsmaChannelRequest.getRequestingLei()); +// +// String transactionId = zeebeProcessStarter.startZeebeWorkflow(tenantSpecificBpmn, objectMapper.writeValueAsString(channelRequest), +// extraVariables); +// +// log.debug("GSMA Transaction Started with:{} ", transactionId); +// } +// +// @KafkaListener(topics = "${kafka.topic.slcb.name}", groupId = "group_id") +// public void listenTopicSlcb(String message) { +// log.debug("Received Message in topic SLCB and group group_id:{} ", message); +// } +//} diff --git a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaConsumerConfig.java b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaConsumerConfig.java index e9694b89..bb04a764 100644 --- a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaConsumerConfig.java +++ b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaConsumerConfig.java @@ -1,39 +1,39 @@ -package org.mifos.processor.bulk.kafka.config; - -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; - -@EnableKafka -@Configuration -public class KafkaConsumerConfig { - - @Value(value = "${kafka.bootstrapAddress}") - private String bootstrapAddress; - - @Bean - public ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return new DefaultKafkaConsumerFactory<>(props); - } - - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); - return factory; - } - -} +//package org.mifos.processor.bulk.kafka.config; +// +//import java.util.HashMap; +//import java.util.Map; +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.core.ConsumerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +// +//@EnableKafka +//@Configuration +//public class KafkaConsumerConfig { +// +// @Value(value = "${kafka.bootstrapAddress}") +// private String bootstrapAddress; +// +// @Bean +// public ConsumerFactory consumerFactory() { +// Map props = new HashMap<>(); +// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); +// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); +// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// return new DefaultKafkaConsumerFactory<>(props); +// } +// +// @Bean +// public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// return factory; +// } +// +//} diff --git a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaProducerConfig.java b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaProducerConfig.java index 11b9aa2a..5caa219d 100644 --- a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaProducerConfig.java +++ b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaProducerConfig.java @@ -1,34 +1,34 @@ -package org.mifos.processor.bulk.kafka.config; - -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; - -@Configuration -public class KafkaProducerConfig { - - @Value(value = "${kafka.bootstrapAddress}") - private String bootstrapAddress; - - @Bean - public ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - return new DefaultKafkaProducerFactory<>(configProps); - } - - @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); - } - -} +//package org.mifos.processor.bulk.kafka.config; +// +//import java.util.HashMap; +//import java.util.Map; +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.common.serialization.StringSerializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.core.DefaultKafkaProducerFactory; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.kafka.core.ProducerFactory; +// +//@Configuration +//public class KafkaProducerConfig { +// +// @Value(value = "${kafka.bootstrapAddress}") +// private String bootstrapAddress; +// +// @Bean +// public ProducerFactory producerFactory() { +// Map configProps = new HashMap<>(); +// configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); +// configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// return new DefaultKafkaProducerFactory<>(configProps); +// } +// +// @Bean +// public KafkaTemplate kafkaTemplate() { +// return new KafkaTemplate<>(producerFactory()); +// } +// +//} diff --git a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaTopicConfig.java b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaTopicConfig.java index 7d2f2ff7..f7dd2735 100644 --- a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaTopicConfig.java +++ b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaTopicConfig.java @@ -1,40 +1,40 @@ -package org.mifos.processor.bulk.kafka.config; - -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.NewTopic; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.KafkaAdmin; - -@Configuration -public class KafkaTopicConfig { - - @Value(value = "${kafka.bootstrapAddress}") - private String bootstrapAddress; - - @Value(value = "${kafka.topic.gsma.name}") - private String gsmaTopicName; - - @Value(value = "${kafka.topic.slcb.name}") - private String slcbTopicName; - - @Bean - public KafkaAdmin kafkaAdmin() { - Map configs = new HashMap<>(); - configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - return new KafkaAdmin(configs); - } - - @Bean - public NewTopic gsmaTopic() { - return new NewTopic(gsmaTopicName, 1, (short) 1); - } - - @Bean - public NewTopic slcbTopic() { - return new NewTopic(slcbTopicName, 1, (short) 1); - } -} +//package org.mifos.processor.bulk.kafka.config; +// +//import java.util.HashMap; +//import java.util.Map; +//import org.apache.kafka.clients.admin.AdminClientConfig; +//import org.apache.kafka.clients.admin.NewTopic; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.core.KafkaAdmin; +// +//@Configuration +//public class KafkaTopicConfig { +// +// @Value(value = "${kafka.bootstrapAddress}") +// private String bootstrapAddress; +// +// @Value(value = "${kafka.topic.gsma.name}") +// private String gsmaTopicName; +// +// @Value(value = "${kafka.topic.slcb.name}") +// private String slcbTopicName; +// +// @Bean +// public KafkaAdmin kafkaAdmin() { +// Map configs = new HashMap<>(); +// configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); +// return new KafkaAdmin(configs); +// } +// +// @Bean +// public NewTopic gsmaTopic() { +// return new NewTopic(gsmaTopicName, 1, (short) 1); +// } +// +// @Bean +// public NewTopic slcbTopic() { +// return new NewTopic(slcbTopicName, 1, (short) 1); +// } +//} diff --git a/src/main/java/org/mifos/processor/bulk/schema/BatchDTO.java b/src/main/java/org/mifos/processor/bulk/schema/BatchDTO.java index 4212c75c..2bd0402a 100644 --- a/src/main/java/org/mifos/processor/bulk/schema/BatchDTO.java +++ b/src/main/java/org/mifos/processor/bulk/schema/BatchDTO.java @@ -12,9 +12,9 @@ @NoArgsConstructor public class BatchDTO { - private String batchId; + private String batch_id; - private String requestId; + private String request_id; private Long total; @@ -36,7 +36,7 @@ public class BatchDTO { private String notes; - private String createdAt; + private String created_at; private String status; diff --git a/src/main/java/org/mifos/processor/bulk/utility/PhaseUtils.java b/src/main/java/org/mifos/processor/bulk/utility/PhaseUtils.java index ccbe3539..b281f8a9 100644 --- a/src/main/java/org/mifos/processor/bulk/utility/PhaseUtils.java +++ b/src/main/java/org/mifos/processor/bulk/utility/PhaseUtils.java @@ -5,7 +5,7 @@ import org.springframework.stereotype.Component; @Component -@ConfigurationProperties(prefix = "callbackphases") +@ConfigurationProperties(prefix = "callback-phases") public class PhaseUtils { private List values; diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java b/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java index b08d94ee..278d76e7 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/ZeebeVariables.java @@ -107,7 +107,8 @@ private ZeebeVariables() {} public static final String CALLBACK_SUCCESS = "callbackSuccessful"; - public static final String CALLBACK_URL = "callbackUrl"; + public static final String CALLBACK_URL = "X-CallbackURL"; + public static final String MAX_CALLBACK_RETRY = "maxCallbackRetry"; @@ -135,7 +136,7 @@ private ZeebeVariables() {} public static final String PAYER_IDENTIFIER_TYPE = "payerIdentifierType"; public static final String PAYER_IDENTIFIER_VALUE = "payerIdentifier"; public static final String HEADER_CLIENT_CORRELATION_ID = "X-CorrelationID"; - public static final String HEADER_TYPE = "Type"; + public static final String HEADER_TYPE = "type"; public static final String HEADER_PLATFORM_TENANT_ID = "Platform-TenantId"; public static final String AUTHORIZATION_SUCCESSFUL = "authorizationSuccessful"; diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java index cb362b06..4bdf04ff 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java @@ -3,7 +3,7 @@ import static org.mifos.processor.bulk.camel.config.CamelProperties.CALLBACK_RESPONSE_CODE; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_RETRY; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_SUCCESS; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_URL; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.COMPLETION_RATE; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_CODE; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_DESCRIPTION; @@ -30,14 +30,21 @@ public void setup() { int retry = variables.getOrDefault(CALLBACK_RETRY, 0).equals(variables.get(MAX_STATUS_RETRY)) ? 0 : (int) variables.getOrDefault(CALLBACK_RETRY, 0); Exchange exchange = new DefaultExchange(camelContext); - exchange.setProperty(MAX_CALLBACK_RETRY, variables.get(MAX_CALLBACK_RETRY)); - exchange.setProperty(CALLBACK_RETRY, retry); - exchange.setProperty(CALLBACK_URL, variables.get(CALLBACK_URL)); - exchange.setProperty(COMPLETION_RATE, variables.get(COMPLETION_RATE)); - exchange.setProperty(PHASES, variables.get(PHASES)); - exchange.setProperty(PHASE_COUNT, variables.get(PHASE_COUNT)); - sendToCamelRoute(RouteId.SEND_CALLBACK, exchange); - + if(variables.get(CALLBACK_RETRY) != null + && variables.get(CALLBACK_RETRY).equals(variables.get(MAX_CALLBACK_RETRY))){ + exchange.setProperty(CALLBACK_SUCCESS,false); + exchange.setProperty(CALLBACK_RESPONSE_CODE,variables.get(CALLBACK_RESPONSE_CODE)); + } + else { + exchange = new DefaultExchange(camelContext); + exchange.setProperty(MAX_CALLBACK_RETRY, variables.get(MAX_CALLBACK_RETRY)); + exchange.setProperty(CALLBACK_RETRY, variables.getOrDefault(CALLBACK_RETRY, 0)); + exchange.setProperty(CALLBACK, variables.get(CALLBACK)); + exchange.setProperty(COMPLETION_RATE, variables.get(COMPLETION_RATE)); + exchange.setProperty(PHASES, variables.get(PHASES)); + exchange.setProperty(PHASE_COUNT, variables.get(PHASE_COUNT)); + sendToCamelRoute(RouteId.SEND_CALLBACK, exchange); + } Boolean callbackSuccess = exchange.getProperty(CALLBACK_SUCCESS, Boolean.class); if (callbackSuccess == null || !callbackSuccess) { variables.put(ERROR_CODE, exchange.getProperty(ERROR_CODE)); diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 879f43bf..74a869ca 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -1,6 +1,6 @@ camel: - server-port: 5000 - disable-ssl: false + server-port: 5001 + disable-ssl: true springboot: main-run-controller: true dataformat: @@ -8,7 +8,7 @@ camel: auto-discover-object-mapper: true kafka: - bootstrapAddress: "kafka:9092" +# bootstrapAddress: "kafka:9092" topic: gsma: name: gsma @@ -26,10 +26,10 @@ zeebe: # number-of-workers: 8 # evenly-allocated-max-jobs: "#{${zeebe.client.max-execution-threads} / ${zeebe.client.number-of-workers}}" broker: - contactpoint: "zeebe-zeebe-gateway:26500" + contactpoint: "localhost:26500" operations-app: - contactpoint: "https://ops-bk.sandbox.mifos.io" + contactpoint: "http://localhost:57170" username: "mifos" password: "password" endpoints: @@ -44,15 +44,15 @@ mock-payment-schema: authorization: "/batches/" channel: - hostname: "https://ph-ee-connector-channel:8443" + hostname: "http://localhost:8444" cloud: aws: enabled: true s3-base-url: "https://paymenthub-ee.s3.ap-south-1.amazonaws.com" credentials: - access-key: ${AWS_ACCESS_KEY:access_key_from_aws} - secret-key: ${AWS_SECRET_KEY:secret_key_from_aws} + access-key: AKIAWDP6SN37EABSGFWS + secret-key: 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 region: static: ap-south-1 @@ -89,31 +89,31 @@ config: batchAggregate: enable: true partylookup: - enable: true + enable: false authorization: - enabled: true + enabled: false approval: - enable: true + enable: false ordering: - enable: true + enable: false field: "payerIdentifier" splitting: - enable: false + enable: true sub-batch-size: 5 formatting: enable: false standard: "DEFAULT" mergeback: - enable: false + enable: true backpressure: enable: false completion-threshold-check: - enable: false + enable: true completion-threshold: 95 # in percentage max-retry: 4 #can be as high as 30 delay: 2 # in seconds deduplication: - enabled: true + enabled: false callback: max-retry: 3 @@ -131,12 +131,12 @@ callback-phases: - 100 server: - ssl: - key-alias: "tomcat-https" - key-store: "classpath:keystore.jks" - key-store-type: JKS - key-password: "password" - key-store-password: "password" +# ssl: +# key-alias: "tomcat-https" +# key-store: "classpath:keystore.jks" +# key-store-type: JKS +# key-password: "password" +# key-store-password: "password" port: 8443 @@ -213,3 +213,7 @@ security-server: gov-stack-client: header-key: "X-GovStack-Client" header-value: "PAYMENT-BB" + +logging: + level: + root: INFO