Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MongoDB Atlas] Add support of mongodb database datastream #9539

Merged
merged 18 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions packages/mongodb_atlas/_dev/build/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,27 @@

Use the MongoDB Atlas integration to:

- Collect MongoDB Audit logs and Process metrics for comprehensive monitoring and analysis.
- Collect MongoDB Atlas mongod audit logs, mongod database logs, and process metrics for comprehensive monitoring and analysis.
- Create informative visualizations to track usage trends, measure key metrics, and derive actionable business insights.
- Set up alerts to minimize Mean Time to Detect (MTTD) and Mean Time to Resolve (MTTR) by quickly referencing relevant logs during troubleshooting.

## Data streams

The MongoDB Atlas integration collects logs and metrics.

Logs help you keep a record of events that happen on your machine. The `Log` data stream collected by MongoDB Atlas integration is `mongod_audit`.
Logs help you keep a record of events that happen on your machine. The `Log` data stream collected by MongoDB Atlas integration are `mongod_audit` and `mongod_database`.

Metrics give you insight into the statistics of the MongoDB Atlas. The `Metric` data stream collected by the MongoDB Atlas integration is `process` so that the user can monitor and troubleshoot the performance of the MongoDB Atlas instance.

Data streams:
- `mongod_audit`: The auditing facility allows administrators and users to track system activity for deployments with multiple users and applications. Mongod Audit logs capture events related to database operations such as insertions, updates, deletions, user authentication, etc., occurring within the mongod instances.

- `mongod_database`: This datastream collects a running log of events, including entries such as incoming connections, commands run, and issues encountered. Generally, database log messages are useful for diagnosing issues, monitoring your deployment, and tuning performance.

- `process` : This data stream collects host metrics per process for all the hosts of the specified group. Metrics like measurements for the host, such as CPU usage, number of I/O operations and memory are available on this data stream.

Note:
- Users can monitor and see the log inside the ingested documents for MongoDB Atlas in the `logs-*` index pattern from `Discover`.
- Users can monitor and see the logs and metrics inside the ingested documents for MongoDB Atlas in the `logs-*` index pattern from `Discover`.

## Prerequisites

Expand All @@ -40,9 +42,11 @@ You can store and search your data using Elasticsearch and visualize and manage

### Steps to obtain Public Key, Private Key and GroupId

1. Generate programmatic API Keys with project owner permissions using the instructions in the Atlas [documentation](https://www.mongodb.com/docs/atlas/configure-api-access/#create-an-api-key-for-a-project). Then, copy the public key and private key. These serve the same function as a username and API Key respectively.
2. Enable Database Auditing for the Atlas project you want to monitor logs. You can follow the instructions provided in this Atlas [document](https://www.mongodb.com/docs/atlas/database-auditing/#procedure).
3. You can find your Project ID (Group ID) in the Atlas UI. To do this, navigate to your project, click on Settings, and copy the Project ID (Group ID). You can also programmatically find it using the Atlas Admin API or Atlas CLI as described in this Atlas [document](https://www.mongodb.com/docs/atlas/app-services/apps/metadata/#find-a-project-id).
1. Generate programmatic API keys with `project owner` permissions by following the instructions in the Atlas [documentation](https://www.mongodb.com/docs/atlas/configure-api-access/#create-an-api-key-for-a-project). Then, copy the public and private keys which function as a username and API key respectively.
2. From the Atlas UI, go to Project Settings > Access Manager > API Keys and then click on Invite To Project to add the API key created above.
3. Add specific role to API keys, under Project Settings > Access Manager > API Keys. This step is important to make sure that these API keys have the right permissions to access the data without running into any issues. The specific role for each datastream is defined under data stream reference section.
4. Enable Database Auditing for the Atlas project you want to monitor logs. You can follow the instructions provided in this Atlas [document](https://www.mongodb.com/docs/atlas/database-auditing/#procedure).
5. You can find your Project ID (Group ID) in the Atlas UI. To do this, navigate to your project, click on Settings, and copy the Project ID (Group ID). You can also programmatically find it using the Atlas Admin API or Atlas CLI as described in this Atlas [document](https://www.mongodb.com/docs/atlas/app-services/apps/metadata/#find-a-project-id).

### Important terms of MongoDB Atlas API

Expand All @@ -61,7 +65,9 @@ Note: Both of above attributes can be set by using `period` in configuration par
6. Finally, save the integration.

Note:
- The `mongod_audit` data stream gathers historical data spanning the previous 30 minutes.
- The `mongod_audit` and `mongod_database` data streams gather historical data spanning the previous 30 minutes.
- We recommend setting an interval of five minutes or higher for collecting mongod audit and database logs, as MongoDB Atlas refreshes logs from the cluster's backend infrastructure at five minutes intervals as described in this Atlas [document](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v2/#tag/Monitoring-and-Logs/operation/getHostLogs).
- The logs collection from MongoDB Atlas does not support M0 free clusters, M2/M5 shared clusters, or serverless instances.
- Mongod: Mongod is the primary daemon method for the MongoDB system. It helps in handling the data requests, managing the data access, performing background management operations, and other core database operations.

## Troubleshooting
Expand All @@ -79,16 +85,24 @@ If you encounter an error while ingesting data, it might be due to the data coll

### Mongod Audit

This is the `mongod_audit` data stream. This data stream allows administrators and users to track system activity for deployments with multiple users and applications.
This is the `mongod_audit` data stream. This data stream allows administrators and users to track system activity for deployments with multiple users and applications. To collect audit logs, the requesting API Key must have the `Project Data Access Read Only` or higher role.

{{event "mongod_audit"}}

{{fields "mongod_audit"}}

### Mongod Database

This is the `mongod_database` data stream. This datastream collects a running log of events, including entries such as incoming connections, commands run, monitoring deployment, tuning performance, and issues encountered. To collect database logs, the requesting API Key must have the `Project Data Access Read Only` or higher role.

{{event "mongod_database"}}

{{fields "mongod_database"}}

## Metrics reference

### Process
This data stream collects host metrics per process for all the hosts of the specified group. Metrics like measurements for the host, such as CPU usage, number of I/O operations and memory are available on this data stream.
This data stream collects host metrics per process for all the hosts of the specified group. Metrics like measurements for the host, such as CPU usage, number of I/O operations and memory are available on this data stream. To collect process metrics, the requesting API Key must have the `Project Read Only` role.

{{event "process"}}

Expand Down
Empty file modified packages/mongodb_atlas/_dev/deploy/docker/Dockerfile
100644 → 100755
Empty file.
Empty file modified packages/mongodb_atlas/_dev/deploy/docker/docker-compose.yml
100644 → 100755
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{ "t": { "$date": "2024-03-20T19:17:06.188+00:00" }, "s": "W", "c": "CONTROL", "id": 22120, "ctx": "initandlisten", "msg": "Access control is not enabled for the database. Read and write access to data and configuration is unrestricted", "tags": [ "startupWarnings" ] }
{ "t": { "$date": "2024-02-18T14:45:23.512+00:00" }, "s": "I", "c": "NETWORK", "id": 67890, "ctx": "conn123", "msg": "Client connection accepted", "tags": [ "connection" ] }
{ "t": { "$date": "2024-02-22T10:20:05.933+00:00" }, "s": "E", "c": "STORAGE", "id": 13579, "ctx": "journal", "msg": "Journal file not found", "tags": [ "journalError" ] }
{ "t": { "$date": "2024-02-25T16:55:36.124+00:00" }, "s": "I", "c": "NETWORK", "id": 24680, "ctx": "conn456", "msg": "Client disconnected", "tags": [ "connection" ] }
{ "t": { "$date": "2024-02-28T09:12:50.007+00:00" }, "s": "E", "c": "QUERY", "id": 98765, "ctx": "queryExecutor", "msg": "Query execution failed", "tags": [ "queryError" ] }
5 changes: 5 additions & 0 deletions packages/mongodb_atlas/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# newer versions go on top
- version: "0.0.3"
changes:
- description: MongoDB Atlas integration package with "mongod_database" data stream.
type: enhancement
link: https://github.com/elastic/integrations/pull/9539
- version: "0.0.2"
changes:
- description: MongoDB Atlas integration package with "mongod_audit" data stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,80 +28,72 @@ auth.digest:
password: {{private_key}}
resource.url: {{url}}
state:
groupId: {{groupId}}
group_id: {{groupId}}
want_more: false
page_num: 1
redact:
fields: ~
program: |
(
(
has(state.hostlist) && size(state.hostlist) > 0
?
state
:
(
state.page_num != 1
?
state
:
state.with({
"startDate": state.?cursor.last_timestamp.orValue(int(now - duration("30m"))),
"endDate": int(now)
})).as(state, state.with(request("GET", state.url + "/api/atlas/v2/groups/" + state.groupId + "/processes?pageNum=" + string(state.page_num) + "&itemsPerPage=100").with({
"Header": {
"Accept": ["application/vnd.atlas." + string(now.getFullYear()) + "-01-01+gzip"]
}
}).do_request().as(resp,
bytes(resp.Body).decode_json().as(body, {
"hostlist": body.results.map(e, e.hostname),
"next": 0,
"page_num": body.links.exists_one(res,res.rel=="next") ? (int(state.page_num)+1) : 1
})
)))
).as(state, state.next < size(state.hostlist)
?
(
request("GET", state.url + "/api/atlas/v2/groups/" + state.groupId + "/clusters/" + state.hostlist[state.next] + "/logs/mongodb-audit-log.gz?startDate=" + string(int(state.startDate)) + "&endDate=" + string(int(state.endDate))
).with({
"Header": {
"Accept": ["application/vnd.atlas." + string(now.getFullYear()) + "-01-01+gzip"]
}
}).do_request().as(resp, (
resp.StatusCode == 200 && resp.ContentLength != 0
?
string(bytes(resp.Body).mime('application/gzip')).as(body,{
"events": body.trim_space().split("\n").map(value,{"message":value, "host_name": state.hostlist[state.next]}),
"cursor": {
"last_timestamp": state.endDate
},
"hostlist": (int(state.next)+1) < size(state.hostlist) ? state.hostlist : [],
"next": (int(state.next)+1) < size(state.hostlist) ? (int(state.next)+1) : 0,
"want_more": ((int(state.next)+1) < size(state.hostlist) || state.page_num != 1),
"page_num": state.page_num,
"startDate": state.startDate,
"endDate": state.endDate,
"groupId": state.groupId,
})
(
has(state.hostlist) && size(state.hostlist) > 0 ?
state
:
(
state.page_num != 1 ?
state
:
{
// If data is not available during this period, or if the host is unreachable,
// an event will be generated with the following message, and this event will be dropped during pipeline processing.
// keep this message in a sync with pipeline as it is being used there.
"events": [{"message":"No data for given time period or host is unreachable"}],
"cursor": {
"last_timestamp": state.endDate
},
"hostlist": (int(state.next)+1) < size(state.hostlist) ? state.hostlist : [],
"next": (int(state.next)+1) < size(state.hostlist) ? (int(state.next)+1) : 0,
"want_more": ((int(state.next)+1) < size(state.hostlist) || state.page_num != 1),
"page_num": state.page_num,
"startDate": state.startDate,
"endDate": state.endDate,
"groupId": state.groupId,
state.with({
"startDate": state.?cursor.last_timestamp.orValue(int(now - duration("30m"))),
"endDate": int(now)
})
).as(state, state.with(request("GET", state.url + "/api/atlas/v2/groups/" + state.group_id + "/processes?pageNum=" + string(state.page_num) + "&itemsPerPage=100").with({
"Header": {
"Accept": ["application/vnd.atlas." + string(now.getFullYear()) + "-01-01+json"]
}
}).do_request().as(resp, bytes(resp.Body).decode_json().as(body, {
"hostlist": body.results.map(e, e.hostname),
"next": 0,
"page_num": body.links.exists_one(res,res.rel=="next") ? (int(state.page_num)+1) : 1
}))))
).as(state, state.next >= size(state.hostlist) ? {} :
(
request("GET", state.url + "/api/atlas/v2/groups/" + state.group_id + "/clusters/" + state.hostlist[state.next] + "/logs/mongodb-audit-log.gz?startDate=" + string(int(state.startDate)) + "&endDate=" + string(int(state.endDate))
).with({
"Header": {
"Accept": ["application/vnd.atlas." + string(now.getFullYear()) + "-01-01+gzip"]
}
)))
:
{}
)
)
}).do_request().as(resp, (
resp.StatusCode == 200 && resp.ContentLength != 0
?
string(bytes(resp.Body).mime('application/gzip')).as(body,{
"events": body.trim_space().split("\n").map(value,{"message":value, "host_name": state.hostlist[state.next]}),
"cursor": {
"last_timestamp": state.endDate
},
"hostlist": (int(state.next)+1) < size(state.hostlist) ? state.hostlist : [],
"next": (int(state.next)+1) < size(state.hostlist) ? (int(state.next)+1) : 0,
"want_more": ((int(state.next)+1) < size(state.hostlist) || state.page_num != 1),
"page_num": state.page_num,
"startDate": state.startDate,
"endDate": state.endDate,
"group_id": state.group_id,
})
:
{
// If data is not available during this period, or if the host is unreachable,
// an event will be generated with the following message, and this event will be dropped during pipeline processing.
// keep this message in a sync with pipeline as it is being used there.
"events": [{"message":"No data for given time period or host is unreachable"}],
"cursor": {
"last_timestamp": state.endDate
},
"hostlist": (int(state.next)+1) < size(state.hostlist) ? state.hostlist : [],
"next": (int(state.next)+1) < size(state.hostlist) ? (int(state.next)+1) : 0,
"want_more": ((int(state.next)+1) < size(state.hostlist) || state.page_num != 1),
"page_num": state.page_num,
"startDate": state.startDate,
"endDate": state.endDate,
"group_id": state.group_id,
}
)))
)
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,31 @@ processors:
field: json.uuid.$type
target_field: mongodb_atlas.mongod_audit.uuid.type
ignore_missing: true
- convert:
field: json.result
type: string
ignore_missing: true
- script:
description: Provide information of the error code.
lang: painless
ignore_failure: true
description: Maps error codes to descriptive values
tag: informative_error_code
on_failure:
- append:
field: error.message
value: 'Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}'
params:
error_codes:
"0": "Success"
"13": "Unauthorized to perform the operation"
"18": "Authentication Failed"
"26": "Namespace Not Found"
"276": "Index build aborted"
"334": "Unauthorized to perform the operation"
source: |
Map m = new HashMap();
m.put(0, "Success");
m.put(13, "Unauthorized to perform the operation");
m.put(18, "Authentication Failed");
m.put(26, "Namespace Not Found");
m.put(276, "Index build aborted");
m.put(334, "Unauthorized to perform the operation");
ctx.mongodb_atlas.mongod_audit.result = m.get(ctx.json.result);
String value = ctx.json?.result;
if (value != null) {
ctx.mongodb_atlas.mongod_audit.result = params.error_codes.getOrDefault(value, null);
}
- script:
lang: painless
source: |-
Expand All @@ -139,7 +151,7 @@ processors:
return false;
}
drop(ctx);
description: Drops null/empty values recursively.
description: Drops null and empty values recursively from the Elasticsearch document context.
- remove:
field:
- event.original
Expand All @@ -149,6 +161,7 @@ processors:
field:
- json
ignore_missing: true
description: Removes temporary fields.
- set:
field: event.kind
value: pipeline_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
- name: input.type
type: keyword
description: Type of Filebeat input.
- name: tags
type: keyword
description: List of keywords used to tag each event.
- name: '@timestamp'
type: date
description: Event timestamp.
Loading