공부하고 기록하는, 경제학과 출신 개발자의 노트

강연

if kakao 2021 - 스마트 메시지 서비스 개발기 (kafka Streams)

inspirit941 2022. 7. 12. 20:03
반응형

스마트 메시지 서비스 개발기 (Kafka Streams)

https://if.kakao.com/session/22

 

if(kakao)2021

함께 나아가는 더 나은 세상

if.kakao.com

 

스크린샷 2022-07-12 오후 2 53 51스크린샷 2022-07-12 오후 2 56 11

 

스크린샷 2022-07-12 오후 2 58 37

스마트 메시지: 카카오톡 채널 광고메시지 서비스의 일종.

  • 소재 최적화: 여러 광고시안 중 어떤 소재가 가장 반응률이 높을 것인지
  • 유저 타겟팅: 반응률이 높을 것 같은 유저 타겟팅.

스크린샷 2022-07-12 오후 2 55 39

  • 광고 등록을 요청하면, 스마트메시지 시스템에 요청이 등록됨.
  • 스마트메시지 시스템은 카톡 채널 로그로부터 모델을 학습해서, 소재최적화 / 유저타겟팅 대상을 선정한다.
  • 메시지를 보내면, 반응 로그가 kafka로 유입. 스마트메시지 시스템이 다시 학습에 사용함.
  • 광고 집행이 끝나면 리포트 생성 -> 광고주가 확인할 수 있도록 전달.

스크린샷 2022-07-12 오후 4 36 56

프로덕트 아키텍처

스크린샷 2022-07-12 오후 4 36 02

  • 광고 집행 담당하는 서버군
  • 소재최적화 / 유저타겟팅을 위한 모델 학습 시스템
  • 사용자 로그를 필요한 곳에 제공하는 파이프라인

스크린샷 2022-07-12 오후 4 40 49

Golang-based Server

스크린샷 2022-07-12 오후 4 40 59

 

  • Go로 전환하면서 코드량 60% 감소, 메모리 사용량도 1/3 수준으로 감소했다고 함

Event Driven Architecture

스크린샷 2022-07-12 오후 5 09 44

 

  • k8s pod는 항상 장애상황을 고려해야 하는데, http 기반 통신은 pod 장애발생 시 유실 / 중복처리 가능성이 있었음.

 

스크린샷 2022-07-12 오후 5 09 37

  • kafka의 Txn 기능 -> 오류 발생 시 commit되지 않도록 해서 중복처리 가능성 방지.
  • CQRS (명령과 쿼리 분리)
  • REST CRUD에 비해 상태 변경요청이 DB에는 느리게 반영되지만, latency보다 정합성이 더 중요한 서비스이며 이벤트 발생 빈도가 낮으므로 괜찮음.

Delay Queue 기반 Job Scheduling

스크린샷 2022-07-12 오후 5 22 16

광고 집행방식

  • 시스템 내부에서 학습모델이 갱신 -> 발송대상과 발송메시지가 주기적으로 갱신되는 Mini-batch 방식
  • 따라서 집행 시작지점 / 실제 job 발생 시점 사이의 시차가 있음
  • 스케줄링 서버에서 정해진 시간에 맞추어 Job을 요청하느 ㄴ식.

스크린샷 2022-07-12 오후 5 22 25

  • 스케줄링 서버에 이슈가 생겼을 경우
    • 서버 메모리에 저장하면 그대로 유실됨
    • DB에 저장할 경우, 저장된 job 정보를 하나의 pod만 실행할 수 있도록 로직을 구성해야 함

스크린샷 2022-07-12 오후 5 27 06스크린샷 2022-07-12 오후 5 30 21

  • RabbitMQ의 MessageTTL + Dead Letter Exchange 사용해서 Delay Queue 구현 가능.
    • 메시지의 TTL 설정시간이 끝나면 Delay Letter Exchange로 이동하게 됨.
    • DLX를 별도의 queue와 바인딩하면, delay queue 형태로 사용 가능함.
  • 예컨대 11:00에 시작하는 광고 스케줄을 등록하면
    • 11:00, 11:10, 11:20 ... 에 시작되는 스케줄을 TTL 지정해서 Queue에 등록.
    • TTL이 만료되면 DLX Binded Queue로 job이 이동하며, DLX로 들어온 job을 서버가 받아서 실행하도록 설정.

스크린샷 2022-07-12 오후 5 32 54

서버가 죽을 경우 k8s에서 다른 Pod을 띄워서 스케줄링 서버를 동작시킴.

  • Delay Queue에서 필요한 정보를 받아오도록 설정.

Kafka Streams

스크린샷 2022-07-12 오후 5 34 55스크린샷 2022-07-12 오후 5 35 09

고려해야 할 점은 세 가지.

  • 사용자의 모든 로그는 일단 kafka로 모인다. 원천데이터를 저장하는 kafka와 완벽히 연동되는 애플리케이션이 필요함.
  • 상태 기반 처리.
    • 스트림 데이터 처리 종류는 두 가지. stateful / stateless.
    • stateful : 여러 메시지를 시간단위로 묶어 처리. 사용자의 반응 로그데이터를 취합하는 과정이 필요했으므로 stateful.
    • stateless : 각각의 메시지를 전부 별개의 메시지로 인식해 처리.
  • 운영: 지속적으로 stream application을 운영할 수 있어야 함. SW지원, 트러블슈팅 등을 포함

스크린샷 2022-07-12 오후 5 40 12

kafka stream 선택 이유

  • apache kafka에서 공식 REALESE
  • masterless 배포 방식으로 배포유연성이 높음
  • 어차피 Kafka로 원천데이터 받는 상황에서 streams 기능을 공식 지원하고 있음

 

스크린샷 2022-07-12 오후 5 42 36

 

특징

  • 라이브러리. -> 스프링 쓰거나, public static void main 으로 시작하는 일반 자바 애플리케이션으로도 사용 가능.
  • Task 단위로 데이터 처리. Partition 별로 task 만들어서 병렬처리 -> partition 개수만큼 streams application을 Scale out 가능.
    • kafka Consumer가 partition단위로 consumer group 만들어서 데이터 처리하는 것과 비슷함
  • rocksDB / changeLog Topic 활용. changeLog는 상태 백업 용도로 Topic에도 저장.

 

스크린샷 2022-07-12 오후 5 46 58

구현방법은 크게 두 가지. StreamsDSL / ProcessorAPI

  • StreamDSL은 세 가지 데이터 처리방식 - Kstream, Ktable, GlobalKtable 제공
  • ProcessorAPI는 streamDSL이 제공하지 않는 스케줄링 처리가 필요할 때 / msg header, timestamp 기준으로 처리가 필요할 경우 사용.

StreamDSL만 사용해서 구현했다고 함.

 

스크린샷 2022-07-12 오후 7 25 31

KStreams

  • 레코드 단위로 메시지의 Key / value를 추상화한 객체.
  • 기존의 Consumer에서 Record 단위로 처리하는 것과 유사함.
  • partition으로 들어온 데이터를 순차적으로 하나씩 처리함.

 

스크린샷 2022-07-12 오후 7 27 58

KTable

  • topic에서 record의 추가를 Insert로 보지 않고 update로 간주하는 방식.
  • 동일한 KEY값으로 들어온 여러 메시지 중 가장 마지막으로 들어온 메시지를 유효한 값으로 판단한다.
    • Task별로 각 Partition의 데이터를 일종의 View Table로 만든 느낌
  • topic의 데이터를 batch / table 형태로 사용할 수 있다는 장점.

 

스크린샷 2022-07-12 오후 7 30 30

GlobalKTable

  • 모든 Partition의 데이터를 각 Task마다 구체화된 View로 만든다.

 

적용 예시

스크린샷 2022-07-12 오후 7 40 29

 

로그 취합 및 저장 프로세스 (Aggregation Process)

  • 원천로그 데이터는 최대 1000tps로 유입됨. 데이터량이 많기 때문에 mongoDB에 직접 데이터 조회를 요청하면 DB부하가 심하다.
    • streams로 데이터 처리 -> 저장하는 방식.

스크린샷 2022-07-12 오후 7 44 20

사용자 정보 - 로그데이터 매핑이 필요하면 Stateless Mapping 담당하는 로직 실행.

  1. 사용자의 반응로그를 의미하는 topic을 선언한다.
  2. filter / map 등의 메소드로 stateless 로직 수행
    1. json여부를 filter 함수로 확인
    2. Redis에 있는 사용자 데이터와 결합해서 신규 KStream을 생성한다.
    3. mongoDB에 Record들을 저장한다.

 

스크린샷 2022-07-12 오후 7 53 45

 

원천로그를 Stateful Processing으로 Aggregate한 뒤 MongoDB에 저장하는 프로세스

  • KStream을 Window 연산하면 시간 단위로 KTable 데이터가 리턴됨.
  • 코드를 보면, 1분 단위 + Message Key 기반으로 Group + Aggregate.
    • 레코드의 Group 단위로는 KEY를 사용하는 게 일반적.
  • KTable -> Kstream으로 다시 변환한 뒤 MongoDB에 저장.

"kafka Stream은 Partition단위로 task가 나뉘고, 각 task는 partition의 데이터만 활용한다."

스크린샷 2022-07-12 오후 7 58 44

unit test 코드. TopologyTestDriver를 사용함.

반응형