SpringBoot动态创建绑定rabbitMq队列

一、介绍

在以前,我写过一篇如何使用SpringBoot整合rabbitMq的文章。

SpringBoot整合rabbitMq | 半月无霜 (banmoon.top)

上面这种方法,是自己创建队列,交换机,绑定。生成Bean,从而实现队列等等的创建。

这种方式太过于繁琐,有没有一种方法可以快速创建呢,我们只管使用就行了


还真的有,只需要在配置文件中配置队列、交换机等信息,就可以在服务启动的时候自动创建并绑定。

一次偶然间,在csdn上看到了,动态创建rabbitMq队列的文章。

拉出来魔改了一下,只要再配置文件中配置了相关的实现,实现了队列、交换机的绑定。

同时还解决了,多个开发连接同一个rabbitMq,导致自己生产的消息,被其他同事消费走的问题。

二、代码

1)读取配置的代码

这是RabbitModuleInfoProperties.java,读取配置文件中的信息,生成信息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package com.banmoon.config.properties;

import com.banmoon.enums.HeadersTypeEnum;
import com.banmoon.enums.RabbitExchangeTypeEnum;
import lombok.Data;

import java.util.List;
import java.util.Map;

@Data
public class RabbitModuleInfoProperties {

/**
* 路由key
*/
private String routingKey;

/**
* 队列信息
*/
private Queue queue;

/**
* 多个队列
*/
private List<Queue> queues;

/**
* 交换机信息
*/
private Exchange exchange;

@Data
public static class Queue {
/**
* 队列名称
*/
private String name;

/**
* 是否持久化,默认true持久化,重启消息不会丢失
*/
private boolean durable = true;

/**
* 是否具有排他性,默认false,可多个消费者消费同一个队列
*/
private boolean exclusive = false;

/**
* 当消费者均断开连接,是否自动删除队列,默认false,不自动删除,避免消费者断开队列丢弃消息
*/
private boolean autoDelete = false;

/**
* 绑定死信队列的队列名称
*/
private String deadLetterQueue;

/**
* 绑定死信队列的交换机名称
*/
private String deadLetterExchange;

/**
* 绑定死信队列的路由key
*/
private String deadLetterRoutingKey;

/**
* 其他属性设置
*/
private Map<String, Object> arguments;
}

@Data
public static class Exchange {
/**
* 交换机类型,默认直连交换机
*/
private String type = RabbitExchangeTypeEnum.DIRECT.getCode();

/**
* 交换机名称
*/
private String name;

/**
* 是否持久化,默认true持久化,重启消息不会丢失
*/
private boolean durable = true;

/**
* 当所有队绑定列均不在使用时,是否自动删除交换机
*/
private boolean autoDelete = false;

/**
* 是否为txl延迟交换机
*/
private boolean txlDelay = false;

/**
* 交换机其他参数
*/
private Map<String, Object> arguments;

/**
* 头部交换机的参数
*/
private Map<String, Object> headersMap;

/**
* 头部交换机的参数匹配类型,默认是所有参数都要匹配
*/
private String headersType = HeadersTypeEnum.ALL.getCode();
}

}

这是RabbitModuleProperties.java,上面有多个绑定配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.banmoon.config.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Data
@Configuration
@ConfigurationProperties("spring.rabbitmq")
public class RabbitModuleProperties {

private List<RabbitModuleInfoProperties> modules;

}

2)配置文件

这个是配置,请注意交换机,队列前缀,这个就是保证不同开发之间消息隔离的关键

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
spring:
rabbitmq:
host: rabbitMq服务地址
port: rabbitMq服务端口
username: 帐号
password: 密码
virtual-host: /
# 确认消息是否发送至交换机
publisher-confirm-type: correlated
publisher-confirms: true
# 确认消息是否发送至队列
publisher-returns: true
# 交换机,队列前缀
prefix: whc
modules:
- routingKey: test.direct.routingKey
queue:
name: test.direct.queue
exchange:
name: test.direct.exchange
- routingKey: test.fanout.router.key
queues:
- name: test.fanout.queue.a
- name: test.fanout.queue.b
- name: test.fanout.queue.c
exchange:
name: test.fanout.exchange
type: fanout
- routingKey: test.topic.routerKey.#
queue:
name: test.topic.queue.log
exchange:
name: test.topic.exchange
type: topic
- routingKey: test.topic.routerKey.text
queue:
name: test.topic.queue.text
exchange:
name: test.topic.exchange
type: topic
- routingKey: test.topic.routerKey.image
queue:
name: test.topic.queue.image
exchange:
name: test.topic.exchange
type: topic
- routingKey: test.headers.routerKey
queue:
name: test.headers.queue
exchange:
name: test.headers.exchange
type: headers
headers-map:
authentication: "半月无霜"
- routingKey: test.ttl.routerKey
queue:
name: test.ttl.queue
deadLetterQueue: test.ttl.death.queue
deadLetterExchange: test.ttl.death.exchange
deadLetterRoutingKey: test.ttl.death.routerKey
arguments:
x-message-ttl: 5000
exchange:
name: test.ttl.exchange
- routingKey: test.txl.routerKey
queue:
name: test.txl.queue
exchange:
name: test.txl.exchange
txl-delay: true

