TODAY 181
Spring 4, RabbitMQ, SockJS, Stomp 연동

개념도

 

 

동작방식

맨처음 파란색 Pub에서 MQTT 요청을 보내면 그 요청을 /device 라는 큐로 받아서 Spring 에게 전달해 줍니다.
Spring은 알람이 필요할것으로 보이는것이면, 로직 처리후 알람을 보내기위해 RabbitMQ에게 알람을 보낼 내용을 전달해줍니다.
전달받은 RabbitMQ는 /noti 라는 토픽에 넣어서 해당 토픽을 구독중인 큐 들에게 요청을 보냅니다.

 

(1) Spring 서버측 받는요청, 보내는 요청을 처리할 설정파일을 설정합니다.

context-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xmlns="http://www.springframework.org/schema/beans"; xmlns:rabbit="http://www.springframework.org/schema/rabbit";
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd";>
 
    <!-- RabbitMQ 연결 설정 -->
     <rabbit:connection-factory id="rabbitConnectionFactory" host="10.0.0.103" username="아이디" password="비밀번호" port="5672" />
     <rabbit:admin connection-factory="rabbitConnectionFactory" />
 
    <!-- Queue 등록 -->
     <rabbit:queue name="device" />
 
    <!-- Queue 바인딩 -->
     <rabbit:direct-exchange name="amq.direct">
         <rabbit:bindings>
             <rabbit:binding key="#" queue="device" />
         </rabbit:bindings>
     </rabbit:direct-exchange>
 
     <!-- 메시지를 보내기위한 template 설정, 재시도할 경우 retryTemplate을 이용한다 -->
     <rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory" exchange="amq.topic" routing-key="noti" retry-template="retryTemplate" />
 
     <!-- 메시지 리스너 설정 -->
     <rabbit:listener-container connection-factory="rabbitConnectionFactory">
 
         <!-- 큐 myQueue 메시지는 bean id consumer의 method handleMessage가 처리한다 -->
         <rabbit:listener queues="device" ref="consumer" method="handleMessage" />
 
     </rabbit:listener-container>
 
     <!-- 재시도를 위한 template 설정 -->
     <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
 
         <!-- 재시도 시 간격 설정 -->
         <property name="backOffPolicy">
             <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                 <property name="initialInterval" value="500" />
                 <property name="multiplier" value="2.0" />
                 <property name="maxInterval" value="20000" />
             </bean>
         </property>
 
         <!-- 실패 시 재시도 횟수 설정 -->
         <property name="retryPolicy">
             <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                 <property name="maxAttempts" value="5"/>
             </bean>
         </property>
     </bean>
 </beans>

 

(2) 메세지를 받아줄 Consumer 코드를 작성합니다.

Consumer.java
package com.chemi.cheminoti.rabbit;
 
import com.chemi.cheminoti.framework.common.BaseObject;
import com.chemi.cheminoti.service.dashboard.MonitorService;
import com.chemi.cheminoti.service.login.SequencesService;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
import java.util.Map;
 
/**
 * Created by Skibis on 2018-02-27 오후 5:33
 */
@Component
public class Consumer extends BaseObject{
 
    @Autowired
    MonitorService monitorService;
 
    @Autowired
    SequencesService sequencesService;
 
