Skip to content

Commit

Permalink
nifi remote s3
Browse files Browse the repository at this point in the history
  • Loading branch information
marceloarocha committed Dec 8, 2024
1 parent 9ff6a89 commit 8f9ff2c
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 26 deletions.
2 changes: 2 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Config:
MAIL_SENDER = getenv("MAIL_SENDER") or "[email protected]"
MAIL_HOST = getenv("MAIL_HOST") or "localhost"

NIFI_BUCKET_NAME = getenv("NIFI_BUCKET_NAME") or ""

CACHE_BUCKET_NAME = getenv("CACHE_BUCKET_NAME") or ""
CACHE_BUCKET_ID = getenv("CACHE_BUCKET_ID") or ""
CACHE_BUCKET_KEY = getenv("CACHE_BUCKET_KEY") or ""
Expand Down
106 changes: 80 additions & 26 deletions services/admin/admin_integration_remote_service.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,101 @@
import re
import boto3
import dateutil as pydateutil
from utils import status
from markupsafe import escape
from datetime import datetime
from datetime import datetime, timedelta
from sqlalchemy import desc
from botocore.exceptions import ClientError

from models.main import db
from models.main import db, User
from models.appendix import NifiStatus, NifiQueue
from utils.dateutils import to_iso
from utils import dateutils
from models.enums import NifiQueueActionTypeEnum
from exception.validation_error import ValidationError
from decorators.has_permission_decorator import has_permission, Permission
from config import Config


@has_permission(Permission.ADMIN_INTEGRATION_REMOTE)
def get_template_date():
result = (
db.session.query(NifiStatus.updatedAt)
.filter(NifiStatus.nifi_diagnostics != None)
.filter(NifiStatus.nifi_template != None)
.filter(NifiStatus.nifi_status != None)
.first()
def get_file_url(schema: str, filename="template") -> tuple[str, str]:
client = boto3.client("s3")

cache_data = _get_cache_data(client=client, schema=schema, filename=filename)

if cache_data["exists"]:
return (
client.generate_presigned_url(
"get_object",
Params={
"Bucket": Config.NIFI_BUCKET_NAME,
"Key": _get_resource_name(schema=schema, filename=filename),
},
ExpiresIn=3600,
),
cache_data["updatedAt"],
)

return None, None


def _get_resource_name(schema, filename="current"):
return f"{schema}/{filename}.json"


def _get_cache_data(client, schema, filename="current"):
try:
resource_info = client.head_object(
Bucket=Config.NIFI_BUCKET_NAME,
Key=_get_resource_name(schema=schema, filename=filename),
)

resource_date = pydateutil.parser.parse(
resource_info["ResponseMetadata"]["HTTPHeaders"]["last-modified"],
) - timedelta(hours=3)

return {
"exists": True,
"updatedAt": resource_date.replace(tzinfo=None).isoformat(),
}
except ClientError:
return {"exists": False, "updatedAt": None}


@has_permission(Permission.ADMIN_INTEGRATION_REMOTE)
def get_template_date(user_context: User):
client = boto3.client("s3")
url, metadata = _get_cache_data(
client=client, schema=user_context.schema, filename="template"
)

if result != None:
return {"updatedAt": result.updatedAt.isoformat()}
if metadata != None:
return {"updatedAt": metadata["updatedAt"]}

return {"updatedAt": None}


@has_permission(Permission.ADMIN_INTEGRATION_REMOTE)
def get_template():
config: NifiStatus = db.session.query(NifiStatus).first()
def get_template(user_context: User):
template_url, template_updated_at = get_file_url(
schema=user_context.schema, filename="template"
)
status_url, status_updated_at = get_file_url(
schema=user_context.schema, filename="status"
)
diagnostics_url, diagnostics_updated_at = get_file_url(
schema=user_context.schema, filename="diagnostics"
)

if config == None:
if not template_url:
raise ValidationError(
"Registro não encontrado",
"Template encontrado",
"errors.businessRule",
status.HTTP_400_BAD_REQUEST,
)

if config.nifi_status == None or config.nifi_template == None:
if not status_url:
raise ValidationError(
"Template/Status não encontrado",
"Status não encontrado",
"errors.businessRule",
status.HTTP_400_BAD_REQUEST,
)
Expand All @@ -60,16 +114,16 @@ def get_template():
"responseCode": q.responseCode,
"response": q.response,
"extra": q.extra,
"responseAt": to_iso(q.responseAt),
"createdAt": to_iso(q.createdAt),
"responseAt": dateutils.to_iso(q.responseAt),
"createdAt": dateutils.to_iso(q.createdAt),
}
)

return {
"template": config.nifi_template,
"status": config.nifi_status,
"diagnostics": config.nifi_diagnostics,
"updatedAt": to_iso(config.updatedAt),
"template": template_url,
"status": status_url,
"diagnostics": diagnostics_url,
"updatedAt": dateutils.to_iso(template_updated_at),
"queue": queue_results,
}

Expand Down Expand Up @@ -186,8 +240,8 @@ def get_queue_status(id_queue_list):
"extra": q.extra,
"responseCode": q.responseCode,
"response": q.response,
"responseAt": to_iso(q.responseAt),
"createdAt": to_iso(q.createdAt),
"responseAt": dateutils.to_iso(q.responseAt),
"createdAt": dateutils.to_iso(q.createdAt),
}
)

Expand Down

0 comments on commit 8f9ff2c

Please sign in to comment.