Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline binding to wrangler.toml #6674

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/angry-keys-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"wrangler": minor
---

Added new [[pipelines]] bindings. This creates a new binding that allows sending events to
the specified pipeline.

Example:

[[pipelines]]
binding = "MY_PIPELINE"
pipeline = "my-pipeline"
2 changes: 2 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ packages/create-cloudflare/templates/**/*.*
# but still exclude the worker-configuration.d.ts file, since it's generated
!packages/create-cloudflare/templates/hello-world/**/*.*
packages/create-cloudflare/templates/hello-world/**/worker-configuration.d.ts
# dist-functions are generated in the fixtures/vitest-pool-workers-examples/pages-functions-unit-integration-self folder
dist-functions

vscode.d.ts
vscode.*.d.ts
Expand Down
109 changes: 109 additions & 0 deletions packages/wrangler/src/__tests__/configuration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ describe("normalizeAndValidateConfig()", () => {
upload_source_maps: undefined,
placement: undefined,
tail_consumers: undefined,
pipelines: [],
});
expect(diagnostics.hasErrors()).toBe(false);
expect(diagnostics.hasWarnings()).toBe(false);
Expand Down Expand Up @@ -3181,6 +3182,114 @@ describe("normalizeAndValidateConfig()", () => {
});
});

describe("[pipelines]", () => {
it("should error if pipelines is an object", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: {} },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got {}."
`);
});

it("should error if pipelines is a string", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: "BAD" },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got \\"BAD\\"."
`);
});

it("should error if pipelines is a number", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: 999 },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got 999."
`);
});

it("should error if pipelines is null", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: null },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got null."
`);
});

it("should accept valid bindings", () => {
const { diagnostics } = normalizeAndValidateConfig(
{
pipelines: [
{
binding: "VALID",
pipeline: "343cd4f1d58c42fbb5bd082592fd7143",
},
],
} as unknown as RawConfig,
undefined,
{ env: undefined }
);

expect(diagnostics.hasErrors()).toBe(false);
});

it("should error if pipelines.bindings are not valid", () => {
const { diagnostics } = normalizeAndValidateConfig(
{
pipelines: [
{},
{
binding: "VALID",
pipeline: "343cd4f1d58c42fbb5bd082592fd7143",
},
{ binding: 2000, project: 2111 },
],
} as unknown as RawConfig,
undefined,
{ env: undefined }
);
expect(diagnostics.renderWarnings()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- Unexpected fields found in pipelines[2] field: \\"project\\""
`);
expect(diagnostics.hasWarnings()).toBe(true);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- \\"pipelines[0]\\" bindings should have a string \\"binding\\" field but got {}.
- \\"pipelines[0]\\" bindings must have a \\"pipeline\\" field but got {}.
- \\"pipelines[2]\\" bindings should have a string \\"binding\\" field but got {\\"binding\\":2000,\\"project\\":2111}.
- \\"pipelines[2]\\" bindings must have a \\"pipeline\\" field but got {\\"binding\\":2000,\\"project\\":2111}."
`);
});
});

