본문 바로가기

Development/Spring

Spring Framework 과 ActiveMQ 연동

 

Spring Framework 과 ActiveMQ 연동하기.

 

간단한 비동기화 로직은 스프링에서 제공하는 async annotation 을 사용해서 처리가 가능한데 대용량 데이터를 비동기로 안정적으로 처리하기 위해서는 메시지 서비스의 일종인 ActiveMQ 를 연동해서 사용한다.

 

전체적인 process 를 간략하게 설명하자면, 

Application Server 에서 listener container 를 구현하고 구현한 listener 를 통해서 ActiveMQ 와 통신한다.

Application Server 에서 listener container 로 domain object 를 던져주면 converter 에서 jms message 로 변환해서 ActiveMQ 에 전달하고 ActiveMQ 로 부터 받은 jms message 는 listener container 의 converter 에서 domain object 로 변환해서 사용한다.

ActiveMQ 의 queue 에 jms message 가 쌓이면, listener 로 등록된 class 의 method 가 자동으로 실행된다.

 

 

 

 

 

 

발환경.

JDK 1.8Spring Framework 4.3.7.RELEASEMaven3ActiveMQ 5.13.0 (for Windows)Windows 10 / MacOC




pom.xml

.... <dependency>            <groupId>org.springframework</groupId>            <artifactId>spring-jms</artifactId>            <version>4.3.7.RELEASE</version>        </dependency>         <dependency>            <groupId>org.apache.activemq</groupId>            <artifactId>activemq-spring</artifactId>            <version>4.3.7.RELEASE</version>        </dependency>
....


필요한 dependency 를 정의해준다.



context-jms.xml

....
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">        <property name="brokerURL">            <value>tcp://localhost:61616</value>        </property>    </bean>
    <!-- Connection Pooling 정의. -->    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">        <property name="connectionFactory" ref="connectionFactory"/>    </bean>
    <!-- ActiveMQ 에서 사용하는 Queue Destination 정의. -->    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="xxxx.mobile.queue"/>    </bean>
    <bean id="messageConverter" class="kr.co.xxxx.notification.jms.MessageConverterImpl"/>
    <bean id="pushNotification" class="kr.co.xxxx.notification.jms.PushNotification"/>
    <!--    Message 를 받으면 실행할 class, method 를 정의한다.    PushNotification 클래스의 handleMessage 메소드를 실행하며 messageConverter 를 이용해서 object 변환 처리한다.    -->    <bean id="purePojoPushNotification" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">        <property name="delegate" ref="pushNotification"/>        <property name="defaultListenerMethod" value="handleMessage"/>        <property name="messageConverter" ref="messageConverter"/>    </bean>
    <!--    Message 를 받으면 호출되는 id 를 정의한다.    여기서는 ajrentacar.mobile.queue 메시지가 도착하면 purePojoPushNotification 를 호출하도록 정의.    -->    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">        <property name="connectionFactory" ref="jmsFactory"/>        <property name="destination" ref="queueDestination"/>        <property name="messageListener" ref="purePojoPushNotification"/>    </bean>
    <!-- Spring JMS Template -->    <bean class="org.springframework.jms.core.JmsTemplate">        <property name="connectionFactory" ref="jmsFactory"/>        <property name="defaultDestination" ref="queueDestination"/>        <property name="messageConverter" ref="messageConverter"/>

 

    </bean>
....


ActiveMQ 의 url 을 정의한다. ActiveMQ 는 내부적으로 Jetty 서버위에 구동되고 있다.Listener Container 를 정의한다.connection factory, listener, converter 등으로 구성되어 있으며 ActiveMQ 와 통신하고 jms message 를 처리하는 구현제들이다.



MessageConverterImpl.java

....@Componentpublic class MessageConverterImpl implements MessageConverter {
    private Logger logger = LoggerFactory.getLogger(this.getClass().getName());    private String TAG = this.getClass().getName();
    /**     * ActiveMQ 로 메시지를 전달할때 domain object 를 jms message 로 변환한다.     *     * @param o     * @param session     * @return     * @throws JMSException     * @throws MessageConversionException     */    public Message toMessage(Object o, Session session) throws JMSException, MessageConversionException {        logger.debug("> toMessage", TAG);
        if ( !(o instanceof NotificationDto) ) {            throw new MessageConversionException("not NotificationDto");        }
        NotificationDto notificationDto = (NotificationDto)o;

 

        MapMessage mapMessage = session.createMapMessage();        mapMessage.setString("memberId", notificationDto.getMemberId());        mapMessage.setString("phoneNumber", notificationDto.getPhoneNumber());        mapMessage.setString("registrationKey", notificationDto.getRegistrationKey());        mapMessage.setString("content", notificationDto.getContent());
        return mapMessage;    }
    /**     * ActiveMQ 로 부터 메시지를 받을때 jms message 를 domain object 로 변환한다.     *     * @param message     * @return     * @throws JMSException     * @throws MessageConversionException     */    public Object fromMessage(Message message) throws JMSException, MessageConversionException {        logger.debug("> fromMessage", TAG);
        if( !(message instanceof MapMessage)) {            throw new MessageConversionException("not MapMessage");        }
        MapMessage mapMessage = (MapMessage)message;
        NotificationDto notificationDto = new NotificationDto();        notificationDto.setMemberId(mapMessage.getString("memberId"));        notificationDto.setPhoneNumber(mapMessage.getString("phoneNumber"));        notificationDto.setRegistrationKey(mapMessage.getString("registrationKey"));        notificationDto.setContent(mapMessage.getString("content"));

 

        return notificationDto;    }

 

}


domain object <--> jms message 변환하는 converter 를 구현한다.



PushNotification.java

....@Componentpublic class PushNotification {
    private Logger logger = LoggerFactory.getLogger(this.getClass().getName());    private String TAG = this.getClass().getName();
    @Autowired    private NotificationUtils notificationUtils;
    /**     * Queue 에 Message 가 들어오면 자동으로 실행된다.

     *

     * @param notificationDto     */    public void handleMessage(NotificationDto notificationDto) {        logger.debug("> handleMessage", TAG);        logger.debug("> memberId        : {}", notificationDto.getMemberId(), TAG);        logger.debug("> phoneNumber     : {}", notificationDto.getPhoneNumber(), TAG);        logger.debug("> registrationKey : {}", notificationDto.getRegistrationKey(), TAG);        logger.debug("> content         : {}", notificationDto.getContent(), TAG);
        notificationUtils.sendNotification(notificationDto);    }}


ActiveMQ 의 Queue 에 jms message 가 들어오면 자동으로 실행되는 listener 를 구현한다.POJO 로 구현하며 push notification 용도로 많이 사용되기 때문에 위의 예제로 구현했다. 상세 설정은 context-jms.xml 에 정의되어 있다.



NotificationServiceImpl.java

....
    public void send(List<NotificationDto> notificationDtoList) {        logger.debug("> send", TAG);
        for (NotificationDto param : notificationDtoList) {            jmsTemplate.convertAndSend(param);        }

 

    }
....


전형적인 service implement 이다.일반적으로 RESTful service 를 통해서 호출을 받으면 controller 에서 호출되는 method 이다.



ActiveMQ 에서 주고받는 jms message 등의 기타 정보들은 http://localhost;8161 에서 확인이 가능하다.(기본 관리자 정보는 admin / admin 이다.)

만약 윈도우에서 ActiveMQ 를 실행할때 Address already in use: JVM_Bind 오류가 발생하면 포트번호를 바꾸던지 방화벽에서 해당 포트를 열어주고 다시 실행하면 되는데 나같은 경우는 해당 포트를 열고 실행시켰다.