Backlite provides type-safe, persistent and embedded task queues meant to run within your application as opposed to an external message broker. A task can be of any type and each type declares its own queue along with the configuration for the queue. Tasks are automatically executed in the background via a configurable worker pool.
This project started shortly after migrating Pagoda to SQLite from Postgres and Redis. Redis was previously used to handle task queues and I wanted to leverage SQLite instead. Originally, goqite was chosen, which is a great library and one that I took inspiration from, but I had a lot of different ideas for the overall approach and it lacked a number of features that I felt were needed.
River, an excellent, similar library built for Postgres, was also a major source of inspiration and ideas.
More to come as the web UI is under active development.
This project is under active development and is not yet recommended for production use though all features outlined below are available.
Install by simply running: go get github.com/mikestefanello/backlite
No need to deal with serialization and byte slices. By leveraging generics, tasks and queues are completely type-safe which means that you pass in your task type into a queue and your task processor callbacks will only receive that given type.
When tasks are added to a queue, they are inserted into a SQLite database table to ensure persistence.
Each queue can have completed tasks retained in a separate table for archiving, auditing, monitoring, etc. Options exist to retain all completed tasks or only those that failed all attempts. An option also exists to retain the task data for all tasks or only those that failed.
Each queue can be configured to retry tasks a certain number of times and to backoff a given amount of time between each attempt.
When adding a task to a queue, you can specify a duration or specific time to wait until executing the task.
Optionally log queue operations with a logger of your choice, as long as it implements the simple Logger
interface, which log/slog
does.
2024/07/21 14:08:13 INFO task processed id=0190d67a-d8da-76d4-8fb8-ded870d69151 queue=example duration=85.101µs attempt=1
While processing a given task, it's easy to create another task in the same or a different queue, which allows you to nest tasks to create workflows. Use FromContext()
with the provided context to get your initialized client from within the task processor, and add one or many tasks.
The task dispatcher, which handles sending tasks to the worker pool for execution, can be shutdown gracefully by calling Stop()
on the client. That will wait for all workers to finish for as long as the passed in context is not cancelled. The hard-stop the dispatcher, cancel the context passed in when calling Start()
. See usage below.
Task creation can be added to a given database transaction. If you are using SQLite as your primary database, this provides a simple, robust way to ensure data integrity. For example, using the eCommerce app example, when inserting a new order into your database, the same transaction to be used to add a task to send an order notification email, and they either both succeed or both fail. Use the chained method Tx()
to provide your transaction when adding one or multiple tasks.
Since SQLite only supports one writer, no continuous database polling is required. The task dispatcher is able to remain aware of new tasks and keep track of when future tasks are scheduled for, and thus only queries the database when it needs to.
Use any SQLite driver that you'd like. This library only includes go-sqlite3 since it is used in tests.
Insert one or many tasks across one or many queues in a single operation.
Each queue can be configured with an execution timeout for processing a given task. The provided context will cancel after the time elapses. If you want to respect that timeout, your processor code will have to listen for the context cancellation.
When creating a client, you can specify the amount of goroutines to use to build a worker pool. This pool is created and shutdown via the dispatcher by calling Start()
and Stop()
on the client. The worker pool is the only way to process tasks; they cannot be pulled manually.
A simple web UI to monitor running, upcoming, and completed tasks is provided but under active development.
To run, pass your *sql.DB
to ui.NewHandler()
and provide that to an HTTP server, for example:
err := http.ListenAndServe(":9000", ui.NewHandler(db))
Then visit the given port and/or domain in your browser (ie, localhost:9000
).
The web CSS and JS are provided by tabler.
First, open a connection to your SQLite database using the driver of your choice:
db, err := sql.Open("sqlite3", "data.db?_journal=WAL&_timeout=5000")
Second, initialize a client:
client, err := backlite.NewClient(backlite.ClientConfig{
DB: db,
Logger: slog.Default(),
ReleaseAfter: 10 * time.Minute,
NumWorkers: 10,
CleanupInterval: time.Hour,
})
The configuration options are:
- DB: The database connection.
- Logger: A logger that implements the
Logger
interface. Omit if you do not want to log. - ReleaseAfter: The duration after which tasks claimed and passed for execution should be added back to the queue if a response was never received.
- NumWorkers: The amount of goroutines to open which will process queued tasks.
- CleanupInterval: How often the completed tasks database table will attempt to remove expired rows.
Until a more robust system is provided, to install the database schema, call client.Install()
. This must be done prior to using the client. It is safe to call this if the schema was previously installed. The schema is currently defined in internal/query/schema.sql
.
Any type can be a task as long as it implements the Task
interface, which requires only the Config() QueueConfig
method, used to provide information about the queue that these tasks will be added to. As an example, this is a task used to send new order email notifications:
type NewOrderEmailTask struct {
OrderID string
EmailAddress string
}
Then implement the Task
interface by providing the queue configuration:
func (t NewOrderEmailTask) Config() backlite.QueueConfig {
return backlite.QueueConfig{
Name: "NewOrderEmail",
MaxAttempts: 5,
Backoff: 5 * time.Second,
Timeout: 10 * time.Second,
Retention: &backlite.Retention{
Duration: 6 * time.Hour,
OnlyFailed: false,
Data: &backlite.RetainData{
OnlyFailed: true,
},
},
}
}
The configuration options are:
- Name: The name of the queue. This must be unique otherwise registering the queue will fail.
- MaxAttempts: The maximum number of times to try executing this task before it's consider failed and marked as complete.
- Backoff: The amount of time to wait before retrying after a failed attempt at processing.
- Retention: If provided, completed tasks will be retained in the database in a separate table according to the included options.
- Duration: How long to retain completed tasks in the database for. Omit to never expire.
- OnlyFailed: If true, only failed tasks will be retained.
- Data: If provided, the task data (the serialized task itself) will be retained.
- OnlyFailed: If true, the task data will only be retained for failed tasks.
The easiest way to implement a queue and define the processor is to use backlite.NewQueue()
. This leverages generics in order to provide type-safety with a given task type. Using the example above:
processor := func(ctx context.Context, task NewOrderEmailTask) error {
return email.Send(ctx, task.EmailAddress, fmt.Sprintf("Order %s received", task.OrderID))
}
queue := backlite.NewQueue[NewOrderEmailTask](processor)
The parameter is the processor callback which is what will be called by the dispatcher worker pool to execute the task. If no error is returned, the task is considered successfully executed. If the task fails all attempts and the queue has retention enabled, the value of the error will be stored in the database.
The provided context will be set to timeout at the duration set in the queue settings, if provided. To get the client from the context, you can call client := backlite.FromContext(ctx)
.
You must register all queues with the client by calling client.Register(queue)
. This will panic if duplicate queue names are registered.
To add a task to the queue, simply pass one or many into client.Add()
. You can provide tasks of different types. This returns a chainable operation which contains many options, that can be used as follows:
err := client.
Add(task1, task2).
Ctx(ctx).
Tx(tx).
At(time.Date(2024, 1, 5, 12, 30, 00)).
Wait(15 * time.Minute).
Save()
Only Add()
and Save()
are required. Don't use At()
and Wait()
together as they override each other.
The options are:
- Ctx: Provide a context to use for the operation.
- Tx: Provide a database transaction to add the tasks to. You must commit this yourself then call
client.Notify()
to tell the dispatcher that the new task(s) were added. This may be improved in the future but for now it is required. - At: Don't execute this task until at least the given date and time.
- Wait: Wait at least the given duration before executing the task.
To start the dispatcher, which will spin up the worker pool and begin executing tasks in the background, call client.Start()
. The context you pass in must persist for as long as you want the dispatcher to continue working. If that is ever cancelled, the dispatcher will shutdown. See the next section for more details.
To gracefully shutdown the dispatcher, which will wait until all tasks currently being executed are finished, call client.Stop()
. You can provide a context with a given timeout in order to give the shutdown process a set amount of time to gracefully shutdown. For example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client.Stop(ctx)
This will wait up to 5 seconds for all workers to complete the task they are currently working on.
If you want to hard-stop the dispatcher, cancel the context that was provided when calling client.Start()
.
- Finish Web UI
- Hooks
- Expand processor context to store attempt number, other data
- Avoid needing to call Notify() when using transaction
- Queue priority
- Better handling of database schema, migrations
- Store queue stats in a separate table?
- Pause/resume queues
- Benchmarks
- Expand testing