    //메시지를 받아서 처리한다.
    public void handleMessage(byte[] message) throws ParseException {
        String receivedMessage = new String(message);
        info("=== Rabbit Consumer === ");
        info(receivedMessage);
 
        JSONParser jsonParser = new JSONParser();
        JSONObject jsonObj = (JSONObject) jsonParser.parse(receivedMessage);
        JSONArray receivedMessageArray = (JSONArray) jsonObj.get("device");
 
        Map<String, String> receivedMap = new HashMap<>();
        String receivedAddress;
        String receivedSensorState;
        String receivedBatteryPower;
        int result;
 
        info("=== receivedMessage START ===");
        for(int i=0 ; i<receivedMessageArray.size() ; i++) {
            JSONObject tempObj = (JSONObject) receivedMessageArray.get(i);
 
            receivedAddress = (String) tempObj.get("addr");
            receivedSensorState =  (String) tempObj.get("sensor status");
            receivedBatteryPower = (String) tempObj.get("pwr");
            receivedMap.put(REC.REC_CODE,receivedAddress);
            receivedMap.put(REC.REC_MOD,receivedSensorState);
            receivedMap.put(REC.REC_PWR,receivedBatteryPower);
 
            info("receivedAddress: ["+receivedAddress+"]");
            info("receivedSensorState: ["+receivedSensorState+"]");
            info("receivedBatteryPower: ["+receivedBatteryPower+"]");
 
            result = monitorService.handleMessage(receivedMap);
            info(String.valueOf(result));
        }
        info("=== receivedMessage END ===");
 
    }
 
}
메세지를 받아서 Json 타입으로 처리후 해당 데이터를 DB에 저장시킵니다.

 

(3) 알람을 보낼수있는 클래스를 만들어 줍시다.

Producer.java
package com.chemi.cheminoti.rabbit;
 
import com.chemi.cheminoti.framework.common.BaseObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
/**
 * Created by Skibis on 2018-02-27 오후 5:33
 */
@Component
public class Producer extends BaseObject {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    public void sendMessage(Object message) {
        info("=== Rabbit Producer Class START ===");
        String sendMessage = (String) message;
        info(sendMessage);
        rabbitTemplate.convertAndSend(sendMessage);
    }
 
}

 

(4) 특정 로직을 거친후... 알람을 보내봅시다. ( 현재는 특정 버튼을 누르면 알림을 보내도록 설정 )

@RequestMapping(value = "/dashboard/TEST.ajax", method = )
@ResponseBody
public void notitest(@RequestParam Map<String, Object> param, HttpServletRequest request) {
  producer.sendMessage("Send test data :" + System.currentTimeMillis() );
}

 

(5) 클라이언트 측에서 요청을 받기위해서는 2개의 라이브러리가 필요합니다.

<script src="//cdnjs.cloudflare.com/ajax/libs/sockjs-client/0.3.4/sockjs.min.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
해당 JS들 가져온 이후에 접속할 JS를 만들어 봅시다. ( SockJS 사용 )

 

custom.js
// Create a WebSocket connection. Replace with your hostname
var ws = new SockJS("http://10.0.0.103:15674/stomp");
var client = Stomp.over(ws);
 
// RabbitMQ Web-Stomp does not support heartbeats so disable them
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
 
client.debug = onDebug;
 
// Make sure the user has limited access rights
client.connect("아이디", "비밀번호", onConnect, onError, "/");
 
//Start subscribing to the chat queue
function onConnect() {
    var id = client.subscribe("/topic/noti", function(d) {
 
/*
        var node = document.createTextNode(d.body + '\n');
        document.getElementById('chat').appendChild(node);
*/
 
    });
}
 
//Send a message to the chat queue
function sendMsg() {
    console.log("!sendMsg!");
    var msg = document.getElementById('msg').value;
    client.send('/amq/noti', { "content-type": "text/plain" }, msg);
}
 
function onError(e) {
    console.log("STOMP ERROR", e);
}
 
function onDebug(m) {
    console.log("STOMP DEBUG", m);
}

 

Tip) heartbeat 부분을 0으로 넣지 않으면, ping pong 후 연결을 끊게 됩니다.

 

실행 테스트

RabbitMQ GUI 콘솔에서 기존에 존재하는 큐를 확인합니다. 이떄 큐는 device 한개만 존재해야 합니다.
(-> 큐방식으로 사용할 예정이기 때문)

 

 

이후에 WAS를 시작하고 2개의 테스트 계정으로 접속시 큐가 생기는것을 볼수있습니다.

 

크롬 콘솔로 두개의 브라우저를 확인하면 정상적으로 접속한 모습이 확인됩니다.

 

한쪽에서 메세지를 보내면 양쪽에 동시에 받아집니다.