공부하고 기록하는, 경제학과 출신 개발자의 노트

학습일지/Knative

KnativeCon 2022 - Data Processing at Scale with Knative and Benthos

inspirit941 2024. 4. 3. 15:35
반응형

https://youtu.be/3OaRXwcRJJk?si=xB_M9TrXfPaVeSB8

스크린샷 2024-04-03 오후 1 55 05스크린샷 2024-04-03 오후 1 55 12

 

발표자

  • Mihai Todor: Optum 소속, Benthos Contributor.
  • Murugappan Chetty: Box 소속, Knative committee member

발표내용

  • Knative Autoscaling, Benthos와 Knative
  • Demo

Knative Autoscaling

스크린샷 2024-04-03 오후 1 59 16

 

Knative가 Autoscaling에 사용하는 Metric은 두 가지.

  • Concurrency: number of concurrent request your application receives <- default option. default value is 100
  • RPS: number of request per second

기본적으로는 Push-based Autoscaling. 설정한 기준치 이상의 요청이 knative에 들어오면 반응하는 식.

스크린샷 2024-04-03 오후 2 02 49스크린샷 2024-04-03 오후 2 05 18

 

Pull-based autoscaling이 필요할 때가 있다. batch 같은 거

  • Database / csv를 빠르게 처리할 수 있도록 autoscaling 설정하는 것
  • 이걸 하려면 special component가 필요함.

예컨대 Bespoke Component라고 해보자. 예컨대 100k 데이터 처리가 필요하면

  • DB에서 source data를 가져오고
  • 10 record per request로, 100개의 요청을 parallel하게 knative application에 전달하는 역할

스크린샷 2024-04-03 오후 2 07 32

 

위와 같은 일을 하는 Component가

  • Configurable
  • 많이 쓰는 data source와 호환
  • HTTP / gRPC 지원
  • Scalable / Resilient / Observable

와 같은 조건을 충족한다면 어떨까?

Benthos

스크린샷 2024-04-03 오후 2 09 23스크린샷 2024-04-03 오후 2 09 28

 

Benthos란 Streaming Process를 평범하게 관리할 수 있는 툴.

  • Fancy Stream Processing made operationally mundane.
  • Single Static Binary, supports many sinks and sources
    • source와 sink 설정하고
    • 전달되는 message에 다양한 설정이 가능하다
      • Transforms (ex. scheme migration for various messages)
      • Filtering (ex. drop messages by condition)
      • Hydration (import data into an Object라는 뜻인 듯?) ref. https://www.snaplogic.com/glossary/data-hydration
      • enrichment (ex. get data from some other sources / put it in original message / send it to output)

스크린샷 2024-04-03 오후 2 18 08

 

  • Written in Go
  • Performant and Simple
    • small workflows like lambda 에서도 유용함
  • yaml과 CUE config 지원
  • Stateless. inflight인 어떤 메시지도 로컬이나 다른 곳에 저장하지 않음
    • cache 세팅은 가능
  • Extendable
    • go에서 import하기 / custom benthos binary 생성하는 작업이 쉽다

스크린샷 2024-04-03 오후 2 29 35스크린샷 2024-04-03 오후 2 29 41

 

Benthos에 Knative를 결합해서, Autoscaling이 자유로운 streaming process를 Demo로 구현

Demo 환경

  • local k8s With Kind
  • knative serving으로 sentiment Analysis 처리하는 lambda 구현
  • Benthos는 separate process로 동작
    • local에서 실행중인 postgres DB. Source와 Sink라는 이름의 테이블
    • source에서 데이터를 batch로 읽어서 sentiment analysis App에 전달
    • 분석완료 결과물을 sink 테이블에 batch로 저장

Demo

스크린샷 2024-04-03 오후 2 33 53

 

데모 소스코드 https://github.com/mihaitodor/knative-benthos

install_knative.sh 를 실행해서, kind 클러스터에 Knative serving을 설치한다.

#!/usr/bin/env bash

set -eo pipefail

echo -e "✅ Checking dependencies... \033[0m"
STARTTIME=$(date +%s)

# The command below executes 01-kind.sh in the same bash shell so that exit commands are not swallowed
source 01-kind.sh
echo -e "🍿 Installing Knative Serving... \033[0m"
bash 02-serving.sh
echo -e "🔌 Installing Knative Serving Networking Layer kourier... \033[0m"
bash 02-kourier.sh

# Setup Knative DOMAIN DNS
INGRESS_HOST="127.0.0.1"
KNATIVE_DOMAIN=$INGRESS_HOST.sslip.io
kubectl patch configmap -n knative-serving config-domain -p "{\"data\": {\"$KNATIVE_DOMAIN\": \"\"}}"

DURATION=$(($(date +%s) - $STARTTIME))
echo -e "\033[0;92m 🚀 Knative install took: $(($DURATION / 60))m$(($DURATION % 60))s \033[0m"
echo -e "\033[0;92m 🎉 Now have some fun with Serverless and Event Driven Apps \033[0m"

sentiment analysis를 처리하는 workload를 생성한다.

  • custom benthos instance, which imports sentiment analysis library
kubectl create configmap benthos-vader-config --from-file=benthos.yaml=./benthos/benthos_vader.yaml
./deploy_benthos_vader.sh

deploy_benthos_vader.sh

  • knative service 배포하는 sh 파일
#!/usr/bin/env bash

set -eo pipefail
set -u

