반응형
스마트 메시지 서비스 개발기 (Kafka Streams)
https://if.kakao.com/session/22
스마트 메시지: 카카오톡 채널 광고메시지 서비스의 일종.
- 소재 최적화: 여러 광고시안 중 어떤 소재가 가장 반응률이 높을 것인지
- 유저 타겟팅: 반응률이 높을 것 같은 유저 타겟팅.
- 광고 등록을 요청하면, 스마트메시지 시스템에 요청이 등록됨.
- 스마트메시지 시스템은 카톡 채널 로그로부터 모델을 학습해서, 소재최적화 / 유저타겟팅 대상을 선정한다.
- 메시지를 보내면, 반응 로그가 kafka로 유입. 스마트메시지 시스템이 다시 학습에 사용함.
- 광고 집행이 끝나면 리포트 생성 -> 광고주가 확인할 수 있도록 전달.
프로덕트 아키텍처
- 광고 집행 담당하는 서버군
- 소재최적화 / 유저타겟팅을 위한 모델 학습 시스템
- 사용자 로그를 필요한 곳에 제공하는 파이프라인
Golang-based Server
- Go로 전환하면서 코드량 60% 감소, 메모리 사용량도 1/3 수준으로 감소했다고 함
Event Driven Architecture
- k8s pod는 항상 장애상황을 고려해야 하는데, http 기반 통신은 pod 장애발생 시 유실 / 중복처리 가능성이 있었음.
- kafka의 Txn 기능 -> 오류 발생 시 commit되지 않도록 해서 중복처리 가능성 방지.
- CQRS (명령과 쿼리 분리)
- REST CRUD에 비해 상태 변경요청이 DB에는 느리게 반영되지만, latency보다 정합성이 더 중요한 서비스이며 이벤트 발생 빈도가 낮으므로 괜찮음.
Delay Queue 기반 Job Scheduling
광고 집행방식
- 시스템 내부에서 학습모델이 갱신 -> 발송대상과 발송메시지가 주기적으로 갱신되는 Mini-batch 방식
- 따라서 집행 시작지점 / 실제 job 발생 시점 사이의 시차가 있음
- 스케줄링 서버에서 정해진 시간에 맞추어 Job을 요청하느 ㄴ식.
- 스케줄링 서버에 이슈가 생겼을 경우
- 서버 메모리에 저장하면 그대로 유실됨
- DB에 저장할 경우, 저장된 job 정보를 하나의 pod만 실행할 수 있도록 로직을 구성해야 함
- RabbitMQ의 MessageTTL + Dead Letter Exchange 사용해서 Delay Queue 구현 가능.
- 메시지의 TTL 설정시간이 끝나면 Delay Letter Exchange로 이동하게 됨.
- DLX를 별도의 queue와 바인딩하면, delay queue 형태로 사용 가능함.
- 예컨대 11:00에 시작하는 광고 스케줄을 등록하면
- 11:00, 11:10, 11:20 ... 에 시작되는 스케줄을 TTL 지정해서 Queue에 등록.
- TTL이 만료되면 DLX Binded Queue로 job이 이동하며, DLX로 들어온 job을 서버가 받아서 실행하도록 설정.
서버가 죽을 경우 k8s에서 다른 Pod을 띄워서 스케줄링 서버를 동작시킴.
- Delay Queue에서 필요한 정보를 받아오도록 설정.
Kafka Streams
고려해야 할 점은 세 가지.
- 사용자의 모든 로그는 일단 kafka로 모인다. 원천데이터를 저장하는 kafka와 완벽히 연동되는 애플리케이션이 필요함.
- 상태 기반 처리.
- 스트림 데이터 처리 종류는 두 가지. stateful / stateless.
- stateful : 여러 메시지를 시간단위로 묶어 처리. 사용자의 반응 로그데이터를 취합하는 과정이 필요했으므로 stateful.
- stateless : 각각의 메시지를 전부 별개의 메시지로 인식해 처리.
- 운영: 지속적으로 stream application을 운영할 수 있어야 함. SW지원, 트러블슈팅 등을 포함
kafka stream 선택 이유
- apache kafka에서 공식 REALESE
- masterless 배포 방식으로 배포유연성이 높음
- 어차피 Kafka로 원천데이터 받는 상황에서 streams 기능을 공식 지원하고 있음
특징
- 라이브러리. -> 스프링 쓰거나, public static void main 으로 시작하는 일반 자바 애플리케이션으로도 사용 가능.
- Task 단위로 데이터 처리. Partition 별로 task 만들어서 병렬처리 -> partition 개수만큼 streams application을 Scale out 가능.
- kafka Consumer가 partition단위로 consumer group 만들어서 데이터 처리하는 것과 비슷함
- rocksDB / changeLog Topic 활용. changeLog는 상태 백업 용도로 Topic에도 저장.
구현방법은 크게 두 가지. StreamsDSL / ProcessorAPI
- StreamDSL은 세 가지 데이터 처리방식 - Kstream, Ktable, GlobalKtable 제공
- ProcessorAPI는 streamDSL이 제공하지 않는 스케줄링 처리가 필요할 때 / msg header, timestamp 기준으로 처리가 필요할 경우 사용.
StreamDSL만 사용해서 구현했다고 함.
KStreams
- 레코드 단위로 메시지의 Key / value를 추상화한 객체.
- 기존의 Consumer에서 Record 단위로 처리하는 것과 유사함.
- partition으로 들어온 데이터를 순차적으로 하나씩 처리함.
KTable
- topic에서 record의 추가를 Insert로 보지 않고 update로 간주하는 방식.
- 동일한 KEY값으로 들어온 여러 메시지 중 가장 마지막으로 들어온 메시지를 유효한 값으로 판단한다.
- Task별로 각 Partition의 데이터를 일종의 View Table로 만든 느낌
- topic의 데이터를 batch / table 형태로 사용할 수 있다는 장점.
GlobalKTable
- 모든 Partition의 데이터를 각 Task마다 구체화된 View로 만든다.
적용 예시
로그 취합 및 저장 프로세스 (Aggregation Process)
- 원천로그 데이터는 최대 1000tps로 유입됨. 데이터량이 많기 때문에 mongoDB에 직접 데이터 조회를 요청하면 DB부하가 심하다.
- streams로 데이터 처리 -> 저장하는 방식.
사용자 정보 - 로그데이터 매핑이 필요하면 Stateless Mapping 담당하는 로직 실행.
- 사용자의 반응로그를 의미하는 topic을 선언한다.
- filter / map 등의 메소드로 stateless 로직 수행
- json여부를 filter 함수로 확인
- Redis에 있는 사용자 데이터와 결합해서 신규 KStream을 생성한다.
- mongoDB에 Record들을 저장한다.
원천로그를 Stateful Processing으로 Aggregate한 뒤 MongoDB에 저장하는 프로세스
- KStream을 Window 연산하면 시간 단위로 KTable 데이터가 리턴됨.
- 코드를 보면, 1분 단위 + Message Key 기반으로 Group + Aggregate.
- 레코드의 Group 단위로는 KEY를 사용하는 게 일반적.
- KTable -> Kstream으로 다시 변환한 뒤 MongoDB에 저장.
"kafka Stream은 Partition단위로 task가 나뉘고, 각 task는 partition의 데이터만 활용한다."
unit test 코드. TopologyTestDriver를 사용함.
반응형