Skip to content

Commit

Permalink
Worker agent demo update (#101)
Browse files Browse the repository at this point in the history
- v0.0.7
- Fixed runbook
- Code cleanups
- Dropped the validation_context.duckdb (path issue fixed)
  • Loading branch information
kariharju authored Nov 13, 2024
1 parent 84ad9cb commit b3274e1
Show file tree
Hide file tree
Showing 36 changed files with 4,621 additions and 2,271 deletions.
13 changes: 13 additions & 0 deletions demos/payment-remittance-reconciliation-agent/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.env
.idea
.venv
.vscode
.pyc
__pycache__/
*.exe
*.zip
*.log
**/.DS_STORE

actions/MyActions/payment-remittance-reconcile-actions/context/reconciliation_context.duckdb
actions/MyActions/payment-remittance-validate-actions/context/validation_context.duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def handle_discrepancies(self, result: ReconciliationResult):
report = self.generate_discrepancy_report(result)

# Update work item status
self.update_work_item_status(
self.update_work_item_with_reconciliation_success(
status="DISCREPANCY_FOUND",
details=report
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,29 @@
from datetime import datetime
import json
from models.reconciliation_models import (
ReconciliationPhase, ReconciliationAgentInsightContext,
ReconciliationContext, ProcessingEvent, ValidationResults
ReconciliationPhase,
ReconciliationAgentInsightContext,
ReconciliationContext,
ProcessingEvent,
)
from reconciliation_ledger.reconciliation_constants import DatabaseConstants
from utils.context.base_agent_context_manager import BaseAgentContextManager


class ReconciliationAgentContextManager(BaseAgentContextManager):
"""Manages context for payment reconciliation processing."""

def __init__(self,
document_id: str,
document_name: str,
customer_id: str,
db_path: Optional[Union[str, Path]] = None,
load_existing: bool = False):

def __init__(
self,
document_id: str,
document_name: str,
customer_id: str,
db_path: Optional[Union[str, Path]] = None,
load_existing: bool = False,
):
"""
Initialize ReconciliationAgentContextManager.
Args:
document_id: Document identifier
document_name: Document name
Expand All @@ -30,28 +35,32 @@ def __init__(self,
"""
if not db_path:
db_path = DatabaseConstants.get_default_reconciliation_context_db_path()

db_path = str(db_path) if isinstance(db_path, Path) else db_path
super().__init__(document_id, document_name, db_path)
self.customer_id = customer_id

if load_existing:
self.logger.info(f"Loading existing reconciliation context for document_id: {document_id}")
self.logger.info(
f"Loading existing reconciliation context for document_id: {document_id}"
)
self.agent_context = self.load_context()
if self.agent_context is None:
self.logger.warning(f"No existing context found. Creating new reconciliation context.")
self.logger.warning(
"No existing context found. Creating new reconciliation context."
)
self.agent_context = ReconciliationAgentInsightContext(
document_id=document_id,
document_name=document_name,
customer_id=customer_id
customer_id=customer_id,
)
else:
self.agent_context = ReconciliationAgentInsightContext(
document_id=document_id,
document_name=document_name,
customer_id=customer_id
customer_id=customer_id,
)

self.current_phase: Optional[ReconciliationPhase] = None

def _create_tables(self, conn):
Expand Down Expand Up @@ -91,27 +100,37 @@ def store_context(self):
context_data = EXCLUDED.context_data,
updated_at = NOW()
"""
conn.execute(query, [
self.document_id,
self.customer_id,
self.document_name,
context_json
])
self.logger.info(f"Stored reconciliation context for document_id: {self.document_id}")
conn.execute(
query,
[
self.document_id,
self.customer_id,
self.document_name,
context_json,
],
)
self.logger.info(
f"Stored reconciliation context for document_id: {self.document_id}"
)
except Exception as e:
self.logger.error(f"Error storing reconciliation context for document_id {self.document_id}: {str(e)}")
self.logger.error(
f"Error storing reconciliation context for document_id {self.document_id}: {str(e)}"
)
raise

def load_context(self) -> Optional[ReconciliationAgentInsightContext]:
"""Load reconciliation context from database."""
try:
with self.duckdb_connection() as conn:
result = conn.execute("""
result = conn.execute(
"""
SELECT context_data
FROM reconciliation_context
WHERE document_id = ? AND customer_id = ?
""", [self.document_id, self.customer_id]).fetchone()

""",
[self.document_id, self.customer_id],
).fetchone()

if result:
context_data = json.loads(result[0])
return ReconciliationAgentInsightContext(**context_data)
Expand All @@ -123,10 +142,7 @@ def load_context(self) -> Optional[ReconciliationAgentInsightContext]:
def start_phase(self, phase: ReconciliationPhase):
"""Start a reconciliation processing phase."""
self.current_phase = phase
context = ReconciliationContext(
phase=phase,
start_time=datetime.utcnow()
)
context = ReconciliationContext(phase=phase, start_time=datetime.utcnow())
self.agent_context.set_phase_context(phase, context)
self.logger.info(f"Started reconciliation phase: {phase}")

Expand All @@ -138,18 +154,23 @@ def end_phase(self):
context.end_time = datetime.utcnow()
duration = (context.end_time - context.start_time).total_seconds()
self.agent_context.overall_processing_time += duration
self.logger.info(f"Ended reconciliation phase: {self.current_phase}. Duration: {self._format_duration(duration)}")
self.logger.info(
f"Ended reconciliation phase: {self.current_phase}. Duration: {self._format_duration(duration)}"
)

def add_event(self, event_type: str, description: str, details: Optional[Dict[str, Any]] = None):
def add_event(
self,
event_type: str,
description: str,
details: Optional[Dict[str, Any]] = None,
):
"""Add event to current reconciliation phase."""
if self.current_phase:
context = self.agent_context.get_phase_context(self.current_phase)
if context:
context.events.append(
ProcessingEvent(
event_type=event_type,
description=description,
details=details
event_type=event_type, description=description, details=details
)
)
self._log_event(event_type, description, details)
Expand All @@ -165,4 +186,4 @@ def update_metrics(self, metrics_update: Dict[str, Any]):

def get_reconciliation_context(self) -> ReconciliationAgentInsightContext:
"""Get current reconciliation context."""
return self.agent_context
return self.agent_context
Loading

0 comments on commit b3274e1

Please sign in to comment.