dowork is a general purpose task queueing system for Go programs. It queues, executes, and reschedules tasks in a Goroutine in-process.
A global task queue is provided for simple use-cases. To use it:
import (
"context"
"git.sr.ht/~sircmpwn/dowork"
)
work.Submit(func(ctx context.Context) error {
// ...do work...
return nil
})
This task will be executed in the background. The first time a task is submitted to the global queue, it will be initialized and start running in the background.
To customize options like maximum retries and timeouts, use work.Enqueue:
task := work.NewTask(func(ctx context.Context) error {
// ...
}).Retries(5).MaxTimeout(10 * time.Minute)
work.Enqueue(task)
task := work.NewTask(func(ctx context.Context) error {
// ...
}).
Retries(5). // Maximum number of attempts
MaxTimeout(10 * time.Minute). // Maximum timeout between attempts
Within(10 * time.Second). // Deadline for each attempt
After(func(ctx context.Context, task *work.Task) {
// Executed once the task completes, successful or not
})
work.Enqueue(task)
Retries are conducted with an exponential backoff.
You may also manage your own work queues. Use NewQueue()
to obtain a queue,
(*Queue).Dispatch()
to execute all overdue tasks, and (*Queue).Start()
to
spin up a goroutine and start dispatching tasks automatically.
Use work.Shutdown()
or (*Queue).Shutdown()
to perform a soft shutdown of the
queue, which will stop accepting new tasks and block until all already-queued
tasks complete.
No such functionality is provided OOTB, but you may find this package useful in doing the actual queueing work for your own distributed work queue. Such an integeration is left as an exercise to the reader.
Instrumentation is provided via Prometheus and the
client_golang library. Relevant metrics are prefixed with
queue_
in the metric name.
A common pattern for soft restarting web servers is to shut down the http.Server, allowing the new web server process to start up and begin accepting new connections, then allow the queue to finish executing any pending tasks before terminating the process. If this describes your program, note that you may want to provide Prometheus metrics on a secondary http.Server on a random port, so that you may monitor the queue shutdown. Something similar to the following will set up a secondary HTTP server for this purpose:
import (
"log"
"net"
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
mux := &http.ServeMux{}
mux.Handle("/metrics", promhttp.Handler())
server := &http.Server{Handler: mux}
listen, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
log.Printf("Prometheus listening on :%d", listen.Addr().(*net.TCPAddr).Port)
go server.Serve(listen)