From 37f25ab6343ab0a4d03f5177fbdcc4e699ba763c Mon Sep 17 00:00:00 2001 From: lilinjiang <36615541+lilinjiang@users.noreply.github.com> Date: Mon, 4 Mar 2024 14:17:28 +0800 Subject: [PATCH] [ISSUE #632 ] Fix NPE caused by using @ ExtRocketMQTemplateConfiguration annotation extension to send messages in v5 Co-authored-by: lilinjiang --- ...ketMQMessageListenerBeanPostProcessor.java | 33 ++++++++++++++++++- .../ListenerContainerConfiguration.java | 26 +++++++++++---- 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java index 61f3e1d2..53d5cabd 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java @@ -23,6 +23,7 @@ import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; +import org.springframework.context.SmartLifecycle; import org.springframework.core.OrderComparator; import org.springframework.core.annotation.AnnotationUtils; @@ -32,7 +33,7 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; -public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean { +public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle { private ApplicationContext applicationContext; @@ -40,6 +41,8 @@ public class RocketMQMessageListenerBeanPostProcessor implements ApplicationCont private ListenerContainerConfiguration listenerContainerConfiguration; + private boolean running = false; + @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; @@ -58,6 +61,34 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw return bean; } + @Override + public int getPhase() { + return Integer.MAX_VALUE - 2000; + } + + @Override + public void start() { + if (!isRunning()) { + this.setRunning(true); + listenerContainerConfiguration.startContainer(); + } + } + + @Override + public void stop() { + + } + + public void setRunning(boolean running) { + this.running = running; + } + + + @Override + public boolean isRunning() { + return running; + } + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java index 81c5b091..bfbb7f9a 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java @@ -32,6 +32,8 @@ import org.springframework.util.Assert; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; @Configuration @@ -48,6 +50,8 @@ public class ListenerContainerConfiguration implements ApplicationContextAware { private RocketMQMessageConverter rocketMQMessageConverter; + private final List containers = new ArrayList<>(); + public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) { this.rocketMQMessageConverter = rocketMQMessageConverter; @@ -68,15 +72,23 @@ public void registerContainer(String beanName, Object bean, RocketMQMessageListe genericApplicationContext.registerBean(containerBeanName, DefaultListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultListenerContainer.class); - if (!container.isRunning()) { - try { - container.start(); - } catch (Exception e) { - log.error("Started container failed. {}", container, e); - throw new RuntimeException(e); + + containers.add(container); + + log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); + } + + public void startContainer() { + for (DefaultListenerContainer container : containers) { + if (!container.isRunning()) { + try { + container.start(); + } catch (Exception e) { + log.error("Started container failed. {}", container, e); + throw new RuntimeException(e); + } } } - log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {