Skip to content

Commit

Permalink
convert queue worker to Service
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuawright11 committed Jul 23, 2024
1 parent 1486024 commit 6044e05
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 111 deletions.
23 changes: 10 additions & 13 deletions Alchemy/HTTP/Commands/ServeCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ struct ServeCommand: Command {
/// If enabled, handled requests won't be logged.
@Flag var quiet: Bool = false

private var bindAddress: BindAddress {
if let socket {
.unixDomainSocket(path: socket)
} else {
.hostname(host, port: port)
}
}

// MARK: Command

func run() async throws {
Expand All @@ -53,19 +61,8 @@ struct ServeCommand: Command {

// 3. start serving

@Inject var app: Application

app.addHTTPListener(
address: {
if let socket {
.unixDomainSocket(path: socket)
} else {
.hostname(host, port: port)
}
}(),
logResponses: !quiet
)
App.addHTTPListener(address: bindAddress, logResponses: !quiet)

try await app.lifecycle.runServices()
try await Life.runServices()
}
}
7 changes: 5 additions & 2 deletions Alchemy/Queue/Commands/WorkCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ struct WorkCommand: Command {
/// Should the scheduler run in process, scheduling any recurring work.
@Flag var schedule: Bool = false

var queue: Queue {
Container.require(id: name)
}

// MARK: Command

func run() async throws {
if schedule {
Schedule.start()
}

let queue: Queue = Container.require(id: name)
for _ in 0..<workers {
queue.startWorker(for: channels.components(separatedBy: ","))
}

try await gracefulShutdown()
try await Life.runServices()
}
}
2 changes: 1 addition & 1 deletion Alchemy/Queue/JobData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public struct JobData: Codable, Equatable {
payload: Data,
jobName: String,
channel: String,
attempts: Int,
attempts: Int = 0,
recoveryStrategy: Job.RecoveryStrategy,
backoff: TimeAmount,
backoffUntil: Date? = nil
Expand Down
104 changes: 9 additions & 95 deletions Alchemy/Queue/Queue.swift
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import AsyncAlgorithms
import ServiceLifecycle

/// Queue lets you run queued jobs to be processed in the background. Jobs are
/// persisted by the given `QueueProvider`.
public final class Queue: IdentifiedService {
public typealias Identifier = ServiceIdentifier<Queue>

/// The default channel to dispatch jobs on for all queues.
public static var defaultChannel = "default"
/// The default rate at which workers poll queues.
public static var defaultPollRate: TimeAmount = .seconds(1)

/// The ids of any workers associated with this queue and running in this
/// process.
public var workers: [String] = []
/// The provider backing this queue.
private let provider: QueueProvider

Expand All @@ -31,7 +29,6 @@ public final class Queue: IdentifiedService {
let data = JobData(payload: payload,
jobName: J.name,
channel: channel,
attempts: 0,
recoveryStrategy: job.recoveryStrategy,
backoff: job.retryBackoff)
try await provider.enqueue(data)
Expand All @@ -47,101 +44,18 @@ public final class Queue: IdentifiedService {
return nil
}

if let job = try await provider.dequeue(from: channel) {
return job
} else {
guard let job = try await provider.dequeue(from: channel) else {
return try await dequeue(from: Array(channels.dropFirst()))
}
}

public func shutdown() async throws {
try await provider.shutdown()
}

// MARK: Workers

/// Start a worker that dequeues and runs jobs from this queue.
///
/// - Parameters:
/// - channels: The channels this worker should monitor for
/// work. Defaults to `Queue.defaultChannel`.
/// - pollRate: The rate at which this worker should poll the
/// queue for new work. Defaults to `Queue.defaultPollRate`.
/// - untilEmpty: If true, workers will run all available jobs before
/// waiting to poll the queue again.
/// - eventLoop: The loop this worker will run on. Defaults to
/// your apps next available loop.
public func startWorker(for channels: [String] = [Queue.defaultChannel], pollRate: TimeAmount = Queue.defaultPollRate, untilEmpty: Bool = true, on eventLoop: EventLoop = LoopGroup.next()) {
let worker = eventLoop.queueId
Log.info("Starting worker \(worker)")
workers.append(worker)
_startWorker(for: channels, pollRate: pollRate, untilEmpty: untilEmpty, on: eventLoop)
return job
}

private func _startWorker(for channels: [String] = [Queue.defaultChannel], pollRate: TimeAmount = Queue.defaultPollRate, untilEmpty: Bool, on eventLoop: EventLoop = LoopGroup.next()) {
eventLoop.asyncSubmit { try await self.runNext(from: channels, untilEmpty: untilEmpty) }
.whenComplete { _ in
// Run check again in the `pollRate`.
eventLoop.scheduleTask(in: pollRate) {
self._startWorker(for: channels, pollRate: pollRate, untilEmpty: untilEmpty, on: eventLoop)
}
}
func complete(_ job: JobData, outcome: JobOutcome) async throws {
try await provider.complete(job, outcome: outcome)
}

private func runNext(from channels: [String], untilEmpty: Bool) async throws {
do {
guard let jobData = try await dequeue(from: channels) else {
return
}

Log.debug("Dequeued job \(jobData.jobName) from queue \(jobData.channel)")
try await execute(jobData)

if untilEmpty {
try await runNext(from: channels, untilEmpty: untilEmpty)
}
} catch {
Log.error("Error running job \(name(of: Self.self)) from `\(channels)`. \(error)")
throw error
}
}

private func execute(_ jobData: JobData) async throws {
var jobData = jobData
jobData.attempts += 1

func retry(ignoreAttempt: Bool = false) async throws {
if ignoreAttempt { jobData.attempts -= 1 }
jobData.backoffUntil = jobData.nextRetryDate
try await provider.complete(jobData, outcome: .retry)
}

var job: Job?
do {
job = try await Jobs.createJob(from: jobData)
let context = JobContext(queue: self, channel: jobData.channel, jobData: jobData)
try await job?.handle(context: context)
try await provider.complete(jobData, outcome: .success)
job?.finished(result: .success(()))
} catch where jobData.canRetry {
try await retry()
job?.failed(error: error)
} catch JobError.unknownJob(let name) {
let error = JobError.unknownJob(name)
// So that an old worker won't fail new, unrecognized jobs.
try await retry(ignoreAttempt: true)
job?.failed(error: error)
throw error
} catch {
try await provider.complete(jobData, outcome: .failed)
job?.finished(result: .failure(error))
job?.failed(error: error)
}
}
}

extension EventLoop {
fileprivate var queueId: String {
String(ObjectIdentifier(self).debugDescription.dropLast().suffix(6))
public func shutdown() async throws {
try await provider.shutdown()
}
}
96 changes: 96 additions & 0 deletions Alchemy/Queue/QueueWorker.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import AsyncAlgorithms

struct QueueWorker: Service, @unchecked Sendable {
let queue: Queue
var channels: [String] = [Queue.defaultChannel]
var pollRate: Duration = .seconds(1)
var untilEmpty: Bool = false

private var timer: some AsyncSequence {
AsyncTimerSequence(interval: pollRate, clock: ContinuousClock())
.cancelOnGracefulShutdown()
}

func run() async throws {
Log.info("Starting worker...")
for try await _ in timer {
try await runNext()
}
}

fileprivate func runNext() async throws {
do {
guard var jobData = try await queue.dequeue(from: channels) else {
return
}

Log.debug("Dequeued job \(jobData.jobName) from queue \(jobData.channel)")
try await execute(&jobData)
if untilEmpty {
try await runNext()
}
} catch {
Log.error("Error running job from `\(channels)`. \(error)")
throw error
}
}

private func execute(_ jobData: inout JobData) async throws {
var job: Job?
do {
jobData.attempts += 1
job = try await Jobs.createJob(from: jobData)
let context = JobContext(queue: queue, channel: jobData.channel, jobData: jobData)
try await job!.handle(context: context)
try await success(job: job!, jobData: jobData)
} catch where jobData.canRetry {
try await retry(jobData: &jobData)
job?.failed(error: error)
} catch JobError.unknownJob(let name) {
let error = JobError.unknownJob(name)
// So that an old worker won't fail new, unrecognized jobs.
try await retry(jobData: &jobData, ignoreAttempt: true)
job?.failed(error: error)
throw error
} catch {
try await queue.complete(jobData, outcome: .failed)
job?.finished(result: .failure(error))
job?.failed(error: error)
}
}

private func success(job: Job, jobData: JobData) async throws {
try await queue.complete(jobData, outcome: .success)
job.finished(result: .success(()))
}

private func retry(jobData: inout JobData, ignoreAttempt: Bool = false) async throws {
if ignoreAttempt { jobData.attempts -= 1 }
jobData.backoffUntil = jobData.nextRetryDate
try await queue.complete(jobData, outcome: .retry)
}
}

extension Queue {
/// Start a worker that dequeues and runs jobs from this queue.
///
/// - Parameters:
/// - channels: The channels this worker should monitor for
/// work. Defaults to `Queue.defaultChannel`.
/// - pollRate: The rate at which this worker should poll the
/// queue for new work. Defaults to `Queue.defaultPollRate`.
/// - untilEmpty: If true, workers will run all available jobs before
/// waiting to poll the queue again.
public func startWorker(for channels: [String] = [Queue.defaultChannel],
pollRate: Duration = .seconds(1),
untilEmpty: Bool = true) {
Life.addService(
QueueWorker(
queue: self,
channels: channels,
pollRate: pollRate,
untilEmpty: untilEmpty
)
)
}
}
3 changes: 3 additions & 0 deletions Alchemy/Services/Aliases.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ var Routes: Router { Container.require() }

/// Job registration.
var Jobs: JobRegistry { Container.require() }

/// The current application
var App: Application { Container.require() }

0 comments on commit 6044e05

Please sign in to comment.