우선 두 DB의 SYNC를 맞춰주기 위해 먼저 order로 REST API가 불러져서DB의 상태가 바뀌게 되면, boss에도 RabbitMQ를 통해 이를 알려주고 이의 DB도 바꾸어 보는 실습을 해보겠습니다.
실습
이를 위해서는 먼저 order의 producer.py의 publish함수를 수정해줘야 합니다. API를 부를 떄, boss에서 필요한 method, body정보를 받아와야 하기 때문입니다.
이번 실습에서 사용할 pika의 중요구문입니다. 이를 기준으로 진행하겠습니다.
/order/user_order/producer.py
from pika import URLParameters, BlockingConnection, BaseProperties
import json
# 파라미터 설정
params = URLParameters('amqps://vjmczupx:MOf1ZIEhCOj58rh-oSQd0Mrg_BpiVfUD@dingo.rmq.cloudamqp.com/vjmczupx')
# 커넥션 ( 인스턴스에 접근 하는 것 )
connection = BlockingConnection(params)
# 통로 ( 채널 )을 뚫어주는 것
channel = connection.channel()
def publish(method, body):
# 가장 기본적인 메세지를 전달하고 싶다. ( order라는 곳에 hello라는 텍스트를 전달하고 싶다 )
properties = BaseProperties(method)
channel.basic_publish(exchange='', routing_key='boss',
body=json.dumps(body), properties=properties)
''' Publish a message to an exchange
Aeguments:
- `exchange` - The exchange to publish to.
- `routing_key` - The routing key to bind on.
- `message` - The message to send. If this is not a string be serialized as JSON,
and the properties.content_type will be forced to be application/json
- `properties` - Dict of AMQP message properties
'''
우선 properties를 pika의 BaseProperties를 통해 주어주었습니다. 여기에 properties를 넣으면 properties.content_type으로 consumer에서 접근이 가능하게 됩니다.
이제 이렇게 했다면 당연히 views.py에서도 이 publish의 사용방법을 바꾸어 주어야 합니다.
/order/user_order/views.py
from rest_framework import viewsets, status
from .models import Shop, Order
from .serializers import ShopSerializer, OrderSerializer
from rest_framework.response import Response
from rest_framework.parsers import JSONParser
from .producer import publish
class ShopViewSet(viewsets.ViewSet):
def list(self, request): # /api/shop/
shops = Shop.objects.all()
serializer = ShopSerializer(shops, many=True)
# GET방식에서는 publish함수가 필요없다. -> 값이 바뀌거나 그런게 아니기 때문이다.
return Response(serializer.data)
def create(self, request): # /api/shop/
data = JSONParser().parse(request);
serializer = ShopSerializer(data=data)
serializer.is_valid(raise_exception=True)
serializer.save()
publish('shop_created', serializer.data)
return Response(serializer.data, status.HTTP_201_CREATED)
def retrieve(self, request, pk=None): # /api/shop/<str:idx>/
shop = Shop.objects.get(id=pk)
serializer = ShopSerializer(shop)
return Response(serializer.data)
def update(self, request, pk=None): # /api/shop/<str:idx>/
shop = Shop.objects.get(id=pk)
data = JSONParser().parse(request);
serializer = ShopSerializer(instance=shop, data=data)
serializer.is_valid(raise_exception=True)
serializer.save()
publish('shop_updated', serializer.data)
return Response(serializer.data, status.HTTP_202_ACCEPTED)
def destroy(self, request, pk=None): # /api/shop/<str:idx>/
shop = Shop.objects.get(id=pk)
shop.delete()
publish('shop_deleted', pk)
return Response(status.HTTP_204_NO_CONTENT)
class OrderViewSet(viewsets.ViewSet):
def list(self, request): # /api/order/
orders = Order.objects.all()
serializer = OrderSerializer(orders, many=True)
return Response(serializer.data)
def create(self, request): # /api/order/
data = JSONParser().parse(request);
serializer = OrderSerializer(data=data)
serializer.is_valid(raise_exception=True)
serializer.save()
publish('order_created', serializer.data)
return Response(serializer.data, status.HTTP_201_CREATED)
def retrieve(self, request, pk=None): # /api/order/<str:idx>/
order = Order.objects.get(id=pk)
serializer = OrderSerializer(order)
return Response(serializer.data)
def update(self, request, pk=None): # /api/order/<str:idx>/
order = Order.objects.get(id=pk)
data = JSONParser().parse(request);
serializer = OrderSerializer(instance=order, data=data)
serializer.is_valid(raise_exception=True)
serializer.save()
publish('order_updated', serializer.data)
return Response(serializer.data, status.HTTP_202_ACCEPTED)
def destroy(self, request, pk=None): # /api/order/<str:idx>/
order = Order.objects.get(id=pk)
order.delete()
publish('order_deleted', pk)
return Response(status.HTTP_204_NO_CONTENT)
여기에서 GET요청은 publish함수가 필요 없습니다. 왜냐하면 직접적으로 DB의 값만 가져다 쓰는 것이기 때문에, 상태가 변경되었다고 boss의 다른 서비스에 보낼 필요가 없습니다. 따라서 총 6개를 각각의 함수의 목적에 맞게 publish함수를 활용해 주었습니다.
다음으로는 boss에서 이 정보를 가지고 consumer에서 properties.content_type의 정보를 보고, DB의 값을 바꾸어 주어야 합니다. 즉 callback함수를 바꾸어 주면 됩니다.
/boss/consumer.py
from pika import URLParameters, BlockingConnection
import json
from main import Shop, Order, db
# 파라미터 설정
params = URLParameters('amqps://vjmczupx:MOf1ZIEhCOj58rh-oSQd0Mrg_BpiVfUD@dingo.rmq.cloudamqp.com/vjmczupx')
# 커넥션 ( 인스턴스에 접근 하는 것 )
connection = BlockingConnection(params)
# 통로 ( 채널 )을 뚫어주는 것
channel = connection.channel()
# 나는 order로 보낸 메세지만 받겠다라는 으미
channel.queue_declare(queue='boss')
def callback(ch, method, properties, body):
print('Received in boss')
data = json.loads(body)
print(data)
if properties.content_type == 'shop_created':
shop = Shop(id=data['id'], shop_name=data['shop_name'], shop_address=data['shop_address'])
db.session.add(shop)
db.session.commit()
elif properties.content_type == 'shop_updated':
shop = Shop.query.get(data['id'])
shop.shop_name = data['shop_name']
shop.shop_address = data['shop_address']
db.session.commit()
elif properties.content_type == 'shop_deleted':
shop = Shop.query.get(data)
db.session.delete(shop)
db.session.commit()
elif properties.content_type == 'order_created':
order = Order(id=data['id'], shop=data['shop'], address=data['address'])
db.session.add(order)
db.session.commit()
elif properties.content_type == 'order_updated':
order = Order.query.get(data['id'])
order.shop = data['shop']
order.address = data['address']
db.session.commit()
elif properties.content_type == 'order_deleted':
order = Order.query.get(data)
db.session.delete(order)
db.session.commit()
# channel을 basic한 consume을 하겠다는 의미 ( order로 보낸 메세지만 받겠다 )
channel.basic_consume(queue='boss', on_message_callback=callback, auto_ack=True)
print('Started consuming')
channel.start_consuming()
channel.close()
다음과 같이 Flask문법을 사용해서 properties.content_type의 값에따라 적절히 작성해 주었습니다.
이를 MSA의 막바지에 이른것입니다. 다음에는 REST API처리 방법에 대해 간단히 실습하겠습니다.