8000 Add fifo queue support by mumia · Pull Request #120 · uecode/qpush-bundle · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add fifo queue support #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 51 additions & 29 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,31 @@ Queue Options
Each queue can have their own options that determine how messages are published or received.
The options and their descriptions are listed below.

+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| Option | Description | Default Value |
+==========================+===========================================================================================+===============+
| ``queue_name`` | The name used to describe the queue on the Provider's side | ``null`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| ``push_notifications`` | Whether or not to POST notifications to subscribers of a Queue | ``false`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| ``notification_retries`` | How many attempts notifications are resent in case of errors - if supported | ``3`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| ``message_delay`` | Time in seconds before a published Message is available to be read in a Queue | ``0`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| ``message_timeout`` | Time in seconds a worker has to delete a Message before it is available to other workers | ``30`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| ``message_expiration`` | Time in seconds that Messages may remain in the Queue before being removed | ``604800`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| ``messages_to_receive`` | Maximum amount of messages that can be received when polling the queue | ``1`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| ``receive_wait_time`` | If supported, time in seconds to leave the polling request open - for long polling | ``3`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| ``subscribers`` | An array of Subscribers, containing an ``endpoint`` and ``protocol`` | ``empty`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| Option | Description | Default Value |
+=================================+============================================================================================+===============+
| ``queue_name`` | The name used to describe the queue on the Provider's side | ``null`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``push_notifications`` | Whether or not to POST notifications to subscribers of a Queue | ``false`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``notification_retries`` | How many attempts notifications are resent in case of errors - if supported | ``3`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``message_delay`` | Time in seconds before a published Message is available to be read in a Queue | ``0`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``message_timeout`` | Time in seconds a worker has to delete a Message before it is available to other workers | ``30`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``message_expiration`` | Time in seconds that Messages may remain in the Queue before being removed | ``604800`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``messages_to_receive`` | Maximum amount of messages that can be received when polling the queue | ``1`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``receive_wait_time`` | If supported, time in seconds to leave the polling request open - for long polling | ``3`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``fifo`` | If supported (only aws), sets queue into FIFO mode | ``false`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``content_based_deduplication`` | If supported (only aws), turns on automatic deduplication id based on the message content | ``false`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``subscribers`` | An array of Subscribers, containing an ``endpoint`` and ``protocol`` | ``empty`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+

