우선 RabbitMQ설정을 하기 위해 이를 설정하기 쉽게 도와주는 CloudAMQP에 가입하고 인스턴스 하나를 만들도록 하겠습니다.
여기서 지역은 ap-northeast-2로 바꾸어주어 하나 생성했습니다.
실습
이제 본격적으로 RabbitMQ의 pika를 이용해서 publisher와consumer를 만들어서 브로커를 통해서 이들간의 메세지 전달을 간단히 테스팅해보는 시간을 가져보도록 하겠습니다.
RabbitMQ는 AMQP(Advanced Message Queuing Protocol)을 구현되었습니다.
AMQP는 queue, exchange, binding이라는 세가지 개념으로 구성됩니다.
- queue는 메세지를 보관하고 소비자가 메시지를 선택하기를 기다립니다.
- exchange는 게시자가 새 메시지를 추가하기 위한 진입점입니다.
- binding은 메시지가 exchange로 queue로 라우팅되는 방법을 정의합니다.
RabbitMQ를 연동하기 위해 파이썬 RPC클라이언트인 Pika를 사용합니다. Pika는 Pythond에서 AMQP 0-9-1 프로토콜을 사용하게 해주는 라이브러리입니다.
https://pika.readthedocs.io/en/stable/intro.html
/user_order/producer.py
from pika import URLParameters, BlockingConnection
# 파라미터 설정
params = URLParameters('amqps://vjmczupx:MOf1ZIEhCOj58rh-oSQd0Mrg_BpiVfUD@dingo.rmq.cloudamqp.com/vjmczupx')
# 커넥션 ( 인스턴스에 접근 하는 것 )
connection = BlockingConnection(params)
# 통로 ( 채널 )을 뚫어주는 것
channel = connection.channel()
def publish():
# 가장 기본적인 메세지를 전달하고 싶다. ( order라는 곳에 hello라는 텍스트를 전달하고 싶다 )
channel.basic_publish(exchange='', routing_key='order', body='hello')
우선 이전에 CloudAMPQ의 URL을 URLParameters에 넣고, 인스턴스와 연결합니다. 그리고 통로를 뚫어주고 publish라는 함수 하나를 정의해 줍니다. 그리고 가장 기본적인 basic_publish함수를 통해서 order로 보내고, body에는 'hello'라는 문자열을 담습니다.
/consumer.py
from pika import URLParameters, BlockingConnection
# 파라미터 설정
params = URLParameters('amqps://vjmczupx:MOf1ZIEhCOj58rh-oSQd0Mrg_BpiVfUD@dingo.rmq.cloudamqp.com/vjmczupx')
# 커넥션 ( 인스턴스에 접근 하는 것 )
connection = BlockingConnection(params)
# 통로 ( 채널 )을 뚫어주는 것
channel = connection.channel()
# 나는 order로 보낸 메세지만 받겠다라는 으미
channel.queue_declare(queue='order')
def callback(ch, method, properties, body):
print('Received in order')
print(body)
# channel을 basic한 consume을 하겠다는 의미 ( order로 보낸 메세지만 받겠다 )
channel.basic_consume(queue='order', on_message_callback=callback)
print('Started consuming')
channel.start_consuming()
channel.close()
그 다음에는 이 producer로부터 받은 메세지를 브로커로부터 받아 소비하기 위한 구문을 작성해 줍니다. 우선 위와 동일하게 인스턴스와 연결하고, 통로를 뚫어줍니다. 그리고 queue_declare를 통해 order로 로 오는 메세지만 받겠다고 정읳바니다. 그리고 나중에 받을 떄 쓰이는 callback함수를 하나 정의해줍니다.
그후에 기본적인 basic_consume을 통해 order로 오는 메세지를 callback에 넘깁니다. 그리고 callback에서 body를 출력하게 되는 것이죠. 이는 start_consuming()을 통해서 실행됩니다.
즉 실습을 위해 이 publish 함수를 GET /api/shop에 붙여보도록 하겠습니다.
/user_order/views.py
...
class ShopViewSet(viewsets.ViewSet):
def list(self, request): # /api/shop/
shops = Shop.objects.all()
serializer = ShopSerializer(shops, many=True)
publish()
return Response(serializer.data)
...
그리고 docker-compose up으로 컨테이너 구성을 한다음에 backend bash쉘로 들어가서 consumer를 실행해보겠습니다. 그리고 포스트맨으로 GET /api/shop으로 요청을 보내 잘 메시지가 브로커를 통해 오는지 확인해 보도록 하겠습니다.
정상적으로 작동하는 것을 확인할 수 있습니다.
그리고 또하나 의문점이 들 수도 있습니다. 왜 consumer는 root폴더에 넣었고, producer는 user_order안에 넣었는지 말이죠. microservice라고 하면, 원래는 user_order외에도 다양한 앱이 존재할 겁니다. 그런데, 만약에 consumer가 각 앱 안에 있다면 다른 서비스에서 여기로 producing즉 메세지를 전달하고 싶을 때, 어디로 가야할지 모르기 때문입니다.
따라서 여기서는 order를 대표하는 root에 consumer를 삽입했고, producing같은 경우에는 여러가지 앱에 따라서 다양한 producing을 할 수 있기 때문에, 각 폴더 앱 안에 작성해 주어도 상관없게 되는 것입니다.
다음 포스팅에서는 Flask안에서 이를 활용해보겠습니다.