3)初始化时创建队列、交换机

RabbitmqConfig.java;这是一个配置类,主要得到了AmqpAdmin对象、RabbitModuleProperties对象、以及定义的前缀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.banmoon.config;

import com.banmoon.config.init.RabbitModuleInitializer;
import com.banmoon.config.properties.RabbitModuleProperties;
import com.banmoon.constant.RabbitmqConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitmqConfig {

@Value(RabbitmqConstant.RABBITMQ_PREFIX)
private String rabbitPrefix;

@Bean
@ConditionalOnMissingBean
public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties) {
return new RabbitModuleInitializer(amqpAdmin, rabbitPrefix, rabbitModuleProperties.getModules());
}

}

RabbitModuleInitializer.java,初始化类,主要声明队列、交换机,以及绑定都在其中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package com.banmoon.config.init;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.banmoon.config.properties.RabbitModuleInfoProperties;
import com.banmoon.enums.HeadersTypeEnum;
import com.banmoon.enums.RabbitExchangeTypeEnum;
import com.banmoon.utils.stream.StreamUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Slf4j
@AllArgsConstructor
public class RabbitModuleInitializer implements SmartInitializingSingleton {

private AmqpAdmin amqpAdmin;

private String rabbitmqPrefix;

private List<RabbitModuleInfoProperties> modules;

@Override
public void afterSingletonsInstantiated() {
log.info("RabbitMQ 根据配置动态创建和绑定队列、交换机");
declareRabbitModule();
}

/**
* RabbitMQ 根据配置动态创建和绑定队列、交换机
*/
private void declareRabbitModule() {
if (CollUtil.isEmpty(modules)) {
return;
}
for (RabbitModuleInfoProperties rabbitModuleInfo : modules) {
// 配置参数校验
configParamValidate(rabbitModuleInfo);
// 队列
List<Queue> queues = convertQueue(rabbitModuleInfo.getQueues(), rabbitModuleInfo.getQueue());
// 交换机
RabbitModuleInfoProperties.Exchange exchangeInfo = rabbitModuleInfo.getExchange();
Exchange exchange = convertExchange(exchangeInfo);
// 绑定关系
String routingKey = rabbitmqPrefix + rabbitModuleInfo.getRoutingKey();
// 创建队列
queues.forEach(amqpAdmin::declareQueue);
// 创建交换机
amqpAdmin.declareExchange(exchange);
// 队列 绑定 交换机
queues.forEach(queue -> {
Binding binding;
if (RabbitExchangeTypeEnum.HEADERS.getCode().equals(exchange.getType())) {
HeadersExchange headersExchange = (HeadersExchange) exchange;
if (HeadersTypeEnum.ALL.getCode().equals(exchangeInfo.getHeadersType())) {
binding = BindingBuilder.bind(queue).to(headersExchange).whereAll(exchangeInfo.getHeadersMap()).match();
} else {
binding = BindingBuilder.bind(queue).to(headersExchange).whereAny(exchangeInfo.getHeadersMap()).match();
}
} else {
binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).and(null);
}
amqpAdmin.declareBinding(binding);
});
}
}

/**
* RabbitMQ动态配置参数校验
*/
public void configParamValidate(RabbitModuleInfoProperties rabbitModuleInfo) {
String routingKey = rabbitModuleInfo.getRoutingKey();

Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");

Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);

Assert.isTrue(Objects.nonNull(rabbitModuleInfo.getQueue()) || CollUtil.isNotEmpty(rabbitModuleInfo.getQueues()), "routingKey:{}未配置queue", routingKey);
}

public List<Queue> convertQueue(List<RabbitModuleInfoProperties.Queue> queues, RabbitModuleInfoProperties.Queue queueInfo) {
if (CollUtil.isNotEmpty(queues)) {
if (Objects.nonNull(queueInfo)) {
queues.add(queueInfo);
}
return StreamUtil.listToList(queues, this::convertQueue);
}
Queue queue = convertQueue(queueInfo);
return CollUtil.newArrayList(queue);
}

/**
* 转换生成RabbitMQ队列
*/
public Queue convertQueue(RabbitModuleInfoProperties.Queue queue) {
String name = rabbitmqPrefix + queue.getName();
Map<String, Object> arguments = queue.getArguments();
// 转换ttl的类型为long
if (arguments != null && arguments.containsKey("x-message-ttl")) {
arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
}
// 设置队列的优先级
if (arguments != null && arguments.containsKey("x-max-priority")) {
arguments.put("x-max-priority", Convert.toLong(arguments.get("x-max-priority")));
}
// 是否需要绑定死信队列
String deadLetterQueue = queue.getDeadLetterQueue();
String deadLetterExchange = queue.getDeadLetterExchange();
String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
if (StrUtil.isNotBlank(deadLetterQueue) && StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {
if (arguments == null) {
arguments = new HashMap<>();
}
deadLetterQueue = rabbitmqPrefix + deadLetterQueue;
deadLetterExchange = rabbitmqPrefix + deadLetterExchange;
deadLetterRoutingKey = rabbitmqPrefix + deadLetterRoutingKey;
arguments.put("x-dead-letter-exchange", deadLetterExchange);
arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
// 绑定死新队列
Queue deadQueue = new Queue(deadLetterQueue, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
amqpAdmin.declareQueue(deadQueue);
Exchange deadExchange = new DirectExchange(deadLetterExchange, true, true, null);
amqpAdmin.declareExchange(deadExchange);
Binding binding = BindingBuilder.bind(deadQueue).to(deadExchange).with(deadLetterRoutingKey).and(null);
amqpAdmin.declareBinding(binding);
}
return new Queue(name, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
}

/**
* 转换生成RabbitMQ交换机
*/
public Exchange convertExchange(RabbitModuleInfoProperties.Exchange exchange) {
String type = exchange.getType();
boolean txlDelay = exchange.isTxlDelay();
String exchangeName = rabbitmqPrefix + exchange.getName();
boolean isDurable = exchange.isDurable();
boolean isAutoDelete = exchange.isAutoDelete();
Map<String, Object> arguments = exchange.getArguments();
if (txlDelay) {
return RabbitExchangeTypeEnum.getTxlDelayExchangeByCode(type, exchangeName, isDurable, isAutoDelete, arguments);
}
return RabbitExchangeTypeEnum.getExchangeByCode(type, exchangeName, isDurable, isAutoDelete, arguments);
}

}

4)其它代码

4.1)常量

这是一个常量类,里面记录着相关的队列名称,主要是给生产者、消费者使用的。太杂乱了不好打理,故专门弄了一个常量类来进行管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.banmoon.constant;

