Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable remote pilot logging system #269

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

martynia
Copy link
Contributor

@martynia martynia commented Jul 1, 2024

Enable remote logging system with open search. This PR only shows a router in its preliminary form.
The router defines following operations:

  1. @router.post("/") - bulk insert log record sent by a pilot. The method returns a pilotID, so it can use it next time to insert messaged w/o consulting PilotAgentsDB to get PilotID from PilotStamp.
  2. @router.get("/logs") - to retrieve logs for a given pilot_id
  3. @router.delete("/logs") - to delete logs by:
    - pilotID
    - from a given minimum date till now.
    - for a date range - not yet implemented

Policies allow pilots to store VO-aware logs and operators/admins to perform delete operations for all VOs. Pilots and normal users can only see (get) logs for their own VO.

@martynia
Copy link
Contributor Author

martynia commented Jul 1, 2024

@chrisburr I think we need a pilot logging router, even in its basic form. One of the tests fails because it is missing.

@martynia
Copy link
Contributor Author

@chrisburr What is a reason of test failures here ? The demo runs fine locally, can be contacted with a client etc. The test fails with dirac login, waiting for diracX to be installed. Could it be that I missed a setting somewhere ? The failure is persistent.

@martynia
Copy link
Contributor Author

martynia commented Nov 1, 2024

@chrisburr @chaen
I managed to reproduce the problem locally. There is the same failure on GitHub and locally. Since the demo runs fine (the DB is there), is it possible that integration tests fail (NotImplementedError: Cannot enable system_name='pilotlogs' as it requires missing_os_dbs={<class 'diracx.db.os.pilot_logs.PilotLogsDB'>} because of charts being no discoverable by the CI workflow ?

This type of error occurs (in the demo) where charts don't list a relevant DB

Both local and CI tests reach the same point - server installation failure.

@chrisburr
Copy link
Member

@martynia
Copy link
Contributor Author

@chaen I fixed the database problem, now integration tests are passing, however I'm getting gubbins errors in unit tests. Is it an internal gubbins problem ?

diracx-routers/pyproject.toml Outdated Show resolved Hide resolved
diracx-db/src/diracx/db/os/utils.py Outdated Show resolved Hide resolved
diracx-db/src/diracx/db/os/utils.py Outdated Show resolved Hide resolved
Comment on lines 96 to 108
# here, users with privileged properties will see logs from all VOs. Is it what we want ?
search_params = [{"parameter": "PilotID", "operator": "eq", "value": pilot_id}]
if _non_privileged(user_info):
search_params.append(
{"parameter": "VO", "operator": "eq", "value": user_info.vo}
)
result = await db.search(
["Message"],
search_params,
[{"parameter": "LineNumber", "direction": "asc"}],
)
if not result:
return [{"Message": f"No logs for pilot ID = {pilot_id}"}]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, this bloc of code should be a method of diracx-dbs/src/diracx/db/sql/pilots/pilot_agents.py PilotAgentsDB

"""Delete either logs for a specific PilotID or a creation date range.
Non-privileged users can only delete log files within their own VO.
"""
message = "no-op"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you initialize message as "no-op"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is a return message to the client. As far as I can see, the delete by query operation is silent. It does not complain when you delete something which doesn't not exist etc. Or I might have missed something in the docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is a return message to the client.

But IIUC, this "no-op" can't be returned to the client, right?
Then may be you can just initialize it to None instead of "no-op"?

As far as I can see, the delete by query operation is silent. It does not complain when you delete something which doesn't not exist etc. Or I might have missed something in the docs.

You mean that if you pass a wrong pilot id or a wrong date to OpenSearch, it does not inform you whether pilot ids and/or dates are all valid?

I am not an expert of opensearch, I don't have much knowledge about it, but from what I can read, https://opensearch.org/docs/latest/api-reference/document-apis/delete-by-query/ seems to provide the number of deleted items, but not the details (which ones).

One solution to identify them could consist in searching for the pilot ids first, and then return the states of each one after the delete query.

Something like (this is pseudo-code):

pilots_ids: [1,2,3,4,5]

result = search(pilots_ids)
# result = [1,2,3]

unknown_pilot_ids = set(pilot_ids) - set(result)
# unkown_pilots_ids = [4,5]

delete_by_query(result)

return {"success": result ,"failed": unknown_pilot_ids}

Comment on lines 152 to 156
def _non_privileged(user_info: AuthorizedUserInfo):
return (
SERVICE_ADMINISTRATOR not in user_info.properties
and OPERATOR not in user_info.properties
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this should be moved in the access_policies.py function? May be by adding an action type? Or by adding some logic like in:

job_owners = await job_db.summary(
["Owner", "VO"],
[{"parameter": "JobID", "operator": "in", "values": job_ids}],
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a logical problem what to do here. I would like to search only once (and delete by query for a delete operation w/o prior searching). This is a reason why I removed the DB argument from a policy call. The problem is more visible in the delete case:

  • I could search in the policy for a given client VO and if pilot IDs are not from the VO throw an exception (not sure what to do if some IDs are and some are not from the VO the client has - for a bulk PilotID search)
  • then delete by query (which searches again)

Or could I use the AgentsDB to match the VO ? Still an extra DB query, this time and SQL one.
I opted for a restricted silent delete and and a restricted search by adding the VO, but outside the policy.


@router.delete("/logs")
async def delete(
pilot_id: int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also tend to make bulk requests to the db to improve the performances of the system. I would take a list of pilot_ids as inputs instead of a single pilot_id.

Then you can return sucess and failures as it is done for the jobs for instance:

@router.delete("/")
async def remove_bulk_jobs(
job_ids: Annotated[list[int], Query()],
config: Config,
job_db: JobDB,
job_logging_db: JobLoggingDB,
sandbox_metadata_db: SandboxMetadataDB,
task_queue_db: TaskQueueDB,
background_task: BackgroundTasks,
check_permissions: CheckWMSPolicyCallable,
):
"""Fully remove a list of jobs from the WMS databases.
WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS
and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should
be removed, and a status change to Deleted (PATCH /jobs/status) should be used instead for any other purpose.
"""
await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=job_ids)
return await remove_jobs(
job_ids,
config,
job_db,
job_logging_db,
sandbox_metadata_db,
task_queue_db,
background_task,
)


@router.get("/logs")
async def get_logs(
pilot_id: int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly,

Suggested change
pilot_id: int,
pilot_ids: List[int],

if _non_privileged(user_info):
search_params.append(non_privil_params)
await db.delete(search_params)
message = f"Logs for for pilots with submission data >='{data.min}' successfully deleted"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
message = f"Logs for for pilots with submission data >='{data.min}' successfully deleted"
message = f"Logs for pilots with submission data >='{data.min}' successfully deleted"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

non_privil_params = {"parameter": "VO", "operator": "eq", "value": user_info.vo}

# id pilot_id is provided we ignore data.min and data.max
if not pilot_ids and data.min and data.max:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be I didn't read carefully, but don't you mean:

Suggested change
if not pilot_ids and data.min and data.max:
if not pilot_ids and not (data.min or data.max):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is correct. pilot_idhas priority, if it is not given then we have to cases:

  1. a range is given both min and max (this requires a (simple) change in DiracX query language - open search support range out of the box; also open range.)
  2. only max is given, then delete everything olde. that max.



@router.delete("/logs")
async def delete(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I am just curious, what is the use case of DELETE /pilots/logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to delete old logs ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants