Skip to content

Commit

Permalink
Add features to task queue functions (#1423)
Browse files Browse the repository at this point in the history
* augment task context interface & pass in headers

* update changelog

* update docstrings & handle header edge cases
  • Loading branch information
blidd-google authored Jun 26, 2023
1 parent 3e1b5ca commit aa55ea6
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Add features to task queue functions. (#1423)
49 changes: 47 additions & 2 deletions spec/common/providers/tasks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ describe("onEnqueueHandler", () => {
function mockEnqueueRequest(
data: unknown,
contentType = "application/json",
context: { authorization?: string } = { authorization: "Bearer abc" }
context: { authorization?: string } = { authorization: "Bearer abc" },
headers: Record<string, string> = {}
): ReturnType<typeof mockRequest> {
return mockRequest(data, contentType, context);
return mockRequest(data, contentType, context, headers);
}

before(() => {
Expand Down Expand Up @@ -194,6 +195,50 @@ describe("onEnqueueHandler", () => {
});
});

it("should populate context with values from header", () => {
const headers = {
"x-cloudtasks-queuename": "x",
"x-cloudtasks-taskname": "x",
"x-cloudtasks-taskretrycount": "1",
"x-cloudtasks-taskexecutioncount": "1",
"x-cloudtasks-tasketa": "timestamp",
"x-cloudtasks-taskpreviousresponse": "400",
"x-cloudtasks-taskretryreason": "something broke",
};
const expectedContext = {
queueName: "x",
id: "x",
retryCount: 1,
executionCount: 1,
scheduledTime: "timestamp",
previousResponse: 400,
retryReason: "something broke",
};

const projectId = getApp().options.projectId;
const idToken = generateIdToken(projectId);
return runTaskTest({
httpRequest: mockEnqueueRequest(
{},
"application/json",
{ authorization: "Bearer " + idToken },
headers
),
expectedData: {},
taskFunction: (data, context) => {
checkAuthContext(context, projectId, mocks.user_id);
expect(context).to.include(expectedContext);
return null;
},
taskFunction2: (request) => {
checkAuthContext(request, projectId, mocks.user_id);
expect(request).to.include(expectedContext);
return null;
},
expectedStatus: 204,
});
});

it("should handle auth", async () => {
const projectId = getApp().options.projectId;
const idToken = generateIdToken(projectId);
Expand Down
5 changes: 5 additions & 0 deletions spec/v1/providers/tasks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ describe("#onDispatch", () => {
uid: "abc",
token: "token" as any,
},
queueName: "fn",
id: "task0",
retryCount: 0,
executionCount: 0,
scheduledTime: "timestamp",
};
let done = false;
const cf = taskQueue().onDispatch((d, c) => {
Expand Down
91 changes: 82 additions & 9 deletions src/common/providers/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,72 @@ export interface TaskContext {
* The result of decoding and verifying an ODIC token.
*/
auth?: AuthData;

/**
* The name of the queue.
* Populated via the `X-CloudTasks-QueueName` header.
*/
queueName: string;

/**
* The "short" name of the task, or, if no name was specified at creation, a unique
* system-generated id.
* This is the my-task-id value in the complete task name, ie, task_name =
* projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id.
* Populated via the `X-CloudTasks-TaskName` header.
*/
id: string;

/**
* The number of times this task has been retried.
* For the first attempt, this value is 0. This number includes attempts where the task failed
* due to 5XX error codes and never reached the execution phase.
* Populated via the `X-CloudTasks-TaskRetryCount` header.
*/
retryCount: number;

/**
* The total number of times that the task has received a response from the handler.
* Since Cloud Tasks deletes the task once a successful response has been received, all
* previous handler responses were failures. This number does not include failures due to 5XX
* error codes.
* Populated via the `X-CloudTasks-TaskExecutionCount` header.
*/
executionCount: number;

/**
* The schedule time of the task, as an RFC 3339 string in UTC time zone.
* Populated via the `X-CloudTasks-TaskETA` header, which uses seconds since January 1 1970.
*/
scheduledTime: string;

/**
* The HTTP response code from the previous retry.
* Populated via the `X-CloudTasks-TaskPreviousResponse` header
*/
previousResponse?: number;

/**
* The reason for retrying the task.
* Populated via the `X-CloudTasks-TaskRetryReason` header.
*/
retryReason?: string;

/**
* Raw request headers.
*/
headers?: Record<string, string>;
}

/**
* The request used to call a Task Queue function.
* The request used to call a task queue function.
*/
export interface Request<T = any> {
export type Request<T = any> = TaskContext & {
/**
* The parameters used by a client when calling this function.
*/
data: T;

/**
* The result of decoding and verifying an ODIC token.
*/
auth?: AuthData;
}
};

type v1TaskHandler = (data: any, context: TaskContext) => void | Promise<void>;
type v2TaskHandler<Req> = (request: Request<Req>) => void | Promise<void>;
Expand All @@ -119,7 +169,30 @@ export function onDispatchHandler<Req = any>(
throw new https.HttpsError("invalid-argument", "Bad Request");
}

const context: TaskContext = {};
const headers: Record<string, string> = {};
for (const [key, value] of Object.entries(req.headers)) {
if (!Array.isArray(value)) {
headers[key] = value;
}
}

const context: TaskContext = {
queueName: req.header("X-CloudTasks-QueueName"),
id: req.header("X-CloudTasks-TaskName"),
retryCount: req.header("X-CloudTasks-TaskRetryCount")
? Number(req.header("X-CloudTasks-TaskRetryCount"))
: undefined,
executionCount: req.header("X-CloudTasks-TaskExecutionCount")
? Number(req.header("X-CloudTasks-TaskExecutionCount"))
: undefined,
scheduledTime: req.header("X-CloudTasks-TaskETA"),
previousResponse: req.header("X-CloudTasks-TaskPreviousResponse")
? Number(req.header("X-CloudTasks-TaskPreviousResponse"))
: undefined,
retryReason: req.header("X-CloudTasks-TaskRetryReason"),
headers,
};

if (!process.env.FUNCTIONS_EMULATOR) {
const authHeader = req.header("Authorization") || "";
const token = authHeader.match(/^Bearer (.*)$/)?.[1];
Expand Down

0 comments on commit aa55ea6

Please sign in to comment.