/**
* 记录rabbitmq相关的队列,交换机,路由KEY名称
*
* @author banmoon
* @date 2024/02/27 12:22:13
*/
public interface RabbitmqConstant {

/**
* 定义的前缀
*/
String RABBITMQ_PREFIX = "#{'${spring.rabbitmq.prefix:}'.empty ? '' : '${spring.rabbitmq.prefix:}' + '.'}";

/**
* 直连测试队列
*/
String DIRECT_TEST_QUEUE = RABBITMQ_PREFIX + "test.direct.queue";
String DIRECT_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.direct.exchange";
String DIRECT_TEST_ROUTING_KEY = RABBITMQ_PREFIX + "test.direct.routingKey";


/**
* 扇形测试队列
*/
String FANOUT_TEST_QUEUE_A = RABBITMQ_PREFIX + "test.fanout.queue.a";
String FANOUT_TEST_QUEUE_B = RABBITMQ_PREFIX + "test.fanout.queue.b";
String FANOUT_TEST_QUEUE_C = RABBITMQ_PREFIX + "test.fanout.queue.c";
String FANOUT_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.fanout.exchange";
String FANOUT_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.fanout.routerKey";

/**
* 主题测试队列
*/
String TOPIC_TEST_QUEUE_LOG = RABBITMQ_PREFIX + "test.topic.queue.log";
String TOPIC_TEST_QUEUE_TEXT = RABBITMQ_PREFIX + "test.topic.queue.text";
String TOPIC_TEST_QUEUE_IMAGE = RABBITMQ_PREFIX + "test.topic.queue.image";
String TOPIC_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.topic.exchange";
String TOPIC_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.topic.routerKey.#";
String TOPIC_TEST_ROUTER_KEY_TEXT = RABBITMQ_PREFIX + "test.topic.routerKey.text";
String TOPIC_TEST_ROUTER_KEY_IMAGE = RABBITMQ_PREFIX + "test.topic.routerKey.image";

/**
* 头部测试队列
*/
String HEADERS_TEST_QUEUE = RABBITMQ_PREFIX + "test.headers.queue";
String HEADERS_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.headers.exchange";
String HEADERS_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.headers.routerKey";

/**
* TTL测试队列
*/
String TTL_TEST_QUEUE = RABBITMQ_PREFIX + "test.ttl.queue";
String TTL_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.ttl.exchange";
String TTL_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.ttl.routerKey";
String TTL_TEST_DEATH_QUEUE = RABBITMQ_PREFIX + "test.ttl.death.queue";
String TTL_TEST_DEATH_EXCHANGE = RABBITMQ_PREFIX + "test.ttl.death.exchange";
String TTL_TEST_DEATH_ROUTER_KEY = RABBITMQ_PREFIX + "test.ttl.death.routerKey";

/**
* TXL测试队列
*/
String TXL_TEST_QUEUE = RABBITMQ_PREFIX + "test.txl.queue";
String TXL_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.txl.exchange";
String TXL_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.txl.routerKey";

}

4.2)枚举代码

在上面的创建中,我们用到了两个枚举类,没什么可说的,直接贴出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.banmoon.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* @author banmoon
* @date 2024/03/04 16:35:27
*/
@Getter
@AllArgsConstructor
public enum HeadersTypeEnum {

ANY("any", "任一"),
ALL("all", "所有"),
;

private final String code;
private final String msg;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.banmoon.enums;

import cn.hutool.core.map.MapUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.amqp.core.*;

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;

@Getter
@AllArgsConstructor
public enum RabbitExchangeTypeEnum {

DIRECT("direct", "直连交换机"),
FANOUT("fanout", "扇形交换机"),
TOPIC("topic", "主题交换机"),
HEADERS("headers", "头部交换机"),
;

private final String code;
private final String msg;

public static RabbitExchangeTypeEnum getByCode(String code) {
return getByCode(code, null);
}

public static RabbitExchangeTypeEnum getByCode(String code, RabbitExchangeTypeEnum defaultEnum) {
return Arrays.stream(values()).filter(e -> e.getCode().equalsIgnoreCase(code)).findFirst().orElse(defaultEnum);
}

public static Exchange getExchangeByCode(String type, String exchangeName, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
AbstractExchange exchange = null;
switch (RabbitExchangeTypeEnum.getByCode(type)) {
case DIRECT:
exchange = new DirectExchange(exchangeName, durable, autoDelete, arguments);
break;
case TOPIC:
exchange = new TopicExchange(exchangeName, durable, autoDelete, arguments);
break;
case FANOUT:
exchange = new FanoutExchange(exchangeName, durable, autoDelete, arguments);
break;
case HEADERS:
exchange = new HeadersExchange(exchangeName, durable, autoDelete, arguments);
break;
}
return exchange;
}

public static Exchange getTxlDelayExchangeByCode(String type, String exchangeName, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
RabbitExchangeTypeEnum typeEnum = RabbitExchangeTypeEnum.getByCode(type);
Map<String, Object> argMap = Optional.ofNullable(arguments).orElse(MapUtil.newHashMap(2));
argMap.put("x-delayed-type", typeEnum.getCode());
return new CustomExchange(exchangeName, "x-delayed-message", durable, autoDelete, argMap);
}
}

三、生产者、消费者

1)生产者

这是一个生产者抽象类,我自己写的生产者都需要继承它

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.banmoon.queues;

import cn.hutool.extra.spring.SpringUtil;
import com.banmoon.utils.JsonUtil;
import lombok.Data;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;