cat <<EOF | kubectl apply -f -
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: benthos-vader
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/window: 6s
        autoscaling.knative.dev/metric: "rps"
        autoscaling.knative.dev/target: "200"
        autoscaling.knative.dev/max-scale: "10"
    spec:
      containers:
        - image: mihaitodor/benthos-vader:latest
          args: [ "-c", "/opt/benthos/benthos.yaml" ]
          ports:
            - containerPort: 4195
          volumeMounts:
            - name: config-volume
              mountPath: /opt/benthos
      volumes:
        - name: config-volume
          configMap:
            name: benthos-vader-config
EOF

echo "Downloading benthos-vader app container image..."
kubectl wait ksvc benthos-vader --all --timeout=-1s --for=condition=Ready > /dev/null
SERVICE_URL=$(kubectl get ksvc benthos-vader -o jsonpath='{.status.url}')
echo "The Knative Service benthos-vader endpoint is $SERVICE_URL"
curl -v $SERVICE_URL/ping

workload test 명령어
curl -v -X POST -d '{"text": "I love Benthos!"}' http://benthos-vader.default.127.0.0.1.sslip.io/post

postgres DB 생성하고 populate Source Table

docker run --rm -it -p 5432:5432 -e POSTGRES_PASSWORD=password postgres
# Populate SOURCE table in DB
./populate_db.sh
# Connect to DB
pgcli postgres://postgres:password@localhost:5432/postgres
> SELECT COUNT(*) FROM SOURCE

시연한 결과는 아래와 같다.

  • source 테이블에, 감정분석에 사용할 tweet 14670개가 저장되어 있음.
  • demo github repo에 있는 tweets.csv파일을 저장한 것으로 보임

스크린샷 2024-04-03 오후 2 52 20스크린샷 2024-04-03 오후 2 52 25

 

populate_db.sh

#!/usr/bin/env bash

psql postgresql://postgres:password@localhost:5432/postgres -c 'CREATE TABLE SOURCE(ID SERIAL, TEXT VARCHAR)'
psql postgresql://postgres:password@localhost:5432/postgres -c 'CREATE TABLE SINK(COMPOUND DOUBLE PRECISION, NEGATIVE DOUBLE PRECISION, NEUTRAL DOUBLE PRECISION, POSITIVE DOUBLE PRECISION)'
# Populate SOURCE table
benthos -c ./benthos/benthos_populate_db.yaml

benthos_populate_db.yaml은 아래와 같다.

input:
  csv:
    paths:
      - Tweets.csv
    parse_header_row: true
    delimiter: ","
    batch_count: 1
    lazy_quotes: false

output:
  sql_insert:
    driver: postgres
    dsn: postgres://postgres:password@localhost:5432/postgres?sslmode=disable
    table: SOURCE
    columns:
      - TEXT
    args_mapping: |
      root = [
        this.text
      ]
    max_in_flight: 64
    batching:
      count: 100
      byte_size: 0
      period: "1s"
      check: ""
      processors: []

이제 benthos로 Batch 실행한다

time benthos -c ./benthos/benthos_batch.yaml

benthos_batch.yaml은 아래와 같다

## input 정의. postgres 정보 입력
input:
  sql_select:
    driver: postgres
    dsn: postgres://postgres:password@localhost:5432/postgres?sslmode=disable
    table: SOURCE
    columns:
      - "*"

## pipeline 설정
pipeline:
  threads: 1
  # threads: 6
  processors:
    - bloblang: root.text = this.text
    - http:
        url: http://benthos-vader.default.127.0.0.1.sslip.io/post
        verb: POST

## sink로 전달. sql로 postgres에 저장.
output:
  sql_insert:
    driver: postgres
    dsn: postgres://postgres:password@localhost:5432/postgres?sslmode=disable
    table: SINK
    columns:
      - COMPOUND
      - NEGATIVE
      - NEUTRAL
      - POSITIVE
    args_mapping: |
      root = [
        this.Compound,
        this.Negative,
        this.Neutral,
        this.Positive
      ]
    batching:
      count: 40
      period: 500ms
    ## batch 정의

metrics:
  prometheus: {}

shutdown_timeout: 3s

스크린샷 2024-04-03 오후 3 18 49

 

테스트중인 화면 얘시

  • 오른쪽의 kubectl get pods 보면, RPS가 높아서 autoscaling이 적용된 걸 볼 수 있음 (deployment 2개)
  • 왼쪽의 sql문에서는 Sink 테이블에 count 요청 보내면 값이 올라가는 걸 볼 수 있음

스크린샷 2024-04-03 오후 3 21 04

 

만약 benthos의 thread 개수를 6으로 올리면, 더 많은 knative service가 Autoscale up된다.

  • 연산속도도 더 빠름.

스크린샷 2024-04-03 오후 3 22 28

 

데모에서는 Prometheus로 metric도 확인함

  • thread 개수를 올려서 요청 보냈을 때 output_sent 값도 올라간 것을 확인할 수 있음
prometheus --config.file=prometheus.yaml
http://localhost:9090/graph?g0.expr=rate(output_sent{}[10s])&g0.tab=0&g0.range_input=5m

Summary

스크린샷 2024-04-03 오후 2 33 57

  • Knative의 Autoscale은 push based.
  • Pull based 작업을 하려면 bespoke component가 필요함
  • Benthos는 knative autoscale을 활용한 bespoke component 구현체 중 하나.

세션 끝나고 질문

Q. 왜 knative autoscaling metric으로 RPS 썼는지

  • rps는 local에서 시연을 쉽게 하기 위한 목적이었음.

Q. concurrency와 RPS 중 추천하는 방식은?

  • case by case
반응형