Symfony Application as a Subscriber
-----------------------------------
Expand Down Expand Up @@ -131,14 +135,32 @@ A working configuration would look like the following
my_queue_key:
provider: ironmq #or aws or in_band or another_aws_provider
options:
queue_name: my_actual_queue_name
push_notifications: true
notification_retries: 3
message_delay: 0
message_timeout: 30
message_expiration: 604800
messages_to_receive: 1
receive_wait_time: 3
queue_name: my_actual_queue_name
push_notifications: true
notification_retries: 3
message_delay: 0
message_timeout: 30
message_expiration: 604800
messages_to_receive: 1
receive_wait_time: 3
fifo: false
content_based_deduplication: false
subscribers:
- { endpoint: http://example1.com/, protocol: http }
- { endpoint: http://example2.com/, protocol: http }
my_fifo_queue_key:
provider: aws
options:
queue_name: my_actual_queue_name.fifo
push_notifications: true
notification_retries: 3
message_delay: 0
message_timeout: 30
message_expiration: 604800
messages_to_receive: 1
receive_wait_time: 3
fifo: true
content_based_deduplication: true
subscribers:
- { endpoint: http://example1.com/, protocol: http }
- { endpoint: http://example2.com/, protocol: http }
8 changes: 8 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ private function getQueuesNode()
->info('How many push requests per second will be triggered. -1 for unlimited, 0 disables push')
->example(1)
->end()
->booleanNode('fifo')
->defaultFalse()
->info('If the queue is FIFO (aws)')
->end()
->booleanNode('content_based_deduplication')
->defaultFalse()
->info('If the FIFO queue has content based deduplication (aws)')
->end()
->append($this->getSubscribersNode())
->end()
->end()
Expand Down
44 changes: 31 additions & 13 deletions src/DependencyInjection/UecodeQPushExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,22 @@
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
use Symfony\Component\Config\FileLocator;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\DependencyInjection\Exception\ServiceNotFoundException;
use RuntimeException;
use Exception;

/**
* @author Keith Kirk <kkirk@undergroundelephant.com>
*/
class UecodeQPushExtension extends Extension
{
/**
* @param array $configs
* @param ContainerBuilder $container
*
* @throws RuntimeException|InvalidArgumentException|ServiceNotFoundException
*/
public function load(array $configs, ContainerBuilder $container)
{
$configuration = new Configuration();
Expand Down Expand Up @@ -112,9 +122,10 @@ public function load(array $configs, ContainerBuilder $container)
]
);

if (!empty($values['options']['queue_name'])
&& $config['providers'][$provider]['driver'] == 'aws'
) {
$isProviderAWS = $config['providers'][$provider]['driver'] === 'aws';
$isQueueNameSet = isset($values['options']['queue_name']) && !empty($values['options']['queue_name']);

if ($isQueueNameSet && $isProviderAWS) {
$definition->addTag(
'uecode_qpush.event_listener',
[
Expand All @@ -123,6 +134,12 @@ public function load(array $configs, ContainerBuilder $container)
'priority' => 255
]
);

// Check queue name ends with ".fifo"
$isQueueNameFIFOReady = preg_match("/$(?<=(\.fifo))/", $values['options']['queue_name']) === 1;
if ($values['options']['fifo'] === true && !$isQueueNameFIFOReady) {
throw new InvalidArgumentException('Queue name must end with ".fifo" on AWS FIFO queues');
}
}

$name = sprintf('uecode_qpush.%s', $queue);
Expand All @@ -139,6 +156,8 @@ public function load(array $configs, ContainerBuilder $container)
* @param ContainerBuilder $container The container
* @param string $name The provider key
*
* @throws RuntimeException
*
* @return Reference
*/
private function createAwsClient($config, ContainerBuilder $container, $name)
Expand All @@ -150,9 +169,7 @@ private function createAwsClient($config, ContainerBuilder $container, $name)
$aws2 = class_exists('Aws\Common\Aws');
$aws3 = class_exists('Aws\Sdk');
if (!$aws2 && !$aws3) {
throw new \RuntimeException(
'You must require "aws/aws-sdk-php" to use the AWS provider.'
);
throw new RuntimeException('You must require "aws/aws-sdk-php" to use the AWS provider.');
}

$awsConfig = [
Expand Down Expand Up @@ -186,8 +203,7 @@ private function createAwsClient($config, ContainerBuilder $container, $name)

$aws->setArguments([$awsConfig]);

$container->setDefinition($service, $aws)
->setPublic(false);
$container->setDefinition($service, $aws)->setPublic(false);
}

return new Reference($service);
Expand All @@ -200,6 +216,8 @@ private function createAwsClient($config, ContainerBuilder $container, $name)
* @param ContainerBuilder $container The container
* @param string $name The provider key
*
* @throws RuntimeException
*
* @return Reference
*/
private function createIronMQClient($config, ContainerBuilder $container, $name)
Expand All @@ -209,9 +227,7 @@ private function createIronMQClient($config, ContainerBuilder $container, $name)
if (!$container->hasDefinition($service)) {

if (!class_exists('IronMQ\IronMQ')) {
throw new \RuntimeException(
'You must require "iron-io/iron_mq" to use the Iron MQ provider.'
);
throw new RuntimeException('You must require "iron-io/iron_mq" to use the Iron MQ provider.');
}

$ironmq = new Definition('IronMQ\IronMQ');
Expand All @@ -225,13 +241,15 @@ private function createIronMQClient($config, ContainerBuilder $container, $name)
]
]);

$container->setDefinition($service, $ironmq)
->setPublic(false);
$container->setDefinition($service, $ironmq)->setPublic(false);
}

return new Reference($service);
}

/**
* @return Reference
*/
private function createSyncClient()
{
return new Reference('event_dispatcher');
Expand Down
Loading
0