Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional Contention Classification database table columns #2187

Merged
merged 22 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
114df77
minor change
chengjie8 Nov 2, 2023
16619b2
refactor and populate new fields using faker
chengjie8 Nov 5, 2023
2321592
added backend code change
chengjie8 Nov 6, 2023
c13a767
WIP
chengjie8 Nov 8, 2023
291c5bc
refactor
chengjie8 Nov 27, 2023
2b93c19
merge from develop
chengjie8 Nov 28, 2023
c257841
added BieMessageUtils to simplify process of populating java object u…
chengjie8 Nov 29, 2023
0ceb82b
minor field and data type updates
chengjie8 Nov 29, 2023
8df10f6
mionr change and resolve some lint issues
chengjie8 Nov 29, 2023
6e6f0d0
Merge branch 'develop' into chengjie8/issue-2085
chengjie8 Nov 29, 2023
2b79bf4
Merge branch 'develop' of https://github.com/department-of-veterans-a…
chengjie8 Dec 4, 2023
19b060b
fix minor issues and added kafka script for local docker build
chengjie8 Dec 4, 2023
16c2d30
added in missing env variables for local kafka integration test
chengjie8 Dec 4, 2023
651b8ec
added null check
chengjie8 Dec 4, 2023
b93a974
Merge branch 'develop' into chengjie8/issue-2085
chengjie8 Dec 4, 2023
da8461f
Merge branch 'develop' into chengjie8/issue-2085
chengjie8 Dec 4, 2023
8eadff8
fix file name
chengjie8 Dec 5, 2023
dd464c4
Merge branch 'develop' of https://github.com/department-of-veterans-a…
chengjie8 Dec 5, 2023
06a163e
Merge branch 'develop' into chengjie8/issue-2085
Dec 6, 2023
c0866dc
minor change
chengjie8 Dec 6, 2023
c6ee69b
Merge branch 'chengjie8/issue-2085' of https://github.com/department-…
chengjie8 Dec 6, 2023
880a063
Merge branch 'develop' into chengjie8/issue-2085
chengjie8 Dec 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
ALTER TABLE "bie_contention_event"
ADD COLUMN "date_added" TIMESTAMP,
ADD COLUMN "date_completed" TIMESTAMP,
ADD COLUMN "date_updated" TIMESTAMP,
ADD COLUMN "actor_station" VARCHAR(255),
ADD COLUMN "automation_indicator" boolean,
ADD COLUMN "benefit_claim_type_code" VARCHAR(255),
ADD COLUMN "contention_status_type_code" VARCHAR(255),
ADD COLUMN "current_lifecycle_status" VARCHAR(255),
ADD COLUMN "clmnt_txt" VARCHAR(255),
ADD COLUMN "details" VARCHAR(255),
ADD COLUMN "event_time" TIMESTAMP,
ADD COLUMN "journal_status_type_code" VARCHAR(255),
ADD COLUMN "veteran_participant_id" VARCHAR(255),
ADD COLUMN "event_details" JSON;
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,27 @@ public ContentionEventEntity saveContentionEvent(final BieMessagePayload bieMess
contentionEventEntity.setDiagnosticTypeCode(bieMessagePayload.getDiagnosticTypeCode());
contentionEventEntity.setContentionClassificationName(
bieMessagePayload.getContentionClassificationName());
contentionEventEntity.setOccurredAt(convertTime(bieMessagePayload.getOccurredAt()));
contentionEventEntity.setOccurredAt(convertTime(bieMessagePayload.getEventTime()));
contentionEventEntity.setDateAdded(convertTime(bieMessagePayload.getDateAdded()));
contentionEventEntity.setDateCompleted(convertTime(bieMessagePayload.getDateCompleted()));
contentionEventEntity.setDateUpdated(convertTime(bieMessagePayload.getDateUpdated()));
contentionEventEntity.setActorStation(bieMessagePayload.getActorStation());
contentionEventEntity.setAutomationIndicator(bieMessagePayload.getAutomationIndicator());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not chain these set statements? that is

contentionEventEntity.setOccurredAt(...) .setDateAdded(...) . . .

contentionEventEntity.setBenefitClaimTypeCode(bieMessagePayload.getBenefitClaimTypeCode());
contentionEventEntity.setContentionStatusTypeCode(
bieMessagePayload.getContentionStatusTypeCode());
contentionEventEntity.setCurrentLifecycleStatus(bieMessagePayload.getCurrentLifecycleStatus());
contentionEventEntity.setDetails(bieMessagePayload.getDetails());
contentionEventEntity.setEventTime(convertTime(bieMessagePayload.getEventTime()));
contentionEventEntity.setJournalStatusTypeCode(bieMessagePayload.getJournalStatusTypeCode());
contentionEventEntity.setVeteranParticipantId(bieMessagePayload.getVeteranParticipantId());

return contentionEventRepository.save(contentionEventEntity);
}

