In this post, I will introduce the basic use of Spring AMQP RabbitMQ.
Dependencies Before using Spring AMQP, you need to add the following dependencies to your Spring Boot project:
implementation("org.springframework.boot:spring-boot-starter-amqp" )
or
<dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
Running RabbitMQ with Docker docker run -d --name rabbitmq1 -p 5672:5672 -p 15672:15672 rabbitmq:4-management
Spring AMQP Configurations Using the spring boot configuration file application.yml
or application.properties
spring: rabbitmq: username: ${RABBITMQ_USER} password: ${RABBITMQ_PASSWORD} port: ${RABBITMQ_PORT}
Or using ConnectionFactory Java Bean
@Configuration public class RabbitMqConfig { @Bean public ConnectionFactory connectionFactory () { CachingConnectionFactory connectionFactory = new CachingConnectionFactory (); connectionFactory.setHost("localhost" ); connectionFactory.setUsername("guest" ); connectionFactory.setPassword("guest" ); return connectionFactory; } }
Or override application.yml
configuration by ConnectionFactory Java Bean
@Configuration public class RabbitMqConfig { @Bean public ConnectionFactory connectionFactory (RabbitProperties rabbitProperties) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory (); connectionFactory.setHost(rabbitProperties.getHost()); connectionFactory.setUsername(rabbitProperties.getUsername()); connectionFactory.setPassword(rabbitProperties.getPassword()); return connectionFactory; } }
Connections and Channels Spring AMQP manages the connections to the RabbitMQ broker. It will create a single connection and a pool of channels.
Unlike the Rabbit client Java API, which uses Channel
objects to publish messages, in Spring AMQP, you use RabbitTemplate
objects to publish messages.
You don’t need to care about the connections and channels, just use the RabbitTemplate
to do most operations.
Declare exchanges, queues and bindings Declare exchanges
@Configuration public class MyRabbitMqExchange { public static final String DIRECT_EXCHANGE_NAME = "spring-amqp-direct-exchange" ; public static final String TOPIC_EXCHANGE_NAME = "spring-amqp-topic-exchange" ; public static final String FANOUT_EXCHANGE_NAME = "spring-amqp-fanout-exchange" ; @Bean DirectExchange directExchange () { return new DirectExchange (DIRECT_EXCHANGE_NAME); } @Bean TopicExchange topicExchange () { return new TopicExchange (TOPIC_EXCHANGE_NAME); } @Bean FanoutExchange fanoutExchange () { return new FanoutExchange (FANOUT_EXCHANGE_NAME); } }
Declare queues
@Configuration public class MyRabbitMqQueue { public static final String QUEUE_1_NAME = "spring-amqp-queue.1" ; public static final String QUEUE_2_NAME = "spring-amqp-queue.2" ; public static final String QUEUE_3_NAME = "spring-amqp-queue.3" ; @Bean Queue queue1 () { return new Queue (QUEUE_1_NAME, true ); } @Bean Queue queue2 () { return new Queue (QUEUE_2_NAME, true ); } @Bean Queue queue3 () { return new Queue (QUEUE_3_NAME, true ); } }
Bindings between exchanges and queues
@Configuration public class MyRabbitMqBinding { public static final String DIRECT_ROUTING_KEY = "spring-amqp-queue.1" ; public static final String TOPIC_ROUTING_KEY = "spring-amqp-queue.*" ; @Bean Binding directQueue1Binding (Queue queue1, DirectExchange directExchange) { return BindingBuilder.bind(queue1).to(directExchange).with(DIRECT_ROUTING_KEY); } @Bean Binding topicQueue1Binding (Queue queue1, TopicExchange topicExchange) { return BindingBuilder.bind(queue1).to(topicExchange).with(TOPIC_ROUTING_KEY); } @Bean Binding topicQueue2Binding (Queue queue2, TopicExchange topicExchange) { return BindingBuilder.bind(queue2).to(topicExchange).with(TOPIC_ROUTING_KEY); } @Bean Binding fanoutQueue1Binding (Queue queue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue1).to(fanoutExchange); } @Bean Binding fanoutQueue2Binding (Queue queue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue2).to(fanoutExchange); } @Bean Binding fanoutQueue3Binding (Queue queue3, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue3).to(fanoutExchange); } }
Publishing Messages Publish messages
@Autowired private RabbitTemplate rabbitTemplate;
String exchange = MyRabbitMqExchange.DIRECT_EXCHANGE_NAME;String routingKey = MyRabbitMqBinding.DIRECT_ROUTING_KEY;rabbitTemplate.convertAndSend(exchange, routingKey, "Hello, World!" );
Consuming Messages Asynchronous Consumer with MessageListener Using MessageListener For asynchronous Message
reception, a dedicated component is involved. That component is a container for a Message
-consuming callback.
@Configuration public class MessageListenerConfig { private String queueName = MyRabbitMqQueue.QUEUE_1_NAME; @Bean public MessageListener messageListener () { return message -> { System.out.println("Received message: " + new String (message.getBody())); }; } @Bean public SimpleMessageListenerContainer messageListenerContainer (ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer (); container.setConnectionFactory(connectionFactory); container.setQueueNames(this .queueName); container.setMessageListener(messageListener()); return container; } }
Using MessageListenerAdapter If you prefer to maintain a stricter separation between your application logic and the messaging API, you can rely upon an adapter implementation that is provided by the framework.
@Configuration public class MessageListenerAdapterConfig { String queueName = MyRabbitMqQueue.QUEUE_1_NAME; @Bean MessageListenerAdapter listenerAdapter (Receiver receiver) { return new MessageListenerAdapter (receiver, "receiveMessage" ); } @Bean SimpleMessageListenerContainer container (ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer (); container.setConnectionFactory(connectionFactory); container.setQueueNames(this .queueName); container.setMessageListener(listenerAdapter); return container; } }
@Component public class Receiver { public void receiveMessage (String message) { System.out.println("Received <" + message + ">" ); } }
Annotation-driven Listener Endpoints The easiest way to receive a message asynchronously is to use the annotated listener endpoint infrastructure. In a nutshell, it lets you expose a method of a managed bean as a Rabbit listener endpoint. The following example shows how to use the @RabbitListener
annotation:
@Component @Slf4j public class MyConsumer { @RabbitListener(queues = MyRabbitMqQueue.QUEUE_1_NAME) public void receiveMessage (String message) { log.info("Received message from {}: {}" , MyRabbitMqQueue.QUEUE_1_NAME, message); } }
Polling consumer (Not recommended) @Component @RequiredArgsConstructor public class PollingConsumer { private final AmqpTemplate amqpTemplate; void consumeMessage (String queueName) { String message = (String) amqpTemplate.receiveAndConvert(queueName); if (message != null ) { System.out.println("Received <" + message + ">" ); } else { System.out.println("No message received" ); } } }
Advanced features Handling dead letters Using x-message-ttl
and x-dead-letter-exchange
arguments to set the time-to-live of messages in the queue.
@Bean Queue queueTtl () { Map<String, Object> arguments = Map.of( "x-message-ttl" , 10000 , "x-dead-letter-exchange" , "my-spring-amqp-queue.dlx" ); return new Queue (QUEUE_NAME, true , false , false , arguments); }
Settings Specifying a consumer prefetch count The default value of prefetch count is 250, which is defined by DEFAULT_PREFETCH_COUNT
in the AbstractMessageListenerContainer
class.
Change the prefetch count in MessageListenerContainer:
container.setPrefetchCount(yourPrefetchCount);
References [1] Spring AMQP Referenece
[2] Messaging with RabbitMQ