Open
Description
When an exception is thrown in the com.rabbitmq.client.Consumer#handleDelivery
method, the consumer doesn't receive messages afterwards because the delivery thread is dead.
The test throwKillsConsumer
, will throw an exception when receiving the first message. The second message won't be delivered. This was hand to debug because no exception is logged about the dying thread.
The test noThrowIsFine
, delivers all messages as expected.
Click to expand
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.github.fridujo.rabbitmq.mock.MockConnectionFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
public class MockConnectionTest3 {
private static final String EXCHANGE = "exchange";
private static final String QUEUE = "queue";
@Test
void throwKillsConsumer() throws Exception {
var factory = new MockConnectionFactory();
var conn = factory.newConnection();
var channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE, false, true, true, Map.of());
channel.queueBind(QUEUE, EXCHANGE, "");
channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "throw".getBytes());
channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "complete".getBytes());
var queue = readFromQueue(conn);
assertEquals("throw", queue.poll(1, TimeUnit.SECONDS));
assertEquals("complete", queue.poll(1, TimeUnit.SECONDS));
}
@Test
void noThrowIsFine() throws Exception {
var factory = new MockConnectionFactory();
var conn = factory.newConnection();
var channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE, false, true, true, Map.of());
channel.queueBind(QUEUE, EXCHANGE, "");
channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "first".getBytes());
channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "complete".getBytes());
var queue = readFromQueue(conn);
assertEquals("first", queue.poll(1, TimeUnit.SECONDS));
assertEquals("complete", queue.poll(1, TimeUnit.SECONDS));
}
ArrayBlockingQueue<String> readFromQueue(Connection conn) throws IOException {
var channel = conn.createChannel();
var queue = new ArrayBlockingQueue<String>(3);
var consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
queue.put(new String(body));
} catch (InterruptedException e) {}
if (new String(body).equals("throw")) {
throw new RuntimeException("oops");
}
}
};
channel.basicConsume(QUEUE, true, consumer);
return queue;
}
}
Metadata
Metadata
Assignees
Labels
No labels