public LocalDateTime convertTime(long time) {
public LocalDateTime convertTime(Long time) {
if(time == null) return null;

return LocalDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void saveContentionEvent() {
bieMessagePayload.getContentionClassificationName(),
entity.getContentionClassificationName());
assertEquals(bieMessagePayload.getDiagnosticTypeCode(), entity.getDiagnosticTypeCode());
assertEquals(dbHelper.convertTime(bieMessagePayload.getOccurredAt()), entity.getOccurredAt());
assertEquals(dbHelper.convertTime(bieMessagePayload.getEventTime()), entity.getOccurredAt());
assertEquals(dbHelper.convertTime(bieMessagePayload.getNotifiedAt()), entity.getNotifiedAt());
}
}
10 changes: 10 additions & 0 deletions scripts/kafka-service.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

export COMPOSE_PROFILES='kafka'
source scripts/setenv.sh

./gradlew docker
./gradlew :dockerComposeUp
./gradlew app:dockerComposeUp
./gradlew -p mocks docker
./gradlew -p mocks :dockerComposeUp
3 changes: 3 additions & 0 deletions scripts/setenv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ export BIP_EVIDENCE_SECRET=daSecret
#export BIP_APPLICATION_ID=VRO
export BIP_STATION_ID=456

export KEYSTORE_FILE="keystore.p12"
export TRUSTSTORE_FILE="truststore.p12"

