Skip to content

Commit

Permalink
feat: Implement experimental AWS ECS resource attributes (#1083)
Browse files Browse the repository at this point in the history
* feat: implement experimental AWS ECS resource attributes

Implement the experimental AWS ECS resource attributes [1] using the Metadata v4 Endpoint.

[1] https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/ecs/

* test: increase test coverage using nock to test HTTP fetch

* bug: remove leftover code

* fix: lint

* fix: address review comments
  • Loading branch information
Michele Mancioppi authored Sep 29, 2022
1 parent 3522660 commit bea8a55
Show file tree
Hide file tree
Showing 8 changed files with 833 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,68 @@
*/

import { diag } from '@opentelemetry/api';
import {
Detector,
Resource,
ResourceDetectionConfig,
} from '@opentelemetry/resources';
import { Detector, Resource } from '@opentelemetry/resources';
import {
CloudProviderValues,
CloudPlatformValues,
SemanticResourceAttributes,
} from '@opentelemetry/semantic-conventions';
import * as http from 'http';
import * as util from 'util';
import * as fs from 'fs';
import * as os from 'os';
import { getEnv } from '@opentelemetry/core';

const HTTP_TIMEOUT_IN_MS = 1000;

interface AwsLogOptions {
readonly 'awslogs-region'?: string;
readonly 'awslogs-group'?: string;
readonly 'awslogs-stream'?: string;
}

/**
* The AwsEcsDetector can be used to detect if a process is running in AWS
* ECS and return a {@link Resource} populated with data about the ECS
* plugins of AWS X-Ray. Returns an empty Resource if detection fails.
*/
export class AwsEcsDetector implements Detector {
readonly CONTAINER_ID_LENGTH = 64;
readonly DEFAULT_CGROUP_PATH = '/proc/self/cgroup';
static readonly CONTAINER_ID_LENGTH = 64;
static readonly DEFAULT_CGROUP_PATH = '/proc/self/cgroup';

private static readFileAsync = util.promisify(fs.readFile);

async detect(_config?: ResourceDetectionConfig): Promise<Resource> {
async detect(): Promise<Resource> {
const env = getEnv();
if (!env.ECS_CONTAINER_METADATA_URI_V4 && !env.ECS_CONTAINER_METADATA_URI) {
diag.debug('AwsEcsDetector failed: Process is not on ECS');
return Resource.empty();
}

const hostName = os.hostname();
const containerId = await this._getContainerId();

return !hostName && !containerId
? Resource.empty()
: new Resource({
[SemanticResourceAttributes.CLOUD_PROVIDER]: CloudProviderValues.AWS,
[SemanticResourceAttributes.CLOUD_PLATFORM]:
CloudPlatformValues.AWS_ECS,
[SemanticResourceAttributes.CONTAINER_NAME]: hostName || '',
[SemanticResourceAttributes.CONTAINER_ID]: containerId || '',
});
let resource = new Resource({
[SemanticResourceAttributes.CLOUD_PROVIDER]: CloudProviderValues.AWS,
[SemanticResourceAttributes.CLOUD_PLATFORM]: CloudPlatformValues.AWS_ECS,
}).merge(await AwsEcsDetector._getContainerIdAndHostnameResource());

const metadataUrl = getEnv().ECS_CONTAINER_METADATA_URI_V4;
if (metadataUrl) {
const [containerMetadata, taskMetadata] = await Promise.all([
AwsEcsDetector._getUrlAsJson(metadataUrl),
AwsEcsDetector._getUrlAsJson(`${metadataUrl}/task`),
]);

const metadatav4Resource = await AwsEcsDetector._getMetadataV4Resource(
containerMetadata,
taskMetadata
);
const logsResource = await AwsEcsDetector._getLogResource(
containerMetadata
);

resource = resource.merge(metadatav4Resource).merge(logsResource);
}

return resource;
}

/**
Expand All @@ -68,22 +86,146 @@ export class AwsEcsDetector implements Detector {
* we do not throw an error but throw warning message
* and then return null string
*/
private async _getContainerId(): Promise<string | undefined> {
private static async _getContainerIdAndHostnameResource(): Promise<Resource> {
const hostName = os.hostname();

let containerId = '';
try {
const rawData = await AwsEcsDetector.readFileAsync(
this.DEFAULT_CGROUP_PATH,
AwsEcsDetector.DEFAULT_CGROUP_PATH,
'utf8'
);
const splitData = rawData.trim().split('\n');
for (const str of splitData) {
if (str.length > this.CONTAINER_ID_LENGTH) {
return str.substring(str.length - this.CONTAINER_ID_LENGTH);
if (str.length > AwsEcsDetector.CONTAINER_ID_LENGTH) {
containerId = str.substring(
str.length - AwsEcsDetector.CONTAINER_ID_LENGTH
);
break;
}
}
} catch (e) {
diag.warn(`AwsEcsDetector failed to read container ID: ${e.message}`);
diag.warn('AwsEcsDetector failed to read container ID', e);
}

if (hostName || containerId) {
return new Resource({
[SemanticResourceAttributes.CONTAINER_NAME]: hostName || '',
[SemanticResourceAttributes.CONTAINER_ID]: containerId || '',
});
}

return Resource.empty();
}

private static async _getMetadataV4Resource(
containerMetadata: any,
taskMetadata: any
): Promise<Resource> {
const launchType: string = taskMetadata['LaunchType'];
const taskArn: string = taskMetadata['TaskARN'];

const baseArn: string = taskArn.substring(0, taskArn.lastIndexOf(':'));
const cluster: string = taskMetadata['Cluster'];

const clusterArn = cluster.startsWith('arn:')
? cluster
: `${baseArn}:cluster/${cluster}`;

const containerArn: string = containerMetadata['ContainerARN'];

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/semantic_conventions/resource/cloud_provider/aws/ecs.yaml
return new Resource({
[SemanticResourceAttributes.AWS_ECS_CONTAINER_ARN]: containerArn,
[SemanticResourceAttributes.AWS_ECS_CLUSTER_ARN]: clusterArn,
[SemanticResourceAttributes.AWS_ECS_LAUNCHTYPE]:
launchType?.toLowerCase(),
[SemanticResourceAttributes.AWS_ECS_TASK_ARN]: taskArn,
[SemanticResourceAttributes.AWS_ECS_TASK_FAMILY]: taskMetadata['Family'],
[SemanticResourceAttributes.AWS_ECS_TASK_REVISION]:
taskMetadata['Revision'],
});
}

private static async _getLogResource(
containerMetadata: any
): Promise<Resource> {
if (
containerMetadata['LogDriver'] !== 'awslogs' ||
!containerMetadata['LogOptions']
) {
return Resource.EMPTY;
}
return undefined;

const containerArn = containerMetadata['ContainerARN']!;
const logOptions = containerMetadata['LogOptions'] as AwsLogOptions;

const logsRegion =
logOptions['awslogs-region'] ||
AwsEcsDetector._getRegionFromArn(containerArn);

const awsAccount = AwsEcsDetector._getAccountFromArn(containerArn);

const logsGroupName = logOptions['awslogs-group']!;
const logsGroupArn = `arn:aws:logs:${logsRegion}:${awsAccount}:log-group:${logsGroupName}`;
const logsStreamName = logOptions['awslogs-stream']!;
const logsStreamArn = `arn:aws:logs:${logsRegion}:${awsAccount}:log-group:${logsGroupName}:log-stream:${logsStreamName}`;

return new Resource({
[SemanticResourceAttributes.AWS_LOG_GROUP_NAMES]: [logsGroupName],
[SemanticResourceAttributes.AWS_LOG_GROUP_ARNS]: [logsGroupArn],
[SemanticResourceAttributes.AWS_LOG_STREAM_NAMES]: [logsStreamName],
[SemanticResourceAttributes.AWS_LOG_STREAM_ARNS]: [logsStreamArn],
});
}

private static _getAccountFromArn(containerArn: string): string {
const match = /arn:aws:ecs:[^:]+:([^:]+):.*/.exec(containerArn);
return match![1];
}

private static _getRegionFromArn(containerArn: string): string {
const match = /arn:aws:ecs:([^:]+):.*/.exec(containerArn);
return match![1];
}

private static _getUrlAsJson(url: string): Promise<any> {
return new Promise<string>((resolve, reject) => {
const request = http.get(url, (response: http.IncomingMessage) => {
if (response.statusCode && response.statusCode >= 400) {
reject(
new Error(
`Request to '${url}' failed with status ${response.statusCode}`
)
);
}
/*
* Concatenate the response out of chunks:
* https://nodejs.org/api/stream.html#stream_event_data
*/
let responseBody = '';
response.on(
'data',
(chunk: Buffer) => (responseBody += chunk.toString())
);
// All the data has been read, resolve the Promise
response.on('end', () => resolve(responseBody));
/*
* https://nodejs.org/api/http.html#httprequesturl-options-callback, see the
* 'In the case of a premature connection close after the response is received'
* case
*/
request.on('error', reject);
});

// Set an aggressive timeout to prevent lock-ups
request.setTimeout(HTTP_TIMEOUT_IN_MS, () => {
request.destroy();
});
// Connection error, disconnection, etc.
request.on('error', reject);
request.end();
}).then(responseBodyRaw => JSON.parse(responseBodyRaw));
}
}

Expand Down
Loading

0 comments on commit bea8a55

Please sign in to comment.