自定义JmsListenerContainerFactory时,containerFactory字段解读
自定义JmsListenerContainerFactory时,containerFactory字段
最近项目中利用ActiveMQ作为消息中间件,JMS进行消息传递。在对@JmsListener注解研究时对“containerFactory”字段的填值产生了一些疑问。分享一下我的心得体会。
疑问
@Component public class TestMessageListener implements MessageListener { @Override @JmsListener(destination = "myQueue", containerFactory = "jmsListenerContainerFactory") public void onMessage(Message message) { ... 业务代码 ... } }
@Configuration @EnableJms public class ActiveMQConfig { /** * 发布-订阅模式的ListenerContainer */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory factory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory(); configurer.configure(jmsListenerContainerFactory, factory); jmsListenerContainerFactory.setConnectionFactory(factory); ... return jmsListenerContainerFactory; } /** * P2P模式的ListenerContainer */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory factory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory(); configurer.configure(jmsListenerContainerFactory, factory); jmsListenerContainerFactory.setConnectionFactory(factory); ... return jmsListenerContainerFactory; } }
我查找项目中,containerFactory的对应字段“jmsListenerContainerFactory”,并不存在被该字段修饰的@Bean注解,而最终项目运行起来后,却正确使用了jmsListenerContainerQueue()定义的配置属性。
按照@Bean注解的定义,在@Bean不自定义的情况下,以方法名作为标识,而若想要jmsListenerContainerQueue()被调用,containerFactory的对应字段应填写“jmsListenerContainerQueue”而不是“jmsListenerContainerFactory”。
对此我深感疑惑,由于对springboot项目也是初上手,对很多加载机制非常陌生,在翻阅了一些资料与查看了相关源码后,终于理解了其中的奥妙。
问题解析
翻找了一圈源码后,终于发现了被“jmsListenerContainerFactory”修饰的@Bean注解
@Bean @ConditionalOnMissingBean( name = {"jmsListenerContainerFactory"} ) public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; }
这是在JMS源码的JmsAnnotationDrivenConfiguration类中的一个方法,用于生成默认的JMS监听工厂。那么为何调用默认的工厂生成方法,最后返回的却是自定义的工厂?
其中最关键的技术实现基础,就是springboot的scope属性即默认的singleton模式以及@Bean注解的方法参数依赖。
首先,jmsListenerContainerFactory()被@Bean注解,该方法又需要参数configurer,而该configurer由于@Bean注解的方法参数依赖,依赖调用了以下方法生成
@Bean @ConditionalOnMissingBean public DefaultJmsListenerContainerFactoryConfigurer jmsListenerContainerFactoryConfigurer() { DefaultJmsListenerContainerFactoryConfigurer configurer = new DefaultJmsListenerContainerFactoryConfigurer(); configurer.setDestinationResolver((DestinationResolver)this.destinationResolver.getIfUnique()); configurer.setTransactionManager((JtaTransactionManager)this.transactionManager.getIfUnique()); configurer.setMessageConverter((MessageConverter)this.messageConverter.getIfUnique()); configurer.setJmsProperties(this.properties); return configurer; }
该方法同样被@Bean修饰,最终返回一个DefaultJmsListenerContainerFactoryConfigurer。
第一个关键点来了,项目中自定义的jmsListenerContainerQueue()中的configurer参数,同样由于@Bean的参数依赖,由jmsListenerContainerFactoryConfigurer()生成。因此,无论containerFactory字段填“jmsListenerContainerFactory”或“jmsListenerContainerQueue”,最终获得的DefaultJmsListenerContainerFactoryConfigurer是由同一个方法生成的。
但是问题又来了,jmsListenerContainerFactoryConfigurer()返回的DefaultJmsListenerContainerFactoryConfigurer对象,是每次新new出来的,为何两处拿到的对象是同一个呢。
那么引出第二个关键点,springboot中Bean加载的singleton机制,由于该机制,导致被@Bean所注解的方法,在项目启动时执行,并将该返回值放入BeanFactory中保存。后续若该方法被调用,不像普通java代码一样,重新执行一遍方法获取返回值,而是直接从BeanFactory中获取。
由此,无论containerFactory使用哪个字段,最终获得的Factory配置,均是同一个。
心得
这一次的疑问,使我对springboot的加载机制及JMS消息传递都有了深入的理解,还是那句话,实践出真知。
springboot自定义JMSListener.destination
情景
项目在组内开发人员电脑上经常跑本地,activemq的队列名写在配置文件上。由于代码分支不一样,导致消息经常被不正常得消费掉。想要改进这个问题,最简单的是将注解@JMSListener 改为动态加载监听BEAN,但是大家不想为了这个事改变开发习惯,所以定为动态修改入队和监听的地址。
开始工作
从JMSListener注解入手,注释中提到了JmsListenerContainerFactory和DestinationResolver。从JmsListenerContainerFactory开始查找,项目中用到的默认factory-DefaultJmsListenerContainerFactory,生产泛型为DefaultMessageListenerContainer的container,找到了最终监听内部类scheduledInvokers,但是没方法修改该类的内容。
换个路子看DestinationResolver。找到了DynamicDestinationResolver,看到了希望,查找包内引用查到了JmsDestinationAccessor,方法是new DynamicDestinationResolver(),开始想研究这个resolver并想办法替换。找了一路十八开,原来这玩意是个池子,把初始化好的放里边,你改变这里的东西其实没啥大用处,队列已经建好了,尝试stop旧的container然后new一个新的,证明也是徒劳。期间也尝试了EmbeddedValueResolver,发现更坑,直接就是spring底层的BeanFactory。
把org.springframework.jms.listener下面的包翻了一遍没结果,就要放弃的时候,发现annotation包里有个JmsListenerAnnotationBeanPostProcessor一下豁然开朗。我不能初始化之后改,为啥不在初始化之前改呢……继续折腾。
赫然看见救命注释@see JmsListenerConfigurer。后面百度重启这些小白操作被大风吹跑了。
总之最后结果就是,继承JmsListenerEndpointRegistry,重写registerListenerContainer方法,将endpoint的destincation加上自己的suffix,然后继续走父类逻辑,然后通过JmsListenerConfigurer替换JmsListenerEndpointRegistrar中的Registry。
最后,这东西就是为了本地启动有的,就不要影响线上了,排除prd环境。
代码贴出来
@Slf4j @Component @Profile("!prd") public class MyJmsListenerConfigurer implements JmsListenerConfigurer, ApplicationContextAware { private ConfigurableApplicationContext context; @Autowired private MyJmsListenerEndpointRegistry myJmsListenerEndpointRegistry; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (applicationContext instanceof ConfigurableApplicationContext) { this.context = (ConfigurableApplicationContext) applicationContext; } } @Override public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) { try { InetAddress address = InetAddress.getLocalHost(); String localName = "-" + address.getHostName() + "@" + address.getHostAddress(); myJmsListenerEndpointRegistry.setSuffix(localName); registrar.setEndpointRegistry(myJmsListenerEndpointRegistry); } catch (UnknownHostException e) { log.error("获取本机端口号异常", e); if (context != null) { context.close(); } } } }
@Component @Profile("!prd") public class MyJmsListenerEndpointRegistry extends JmsListenerEndpointRegistry { private String suffix; public void setSuffix(String suffix) { this.suffix = suffix; } @Override public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory, boolean startImmediately) { MethodJmsListenerEndpoint methodEndpoint = (MethodJmsListenerEndpoint) endpoint; methodEndpoint.setDestination(methodEndpoint.getDestination() + suffix); super.registerListenerContainer(endpoint, factory, startImmediately); } }
入队的地址替换就很简单了,注入直接换就行:
@Component @Order @Profile(value = "!prd") public class LocalQueueBeanNameChanger implements CommandLineRunner { @Autowired private List<Queue> queueList; @Override public void run(String... args) throws Exception { InetAddress address = InetAddress.getLocalHost(); String localName = "-" + address.getHostName() + "@" + address.getHostAddress(); for (Queue queue : queueList) { if (queue instanceof ActiveMQQueue) { ActiveMQQueue activeMQQueue = (ActiveMQQueue) queue; activeMQQueue.setPhysicalName(activeMQQueue.getPhysicalName() + localName); } } } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
最新评论