加载中...
“水杉在线”-集成RocketMQ(二、自定义拦截器)
发表于:2023-05-29 |

一、自定义MyBatis拦截器

我们首先需要自定义一个拦截器来拦截我们指定的sql语句类型,比如query或者update

对于拦截下来的sql语句我们可以根据自己的需要进行处理,比如我们将某些指定的sql语句发送到rocketmq中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})
})
public class RocketMQInterceptor implements Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0];
Object parameter = null;
if (invocation.getArgs().length > 1) {
parameter = invocation.getArgs()[1];
}
BoundSql boundSql = mappedStatement.getBoundSql(parameter);
String sql = boundSql.getSql();
RocketMQProducer producer = new RocketMQProducer("your-producer-group", "ip:9876");
producer.start();
producer.sendMessage("topic", sql);
System.out.println("发送的sql语句为:\n" + sql);
producer.shutdown();
return invocation.proceed();
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {
}
}

二、rocketmq生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @author Lucky
* @description: TODO
* @date 2023/5/9 14:37
*/

public class RocketMQProducer {

private DefaultMQProducer producer;

public RocketMQProducer(String producerGroup, String namesrvAddr) {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
}

public void start() throws Exception {
producer.start();
}

public void shutdown() {
producer.shutdown();
}

public void sendMessage(String topic, String message) throws Exception {
Message msg = new Message(topic, message.getBytes());
producer.send(msg);
}

}

三、注册拦截器

1
2
3
4
5
6
7
8
/**
* RocketMQ-拦截器
* @return
*/
@Bean
public RocketMQInterceptor myBatisInterceptor() {
return new RocketMQInterceptor();
}

测试方法:我们选择单元测试启动一个Mapper的查询方法。消费者模块单独启动一个微服务进行消费。

结果:测试拦截成功。

消息发送

消息消费

四、踩坑记录

原本在生产者和消费者模块全部采用RocketTemplate来处理,因为实现了自动装配,不需要指定namesrv、group组等,但是由于未知原因在拦截器中注入RocketTemplate出现一个问题就是注入不进来,异常处理后就是报空指针异常,最后试了很多方法都不行。于是改变策略,将测试、本地、生产端不同的mq配置均放在nacos中。通过默认生产者代码进行处理,封装一个新的生产者,进而曲线救国,解决了问题。

上一篇:
Ubuntu下修改docker默认数据存储路径
下一篇:
Github Actions 简易使用教程