describe("[unsafe.bindings]", () => {
it("should error if unsafe is an array", () => {
const { diagnostics } = normalizeAndValidateConfig(
Expand Down
37 changes: 37 additions & 0 deletions packages/wrangler/src/__tests__/deploy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10524,6 +10524,43 @@ export default{
});
});

describe("pipelines", () => {
it("should upload pipelines bindings", async () => {
writeWranglerToml({
pipelines: [
{
binding: "MY_PIPELINE",
pipeline: "0123456789ABCDEF0123456789ABCDEF",
},
],
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest({
expectedBindings: [
{
type: "pipelines",
name: "MY_PIPELINE",
id: "0123456789ABCDEF0123456789ABCDEF",
},
],
});

await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
"Total Upload: xx KiB / gzip: xx KiB
Worker Startup Time: 100 ms
Your worker has access to the following bindings:
- Pipelines:
- MY_PIPELINE: 0123456789ABCDEF0123456789ABCDEF
Uploaded test-name (TIMINGS)
Deployed test-name triggers (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Current Version ID: Galaxy-Class"
`);
});
});

describe("--keep-vars", () => {
it("should send keepVars when keep-vars is passed in", async () => {
vi.stubEnv("CLOUDFLARE_API_TOKEN", "hunter2");
Expand Down
1 change: 1 addition & 0 deletions packages/wrangler/src/__tests__/type-generation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ const bindingsConfigMock: Omit<
},
{ type: "CompiledWasm", globs: ["**/*.wasm"], fallthrough: true },
],
pipelines: [],
};

describe("generateTypes()", () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ function createWorkerBundleFormData(
text_blobs: undefined,
data_blobs: undefined,
dispatch_namespaces: undefined,
pipelines: undefined,
logfwdr: undefined,
unsafe: undefined,
experimental_assets: undefined,
Expand Down
2 changes: 2 additions & 0 deletions packages/wrangler/src/api/startDevWorker/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
CfLogfwdrBinding,
CfModule,
CfMTlsCertificate,
CfPipeline,
CfQueue,
CfR2Bucket,
CfScriptFormat,
Expand Down Expand Up @@ -261,6 +262,7 @@ export type Binding =
| ({ type: "analytics_engine" } & Omit<CfAnalyticsEngineDataset, "binding">)
| ({ type: "dispatch_namespace" } & Omit<CfDispatchNamespace, "binding">)
| ({ type: "mtls_certificate" } & Omit<CfMTlsCertificate, "binding">)
| ({ type: "pipeline" } & Omit<CfPipeline, "binding">)
| ({ type: "logfwdr" } & Omit<CfLogfwdrBinding, "name">)
| { type: `unsafe_${string}` }
| { type: "assets" };
Expand Down
10 changes: 10 additions & 0 deletions packages/wrangler/src/api/startDevWorker/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ export function convertCfWorkerInitBindingstoBindings(
output[info["binding"]] = { type: "assets" };
break;
}
case "pipelines": {
for (const { binding, ...x } of info) {
output[binding] = { type: "pipeline", ...x };
}
break;
}
default: {
assertNever(type);
}
Expand Down Expand Up @@ -282,6 +288,7 @@ export async function convertBindingsToCfWorkerInitBindings(
logfwdr: undefined,
unsafe: undefined,
experimental_assets: undefined,
pipelines: undefined,
};

const fetchers: Record<string, ServiceFetch> = {};
Expand Down Expand Up @@ -354,6 +361,9 @@ export async function convertBindingsToCfWorkerInitBindings(
} else if (binding.type === "mtls_certificate") {
bindings.mtls_certificates ??= [];
bindings.mtls_certificates.push({ ...binding, binding: name });
} else if (binding.type === "pipeline") {
bindings.pipelines ??= [];
bindings.pipelines.push({ ...binding, binding: name });
} else if (binding.type === "logfwdr") {
bindings.logfwdr ??= { bindings: [] };
bindings.logfwdr.bindings.push({ ...binding, name: name });
Expand Down
1 change: 1 addition & 0 deletions packages/wrangler/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,5 @@ export const defaultWranglerConfig: Config = {
},
mtls_certificates: [],
tail_consumers: undefined,
pipelines: [],
};
17 changes: 17 additions & 0 deletions packages/wrangler/src/config/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,23 @@ export interface EnvironmentNonInheritable {
/** Details about the outbound Worker which will handle outbound requests from your namespace */
outbound?: DispatchNamespaceOutbound;
}[];

/**
* Specifies list of Pipelines bound to this Worker environment
*
* NOTE: This field is not automatically inherited from the top level environment,
* and so must be specified in every named environment.
*
* @default `[]`
* @nonInheritable
*/
pipelines: {
/** The binding name used to refer to the bound service. */
binding: string;

/** Name of the Pipeline to bind */
pipeline: string;
}[];
}

/**
Expand Down
15 changes: 13 additions & 2 deletions packages/wrangler/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import type { NormalizeAndValidateConfigArgs } from "./validation";

export type {
Config,
RawConfig,
ConfigFields,
DevConfig,
RawConfig,
RawDevConfig,
} from "./config";
export type {
ConfigModuleRuleType,
Environment,
RawEnvironment,
ConfigModuleRuleType,
} from "./environment";

type ReadConfigCommandArgs = NormalizeAndValidateConfigArgs & {
Expand Down Expand Up @@ -232,6 +232,7 @@ export function printBindings(bindings: CfWorkerInit["bindings"]) {
wasm_modules,
dispatch_namespaces,
mtls_certificates,
pipelines,
} = bindings;

if (data_blobs !== undefined && Object.keys(data_blobs).length > 0) {
Expand Down Expand Up @@ -443,6 +444,16 @@ export function printBindings(bindings: CfWorkerInit["bindings"]) {
});
}

if (pipelines?.length) {
output.push({
type: "Pipelines",
entries: pipelines.map(({ binding, pipeline }) => ({
key: binding,
value: pipeline,
})),
});
}

if (version_metadata !== undefined) {
output.push({
type: "Worker Version Metadata",
Expand Down
45 changes: 45 additions & 0 deletions packages/wrangler/src/config/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,16 @@ function normalizeAndValidateEnvironment(
validateAIBinding(envName),
undefined
),
pipelines: notInheritable(
diagnostics,
topLevelEnv,
rawConfig,
rawEnv,
envName,
"pipelines",
validateBindingArray(envName, validatePipelineBinding),
[]
),
version_metadata: notInheritable(
diagnostics,
topLevelEnv,
Expand Down Expand Up @@ -2201,6 +2211,7 @@ const validateUnsafeBinding: ValidatorFn = (diagnostics, field, value) => {
"service",
"logfwdr",
"mtls_certificate",
"pipeline",
];

if (safeBindings.includes(value.type)) {
Expand Down Expand Up @@ -3103,6 +3114,40 @@ const validateConsumer: ValidatorFn = (diagnostics, field, value, _config) => {
return isValid;
};

const validatePipelineBinding: ValidatorFn = (diagnostics, field, value) => {
if (typeof value !== "object" || value === null) {
diagnostics.errors.push(
`"pipeline" bindings should be objects, but got ${JSON.stringify(value)}`
);
return false;
}
let isValid = true;
// Pipeline bindings must have a binding and a pipeline.
if (!isRequiredProperty(value, "binding", "string")) {
diagnostics.errors.push(
`"${field}" bindings should have a string "binding" field but got ${JSON.stringify(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`"${field}" bindings should have a string "binding" field but got ${JSON.stringify(
`"${field}" bindings must have a string "binding" field but got ${JSON.stringify(

value
)}.`
);
isValid = false;
}
if (!isRequiredProperty(value, "pipeline", "string")) {
diagnostics.errors.push(
`"${field}" bindings must have a "pipeline" field but got ${JSON.stringify(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`"${field}" bindings must have a "pipeline" field but got ${JSON.stringify(
`"${field}" bindings must have a string "pipeline" field but got ${JSON.stringify(

value
)}.`
);
isValid = false;
}

validateAdditionalProperties(diagnostics, field, Object.keys(value), [
"binding",
"pipeline",
]);

return isValid;
};

function normalizeAndValidateLimits(
diagnostics: Diagnostics,
topLevelEnv: Environment | undefined,
Expand Down
Loading
Loading