10000 Exception in Consumer breaks subscription · Issue #441 · fridujo/rabbitmq-mock · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
Exception in Consumer breaks subscription #441
Open
@jmattheis

Description

@jmattheis

When an exception is thrown in the com.rabbitmq.client.Consumer#handleDelivery method, the consumer doesn't receive messages afterwards because the delivery thread is dead.

The test throwKillsConsumer, will throw an exception when receiving the first message. The second message won't be delivered. This was hand to debug because no exception is logged about the dying thread.

The test noThrowIsFine, delivers all messages as expected.

Click to expand

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.github.fridujo.rabbitmq.mock.MockConnectionFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

public class MockConnectionTest3 {

    private static final String EXCHANGE = "exchange";
    private static final String QUEUE = "queue";

    @Test
    void throwKillsConsumer() throws Exception {
        var factory = new MockConnectionFactory();

        var conn = factory.newConnection();
        var channel = conn.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE, false, true, true, Map.of());
        channel.queueBind(QUEUE, EXCHANGE, "");

        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "throw".getBytes());
        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "complete".getBytes());

        var queue = readFromQueue(conn);
        assertEquals("throw", queue.poll(1, TimeUnit.SECONDS));
        assertEquals("complete", queue.poll(1, TimeUnit.SECONDS));
    }

    @Test
    void noThrowIsFine() throws Exception {
        var factory = new MockConnectionFactory();

        var conn = factory.newConnection();
        var channel = conn.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE, false, true, true, Map.of());
        channel.queueBind(QUEUE, EXCHANGE, "");

        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "first".getBytes());
        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "complete".getBytes());

        var queue = readFromQueue(conn);
        assertEquals("first", queue.poll(1, TimeUnit.SECONDS));
        assertEquals("complete", queue.poll(1, TimeUnit.SECONDS));
    }

    ArrayBlockingQueue<String> readFromQueue(Connection conn) throws IOException {
        var channel = conn.createChannel();
        var queue = new ArrayBlockingQueue<String>(3);

        var consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                try {
                    queue.put(new String(body));
                } catch (InterruptedException e) {}

                if (new String(body).equals("throw")) {
                    throw new RuntimeException("oops");
                }
            }
        };

        channel.basicConsume(QUEUE, true, consumer);
        return queue;
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0