TODAY 74
RabbitMQ 란

이해하기

RabbitMQ는 AMQP(Advanced Message Queing Protocol)을 구현한 메시지 브로커이다.AMQP라는 표준MQ 프로토콜로 만들어 져있고 Cluster구성이 쉽고 ManageUI가 제공되며 무엇보다 성능이 뛰어나다고 알려져 현재 많이 사용되고 있다. 또한 ManagementUI, Autocluster, MQTT Convert, STOMP 등의 plugin도 제공되어 확장성이 뛰어나며 Spring에서도 AMQP연동 라이브러리가 제공되어 편리하게 연동하여 사용가능하다.
 

특징

ISO 표준(ISO/IEC 19464) AMQP 구현
비동기처리를 위한 메시지큐 브로커
erlang과 java 언어로 만들어짐 (clustering 등을 위한 고속 데이터 전송하는 부분에 erlang이 사용된 것으로 추정)
분산처리를 고려한 MQ ( Cluster, Federation )
고가용성 보장 (High Availability)
Publish/Subscribe 방식 지원
다양한 plugin 지원
 

주요 용어

Producer 메시지를 보내는 Application

 

 
Publish Producer가 메시지를 보냄
Queue 메시지를 저장하는 버퍼 , Queue는 Exchange에 Binding된다.
Consumer 메시지 받는 User Application
Tip)동일 업무를 처리하는 Consumer는 보통 하나의 Queue를 바라본다. (중복방지)
Tip)동일 업무를 처리하는 Consumer가 여러개 인 경우 같은 Queue를 바라보게 하면 자동으로 메시지를 분배하여 전달함
Subscribe Consumer가 메세지를 수신하기 위해 Queue를 실시간으로 리스닝하도록 만듬.
ExchangeProducer가 전달한 메시지를 Queue에 전달하는 역할, 메시지가 Queue에 직접 전달되지 않고 exchange type 정의대로 동작
Exchange type동작방식 선택
type 설명
fanout 알려진 모든 Queue에 메시지 전달 함
direct 지정된 routingKey를 가진 Queue에만 메시지 전달 함
topic 지정된 패턴 바인딩 형태에 일치하는 Queue에만 메시지 전달. #(여러단어), *(한단어)를 통한 문자열 패턴 매칭
header 헤더에 포함된 key=value의 일치조건에 따라서 메시지 전달

 

Bindings Exchange와 Queue를 연결해주는 것

 

Routing Exchange가 Queue에 메시지를 전달하는 과정
 
RoutingKey Exchange와 Queue가 Binding될 때 Exchange가 Queue에 메시지를 전달할지를 결정하는 기준
Publish RoutingKey가 Binding시 설정된 RoutingKey값과 일치하거나 (exchange type=direct 경우) RoutingKey값이 Binding시 설정된 패턴에 매칭될 때
-> (exchange type=topic 경우)

 

기본 흐름

Exchange를 별도로 명시하지 않은 요청의 기본 흐름

 

 

 

  1. Producer가 메시지를 생성하여 전송
  2. Queue가 이 메시지를 순차적으로 쌓음
  3. Consumer가 Queue에 대한 Binding을 가지고 있다가 메시지를 Queue에서 수신

상세 흐름

  1. Producer가 메시지를 생성하여 전송
  2. Exchange가 어떤 Queue에 전달할지를 Routing
  3. Queue들이 메시지를 순차적으로 쌓음
  4. Consumer가 Queue에 대한 Binding을 가지고 있다가 메시지를 Queue에서 수신

 

메시지 분배 (Round-robin dispatching)

RabbitMQ는 Consumer가 병렬처리를 쉽게 할 수 있도록 같은 Queue를 바라보고 있는 Consumer에게 메시지를 균등 분배한다.

 

즉 첫번째 메시지는 Consumer1에게 두번째 메시지는 Consumer2에게 분배(중복 처리하지 않도록 1명에게만 줌)

 

