Scheduling SQS messages
박대섭
2020-11-11
Overview
오늘은 브랜디에서 이미 소개된 기술들을 간단한 미션을 통해 활용하는 시간을 가져볼까 합니다.
브랜디는 상품 주문 또는 취소, 배송 상태 등에 따라 사용자에게 카카오톡 메시지, SMS 등 고객에게 알림 서비스를 제공하고 있습니다. 하지만, 국내 정보통신망법에 따라 야간 시간대(오후 9시 ~ 오전 8시 사이)에는 메시지를 보내지 않아야 하며, 메시지를 보내기 위해서는 별도의 동의가 필요합니다. [1]
그렇다면 당장 해야 할 미션에 대해 짐작이 가시나요?
동의를 받지 못한 알림 서비스는 야간 시간대를 피해 전달하라.
[시스템] 지령을 습득하였습니다.
응?!?!
Think
-
대량 메시지 발송을 위해 서버 부하를 고려하지 않을 수 없습니다. 이를 순차적으로 보내기 위한 저장소와 실행자가 필요합니다.
✔️ SQS + Lambda 포스팅 참고 [2]
-
야간 시간대에 보낸 메시지 저장소와 이 메시지들을 저장하기 위한 실행자가 필요합니다.
✔️ 이것도 SQS + Lambda 포스팅 참고 [2]
-
야간 시간대에 모여진 메시지들을, 매일 해당 시간대를 피해 전송하기 위해 쉬지 않는 기계와 메시지 전송을 위한 실행자가 필요합니다.
✔️ CloudWatch 포스팅 참고 [3]
포스팅을 참조하여 기본적으로 요소를 구성 했다면 그림을 그리며 퍼즐을 맞추어 봅시다.
Architecture(Scedule Message SQS)
- 미션은 야간 시간대를 피해 대량으로 메시지를 전송해야 하며, 이미 브랜디 알림 서비스에서 존재하는 단일 전송, 일괄 전송 기능에 최대한 영향을 주지 않도록 해야 합니다.
- 이에 별도 SQS(Schedule Message SQS)를 구성하여 기존 SQS를 분리 전송합시다.
Architecture(Wait Message SQS)
- 야간 시간대에 보낸 메시지는 별도 저장소에 저장합니다.
- 저장소는 SQS 외 Redis, Mongo DB 등을 이용해도 좋습니다.
- 다만, 필자의 경우 SQS가 웹 UI 설정으로 사용하기 간편하며 메시지 처리 오류 처리 등을 손쉽게 설정 가능하며, 메시지 보관 등을 비중 있게 생각하여 선택했으니 상황에 맞게 선택하시면 됩니다. 🙂
Architecture(Scheduling SQS messages)
- 앞서 구성한 기계(Cloud Watch)와 실행자(Schedule Lambda)로 SQS에 쌓인 메시지들을 조금씩 가져와 Schedule Message SQS에 메시지들을 전달합니다.
- 하나의 SQS를 구성하여 메시지 전송을 해도 되고, 메시지 전송 속도를 높이기 위해 기존 SQS를 사용하거나 별도의 SQS를 구성하는 것도 좋을 것 같습니다.
퍼즐을 맞추었다면 이제 코드를 만들어 봅시다.
Code
- scedule_manager/main.py : 스케줄러 메인
# -*- coding: UTF-8 -*-
# from module.order_message import OrderMessage
# from module.refund_message import RefundMessage
# from module.delivery_message import DeliveryMessage
from module.wait_message import WaitMessage
import json
sqs_list = [
# OrderMessage(),
# RefundMessage(),
# DeliveryMessage()
WaitMessage()
]
def handle():
send_result_map = {}
for sqs in sqs_list:
send_result_map[sqs.name()] = sqs.send()
return json.dumps(send_result_map)
- scedule_manager/test.py : 테스트를 위해 서버 구동
# -*- coding: UTF-8 -*-
"""
Examples:
# 서버 구동
python ./test.py
# 테스트 메세지 전송
http://localhost:5000/send 에 POST로 SQS에 넣을 메세지 전송
"""
from flask import Flask
import main
app = Flask(__name__)
@app.route('/test', methods=['GET'])
def send():
return main.handle()
if __name__ == '__main__':
app.run(debug=True, port=5000
- scedule_manager/requirements.txt : install 항목
pymysql
jinja2
pytz
requests
slacker
boto3
Flask
- 공통 - scedule_manager/common/constant.py : 전역변수
# -*- coding: utf-8 -*-
# 스케줄러 메시지 일괄 전송 SQS
SQS_QUEUE_URL_SCEDULE = 'https://sqs.ap-northeast-1.amazonaws.com/9999999999/sqs_scedule_message'
# 야간 시간대에 전송된 메시지 저장소 SQS
SQS_QUEUE_URL_WAIT = 'https://sqs.ap-northeast-1.amazonaws.com/9999999999/sqs_wait_message'
SLACK_TOKEN = 'xoxp-9999999999-9999999999-9999999999-999999999999999999999999999999'
- 모듈 - scedule_manager/module/wait_message.py : 대기 메시지 전송 SQS 모듈
# -*- coding: UTF-8 -*-
import json
from common import constant
from utils import schedule, slack, sqs
#############################
# 대기 메시지 전송 SQS 모듈
#############################
class WaitMessage:
schedule = {
'08:00': {'usePeriod': True}
}
# 이름
def name(self):
return 'WaitMessage SQS'
# 전송
def send(self):
print('=' * 30)
print('WaitMessage SQS Send Message!')
print('=' * 30)
# 스케줄 시간 확인
schedule_info = schedule.get_schedule_info(self.schedule)
if schedule_info.get('result'):
total = 0
try:
# 재전송 SQS에 큐가 비워질때까지 꺼내고 SQS 메세지 전송 후 메세지 삭제
for message in sqs.receive_after_delete(constant.SQS_QUEUE_URL_WAIT):
if message['Body']:
sqs.send(constant.SQS_QUEUE_URL_SCEDULE, (json.dumps(message['Body'])))
total += 1
except Exception as e:
print("SQS Fail : {}".format(e))
slack.send_slack_message('Exception: {}'.format(str(e)), 'SQS Fail!')
return {
'result': True, 'total': total,
}
else: # 실패시 실패 메시지 리턴
return schedule_info.get('message')
- 유틸 - scedule_manager/utils/schedule.py : Util Schedule
# -*- coding: UTF-8 -*-
import datetime
from pytz import timezone
def get_schedule_info(schedule):
"""슬랙 메세지 전송
Args:
schedule: json
Returns:
json
"""
# now_date = datetime.datetime.now(timezone('Asia/Seoul')) # 현재시간
now_date = datetime.datetime(2020, 9, 29, 8, 0, 0, 0) # 테스트 시간설정
refine_date = datetime.datetime(now_date.year, now_date.month, now_date.day, now_date.hour, 0, 0, 0) # 정리된 시간
limit_date = datetime.datetime(now_date.year, now_date.month, now_date.day, now_date.hour, 5, 0, 0) # 유효 시간
result = {
"now_date": now_date.strftime("%Y-%m-%d %H:%M:%S"),
"refine_date": refine_date.strftime("%Y-%m-%d %H:%M:%S"),
"limit_date": limit_date.strftime("%Y-%m-%d %H:%M:%S")
}
# 유효 시간 넘어가면
if now_date.time() > limit_date.time():
startTime = refine_date.strftime("%Y-%m-%d %H:%M:%S")
endTime = limit_date.strftime("%Y-%m-%d %H:%M:%S")
result.update({"result": False, "message": "유효시간 범위 [" + startTime + " ~ " + endTime + "] 초과"})
# 유효 시간 정보가 없으면
elif schedule.get(refine_date.strftime("%H:%M")) is None:
result.update({"result": False, "message": "no match schedule"})
# 성공
else:
result.update({"result": True})
if schedule.get("usePeriod"):
schedule_info = schedule.get(refine_date.strftime("%H:%M"))
result.update({"result": True, "info": schedule_info})
return result
- scedule_manager/utils/slack.py : Util Slack
# -*- coding: UTF-8 -*-
from slacker import Slacker
from common import constant
def send_slack_message(message, title, incoming_webhook_url='https://hooks.slack.com/services/TEST'):
"""슬랙 메세지 전송
Args:
incoming_webhook_url: slack webhook url
message: 전달할 메세지
title: 메시지 타이틀 이름
Returns:
response or False
"""
token = constant.SLACK_TOKEN
slack = Slacker(
token=token,
incoming_webhook_url=incoming_webhook_url,
timeout=300
)
attachments = list()
attachment = {
'title': title,
'fields': [
{
'value': message,
'short': False
}
],
'mrkdwn_in': ['text', 'title', 'fields']
}
attachments.append(attachment)
slack_data = {
'attachments': attachments
}
return slack.incomingwebhook.post(slack_data)
- scedule_manager/utils/sqs.py : Util SQS
# -*- coding: UTF-8 -*-
import boto3
def send(queue_url, body):
"""SQS 메세지 전송
Args:
queue_url: SQS URL
body : 전달할 SQS Body
Returns:
response or False
"""
sqs_client = boto3.client('sqs', 'ap-northeast-1')
response = sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=body
)
return response
def receive_after_delete(queue_url):
"""재전송 SQS에 큐가 비워질때까지 꺼내고 메세지 삭제
Args:
queue_url: SQS URL
Returns:
response or False
"""
sqs_client = boto3.client('sqs', 'ap-northeast-1')
while True:
resp = sqs_client.receive_message(
QueueUrl=queue_url,
AttributeNames=['All'],
MaxNumberOfMessages=10
)
try:
yield from resp['Messages']
except KeyError:
return False
entries = [
{'Id': msg['MessageId'], 'ReceiptHandle': msg['ReceiptHandle']}
for msg in resp['Messages']
]
resp = sqs_client.delete_message_batch(
QueueUrl=queue_url, Entries=entries
)
if len(resp['Successful']) != len(entries):
raise RuntimeError(
f"Failed to delete messages: entries={entries!r} resp={resp!r}"
)
여기서 잠깐! 추가로 풀어나가야 할 과제가 남아 있습니다!
서비스를 이용하는 고객이 많아지면 한번에 전송해야 하는 Wait Message SQS도 그만큼 늘어나게 됩니다. 물론 Schedule Lambda 제한 시간 안에 완료된다면 문제가 없으나, 초과할 경우 남아 있는 Message는 어떻게 할지에 대한 고민하게 됩니다. 글 중간에 메시지 전송 속도에 대해 언급하여 그러한 일이 발생하지 않도록 SQS 추가 구성에 대해 말씀드렸는데요. ‘나는 제한 시간에 얽매이는 게 싫다!’ 하시는 분들은 Batch 구성을 한다면 좋을 것 같습니다.
Conclusion
이번 시간에는 Scheduling을 통해 SQS 메시지를 모았다가 다른 SQS로 메시지를 전달해보았습니다. 아직 고민해야 할 사항들은 존재하지만, 특정 일자에 전달해야 하는 데이터를 쉽고 빠르게, 더불어 유지보수도 편하도록 개발해야 할 때 Amazon SQS를 이용한 Scheduling 방법을 추천해드립니다.🙂
Reference
1. 불법스팸대응센터 - 야간시간대에 광고성 정보를 전송할 수 있는지?
https://spam.kisa.or.kr/customer/sub3_R.do?idx=8
2. SQS + Lambda - 이상근 실장님 작성 글
http://labs.brandi.co.kr/2018/02/16/leesg.html
3. CloudWatch에 대하여 - 곽정섭님 작성 글
http://labs.brandi.co.kr/2019/05/30/kwakjs.html