Skip to content

Commit

Permalink
feat(pipes-targets): add EventBridge (aws#30654)
Browse files Browse the repository at this point in the history
Add EventBridge event bus as a Pipes target.
  • Loading branch information
msambol authored Oct 7, 2024
1 parent 4ada3ea commit 842f49a
Show file tree
Hide file tree
Showing 16 changed files with 34,187 additions and 5 deletions.
40 changes: 36 additions & 4 deletions packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ Pipe targets are the end point of a EventBridge Pipe.

The following targets are supported:

1. `targets.SqsTarget`: [Send event source to a Queue](#amazon-sqs)
1. `targets.SqsTarget`: [Send event source to an SQS queue](#amazon-sqs)
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions-state-machine)
3. `targets.LambdaFunction`: [Send event source to a Lambda Function](#aws-lambda-function)
4. `targets.ApiDestinationTarget`: [Send event source to an EventBridge API Destination](#amazon-eventbridge-api-destination)
3. `targets.LambdaFunction`: [Send event source to a Lambda function](#aws-lambda-function)
4. `targets.ApiDestinationTarget`: [Send event source to an EventBridge API destination](#amazon-eventbridge-api-destination)
5. `targets.KinesisTarget`: [Send event source to a Kinesis data stream](#amazon-kinesis-data-stream)
6. `targets.EventBridgeTarget`: [Send event source to an EventBridge event bus](#amazon-eventbridge-event-bus)

### Amazon SQS

Expand Down Expand Up @@ -100,7 +101,6 @@ const pipeTarget = new targets.SfnStateMachine(targetStateMachine,
}
);


const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
Expand Down Expand Up @@ -241,3 +241,35 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
target: streamTarget,
});
```

### Amazon EventBridge Event Bus

An event bus can be used as a target for a pipe. The event bus will receive the (enriched/filtered) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetEventBus: events.EventBus;

const eventBusTarget = new targets.EventBridgeTarget(targetEventBus);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: eventBusTarget,
});
```

The input to the target event bus can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetEventBus: events.EventBus;

const eventBusTarget = new targets.EventBridgeTarget(targetEventBus, {
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: eventBusTarget,
});
```
132 changes: 132 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/event-bridge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { Token } from 'aws-cdk-lib';
import { IEventBus } from 'aws-cdk-lib/aws-events';
import { IRole } from 'aws-cdk-lib/aws-iam';

/**
* EventBridge target properties.
*/
export interface EventBridgeTargetParameters {
/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default - none
*/
readonly inputTransformation?: IInputTransformation;

/**
* A free-form string used to decide what fields to expect in the event detail.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargeteventbridgeeventbusparameters.html#cfn-pipes-pipe-pipetargeteventbridgeeventbusparameters-detailtype
* @default - none
*/
readonly detailType?: string;

/**
* The URL subdomain of the endpoint.
*
* @example
* // if the URL for the endpoint is https://abcde.veo.endpoints.event.amazonaws.com
* 'abcde.veo'
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargeteventbridgeeventbusparameters.html#cfn-pipes-pipe-pipetargeteventbridgeeventbusparameters-endpointid
* @default - none
*/
readonly endpointId?: string;

/**
* AWS resources, identified by Amazon Resource Name (ARN), which the event primarily concerns.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargeteventbridgeeventbusparameters.html#cfn-pipes-pipe-pipetargeteventbridgeeventbusparameters-resources
* @default - none
*/
readonly resources?: string[];

/**
* The source of the event.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargeteventbridgeeventbusparameters.html#cfn-pipes-pipe-pipetargeteventbridgeeventbusparameters-source
* @default - none
*/
readonly source?: string;

/**
* The time stamp of the event, per RFC3339.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargeteventbridgeeventbusparameters.html#cfn-pipes-pipe-pipetargeteventbridgeeventbusparameters-time
* @default - the time stamp of the PutEvents call
*/
readonly time?: string;
}

/**
* An EventBridge Pipes target that sends messages to an EventBridge event bus.
*/
export class EventBridgeTarget implements ITarget {
private eventBus: IEventBus;
private eventBridgeParameters?: EventBridgeTargetParameters;
public readonly targetArn: string;

constructor(eventBus: IEventBus, parameters?: EventBridgeTargetParameters) {
this.eventBus = eventBus;
this.targetArn = eventBus.eventBusArn;
if (parameters) {
this.eventBridgeParameters = parameters;
for (const validate of [validateDetailType, validateEndpointId, validateSource, validateTime]) {
validate(parameters);
}
}
}

grantPush(grantee: IRole): void {
this.eventBus.grantPutEventsTo(grantee);
}

bind(pipe: IPipe): TargetConfig {
if (!this.eventBridgeParameters) {
return {
targetParameters: {},
};
}

return {
targetParameters: {
inputTemplate: this.eventBridgeParameters.inputTransformation?.bind(pipe).inputTemplate,
eventBridgeEventBusParameters: this.eventBridgeParameters,
},
};
}
}

function validateDetailType({ detailType }: EventBridgeTargetParameters) {
if (detailType !== undefined && !Token.isUnresolved(detailType)) {
if (detailType.length < 1 || detailType.length > 128) {
throw new Error(`Detail type must be between 1 and 128 characters, received ${detailType.length}`);
}
}
}

function validateEndpointId({ endpointId }: EventBridgeTargetParameters) {
if (endpointId !== undefined && !Token.isUnresolved(endpointId)) {
if (endpointId.length < 1 || endpointId.length > 50) {
throw new Error(`Endpoint id must be between 1 and 50 characters, received ${endpointId.length}`);
}
}
}

function validateSource({ source }: EventBridgeTargetParameters) {
if (source !== undefined && !Token.isUnresolved(source)) {
if (source.length < 1 || source.length > 256) {
throw new Error(`Source must be between 1 and 256 characters, received ${source.length}`);
}
}
}

function validateTime({ time }: EventBridgeTargetParameters) {
if (time !== undefined && !Token.isUnresolved(time)) {
if (time.length < 1 || time.length > 256) {
throw new Error(`Time must be between 1 and 256 characters, received ${time.length}`);
}
}
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './api-destination';
export * from './event-bridge';
export * from './kinesis';
export * from './lambda';
export * from './sqs';
Expand Down
3 changes: 2 additions & 1 deletion packages/@aws-cdk/aws-pipes-targets-alpha/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@
"aws-cdk-lib": "0.0.0",
"constructs": "^10.0.0",
"@aws-cdk/aws-pipes-alpha": "0.0.0",
"@aws-cdk/integ-tests-alpha": "0.0.0"
"@aws-cdk/integ-tests-alpha": "0.0.0",
"@aws-cdk/aws-pipes-sources-alpha": "0.0.0"
},
"dependencies": {},
"peerDependencies": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`EventBridge should grant pipe role push access 1`] = `
{
"MyPipeRoleCBC8E9AB": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`EventBridge should grant pipe role push access 2`] = `
{
"MyPipeRoleDefaultPolicy31387C20": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "events:PutEvents",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyEventBus251E60F8",
"Arn",
],
},
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
"Roles": [
{
"Ref": "MyPipeRoleCBC8E9AB",
},
],
},
"Type": "AWS::IAM::Policy",
},
}
`;
Loading

0 comments on commit 842f49a

Please sign in to comment.