MQ의 존재 이유를 보여주는 매우 중요한 개념이다. 이로 인해 메시지를 받아 처리하는 프로그램들은 수평 확장이 가능하다.물론 Producer도 수평 확장이 가능하다. (같은 Queue에 메시지를 던져주기만 하면 됨)

 

Fair dispatch(공평한 분배)

여러 consumer에게 round robin할 때 번갈아가면서 메시지를 전달하지만 완전히 공평하진 않음(매홀수는 데이터크기가 크고, 매짝수는 데이터크기가 작은 등)

 

때문에 busy한 서버에게 메시지를 계속 전달하지 않도록 prefetchCount라는 개념사용.prefetchCount가 1일때는 아직 ack를 받지 못한 메시지가 1개라도 있으면 다시 그 consumer에게 메시지 할당하지 않음.즉 prefetchCount는 동시에 보내는 메시지 양임

 

 

메시지 수신 통보 (Acknowledgment)

많은 프로토콜들이 메시지 전달 보장을 위해 Acknowlegment(ACK)라는 개념을 사용하여 메시지에 대한 응답을 보내주도록 되어있다.MQTT의 경우는 QoS(Quality of Service) level이라고해서 0=안보냄, 1=전달완료확인, 2=최종목적지까지 처리완료 확인의 개념이 있는데RabbitMQ의 경우는 Acknowledgment(Consumer전달확인)와 Confirm(Publish전달확인)을 이용한 level 1만 지원하는 듯 하다.(추정)

 

ACK가 중요한 이유는 Queue는 Consumer에게 데이터를 전달하고나면 Queue에서 메시지를 삭제하므로, Consumer가 전달받았지만 처리도중 오류가 발생하여 재처리해야하는 경우를 위해 보관 유예기간을 두는 용도로 이용된다. 즉 ACK가 온 메시지만 삭제처리하도록 하는 것이다.

 

※ 주의 basicQos라는 설정은 prefetchCount의 숫자 설정임(!! QoS level 0,1,2의 개념 아님!!)

 

 

여러 Client(Consumer) 중 일부가 죽었을시 대응방법

앞서 설명했듯이 Consumer가 ACK를 보내지 않으면 Queue에 쌓인 메시지는 지워지지 않고 남아있다.메시지를 받은 Consumer가 느려서 아직 Processing 중일 수 있어서 다른 Consumer에게 Round Robin하여 재전송 하자니 중복처리 우려가 있고,그대로 Unacknowledged 상태로 마냥 처리 안된채로 남겨 둘 수도 없고 곤란하다.

 

이 경우에 어떻게 동작하는지 확인해볼 필요가 있다.

 

테스트 예제

같은 메시지에 대하 바인딩되어있는 Consumer가 2개가 있다. 하나는 ACK를 재대로 보내는 Consumer고 하나는 오류 상황이 발생하여 ACK를 보내지 못하는 상황이다. prefetchCount는 1로 하나라도 Unacknowledge 상태라면 재전송을 하지 않도록 설정한다.

 

 

 

위에서 보는 것 처럼 Acknowledge 메시지가 돌아오지 않으면 Queue는 메시지를 삭제하지 않고 보관하고 있다가, Consumer가 이를 처리하지 못한다고 판단했을 때 (Disconnected 되는 등의 상황) 메시지를 다음 순번의 Consumer(Worker)에게 라운드 로빈한다.

 

RabbitMQ가 재기동 됐을때 대응책

메시지를 Queue에 넣은 뒤 Consumer에게 전달하기 전에 RabittMQ 서버가 죽는다면기본적으로 해당 메시지는 날라가버리게 된다.이런 상황을 방지 하기 위해 durable이라는 개념을 가지고 있다.

 

Message durability

메시지는 Queue에 보관할 때 file에도 같이 쓰도록 만드는 방법이다.

 

아래와 같은 방법으로 설정해야 동작한다.

 

