Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EP Merge: Wait for EP400 contentions #2700

Merged
merged 6 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
EP400_WITH_DIFFERENT_CLAIMANT_TEXT = 10003
EP400_WITH_MULTI_CONTENTION_ONE_DUPLICATE = 10004
EP400_WITH_MULTI_CONTENTION_NO_DUPLICATES = 10005
EP400_WITH_NO_CONTENTIONS = 10006

CLAIM_ID_ERROR_AT_GET_CLAIM_DETAILS = 5001
CLAIM_ID_ERROR_AT_CANCEL_CLAIM = 5002
Expand Down Expand Up @@ -133,6 +134,7 @@ class TestError:
pytest.param(
PENDING_CLAIM_ID, CLAIM_ID_ERROR_AT_GET_CONTENTIONS, JobState.GET_EP400_CLAIM_CONTENTIONS, 1, id="fail to get ep400 claim contentions"
),
pytest.param(PENDING_CLAIM_ID, EP400_WITH_NO_CONTENTIONS, JobState.GET_EP400_CLAIM_CONTENTIONS, 1, id="ep400 claim has zero contentions"),
pytest.param(PENDING_CLAIM_ID, CLAIM_ID_ERROR_AT_SET_TSOJ, JobState.SET_TEMP_STATION_OF_JURISDICTION, 1, id="fail to set tsoj on ep400"),
pytest.param(
CLAIM_ID_ERROR_AT_CREATE_CONTENTIONS,
Expand Down
2 changes: 2 additions & 0 deletions domain-ee/ee-ep-merge-app/end_to_end/test_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
EP400_WITH_DIFFERENT_CLAIMANT_TEXT = 10003
EP400_WITH_MULTI_CONTENTION_ONE_DUPLICATE = 10004
EP400_WITH_MULTI_CONTENTION_NO_DUPLICATES = 10005
EP400_WITH_NO_CONTENTIONS = 10006

CLAIM_ID_ERROR_AT_GET_CLAIM_DETAILS = 5001
CLAIM_ID_ERROR_AT_CANCEL_CLAIM = 5002
Expand Down Expand Up @@ -92,6 +93,7 @@ class TestError:
1,
id="fail to get ep400 claim contentions",
),
pytest.param(PENDING_CLAIM_ID, EP400_WITH_NO_CONTENTIONS, JobState.GET_EP400_CLAIM_CONTENTIONS, 1, id="ep400 claim has zero contentions"),
pytest.param(PENDING_CLAIM_ID, CLAIM_ID_ERROR_AT_SET_TSOJ, JobState.SET_TEMP_STATION_OF_JURISDICTION, 1, id="fail to set tsoj on ep400"),
pytest.param(
CLAIM_ID_ERROR_AT_CREATE_CONTENTIONS,
Expand Down
19 changes: 14 additions & 5 deletions domain-ee/ee-ep-merge-app/integration/mq_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from pika import BasicProperties


class MqEndpointConsumerException(Exception):
pass


class MqEndpoint:

def __init__(self, name, exchange, req_queue, response_queue):
Expand Down Expand Up @@ -45,11 +49,16 @@ def _on_message(self, _channel, properties, delivery_tag, _body):

self.consumer.acknowledge_message(properties, delivery_tag)

if self.auto_response_files:
with open(self.auto_response_files[self.index]) as f:
body = json.load(f)
self.publisher.publish_message(body, BasicProperties(app_id="Integration Test", content_type="application/json", correlation_id=correlation_id))
self.index = (self.index + 1) % len(self.auto_response_files)
if not self.auto_response_files:
raise MqEndpointConsumerException(f"{self.name}: Auto-responses is empty")

if self.index == len(self.auto_response_files):
raise MqEndpointConsumerException(f"{self.name}: There are not enough auto-responses defined")

with open(self.auto_response_files[self.index]) as f:
body = json.load(f)
self.publisher.publish_message(body, BasicProperties(app_id="Integration Test", content_type="application/json", correlation_id=correlation_id))
self.index += 1

def set_responses(self, auto_response_files=None):
if auto_response_files is None:
Expand Down
34 changes: 29 additions & 5 deletions domain-ee/ee-ep-merge-app/integration/test_merge_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,39 @@ async def test_completed_success(

@pytest.mark.asyncio(scope="session")
async def test_completed_success_with_duplicate_contention(
self, get_claim_endpoint: MqEndpoint, get_claim_contentions_endpoint: MqEndpoint, put_tsoj_endpoint: MqEndpoint, cancel_claim_endpoint: MqEndpoint
self,
get_claim_endpoint: MqEndpoint,
get_claim_contentions_endpoint: MqEndpoint,
put_tsoj_endpoint: MqEndpoint,
cancel_claim_endpoint: MqEndpoint,
add_claim_note_endpoint: MqEndpoint,
):
get_claim_endpoint.set_responses([pending_claim_200])
get_claim_contentions_endpoint.set_responses([pending_contentions_200, ep400_duplicate_contentions_200])
put_tsoj_endpoint.set_responses([response_200])
cancel_claim_endpoint.set_responses([response_200])
add_claim_note_endpoint.set_responses([response_200])

async with AsyncClient(app=app, base_url="http://test") as client:
response = await submit_request_and_process(client)
assert_successful_response(response)

@pytest.mark.asyncio(scope="session")
async def test_completed_no_ep400_contentions(
self, get_claim_endpoint: MqEndpoint, get_claim_contentions_endpoint: MqEndpoint, put_tsoj_endpoint: MqEndpoint, cancel_claim_endpoint: MqEndpoint
async def test_completed_no_ep400_contentions_on_first_attempt(
self,
get_claim_endpoint: MqEndpoint,
get_claim_contentions_endpoint: MqEndpoint,
put_tsoj_endpoint: MqEndpoint,
create_claim_contentions_endpoint: MqEndpoint,
cancel_claim_endpoint: MqEndpoint,
add_claim_note_endpoint: MqEndpoint,
):
get_claim_endpoint.set_responses([pending_claim_200])
get_claim_contentions_endpoint.set_responses([pending_contentions_200, response_204])
get_claim_contentions_endpoint.set_responses([pending_contentions_200, response_204, ep400_contentions_200])
dfitchett marked this conversation as resolved.
Show resolved Hide resolved
put_tsoj_endpoint.set_responses([response_200])
create_claim_contentions_endpoint.set_responses([response_201])
cancel_claim_endpoint.set_responses([response_200])
add_claim_note_endpoint.set_responses([response_200])

async with AsyncClient(app=app, base_url="http://test") as client:
response = await submit_request_and_process(client)
Expand Down Expand Up @@ -201,6 +215,16 @@ async def test(self, get_claim_endpoint: MqEndpoint, get_claim_contentions_endpo
response = await submit_request_and_process(client)
assert_error_response(response, JobState.GET_EP400_CLAIM_CONTENTIONS)

@pytest.mark.asyncio(scope="session")
async def test_no_contentions_found(self, get_claim_endpoint: MqEndpoint, get_claim_contentions_endpoint: MqEndpoint):
get_claim_endpoint.set_responses([pending_claim_200])
get_claim_contentions_endpoint.set_responses([pending_contentions_200, response_204, response_204])
# Note the second 204 is because the tests are set up to try to get the ep400 contentions twice in pyproject.toml

async with AsyncClient(app=app, base_url="http://test") as client:
response = await submit_request_and_process(client)
assert_error_response(response, JobState.GET_EP400_CLAIM_CONTENTIONS)


class TestErrorAtSetTemporaryStationOfJurisdiction(TestMergeRequestBase):

Expand Down Expand Up @@ -312,7 +336,7 @@ async def test(
):
get_claim_endpoint.set_responses([pending_claim_200])
get_claim_contentions_endpoint.set_responses([pending_contentions_200, ep400_contentions_200])
put_tsoj_endpoint.set_responses([response_200])
put_tsoj_endpoint.set_responses([response_200, response_200]) # Note the 200 to revert tsoj
create_claim_contentions_endpoint.set_responses([response_201])
cancel_claim_endpoint.set_responses([response_500])

Expand Down
4 changes: 3 additions & 1 deletion domain-ee/ee-ep-merge-app/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ testpaths = [
env = [
"REQUEST_TIMEOUT = 1",
"REQUEST_RETRIES = 1",
"EP_MERGE_SPECIAL_ISSUE_CODE = TEST"
"EP_MERGE_SPECIAL_ISSUE_CODE = TEST",
"EP400_CONTENTION_RETRIES = 2",
"EP400_CONTENTION_RETRY_RATE = 1"
]
14 changes: 3 additions & 11 deletions domain-ee/ee-ep-merge-app/src/python_src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,11 @@
from urllib.parse import quote, urlparse


def int_from_env(key, default):
val = os.environ.get(key)
if val is not None:
return int(val)
else:
return default


config = {
"app_id": os.environ.get("APP_ID") or "EP_MERGE",
"request_timeout": int_from_env("REQUEST_TIMEOUT", 30),
"request_retries": int_from_env("REQUEST_RETRIES", 3),
"response_delivery_attempts": int_from_env("RESPONSE_DELIVERY_ATTEMPTS", 3),
"request_timeout": int(os.getenv("REQUEST_TIMEOUT") or 30),
"request_retries": int(os.getenv("REQUEST_RETRIES") or 3),
"response_delivery_attempts": int(os.getenv("RESPONSE_DELIVERY_ATTEMPTS") or 3),
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import asyncio
import logging
import os
from enum import Enum
from typing import Type
from typing import Type, Callable

from fastapi.encoders import jsonable_encoder
from hoppy.async_hoppy_client import AsyncHoppyClient
Expand Down Expand Up @@ -42,6 +43,14 @@
CANCEL_TRACKING_EP = "60"
CANCELLATION_REASON_FORMAT = "Issues moved into or confirmed in pending EP{ep_code} - claim #{claim_id}"

# definitions for retrying to get contentions from EP400
EP400_CONTENTION_RETRIES = int(os.getenv("EP400_CONTENTION_RETRIES") or 30)
EP400_CONTENTION_RETRY_WAIT_TIME = int(os.getenv("EP400_CONTENTION_RETRY_WAIT_TIME") or 2)


def ep400_has_no_contentions(response: get_contentions.Response):
return not response.contentions


class Workflow(str, Enum):
PROCESS = 'process'
Expand Down Expand Up @@ -151,8 +160,7 @@ def on_get_pending_claim(self, event):

if response is not None and response.status_code == 200:
if response.claim is None or response.claim.end_product_code is None:
logging.info(self.job.state)
self.add_error(f"Pending claim #{self.job.pending_claim_id} does not have an end product code")
self.add_error(f"error='Pending claim #{self.job.pending_claim_id} does not have an end product code'")
else:
self.cancellation_reason = CANCELLATION_REASON_FORMAT.format(ep_code=response.claim.end_product_code, claim_id=self.job.pending_claim_id)
self.original_tsoj = response.claim.temp_station_of_jurisdiction
Expand All @@ -168,12 +176,19 @@ def on_get_pending_contentions(self, event):
@running_get_ep400_contentions.enter
def on_get_ep400_contentions(self, event, pending_contentions_response=None):
request = get_contentions.Request(claim_id=self.job.ep400_claim_id)
expected_responses = [200, 204]
response = self.make_request(
request=request,
hoppy_client=HOPPY.get_client(ClientName.GET_CLAIM_CONTENTIONS),
response_type=get_contentions.Response,
expected_statuses=[200, 204],
expected_statuses=expected_responses,
max_retries=EP400_CONTENTION_RETRIES,
retry_rate=EP400_CONTENTION_RETRY_WAIT_TIME,
will_retry_condition=ep400_has_no_contentions,
)
if response and (response.status_code in expected_responses and not response.contentions):
self.add_error(f"error='EP400 claim #{self.job.ep400_claim_id} does not have any contentions'")

self.send(event=event, pending_contentions_response=pending_contentions_response, ep400_contentions_response=response)

@running_set_temp_station_of_jurisdiction.enter
Expand Down Expand Up @@ -299,23 +314,52 @@ def log_metrics(self, job_duration):
if self.skipped_merge:
increment(JOB_SKIPPED_MERGE_METRIC)

def make_request(
self, request: GeneralRequest, hoppy_client: AsyncHoppyClient, response_type: Type[GeneralResponse], expected_statuses: list[int] | int = 200
async def make_hoppy_request(
self, hoppy_client, request_id, request_body, response_type: Type[GeneralResponse], expected_statuses, max_retries, retry_rate, will_retry_condition
):
if not isinstance(expected_statuses, list):
expected_statuses = [expected_statuses]
try:
loop = asyncio.new_event_loop()
req = hoppy_client.make_request(self.job.job_id, request.model_dump(by_alias=True))
response = loop.run_until_complete(req)
attempts = 0
while True:
response = await hoppy_client.make_request(request_id, request_body)
model = response_type.model_validate(response)
if model.status_code not in expected_statuses:
self.add_error(
model.messages
if model.messages
else f"client={hoppy_client.name} error='Unknown Downstream Error' status={model.status_code} status_message={model.status_message}"
)
return model
break

attempts += 1
if attempts == max_retries or not will_retry_condition(model):
break
await asyncio.sleep(retry_rate)
dfitchett marked this conversation as resolved.
Show resolved Hide resolved
return model

def make_request(
self,
request: GeneralRequest,
hoppy_client: AsyncHoppyClient,
response_type: Type[GeneralResponse],
expected_statuses: list[int] | int = 200,
max_retries: int = 1,
retry_rate: int = 2,
will_retry_condition: Callable[[Type[GeneralResponse]], bool] = lambda x: False,
):
if not isinstance(expected_statuses, list):
expected_statuses = [expected_statuses]
try:
loop = asyncio.new_event_loop()
req = self.make_hoppy_request(
hoppy_client,
self.job.job_id,
request.model_dump(by_alias=True),
response_type,
expected_statuses,
max_retries,
retry_rate,
will_retry_condition,
)
return loop.run_until_complete(req)
except ValidationError as e:
self.add_error(f"client={hoppy_client.name} error={e.errors(include_url=False, include_input=False)}")
except ResponseException as e:
Expand All @@ -333,7 +377,5 @@ def has_new_contentions(self, pending_contentions_response: get_contentions.Resp
return contentions

def add_error(self, error):
logging.warning(f"event=jobError job_id={self.job.job_id} state={self.job.state} {error}")
self.job.error(error)

def add_message(self, message):
self.job.add_message(message)
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ class TestUpToGetEp400Contentions:
pytest.param(load_response(response_400, get_contentions.Response), id="400"),
pytest.param(load_response(response_404, get_contentions.Response), id="404"),
pytest.param(load_response(response_500, get_contentions.Response), id="500"),
pytest.param(get_ep400_contentions_204, id="No contentions found"),
pytest.param(get_contentions.Response(status_code=200, status_message="OK", contentions=None), id="None"),
pytest.param(get_contentions.Response(status_code=200, status_message="OK", contentions=[]), id="Empty"),
],
)
def test_invalid_request_at_get_ep400_contentions(
Expand Down Expand Up @@ -337,29 +340,6 @@ def test_invalid_request(self, machine, mock_hoppy_async_client, metric_logger_d
)
assert_metrics_called(metric_logger_distribution, metric_logger_increment, JobState.COMPLETED_ERROR, JobState.SET_TEMP_STATION_OF_JURISDICTION)

@pytest.mark.parametrize(
"no_contentions_response",
[
pytest.param(get_contentions.Response(status_code=200, status_message="OK"), id="Implicit None"),
pytest.param(get_contentions.Response(status_code=200, status_message="OK", contentions=None), id="Explicit None"),
pytest.param(get_contentions.Response(status_code=200, status_message="OK", contentions=[]), id="Empty"),
],
)
def test_no_contentions_on_ep400_after_set_tsoj_failure(
self, machine, mock_hoppy_async_client, metric_logger_distribution, metric_logger_increment, no_contentions_response
):
mock_async_responses(mock_hoppy_async_client, [get_pending_claim_200, get_pending_contentions_200, no_contentions_response, ResponseException("Oops")])
process_and_assert(machine, JobState.COMPLETED_ERROR, JobState.SET_TEMP_STATION_OF_JURISDICTION, 1)
mock_hoppy_async_client.make_request.assert_has_calls(
[
call(machine.job.job_id, get_pending_claim_req),
call(machine.job.job_id, get_pending_contentions_req),
call(machine.job.job_id, get_ep400_contentions_req),
call(machine.job.job_id, update_temporary_station_of_jurisdiction_req),
]
)
assert_metrics_called(metric_logger_distribution, metric_logger_increment, JobState.COMPLETED_ERROR, JobState.SET_TEMP_STATION_OF_JURISDICTION)

@pytest.mark.parametrize(
"invalid_request",
[
Expand Down Expand Up @@ -734,26 +714,3 @@ def test_process_succeeds_with_duplicate_contention(self, machine, mock_hoppy_as
]
)
assert_metrics_called(metric_logger_distribution, metric_logger_increment, JobState.COMPLETED_SUCCESS, None, 0, True)

def test_process_succeeds_with_no_ep400_contentions(self, machine, mock_hoppy_async_client, metric_logger_distribution, metric_logger_increment):
mock_async_responses(
mock_hoppy_async_client,
[
get_pending_claim_200,
get_pending_contentions_increase_tinnitus_200,
get_ep400_contentions_204,
update_temporary_station_of_jurisdiction_200,
cancel_claim_200,
add_claim_note_200,
],
)
process_and_assert(machine, JobState.COMPLETED_SUCCESS)
mock_hoppy_async_client.make_request.assert_has_calls(
[
call(machine.job.job_id, get_pending_contentions_req),
call(machine.job.job_id, get_ep400_contentions_req),
call(machine.job.job_id, update_temporary_station_of_jurisdiction_req),
call(machine.job.job_id, cancel_ep400_claim_req),
]
)
assert_metrics_called(metric_logger_distribution, metric_logger_increment, JobState.COMPLETED_SUCCESS, None, 0, True)
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,8 @@
"phase": "Claim Received",
"endProductCode": "400",
"claimLifecycleStatus": "Open"
}
},
"contentions": []
},
{
"description": [
Expand Down