Spring Framework 과 ActiveMQ 연동하기.
간단한 비동기화 로직은 스프링에서 제공하는 async annotation 을 사용해서 처리가 가능한데 대용량 데이터를 비동기로 안정적으로 처리하기 위해서는 메시지 서비스의 일종인 ActiveMQ 를 연동해서 사용한다.
전체적인 process 를 간략하게 설명하자면,
Application Server 에서 listener container 를 구현하고 구현한 listener 를 통해서 ActiveMQ 와 통신한다.
Application Server 에서 listener container 로 domain object 를 던져주면 converter 에서 jms message 로 변환해서 ActiveMQ 에 전달하고 ActiveMQ 로 부터 받은 jms message 는 listener container 의 converter 에서 domain object 로 변환해서 사용한다.
ActiveMQ 의 queue 에 jms message 가 쌓이면, listener 로 등록된 class 의 method 가 자동으로 실행된다.
개발환경.
JDK 1.8Spring Framework 4.3.7.RELEASEMaven3ActiveMQ 5.13.0 (for Windows)Windows 10 / MacOC
pom.xml
.... <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.7.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>4.3.7.RELEASE</version> </dependency>....
필요한 dependency 를 정의해준다.
context-jms.xml
....
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean>
<!-- Connection Pooling 정의. --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="connectionFactory"/> </bean>
<!-- ActiveMQ 에서 사용하는 Queue Destination 정의. --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="xxxx.mobile.queue"/> </bean>
<bean id="messageConverter" class="kr.co.xxxx.notification.jms.MessageConverterImpl"/>
<bean id="pushNotification" class="kr.co.xxxx.notification.jms.PushNotification"/>
<!-- Message 를 받으면 실행할 class, method 를 정의한다. PushNotification 클래스의 handleMessage 메소드를 실행하며 messageConverter 를 이용해서 object 변환 처리한다. --> <bean id="purePojoPushNotification" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="pushNotification"/> <property name="defaultListenerMethod" value="handleMessage"/> <property name="messageConverter" ref="messageConverter"/> </bean>
<!-- Message 를 받으면 호출되는 id 를 정의한다. 여기서는 ajrentacar.mobile.queue 메시지가 도착하면 purePojoPushNotification 를 호출하도록 정의. --> <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="purePojoPushNotification"/> </bean>
<!-- Spring JMS Template --> <bean class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> <property name="defaultDestination" ref="queueDestination"/> <property name="messageConverter" ref="messageConverter"/></bean>
....
ActiveMQ 의 url 을 정의한다. ActiveMQ 는 내부적으로 Jetty 서버위에 구동되고 있다.Listener Container 를 정의한다.connection factory, listener, converter 등으로 구성되어 있으며 ActiveMQ 와 통신하고 jms message 를 처리하는 구현제들이다.
MessageConverterImpl.java
....@Componentpublic class MessageConverterImpl implements MessageConverter {
private Logger logger = LoggerFactory.getLogger(this.getClass().getName()); private String TAG = this.getClass().getName();
/** * ActiveMQ 로 메시지를 전달할때 domain object 를 jms message 로 변환한다. * * @param o * @param session * @return * @throws JMSException * @throws MessageConversionException */ public Message toMessage(Object o, Session session) throws JMSException, MessageConversionException { logger.debug("> toMessage", TAG);
if ( !(o instanceof NotificationDto) ) { throw new MessageConversionException("not NotificationDto"); }
NotificationDto notificationDto = (NotificationDto)o;MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("memberId", notificationDto.getMemberId()); mapMessage.setString("phoneNumber", notificationDto.getPhoneNumber()); mapMessage.setString("registrationKey", notificationDto.getRegistrationKey()); mapMessage.setString("content", notificationDto.getContent());
return mapMessage; }
/** * ActiveMQ 로 부터 메시지를 받을때 jms message 를 domain object 로 변환한다. * * @param message * @return * @throws JMSException * @throws MessageConversionException */ public Object fromMessage(Message message) throws JMSException, MessageConversionException { logger.debug("> fromMessage", TAG);
if( !(message instanceof MapMessage)) { throw new MessageConversionException("not MapMessage"); }
MapMessage mapMessage = (MapMessage)message;
NotificationDto notificationDto = new NotificationDto(); notificationDto.setMemberId(mapMessage.getString("memberId")); notificationDto.setPhoneNumber(mapMessage.getString("phoneNumber")); notificationDto.setRegistrationKey(mapMessage.getString("registrationKey")); notificationDto.setContent(mapMessage.getString("content"));return notificationDto; }
}
domain object <--> jms message 변환하는 converter 를 구현한다.
PushNotification.java
....@Componentpublic class PushNotification {
private Logger logger = LoggerFactory.getLogger(this.getClass().getName()); private String TAG = this.getClass().getName();
@Autowired private NotificationUtils notificationUtils;
/** * Queue 에 Message 가 들어오면 자동으로 실행된다.*
* @param notificationDto */ public void handleMessage(NotificationDto notificationDto) { logger.debug("> handleMessage", TAG); logger.debug("> memberId : {}", notificationDto.getMemberId(), TAG); logger.debug("> phoneNumber : {}", notificationDto.getPhoneNumber(), TAG); logger.debug("> registrationKey : {}", notificationDto.getRegistrationKey(), TAG); logger.debug("> content : {}", notificationDto.getContent(), TAG);
notificationUtils.sendNotification(notificationDto); }}
ActiveMQ 의 Queue 에 jms message 가 들어오면 자동으로 실행되는 listener 를 구현한다.POJO 로 구현하며 push notification 용도로 많이 사용되기 때문에 위의 예제로 구현했다. 상세 설정은 context-jms.xml 에 정의되어 있다.
NotificationServiceImpl.java
....
public void send(List<NotificationDto> notificationDtoList) { logger.debug("> send", TAG);
for (NotificationDto param : notificationDtoList) { jmsTemplate.convertAndSend(param); }}
....
전형적인 service implement 이다.일반적으로 RESTful service 를 통해서 호출을 받으면 controller 에서 호출되는 method 이다.
ActiveMQ 에서 주고받는 jms message 등의 기타 정보들은 http://localhost;8161 에서 확인이 가능하다.(기본 관리자 정보는 admin / admin 이다.)
만약 윈도우에서 ActiveMQ 를 실행할때 Address already in use: JVM_Bind 오류가 발생하면 포트번호를 바꾸던지 방화벽에서 해당 포트를 열어주고 다시 실행하면 되는데 나같은 경우는 해당 포트를 열고 실행시켰다.
'Development > Spring' 카테고리의 다른 글
Spring Framework 과 Redis 연동 (0) | 2017.08.07 |
---|---|
Spring Framework 과 RMI 연동 (0) | 2017.08.04 |
Spring Security (0) | 2017.07.20 |
외부 인터넷이 연결되지 않은 경우 application-context 설정하기. (0) | 2016.07.08 |
Cause: java.util.NoSuchElementException 오류 수정.. (0) | 2013.03.06 |