queue생성시 durable속성을 true로 주고 만든다.

message publish할때 MessageProperties.PERSISTENT_TEXT_PLAIN을 설정함

 

1,2번 모두 만족해야 메시지가 Queue에 남아있을 때 restart해도 날라가지 않는다.

 

 

※ 메시지의 persistent는 완변히 보장되진 않음. 메번 메시지마다 fsync 로 동기화히지 않기 때문에 짧은시간이나마 아직 Disk에 쓰여지지 않았을 경우가 있다. 좀더 강력한 방법을 보장하기 위해서는 publisher confirms를 사용

 

Publish/Subscribe

한 메시지를 여러 Consumer가 동시에 받아 사용해야 하는 경우가 있다. 이러한 Publish/Subscribe 구조를 RabbitMQ로 구성하여 사용 할 수 있다.

 

기본 exchanges

: RabbitMQ설치시 기본적으로 세팅된 exchanges
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic

 

이름없는 exchange

Producer가 메시지를 보낼 때 exchange를 지정해서 보낼 수 있는데, 이름이없다면 routingKey라는 특정 key값으로 Queue에 라우팅 된다.즉 exchange명이 있거나 routingKey명이 있어야 한다.
channel.basicPublish("", "hello", null, message.getBytes());
channel.basicPublish( "logs", "", null, message.getBytes());

 

임시 Queue

이전에 사용한 Queue를 기억하고 계속적으로 사용하기 위해서는 같은 이름의 Queue를 Worker가 바라보도록 하는 것이 중요하다.그러나 fanout을 받아 처리하는 Queue인 경우 Consumer가 임시 이름이라도 지정해줘야하는데 다음 2가지가 고려 되어야 한다.

RabbitMQ 연결시마다 서버에서 발급한 random Queue 이름의 사용

연결이 종료 되었을 때 Queue의 자동삭제

 

 

String queueName = channel.queueDeclare().getQueue();
이런식의 amq.gen-JzTY20BRgKO-HjmUJj0wLg Random Queue가 사용된다.

 

바인딩 (Bindings)

생성한 exchange를 queue에 매핑을 해줘야 exchange가 메시지를 받으면 Queue에 메시지를 보낼 수 있는데이를 Bindings 이라고 함

 

channel.queueBind(queueName, "logs", "");

 

Routing

모든 메시지를 받는 것이 아니라 선별정으로 특정 종류의 메시지를 받도록 subset을 설정하여 멀티케스트 함

 

Direct exchange

exchange type 중 direct을 이용해서 exchange를 만들면 부분적으로 메시지를 라우팅 하도록 처리 할 수 있다.

 

// 특정 EXCHANGE_NAME 으로 "direct' exchange를 생성
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

 

Producer

메시지를 publish할 때 routingKey값을 주어 선별적인 Queue에만 메시지를 전달 한다.

 

// 특정 EXCHANGE_NAME에 특정 "routingKey"로 binding된 queue에게만 메시지 전송
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

 

Consumer

메시지를 받는 Consumer, Worker에서 queue bind시 routingKey값을 주어 선별적으로 수신 한다.

 

// 특정 EXCHANGE_NAME의 특정 routingKey만 전달 받도록 binding 함
// 여러 routingKey와 bind 될 수 있다.
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);

 

Multiple binding

Exchange와 Queue간에 n:m binding이 가능하다.
1개의 Queue는 여러 routingKey와 binging 할 수 있다.여러개의 Queue는 같은 routingKey와 binding 할 수 있다.

 

패턴매칭 라우팅(Topics)

RabbitMQ는 Topic이라는 개념을 통해 패턴 매칭식 멀티케스트 라우팅을 지원한다.
topic exchange를 생성하여 특정 문자열 패턴에 일치하는 Queue에만 데이터를 보내주도록 구성할 수 있다.

 

Topic exchange

exchange type 중 topic을 이용해서 exchange를 만들면 특정 패턴에 일치하는 routingKey로 메시지가 전송된 경우 메시지를 수신 할 수 있다.
해당 패턴은 다음의 특수 문자를 사용하여 구성한다.
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

 

Topic Pattern

*

여러 Word를 나타냄 (1 Word 는 .로 구분됨 )

#

여러 Word를 나타냄 (1 Word 는 .로 구분됨 )

 

 

※ Topic이 # 으로만 구성되면 fanout과 동일하다.※ Topic에 *나 # 문자가 하나도 없다면 direct와 동일하다.

 

예제

quick으로 시작되는 .으로 구분되는 4개의 Word로 구성된 문자열
quick.*.*.*
quick.test.good.job => 매칭
front.quick.test.good.job => 비매칭
#.end
front.mid.end => 매칭
front.end => 매칭
front.mid.end.last => 비매칭

 

Producer

메시지를 publish할 때 routingKey값을 주어 선별적인 Queue에만 메시지를 전달 한다.

 

// 특정 EXCHANGE_NAME에 특정 "routingKey"로 binding된 queue에게만 메시지 전송 (메시지 전송시는 direct와 별반 다를바 없음)
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

 

Consumer

메시지를 받는 Consumer, Worker에서 queue bind시 routingKey의 패턴 값을 주어 선별적으로 수신 한다.

 

// 특정 EXCHANGE_NAME의 특정 패턴을 가진 문자열로 routingKey만 전달 받도록 binding 함
// 여러 routingKey와 bind 될 수 있다.
String patternedRoutingKey1 = "kern.*";
String patternedRoutingKey1 = "*.critical";

 

channel.queueBind(queueName, EXCHANGE_NAME, patternedRoutingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, patternedRoutingKey2);

 

원격 프로시져 호출 (RPC)

RabbitMQ는 Request-Response로 Client와 Server를 이어주기 위해 RPC라는 개념으로 기능을 제공한다.
RPC라는 거창한 이름을 사용하였지만 실제로는 Client의 request를 Server에 전달하고, Server가 처리한 결과를 알맞은 Client 요청에 대한 응답으로 전달 할 수 있는 방법을 말한다.

 

Message Properties 설명

DeliveryMode : persistent인지 transient인지 표시 (휘발성인지 비휘발성인지 구분자)
ContentType : 내용물의 mime-type
ReplyTo : Callback Queue의 이름
CorrelationID : 요청을 구분할 수 있는 유일값

 

처리 흐름

  1. Client가 CorrelationID, ReplyTo 주어서 RabbitMQ의 특정 Request보관용 Queue에 데이터를 Push한다.
  2. Request용 Queue에 데이터를 Server에서 Consume하여 요청을 처리한다.
  3. 요청처리 후 Request에서 받은 CorrelationID 와 ReplyTo를 추출하여, 요청ID를 속성으로 갖는 Response를 ReplyTo Queue에 Push한다.
  4. Client는 ReplyTo Queue를 subscribe하고 있다가 Response가 오면 CorrelationID를 보고 어떤 요청에 대한 응답인지를 구분하여 처리한다.

RPC의 이점

서버처리 이점 : 서버 처리속도가 느려서 성능을 증가 시키려고 할 때, RPC 서버를 하나 더 두고 같은 Request Queue를 바라보게 하면 됨 ( Round Robin 하므로 )

 

Client 이점 : 하나의 메시지를 개별 Round Trip으로 처리를 위해 queueDeclare같은 동기처리 요청이 필요없다. 

 -> (임시 Queue를 생성하여 Client마다 다른 Queue를 사용하므로)

 

RPC 구성시 고려할 점

돌아가는 서버가 없을 때 Client 처리

요청 Timeout시 Client 처리

서버 Exception이나 오동작시 Client에게 이를 어떻게 전달할지

Invalid한 데이터가 서버로 전달 되었을 때의 처리