###
### Integration with BIE's Kafka ###
##
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,32 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
@EqualsAndHashCode
public class BieMessagePayload {
// these are vro fields
private Integer status;
private String statusMessage;
private ContentionEvent eventType;
private long claimId;
private long contentionId;
private Long notifiedAt;

// populated from kafka topic payload
private String benefitClaimTypeCode;
private String actorStation;
private String details;
private Long veteranParticipantId;
private Long claimId;
private Long contentionId;
private String contentionTypeCode;
private String contentionClassificationName;
private String diagnosticTypeCode;
private String actionName;
private String actionResultName;
private long notifiedAt;
private long occurredAt;
private Boolean automationIndicator;
private String contentionStatusTypeCode;
private String currentLifecycleStatus;
private Long eventTime;
private String clmntTxt;
private String journalStatusTypeCode;
private Long dateAdded;
private Long dateCompleted;
private Long dateUpdated;
private String eventDetails;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,65 @@
import java.util.concurrent.TimeUnit;

public class BieMessagePayloadFactory {

private static final Faker faker = new Faker();

public static BieMessagePayload create() {
private static BieMessagePayload createPayload(ContentionEvent eventType) {
return BieMessagePayload.builder()
.eventType(ContentionEvent.CONTENTION_ASSOCIATED_TO_CLAIM)
.eventType(eventType)
.claimId(faker.random().nextLong())
.contentionClassificationName(faker.lorem().word())
.contentionTypeCode(faker.lorem().characters(10))
.contentionId(faker.random().nextLong())
.diagnosticTypeCode(faker.lorem().characters(10))
.occurredAt(faker.date().past(60, TimeUnit.DAYS).getTime())
.eventTime(faker.date().past(60, TimeUnit.DAYS).getTime())
.notifiedAt(faker.date().past(60, TimeUnit.DAYS).getTime())
.actionName(faker.lorem().characters(10))
.actionResultName(faker.lorem().characters(10))
.status(200)
.build();
}

private static void setCommonPayloadValues(BieMessagePayload payload) {
payload.setBenefitClaimTypeCode(faker.lorem().characters(10));
payload.setActorStation(faker.lorem().characters(200));
payload.setDetails(faker.lorem().characters(200));
payload.setVeteranParticipantId(faker.random().nextLong());
payload.setAutomationIndicator(faker.bool().bool());
payload.setContentionStatusTypeCode(faker.lorem().characters(5));
payload.setCurrentLifecycleStatus(faker.lorem().characters(8));
payload.setEventTime(faker.date().past(60, TimeUnit.DAYS).getTime());
}

private static void setDeletedPayloadValues(BieMessagePayload payload) {
payload.setAutomationIndicator(faker.bool().bool());
payload.setContentionStatusTypeCode(faker.lorem().characters(5));
payload.setCurrentLifecycleStatus(faker.lorem().characters(8));
payload.setEventTime(faker.date().past(60, TimeUnit.DAYS).getTime());
}

public static BieMessagePayload create() {
// generate a random contention classification event type
ContentionEvent eventType = faker.options().option(ContentionEvent.values());

BieMessagePayload payload = createPayload(eventType);

switch (eventType) {
case CONTENTION_ASSOCIATED_TO_CLAIM:
case CONTENTION_CLASSIFIED:
setCommonPayloadValues(payload);
break;
case CONTENTION_UPDATED:
setCommonPayloadValues(payload);
payload.setJournalStatusTypeCode(faker.lorem().characters(5));
break;
case CONTENTION_DELETED:
setDeletedPayloadValues(payload);
break;
case CONTENTION_COMPLETED:
// No special handling for CONTENTION_COMPLETED
break;
}

return payload;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,43 @@ public class ContentionEventEntity extends BaseEntity {

@Column(name = "diagnostic_type_code")
private String diagnosticTypeCode;

@Column(name = "date_added")
private LocalDateTime dateAdded;

@Column(name = "date_completed")
private LocalDateTime dateCompleted;

@Column(name = "date_updated")
private LocalDateTime dateUpdated;

@Column(name = "actor_station")
private String actorStation;

@Column(name = "automation_indicator")
private boolean automationIndicator;

@Column(name = "benefit_claim_type_code")
private String benefitClaimTypeCode;

@Column(name = "contention_status_type_code")
private String contentionStatusTypeCode;

@Column(name = "current_lifecycle_status")
private String currentLifecycleStatus;

@Column(name = "details")
private String details;

@Column(name = "event_time")
private LocalDateTime eventTime;

@Column(name = "journal_status_type_code")
private String journalStatusTypeCode;

@Column(name = "veteran_participant_id")
private long veteranParticipantId;

@Column(name = "event_details")
private String eventDetails;
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void sendEventToKafkaTopic() throws InterruptedException, IOException {
// Message 2 comes through Kafka
BieMessagePayload kafkaEventBody = BieMessagePayloadFactory.create();
kafkaEventBody.setEventType(null);
kafkaEventBody.setContentionId(1234567890);
kafkaEventBody.setContentionId(1234567890L);

ObjectMapper objectMapper = new ObjectMapper();
val kafkaSentMessage = objectMapper.writeValueAsString(kafkaEventBody);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import gov.va.vro.model.biekafka.ContentionEvent;
import gov.va.vro.services.bie.config.BieProperties;
import gov.va.vro.services.bie.service.AmqpMessageSender;
import gov.va.vro.services.bie.utils.BieMessageUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -31,6 +32,7 @@ public void consume(ConsumerRecord<String, Object> record) {
log.info("Topic name: {}", topicName);
if (record.value() instanceof GenericRecord) {
payload = this.handleGenericRecord(record);
log.info("kafka payload: {}", payload);
} else if (record.value() instanceof String stringPayload) {
log.info("Consumed message string value (before) json conversion: {}", stringPayload);
payload = this.handleStringRecord(record);
Expand All @@ -56,28 +58,12 @@ private BieMessagePayload handleStringRecord(ConsumerRecord<String, Object> reco

private BieMessagePayload handleGenericRecord(ConsumerRecord<String, Object> record) {
GenericRecord messageValue = (GenericRecord) record.value();
String KEY_CONTENTION_CLASSIFICATION_NAME = "ContentionClassificationName";
String KEY_DIAGNOSTIC_TYPE_CODE = "DiagnosticTypeCode";
String KEY_CLAIM_ID = "ClaimId";
String KEY_CONTENTION_ID = "ContentionId";
String KEY_CONTENTION_TYPE_CODE = "ContentionTypeCode";
String KEY_EVENT_TIME = "EventTime";
String ACTION_NAME = "ActionName";
String ACTION_RESULT_NAME = "ActionResultName";

return BieMessagePayload.builder()
.eventType(
ContentionEvent.valueOf(ContentionEvent.mapTopicToEvent(record.topic()).toString()))
.claimId((long) messageValue.get(KEY_CLAIM_ID))
.contentionId((long) messageValue.get(KEY_CONTENTION_ID))
.contentionClassificationName((String) messageValue.get(KEY_CONTENTION_CLASSIFICATION_NAME))
.contentionTypeCode((String) messageValue.get(KEY_CONTENTION_TYPE_CODE))
.diagnosticTypeCode((String) messageValue.get(KEY_DIAGNOSTIC_TYPE_CODE))
.occurredAt((Long) messageValue.get(KEY_EVENT_TIME))
.notifiedAt(record.timestamp())
.actionName((String) messageValue.get(ACTION_NAME))
.actionResultName((String) messageValue.get(ACTION_RESULT_NAME))
.status(200)
.build();
BieMessagePayload payload = BieMessageUtils.processBieMessagePayloadFields(messageValue);
payload.setEventType(
ContentionEvent.valueOf(ContentionEvent.mapTopicToEvent(record.topic()).toString()));
payload.setNotifiedAt(record.timestamp());

return payload;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package gov.va.vro.services.bie.utils;

import gov.va.vro.model.biekafka.BieMessagePayload;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.springframework.util.StringUtils;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;

@Slf4j
public class BieMessageUtils {
private static final Set<String> IGNORED_FIELDS = new HashSet<>();

static {
// Add the names of fields to be ignored
IGNORED_FIELDS.add("status");
IGNORED_FIELDS.add("statusMessage");
IGNORED_FIELDS.add("eventType");
IGNORED_FIELDS.add("notifiedAt");
}

public static BieMessagePayload processBieMessagePayloadFields(GenericRecord genericRecord) {
BieMessagePayload payload = BieMessagePayload.builder().status(200).build();

for (Field field : BieMessagePayload.class.getDeclaredFields()) {
String fieldName = field.getName();

// Skip the field if it's in the ignored list
if (IGNORED_FIELDS.contains(fieldName)) {
continue;
}

String capitalizedFieldName = StringUtils.capitalize(fieldName);
Object value = genericRecord.get(capitalizedFieldName);

if (value == null) {
continue; // Skip setting the field if the value is null
}

try {
String setterMethodName = "set" + capitalizedFieldName;
Method setterMethod = BieMessagePayload.class.getMethod(setterMethodName, field.getType());

if (field.getType().isAssignableFrom(value.getClass())) {
setterMethod.invoke(payload, value);
} else {
log.warn(
"Type mismatch for field '{}'. Expected type: '{}', Actual value: '{}'",
fieldName,
field.getType().getSimpleName(),
value);
}
} catch (Exception e) {
log.error("Error setting value for field '{}': {}", fieldName, e.getMessage(), e);
}
}

return payload;
}
}
2 changes: 1 addition & 1 deletion svc-bie-kafka/src/main/resources/application-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ spring:
schema.registry.ssl.truststore.password: "${BIE_KAFKA_TRUSTSTORE_PASSWORD}"
schema.registry.ssl.truststore.type: "PKCS12"
consumer:
group-id: "${BIE_KAFKA_PLACEHOLDERS_GROUP_ID:vro-bie-tst-vro-2}"
group-id: "${BIE_KAFKA_PLACEHOLDERS_GROUP_ID:vro-bie-tst-vro-3}"
key-deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
value-deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer"
properties:
Expand Down
Loading