A Guide to Spring AMQP RabbitMQ

In this post, I will introduce the basic use of Spring AMQP RabbitMQ.

Dependencies

Before using Spring AMQP, you need to add the following dependencies to your Spring Boot project:

// Gradle (Kotlin DSL)
implementation("org.springframework.boot:spring-boot-starter-amqp")

or

<!-- Maven -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Running RabbitMQ with Docker

docker run -d --name rabbitmq1 -p 5672:5672 -p 15672:15672 rabbitmq:4-management

Spring AMQP Configurations

Using the spring boot configuration file application.yml or application.properties

spring:  
rabbitmq:
username: ${RABBITMQ_USER}
password: ${RABBITMQ_PASSWORD}
port: ${RABBITMQ_PORT}

Or using ConnectionFactory Java Bean

@Configuration  
public class RabbitMqConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
}

Or override application.yml configuration by ConnectionFactory Java Bean

@Configuration
public class RabbitMqConfig {
@Bean
public ConnectionFactory connectionFactory(RabbitProperties rabbitProperties) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
return connectionFactory;
}
}

Connections and Channels

Spring AMQP manages the connections to the RabbitMQ broker. It will create a single connection and a pool of channels.

Unlike the Rabbit client Java API, which uses Channel objects to publish messages, in Spring AMQP, you use RabbitTemplate objects to publish messages.

You don’t need to care about the connections and channels, just use the RabbitTemplate to do most operations.

Declare exchanges, queues and bindings

Declare exchanges

@Configuration
public class MyRabbitMqExchange {

public static final String DIRECT_EXCHANGE_NAME = "spring-amqp-direct-exchange";
public static final String TOPIC_EXCHANGE_NAME = "spring-amqp-topic-exchange";
public static final String FANOUT_EXCHANGE_NAME = "spring-amqp-fanout-exchange";

@Bean
DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE_NAME);
}

@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE_NAME);
}

@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE_NAME);
}
}

Declare queues

@Configuration
public class MyRabbitMqQueue {

public static final String QUEUE_1_NAME = "spring-amqp-queue.1";
public static final String QUEUE_2_NAME = "spring-amqp-queue.2";
public static final String QUEUE_3_NAME = "spring-amqp-queue.3";

@Bean
Queue queue1() {
return new Queue(QUEUE_1_NAME, true);
}

@Bean
Queue queue2() {
return new Queue(QUEUE_2_NAME, true);
}

@Bean
Queue queue3() {
return new Queue(QUEUE_3_NAME, true);
}
}

Bindings between exchanges and queues

@Configuration
public class MyRabbitMqBinding {

public static final String DIRECT_ROUTING_KEY = "spring-amqp-queue.1";
public static final String TOPIC_ROUTING_KEY = "spring-amqp-queue.*";

@Bean
Binding directQueue1Binding(Queue queue1, DirectExchange directExchange) {
return BindingBuilder.bind(queue1).to(directExchange).with(DIRECT_ROUTING_KEY);
}

@Bean
Binding topicQueue1Binding(Queue queue1, TopicExchange topicExchange) {
return BindingBuilder.bind(queue1).to(topicExchange).with(TOPIC_ROUTING_KEY);
}

@Bean
Binding topicQueue2Binding(Queue queue2, TopicExchange topicExchange) {
return BindingBuilder.bind(queue2).to(topicExchange).with(TOPIC_ROUTING_KEY);
}

@Bean
Binding fanoutQueue1Binding(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}

@Bean
Binding fanoutQueue2Binding(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}

@Bean
Binding fanoutQueue3Binding(Queue queue3, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue3).to(fanoutExchange);
}
}

Publishing Messages

Publish messages

@Autowired
private RabbitTemplate rabbitTemplate;
String exchange = MyRabbitMqExchange.DIRECT_EXCHANGE_NAME;
String routingKey = MyRabbitMqBinding.DIRECT_ROUTING_KEY;
rabbitTemplate.convertAndSend(exchange, routingKey, "Hello, World!");

Consuming Messages

Asynchronous Consumer with MessageListener

Using MessageListener

For asynchronous Message reception, a dedicated component is involved. That component is a container for a Message-consuming callback.

@Configuration
public class MessageListenerConfig {

private String queueName = MyRabbitMqQueue.QUEUE_1_NAME;

@Bean
public MessageListener messageListener() {
return message -> {
// Process the received message
System.out.println("Received message: " + new String(message.getBody()));
};
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(this.queueName);
container.setMessageListener(messageListener());
return container;
}
}

Using MessageListenerAdapter

If you prefer to maintain a stricter separation between your application logic and the messaging API, you can rely upon an adapter implementation that is provided by the framework.

@Configuration
public class MessageListenerAdapterConfig {
String queueName = MyRabbitMqQueue.QUEUE_1_NAME;

@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(this.queueName);
container.setMessageListener(listenerAdapter);
return container;
}
}
@Component
public class Receiver {
public void receiveMessage(String message) {
// This method will be called when a message is received
System.out.println("Received <" + message + ">");
}
}

Annotation-driven Listener Endpoints

The easiest way to receive a message asynchronously is to use the annotated listener endpoint infrastructure. In a nutshell, it lets you expose a method of a managed bean as a Rabbit listener endpoint. The following example shows how to use the @RabbitListener annotation:

@Component
@Slf4j
public class MyConsumer {
@RabbitListener(queues = MyRabbitMqQueue.QUEUE_1_NAME)
public void receiveMessage(String message) {
// Process the received message
log.info("Received message from {}: {}", MyRabbitMqQueue.QUEUE_1_NAME, message);
}
}
@Component
@RequiredArgsConstructor
public class PollingConsumer {
private final AmqpTemplate amqpTemplate;

void consumeMessage(String queueName) {
String message = (String) amqpTemplate.receiveAndConvert(queueName);
if (message != null) {
System.out.println("Received <" + message + ">");
} else {
System.out.println("No message received");
}
}
}

Advanced features

Handling dead letters

Using x-message-ttl and x-dead-letter-exchange arguments to set the time-to-live of messages in the queue.

@Bean
Queue queueTtl() {
Map<String, Object> arguments = Map.of(
"x-message-ttl", 10000, // 10 seconds for testing
"x-dead-letter-exchange", "my-spring-amqp-queue.dlx");
return new Queue(QUEUE_NAME, true, false, false, arguments);
}

Settings

Specifying a consumer prefetch count

The default value of prefetch count is 250, which is defined by DEFAULT_PREFETCH_COUNT in the AbstractMessageListenerContainer class.

Change the prefetch count in MessageListenerContainer:

container.setPrefetchCount(yourPrefetchCount);

References

[1] Spring AMQP Referenece

[2] Messaging with RabbitMQ