一、自定义MyBatis
拦截器
我们首先需要自定义一个拦截器来拦截我们指定的sql
语句类型,比如query
或者update
。
对于拦截下来的sql
语句我们可以根据自己的需要进行处理,比如我们将某些指定的sql
语句发送到rocketmq
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Intercepts({ @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}), @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}) }) public class RocketMQInterceptor implements Interceptor { @Override public Object intercept(Invocation invocation) throws Throwable { MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0]; Object parameter = null; if (invocation.getArgs().length > 1) { parameter = invocation.getArgs()[1]; } BoundSql boundSql = mappedStatement.getBoundSql(parameter); String sql = boundSql.getSql(); RocketMQProducer producer = new RocketMQProducer("your-producer-group", "ip:9876"); producer.start(); producer.sendMessage("topic", sql); System.out.println("发送的sql语句为:\n" + sql); producer.shutdown(); return invocation.proceed(); }
@Override public Object plugin(Object target) { return Plugin.wrap(target, this); }
@Override public void setProperties(Properties properties) { } }
|
二、rocketmq
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public class RocketMQProducer {
private DefaultMQProducer producer;
public RocketMQProducer(String producerGroup, String namesrvAddr) { producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr); }
public void start() throws Exception { producer.start(); }
public void shutdown() { producer.shutdown(); }
public void sendMessage(String topic, String message) throws Exception { Message msg = new Message(topic, message.getBytes()); producer.send(msg); }
}
|
三、注册拦截器
1 2 3 4 5 6 7 8
|
@Bean public RocketMQInterceptor myBatisInterceptor() { return new RocketMQInterceptor(); }
|
测试方法:我们选择单元测试启动一个Mapper的查询方法。消费者模块单独启动一个微服务进行消费。
结果:测试拦截成功。
四、踩坑记录
原本在生产者和消费者模块全部采用RocketTemplate
来处理,因为实现了自动装配,不需要指定namesrv、group组等,但是由于未知原因在拦截器中注入RocketTemplate
出现一个问题就是注入不进来,异常处理后就是报空指针异常,最后试了很多方法都不行。于是改变策略,将测试、本地、生产端不同的mq
配置均放在nacos
中。通过默认生产者代码进行处理,封装一个新的生产者,进而曲线救国,解决了问题。