import java.util.HashMap;
import java.util.Map;

/**
* 基础的生产者
*
* @author banmoon
* @date 2024/02/28 11:44:28
*/
@Data
public abstract class AbstractProducer {

private AmqpTemplate amqpTemplate;

private String queueName;

private String exchangeName;

private String routingKey;

public AbstractProducer(AmqpTemplate amqpTemplate, String queueName, String exchangeName, String routingKey) {
this.amqpTemplate = amqpTemplate;
// TODO: 2024/3/2 这边还要进行修改
this.queueName = SpringUtil.getProperty(queueName);
this.exchangeName = SpringUtil.getProperty(exchangeName);
this.routingKey = SpringUtil.getProperty(routingKey);
}

public AbstractProducer(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}

public void send(Object obj) {
String msg = JsonUtil.toJSONString(obj);
amqpTemplate.convertAndSend(exchangeName, routingKey, msg);
}

public void sendTtlMesssage(Object obj, Integer delayMillisecond) {
Map<String, Object> map = new HashMap<>(2);
map.put("x-message-ttl", delayMillisecond);
send(obj, map);
}

public void sendTxlMesssage(Object obj, Integer delayMillisecond) {
send(obj, message -> {
MessageProperties properties = message.getMessageProperties();
properties.setDelay(delayMillisecond);
return message;
});
}

public void send(Object obj, Map<String, Object> headers) {
String msg = JsonUtil.toJSONString(obj);
amqpTemplate.convertAndSend(exchangeName, routingKey, msg, message -> {
MessageProperties properties = message.getMessageProperties();
properties.setHeaders(headers);
return message;
});
}

public void send(Object obj, MessagePostProcessor messagePostProcessor) {
String msg = JsonUtil.toJSONString(obj);
amqpTemplate.convertAndSend(exchangeName, routingKey, msg, messagePostProcessor);
}

}

比如说直连交换机队列的生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.banmoon.queues.producer;

import com.banmoon.constant.RabbitmqConstant;
import com.banmoon.queues.AbstractProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* 直连测试队列生产者
*
* @author banmoon
* @date 2024/02/28 16:39:13
*/
@Slf4j
@Component
public class TestDirectProducer extends AbstractProducer {

public TestDirectProducer(AmqpTemplate amqpTemplate) {
super(amqpTemplate);
}

@Override
@Value(RabbitmqConstant.DIRECT_TEST_QUEUE)
public void setQueueName(String queueName) {
super.setQueueName(queueName);
}

@Override
@Value(RabbitmqConstant.DIRECT_TEST_EXCHANGE)
public void setExchangeName(String exchangeName) {
super.setExchangeName(exchangeName);
}

@Override
@Value(RabbitmqConstant.DIRECT_TEST_ROUTING_KEY)
public void setRoutingKey(String routingKey) {
super.setRoutingKey(routingKey);
}
}

2)消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.banmoon.queues.consumer;

import com.banmoon.constant.RabbitmqConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 直连测试队列消费者
*
* @author banmoon
* @date 2024/02/27 12:08:12
*/
@Slf4j
@Component
public class TestDirectConsumer {

@RabbitListener(queues = RabbitmqConstant.DIRECT_TEST_QUEUE)
public void test(String message) {
log.info("直连测试队列消费者:{}", message);
}

}

四、最后

关于上面几种交换机类型,以及TTL死信队列、TXL延迟队列都有做了配置示例。

主要是没有生产者、消费者的代码示例,相信大家都知道怎么写了。

那个,关于生产者的那个抽象类AbstractProducer.java

有一个地方一直没有调通,就是如何将spel表达式获取配置文件中的配置信息

只能退而求其次,使用@Value注解来进行获取了。

相信注解能获取的,一定有注解解析器,这边也一定可以的。

又要看源码喽!

还有那个开发环境队列隔离问题

有些公司开发使用的是同一个配置文件,这样会导致前缀都是同一个,那样设置前缀就没有意义了。

其实可以这样,如果是使用nacos的远端配置的,可以创建自己的命名空间,修改前缀。

如果是在本地resources文件夹里面,可以使用maven编译后替换变量的那个功能。

如何读取到maven中profile设置的参数 | 半月无霜 (banmoon.top)

上面两种方法,都是可以实现的

我是半月,你我一同共勉!!!