8000 GitHub - silviucpp/beanstalkd-consumer: Erlang consumer framework for beanstalkd work queue
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

silviucpp/beanstalkd-consumer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

73 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

beanstalkd-consumer

Build Status GitHub Hex.pm

Erlang Consumer Framework for Beanstalkd Work Queue.

Project Overview

This framework provides a simple and configurable solution for managing a pool of consumers that execute jobs from a Beanstalkd server.

Key Features

  • Easily configurable to launch a pool of consumers for job execution.
  • Supports limiting the number of concurrent jobs.
  • Ensures all in-progress jobs are completed before the consumer shuts down.
  • Prevents duplicate execution of jobs.

Job Lifecycle

  • Reservation: When a job is reserved, it is first buried before being sent for processing.
  • Successful Completion: If the job executes successfully without exceptions, it is deleted.
  • Failed Execution: If the job execution fails (any exception occurred), based on the exception will behave in the following way:
    • If a job throws a {bad_argument, Reason::any()} exception, it is deleted (useful for malformed job payloads).
    • If a job throws a reschedule_job exception, it is moved back to the ready state to be retried by the consumer.
    • For any other exception, it remains in the buried state for manual review.
  • Delete/Kick Operations:All kick or delete operations are handled in a separate process and queued. If the connection to the server is lost, these operations are preserved and not discarded.

In case you are using the server fork from here you can also reschedule the job after a certain number of seconds or using a backoff exponential algorithm (0, 1, 2, 3, 4 seconds) limited to a specific number of attempts. You can do this using:

  • beanstalkd_consumer:throw_reschedule_job/1
  • beanstalkd_consumer:throw_reschedule_job_backoff/1

Quick start

All consumers need to implement the beanstalkd_consumer behaviour.

-callback init(TubeName::binary()) ->
    {ok, State::any()}.

-callback process(JobId::non_neg_integer(), JobPayload::binary(), State::any()) ->
    any().

Example:

-module(dummy_consumer).

-export([
    init/1, 
    process/3
]).

init(TubeName) ->
    io:format(<<"~p:init: ~p ~n">>, [?MODULE, TubeName]),
    {ok, #{}}.

process(JobId, JobPayload, State)  ->
    io:format(<<"~p:process: ~p ~n">>, [?MODULE, {JobId, JobPayload, State}]).

You can define the consumer pools into sys.config as fallow:

[
    {beanstalkd_consumer, [
        {servers, [
            {default_server, [
                {start_at_startup, true},
                {connection_info, [{host, {127,0,0,1}}, {port, 11300}, {timeout, 5000}]},
                {queues_number, 1},
                {consumers, [
                    {consumer_silviu, [
                        {instances, 1},
                        {callbacks, [
                            {<<"tube_name">>, dummy_consumer}
                        ]},
                        {concurrent_jobs, 1000}
                    ]}
                ]}
            ]}
        ]}
    ]
}].

Where

  • start_at_startup - specify if the consuming of messages should start right away when the application is started. In case you have the beanstalkd_consumer as dependency, and you need to load more other stuffs into your current app before starting consuming events, you can put this property on false and use beanstalkd_consumer_app:start_consumers/0 to start the consumers.
  • connection_info - connection details. See ebeanstalkd for details.
  • queues_number - number of processes that will handle the delete operations. Those are queued in case the connection to the server is not up and are sent again once connection is established.

For each consumer:

  • instances - number of consumer instances.
  • callbacks - [{Tube, Module}]. Each item in the list is formed from the tub name and the module that will handle the jobs for that tube.
  • concurrent_jobs - how many concurrent jobs can run in parallel.

About

Erlang consumer framework for beanstalkd work queue

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

0