Erlang Consumer Framework for Beanstalkd Work Queue.
This framework provides a simple and configurable solution for managing a pool of consumers that execute jobs from a Beanstalkd server.
- Easily configurable to launch a pool of consumers for job execution.
- Supports limiting the number of concurrent jobs.
- Ensures all in-progress jobs are completed before the consumer shuts down.
- Prevents duplicate execution of jobs.
- Reservation: When a job is reserved, it is first buried before being sent for processing.
- Successful Completion: If the job executes successfully without exceptions, it is deleted.
- Failed Execution: If the job execution fails (any exception occurred), based on the exception will behave in the following way:
- If a job throws a
{bad_argument, Reason::any()}
exception, it is deleted (useful for malformed job payloads). - If a job throws a
reschedule_job
exception, it is moved back to the ready state to be retried by the consumer. - For any other exception, it remains in the buried state for manual review.
- If a job throws a
- Delete/Kick Operations:All kick or delete operations are handled in a separate process and queued. If the connection to the server is lost, these operations are preserved and not discarded.
In case you are using the server fork from here you can also reschedule the job after a certain number of seconds or using a backoff exponential algorithm (0, 1, 2, 3, 4 seconds) limited to a specific number of attempts. You can do this using:
beanstalkd_consumer:throw_reschedule_job/1
beanstalkd_consumer:throw_reschedule_job_backoff/1
All consumers need to implement the beanstalkd_consumer
behaviour.
-callback init(TubeName::binary()) ->
{ok, State::any()}.
-callback process(JobId::non_neg_integer(), JobPayload::binary(), State::any()) ->
any().
Example:
-module(dummy_consumer).
-export([
init/1,
process/3
]).
init(TubeName) ->
io:format(<<"~p:init: ~p ~n">>, [?MODULE, TubeName]),
{ok, #{}}.
process(JobId, JobPayload, State) ->
io:format(<<"~p:process: ~p ~n">>, [?MODULE, {JobId, JobPayload, State}]).
You can define the consumer pools into sys.config
as fallow:
[
{beanstalkd_consumer, [
{servers, [
{default_server, [
{start_at_startup, true},
{connection_info, [{host, {127,0,0,1}}, {port, 11300}, {timeout, 5000}]},
{queues_number, 1},
{consumers, [
{consumer_silviu, [
{instances, 1},
{callbacks, [
{<<"tube_name">>, dummy_consumer}
]},
{concurrent_jobs, 1000}
]}
]}
]}
]}
]
}].
Where
start_at_startup
- specify if the consuming of messages should start right away when the application is started. In case you have thebeanstalkd_consumer
as dependency, and you need to load more other stuffs into your current app before starting consuming events, you can put this property onfalse
and usebeanstalkd_consumer_app:start_consumers/0
to start the consumers.connection_info
- connection details. See ebeanstalkd for details.queues_number
- number of processes that will handle thedelete
operations. Those are queued in case the connection to the server is not up and are sent again once connection is established.
For each consumer:
instances
- number of consumer instances.callbacks
-[{Tube, Module}]
. Each item in the list is formed from the tub name and the module that will handle the jobs for that tube.concurrent_jobs
- how many concurrent jobs can run in parallel.