KnativeCon 2022 - Data Processing at Scale with Knative and Benthos
https://youtu.be/3OaRXwcRJJk?si=xB_M9TrXfPaVeSB8
발표자
- Mihai Todor: Optum 소속, Benthos Contributor.
- Murugappan Chetty: Box 소속, Knative committee member
발표내용
- Knative Autoscaling, Benthos와 Knative
- Demo
Knative Autoscaling
Knative가 Autoscaling에 사용하는 Metric은 두 가지.
- Concurrency: number of concurrent request your application receives <- default option. default value is 100
- RPS: number of request per second
기본적으로는 Push-based Autoscaling. 설정한 기준치 이상의 요청이 knative에 들어오면 반응하는 식.
Pull-based autoscaling이 필요할 때가 있다. batch 같은 거
- Database / csv를 빠르게 처리할 수 있도록 autoscaling 설정하는 것
- 이걸 하려면 special component가 필요함.
예컨대 Bespoke Component라고 해보자. 예컨대 100k 데이터 처리가 필요하면
- DB에서 source data를 가져오고
- 10 record per request로, 100개의 요청을 parallel하게 knative application에 전달하는 역할
위와 같은 일을 하는 Component가
- Configurable
- 많이 쓰는 data source와 호환
- HTTP / gRPC 지원
- Scalable / Resilient / Observable
와 같은 조건을 충족한다면 어떨까?
Benthos
Benthos란 Streaming Process를 평범하게 관리할 수 있는 툴.
- Fancy Stream Processing made operationally mundane.
- Single Static Binary, supports many sinks and sources
- source와 sink 설정하고
- 전달되는 message에 다양한 설정이 가능하다
- Transforms (ex. scheme migration for various messages)
- Filtering (ex. drop messages by condition)
- Hydration (import data into an Object라는 뜻인 듯?) ref. https://www.snaplogic.com/glossary/data-hydration
- enrichment (ex. get data from some other sources / put it in original message / send it to output)
- Written in Go
- Performant and Simple
- small workflows like lambda 에서도 유용함
- yaml과 CUE config 지원
- Stateless. inflight인 어떤 메시지도 로컬이나 다른 곳에 저장하지 않음
- cache 세팅은 가능
- Extendable
- go에서 import하기 / custom benthos binary 생성하는 작업이 쉽다
Benthos에 Knative를 결합해서, Autoscaling이 자유로운 streaming process를 Demo로 구현
Demo 환경
- local k8s With Kind
- knative serving으로 sentiment Analysis 처리하는 lambda 구현
- Benthos는 separate process로 동작
- local에서 실행중인 postgres DB. Source와 Sink라는 이름의 테이블
- source에서 데이터를 batch로 읽어서 sentiment analysis App에 전달
- 분석완료 결과물을 sink 테이블에 batch로 저장
Demo
데모 소스코드 https://github.com/mihaitodor/knative-benthos
install_knative.sh 를 실행해서, kind 클러스터에 Knative serving을 설치한다.
#!/usr/bin/env bash
set -eo pipefail
echo -e "✅ Checking dependencies... \033[0m"
STARTTIME=$(date +%s)
# The command below executes 01-kind.sh in the same bash shell so that exit commands are not swallowed
source 01-kind.sh
echo -e "🍿 Installing Knative Serving... \033[0m"
bash 02-serving.sh
echo -e "🔌 Installing Knative Serving Networking Layer kourier... \033[0m"
bash 02-kourier.sh
# Setup Knative DOMAIN DNS
INGRESS_HOST="127.0.0.1"
KNATIVE_DOMAIN=$INGRESS_HOST.sslip.io
kubectl patch configmap -n knative-serving config-domain -p "{\"data\": {\"$KNATIVE_DOMAIN\": \"\"}}"
DURATION=$(($(date +%s) - $STARTTIME))
echo -e "\033[0;92m 🚀 Knative install took: $(($DURATION / 60))m$(($DURATION % 60))s \033[0m"
echo -e "\033[0;92m 🎉 Now have some fun with Serverless and Event Driven Apps \033[0m"
sentiment analysis를 처리하는 workload를 생성한다.
- custom benthos instance, which imports sentiment analysis library
kubectl create configmap benthos-vader-config --from-file=benthos.yaml=./benthos/benthos_vader.yaml
./deploy_benthos_vader.sh
deploy_benthos_vader.sh
- knative service 배포하는 sh 파일
#!/usr/bin/env bash
set -eo pipefail
set -u
cat <<EOF | kubectl apply -f -
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: benthos-vader
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/window: 6s
autoscaling.knative.dev/metric: "rps"
autoscaling.knative.dev/target: "200"
autoscaling.knative.dev/max-scale: "10"
spec:
containers:
- image: mihaitodor/benthos-vader:latest
args: [ "-c", "/opt/benthos/benthos.yaml" ]
ports:
- containerPort: 4195
volumeMounts:
- name: config-volume
mountPath: /opt/benthos
volumes:
- name: config-volume
configMap:
name: benthos-vader-config
EOF
echo "Downloading benthos-vader app container image..."
kubectl wait ksvc benthos-vader --all --timeout=-1s --for=condition=Ready > /dev/null
SERVICE_URL=$(kubectl get ksvc benthos-vader -o jsonpath='{.status.url}')
echo "The Knative Service benthos-vader endpoint is $SERVICE_URL"
curl -v $SERVICE_URL/ping
workload test 명령어curl -v -X POST -d '{"text": "I love Benthos!"}' http://benthos-vader.default.127.0.0.1.sslip.io/post
postgres DB 생성하고 populate Source Table
docker run --rm -it -p 5432:5432 -e POSTGRES_PASSWORD=password postgres
# Populate SOURCE table in DB
./populate_db.sh
# Connect to DB
pgcli postgres://postgres:password@localhost:5432/postgres
> SELECT COUNT(*) FROM SOURCE
시연한 결과는 아래와 같다.
- source 테이블에, 감정분석에 사용할 tweet 14670개가 저장되어 있음.
- demo github repo에 있는 tweets.csv파일을 저장한 것으로 보임
populate_db.sh
#!/usr/bin/env bash
psql postgresql://postgres:password@localhost:5432/postgres -c 'CREATE TABLE SOURCE(ID SERIAL, TEXT VARCHAR)'
psql postgresql://postgres:password@localhost:5432/postgres -c 'CREATE TABLE SINK(COMPOUND DOUBLE PRECISION, NEGATIVE DOUBLE PRECISION, NEUTRAL DOUBLE PRECISION, POSITIVE DOUBLE PRECISION)'
# Populate SOURCE table
benthos -c ./benthos/benthos_populate_db.yaml
benthos_populate_db.yaml은 아래와 같다.
input:
csv:
paths:
- Tweets.csv
parse_header_row: true
delimiter: ","
batch_count: 1
lazy_quotes: false
output:
sql_insert:
driver: postgres
dsn: postgres://postgres:password@localhost:5432/postgres?sslmode=disable
table: SOURCE
columns:
- TEXT
args_mapping: |
root = [
this.text
]
max_in_flight: 64
batching:
count: 100
byte_size: 0
period: "1s"
check: ""
processors: []
이제 benthos로 Batch 실행한다
time benthos -c ./benthos/benthos_batch.yaml
benthos_batch.yaml은 아래와 같다
## input 정의. postgres 정보 입력
input:
sql_select:
driver: postgres
dsn: postgres://postgres:password@localhost:5432/postgres?sslmode=disable
table: SOURCE
columns:
- "*"
## pipeline 설정
pipeline:
threads: 1
# threads: 6
processors:
- bloblang: root.text = this.text
- http:
url: http://benthos-vader.default.127.0.0.1.sslip.io/post
verb: POST
## sink로 전달. sql로 postgres에 저장.
output:
sql_insert:
driver: postgres
dsn: postgres://postgres:password@localhost:5432/postgres?sslmode=disable
table: SINK
columns:
- COMPOUND
- NEGATIVE
- NEUTRAL
- POSITIVE
args_mapping: |
root = [
this.Compound,
this.Negative,
this.Neutral,
this.Positive
]
batching:
count: 40
period: 500ms
## batch 정의
metrics:
prometheus: {}
shutdown_timeout: 3s
테스트중인 화면 얘시
- 오른쪽의 kubectl get pods 보면, RPS가 높아서 autoscaling이 적용된 걸 볼 수 있음 (deployment 2개)
- 왼쪽의 sql문에서는 Sink 테이블에 count 요청 보내면 값이 올라가는 걸 볼 수 있음
만약 benthos의 thread 개수를 6으로 올리면, 더 많은 knative service가 Autoscale up된다.
- 연산속도도 더 빠름.
데모에서는 Prometheus로 metric도 확인함
- thread 개수를 올려서 요청 보냈을 때 output_sent 값도 올라간 것을 확인할 수 있음
prometheus --config.file=prometheus.yaml
http://localhost:9090/graph?g0.expr=rate(output_sent{}[10s])&g0.tab=0&g0.range_input=5m
Summary
- Knative의 Autoscale은 push based.
- Pull based 작업을 하려면 bespoke component가 필요함
- Benthos는 knative autoscale을 활용한 bespoke component 구현체 중 하나.
세션 끝나고 질문
Q. 왜 knative autoscaling metric으로 RPS 썼는지
- rps는 local에서 시연을 쉽게 하기 위한 목적이었음.
Q. concurrency와 RPS 중 추천하는 방식은?
- case by case