KEDA http-add-on Autoscaling 로직 파악하기
작성시간 기준, KEDA add on의 가장 최신버전인 v0.9.0 기준으로 작성.
Design
외부에서 HTTP 요청이 오면, Ingress가 k8s svc로 트래픽을 전달해준다.
- keda-add-ons-http-interceptor-proxy 라는 k8s svc에서 트래픽을 받아서, interceptor 컴포넌트로 트래픽을 전달한다.
- interceptor는 scale zero인 deployment의 pod가 올라올 때까지 HTTP request를 pending하고, 트래픽을 전달할 수 있는 상태가 되었을 때 routing하는 역할.
- Scaler는 interceptor에게 주기적으로 RPC call해서 pending queue 상태를 확인하며, current traffic 정보와 desired pending request 정보에 따라 pod을 Scale up / down한다.
interceptor
https://github.com/kedacore/http-add-on/blob/v0.9.0/interceptor/main.go
interceptor의 역할은 크게 두 가지.
- request pending 정보 / routing table을 관리한다.
- 요청이 들어오면 pending해둔다.
func main() {
// ...
cfg := ctrl.GetConfigOrDie()
cl, err := kubernetes.NewForConfig(cfg)
if err != nil {
setupLog.Error(err, "creating new Kubernetes ClientSet")
os.Exit(1)
}
// k8s 컴포넌트 상태이벤트를 받기 위한 shared informer 생성.
k8sSharedInformerFactory := k8sinformers.NewSharedInformerFactory(cl, time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS))
// informer로 받을 이벤트는 두 개. k8s service (core.v1.service) / endpoints (core.v1.endpoint)
//// endpoint가 정상적으로 생성되었다면 서비스가 정상인 것으로 간주하고 scale up을 진행함.
//// 따라서, endpoint가 정상이지만 pod는 트래픽을 받을 수 없는 경우 (imagePullError나 runtime exception)... 서비스 실패가 사용자에게 그대로 노출된다.
svcCache := k8s.NewInformerBackedServiceCache(ctrl.Log, cl, k8sSharedInformerFactory)
endpointsCache := k8s.NewInformerBackedEndpointsCache(ctrl.Log, cl, time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS))
if err != nil {
setupLog.Error(err, "creating new endpoints cache")
os.Exit(1)
}
// endpoint 개수가 1 이상이면 정상처리, 아닐 경우 wait하는 메소드
waitFunc := newWorkloadReplicasForwardWaitFunc(ctrl.Log, endpointsCache)
httpCl, err := clientset.NewForConfig(cfg)
if err != nil {
setupLog.Error(err, "creating new HTTP ClientSet")
os.Exit(1)
}
// in-memory queue를 생성한다. traffic counter 역할.
queues := queue.NewMemory()
sharedInformerFactory := informers.NewSharedInformerFactory(httpCl, servingCfg.ConfigMapCacheRsyncPeriod)
// httpScaledObject CR의 이벤트를 informer로 받고, queues에서 traffic counter 정보를 받아서 routing table을 생성하는 역할.
routingTable, err := routing.NewTable(sharedInformerFactory, servingCfg.WatchNamespace, queues)
if err != nil {
setupLog.Error(err, "fetching routing table")
os.Exit(1)
}
setupLog.Info("Interceptor starting")
// ...
// start a proxy server without TLS.
eg.Go(func() error {
k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())
setupLog.Info("starting the proxy server with TLS disabled", "port", proxyPort)
k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())
// traffic을 routing할 proxy server를 띄운다.
if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, svcCache, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) {
setupLog.Error(err, "proxy server failed")
return err
}
return nil
})
build.PrintComponentInfo(ctrl.Log, "Interceptor")
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
setupLog.Error(err, "fatal error")
os.Exit(1)
}
setupLog.Info("Bye!")
}
Scale up 대기 / traffic forwarding 기능
upstream traffic을 서비스로 전달하는 router 역할 proxy 로직은 아래와 같다.
func (uh *Upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = util.RequestWithLoggerWithName(r, "UpstreamHandler")
ctx := r.Context()
stream := util.StreamFromContext(ctx)
if stream == nil {
sh := NewStatic(http.StatusInternalServerError, errNilStream)
sh.ServeHTTP(w, r)
return
}
// go httputil에서 제공하는 reverse proxy 서버를 띄우고, request를 전달한다.
proxy := httputil.NewSingleHostReverseProxy(stream)
superDirector := proxy.Director
proxy.Transport = uh.roundTripper
proxy.Director = func(req *http.Request) {
superDirector(req)
req.URL = stream
req.URL.Path = r.URL.Path
req.URL.RawPath = r.URL.RawPath
req.URL.RawQuery = r.URL.RawQuery
// delete the incoming X-Forwarded-For header so the proxy
// puts its own in. This is also important to prevent IP spoofing
req.Header.Del("X-Forwarded-For ")
}
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
sh := NewStatic(http.StatusBadGateway, err)
sh.ServeHTTP(w, r)
}
proxy.ServeHTTP(w, r)
}
scale up 가능한지 확인하는 로직은 아래와 같다. 아래의 newWorkloadReplicasForwardWaitFunc() 메소드가 'WaitFunc' 라는 이름으로 사용될 예정
func workloadActiveEndpoints(endpoints v1.Endpoints) int {
total := 0
for _, subset := range endpoints.Subsets {
total += len(subset.Addresses)
}
return total
}
func newWorkloadReplicasForwardWaitFunc(
lggr logr.Logger,
endpointCache k8s.EndpointsCache,
) forwardWaitFunc {
return func(ctx context.Context, endpointNS, endpointName string) (bool, error) {
// get a watcher & its result channel before querying the
// endpoints cache, to ensure we don't miss events
watcher, err := endpointCache.Watch(endpointNS, endpointName)
if err != nil {
return false, err
}
eventCh := watcher.ResultChan()
defer watcher.Stop()
endpoints, err := endpointCache.Get(endpointNS, endpointName)
if err != nil {
// if we didn't get the initial endpoints state, bail out
return false, fmt.Errorf(
"error getting state for endpoints %s/%s: %w",
endpointNS,
endpointName,
err,
)
}
// if there is 1 or more active endpoints, we're done waiting
activeEndpoints := workloadActiveEndpoints(endpoints)
if activeEndpoints > 0 {
return false, nil
}
for {
select {
case event := <-eventCh:
endpoints, ok := event.Object.(*v1.Endpoints)
if !ok {
lggr.Info(
"Didn't get a endpoints back in event",
)
} else if activeEndpoints := workloadActiveEndpoints(*endpoints); activeEndpoints > 0 {
return true, nil
}
case <-ctx.Done():
// otherwise, if the context is marked done before
// we're done waiting, fail.
return false, fmt.Errorf(
"context marked done while waiting for workload reach > 0 replicas: %w",
ctx.Err(),
)
}
}
}
}
따라서, 두 가지 컴포넌트를 합치면 아래의 기능을 수행할 수 있다.
- 요청이 들어왔을 때
- 트래픽 받을 수 있는 상태인지 체크
- 트래픽을 적절한 애플리케이션으로 Routing
- https://github.com/kedacore/http-add-on/blob/v0.9.0/interceptor/proxy_handlers.go
// newForwardingHandler takes in the service URL for the app backend
// and forwards incoming requests to it. Note that it isn't multitenant.
// It's intended to be deployed and scaled alongside the application itself.
//
// fwdSvcURL must have a valid scheme in it. The best way to do this is
// creating a URL with url.Parse("https://...")
func newForwardingHandler(
lggr logr.Logger,
dialCtxFunc kedanet.DialContextFunc,
waitFunc forwardWaitFunc,
fwdCfg forwardingConfig,
tlsCfg *tls.Config,
) http.Handler {
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialCtxFunc,
ForceAttemptHTTP2: fwdCfg.forceAttemptHTTP2,
MaxIdleConns: fwdCfg.maxIdleConns,
IdleConnTimeout: fwdCfg.idleConnTimeout,
TLSHandshakeTimeout: fwdCfg.tlsHandshakeTimeout,
ExpectContinueTimeout: fwdCfg.expectContinueTimeout,
ResponseHeaderTimeout: fwdCfg.respHeaderTimeout,
TLSClientConfig: tlsCfg,
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// context에서 httpScaledObject 리소스 정보를 가져온다.
// request를 받았을 때, context에 httpScaledObject 채워넣는 부분은 middleware에서 수행한다.
httpso := util.HTTPSOFromContext(ctx)
waitFuncCtx, done := context.WithTimeout(r.Context(), fwdCfg.waitTimeout)
defer done()
isColdStart, err := waitFunc(
waitFuncCtx,
httpso.GetNamespace(),
httpso.Spec.ScaleTargetRef.Service,
)
if err != nil {
lggr.Error(err, "wait function failed, not forwarding request")
w.WriteHeader(http.StatusBadGateway)
if _, err := w.Write([]byte(fmt.Sprintf("error on backend (%s)", err))); err != nil {
lggr.Error(err, "could not write error response to client")
}
return
}
w.Header().Add("X-KEDA-HTTP-Cold-Start", strconv.FormatBool(isColdStart))
uh := handler.NewUpstream(roundTripper)
uh.ServeHTTP(w, r)
})
}
httpScaledObject 정보를 context에 채워넣는 middleware 로직
func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = util.RequestWithLoggerWithName(r, "RoutingMiddleware")
// request의 host, path 값으로 `bytes(//host/path)` 만들어서 key로 사용 -> route table에서 HttpScaledObject 조회한다.
httpso := rm.routingTable.Route(r)
if httpso == nil {
if rm.isProbe(r) {
rm.probeHandler.ServeHTTP(w, r)
return
}
sh := handler.NewStatic(http.StatusNotFound, nil)
sh.ServeHTTP(w, r)
return
}
r = r.WithContext(util.ContextWithHTTPSO(r.Context(), httpso))
// httpScaledObject 필드 사용해서 internal DNS url정보를 리턴한다. (http(s)://svc.namespace:port)
stream, err := rm.streamFromHTTPSO(r.Context(), httpso)
if err != nil {
sh := handler.NewStatic(http.StatusInternalServerError, err)
sh.ServeHTTP(w, r)
return
}
r = r.WithContext(util.ContextWithStream(r.Context(), stream))
rm.upstreamHandler.ServeHTTP(w, r)
}
func (rm *Routing) streamFromHTTPSO(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) {
port, err := rm.getPort(ctx, httpso)
if err != nil {
return nil, fmt.Errorf("failed to get port: %w", err)
}
if rm.tlsEnabled {
return url.Parse(fmt.Sprintf(
"https://%s.%s:%d",
httpso.Spec.ScaleTargetRef.Service,
httpso.GetNamespace(),
port,
))
}
//goland:noinspection HttpUrlsUsage
return url.Parse(fmt.Sprintf(
"http://%s.%s:%d",
httpso.Spec.ScaleTargetRef.Service,
httpso.GetNamespace(),
port,
))
}
Traffic Pending Queue / Routing 기능
in-memory queue의 경우 아래와 같이 생겼다.
// NewMemoryQueue creates a new empty in-memory queue
// RWMutex 적용, traffic counter 역할을 한다.
func NewMemory() *Memory {
lock := new(sync.RWMutex)
return &Memory{
concurrentMap: make(map[string]int),
rpsMap: make(map[string]*RequestsBuckets),
mut: lock,
}
}
queue에 traffic이 들어오고 나갈 때마다 count를 바꿔주는 로직
- https://github.com/kedacore/http-add-on/blob/v0.9.0/interceptor/middleware/counting.go
- http 요청을 받는 server (serverHttp)
- count 바꿔주는 로직
func (cm *Counting) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = util.RequestWithLoggerWithName(r, "CountingMiddleware")
ctx := r.Context()
defer cm.countAsync(ctx)()
// defer로 count를 실행할 경우, countAsync 메소드 내부의 실행코드 순서가 변경됨.
// 1. countAsync메소드 자체는 바로 실행된다. 따라서 go cm.count()는 바로 실행된다.
// 2. countAsync 메소드에서 리턴하는 익명함수 func() { go signaler.Signal() } 은 defer 처리된다. 따라서 이 익명함수는 ServeHTTP 메소드가 끝난 뒤 실행된다.
// 3. cm.upstreamHandler.ServeHTTP() 가 실행된다.
cm.upstreamHandler.ServeHTTP(w, r)
}
func (cm *Counting) countAsync(ctx context.Context) func() {
signaler := util.NewSignaler()
go cm.count(ctx, signaler) // 여기는 cm.upstreamHandler.ServeHTTP() 이전에 실행된다.
return func() {
go signaler.Signal() // cm.upstreamHandler.ServeHTTP() 이후에 실행된다.
}
}
func (cm *Counting) count(ctx context.Context, signaler util.Signaler) {
logger := util.LoggerFromContext(ctx)
httpso := util.HTTPSOFromContext(ctx)
key := k8s.NamespacedNameFromObject(httpso).String()
if !cm.inc(logger, key) {
return
}
if err := signaler.Wait(ctx); err != nil && err != context.Canceled {
logger.Error(err, "failed to wait signal")
}
cm.dec(logger, key)
}
func (cm *Counting) inc(logger logr.Logger, key string) bool {
if err := cm.queueCounter.Increase(key, 1); err != nil {
logger.Error(err, "error incrementing queue counter", "key", key)
return false
}
metrics.RecordPendingRequestCount(key, int64(1))
return true
}
queue의 값을 확인해서, traffic 정보를 관리할 table 정보는 아래와 같다.
type table struct {
httpScaledObjectInformer sharedIndexInformer
httpScaledObjectEventHandlerRegistration cache.ResourceEventHandlerRegistration
httpScaledObjects map[types.NamespacedName]*httpv1alpha1.HTTPScaledObject
httpScaledObjectsMutex sync.RWMutex
memoryHolder util.AtomicValue[TableMemory]
memorySignaler util.Signaler
queueCounter queue.Counter
}
func NewTable(sharedInformerFactory externalversions.SharedInformerFactory, namespace string, counter queue.Counter) (Table, error) {
httpScaledObjects := informershttpv1alpha1.New(sharedInformerFactory, namespace, nil).HTTPScaledObjects()
t := table{
httpScaledObjects: make(map[types.NamespacedName]*httpv1alpha1.HTTPScaledObject),
memorySignaler: util.NewSignaler(),
}
informer, ok := httpScaledObjects.Informer().(sharedIndexInformer)
if !ok {
return nil, errUnknownSharedIndexInformer
}
t.httpScaledObjectInformer = informer
registration, err := informer.AddEventHandler(&t) // table struct에 onAdd, onUpdate, onDelete 메소드가 구현되어 있다.
if err != nil {
return nil, err
}
t.httpScaledObjectEventHandlerRegistration = registration
t.queueCounter = counter
return &t, nil
}
Scaler
scale metric 수집 / 연산기능
scaler의 handler를 보면, request queue size 정보를 받아서 user app을 scale하기 위한 인터페이스 구현체라는 설명이 쓰여 있다.
- https://github.com/kedacore/http-add-on/blob/v0.9.0/scaler/handlers.go
- 정확히는 keda external_scaler의 interface 구현체.
func (e *impl) GetMetrics(
_ context.Context,
metricRequest *externalscaler.GetMetricsRequest,
) (*externalscaler.GetMetricsResponse, error) {
lggr := e.lggr.WithName("GetMetrics")
sor := metricRequest.ScaledObjectRef
namespacedName := k8s.NamespacedNameFromScaledObjectRef(sor)
metricName := MetricName(namespacedName)
scalerMetadata := sor.GetScalerMetadata()
httpScaledObjectName, ok := scalerMetadata[k8s.HTTPScaledObjectKey]
if !ok {
if scalerMetadata := sor.GetScalerMetadata(); scalerMetadata != nil {
if _, ok := scalerMetadata[keyInterceptorTargetPendingRequests]; ok {
return e.interceptorMetrics(metricName)
}
}
err := fmt.Errorf("unable to get HTTPScaledObject reference")
lggr.Error(err, "unable to get the linked HTTPScaledObject for ScaledObject", "name", sor.Name, "namespace", sor.Namespace)
return nil, err
}
httpso, err := e.httpsoInformer.Lister().HTTPScaledObjects(sor.Namespace).Get(httpScaledObjectName)
if err != nil {
lggr.Error(err, "unable to get HTTPScaledObject", "name", httpScaledObjectName, "namespace", sor.Namespace)
return nil, err
}
key := namespacedName.String()
count := e.pinger.counts()[key] // 메모리의 routing table에서 특정 svc의 count 정보를 가져온다.
// 적절한 연산을 수행해서 concurrency / RPS 계산한다
var metricValue int
if httpso.Spec.ScalingMetric != nil &&
httpso.Spec.ScalingMetric.Rate != nil {
metricValue = int(math.Ceil(count.RPS))
lggr.V(1).Info(fmt.Sprintf("%d rps for %s", metricValue, httpso.GetName()))
} else {
metricValue = count.Concurrency
lggr.V(1).Info(fmt.Sprintf("%d concurrent requests for %s", metricValue, httpso.GetName()))
}
res := &externalscaler.GetMetricsResponse{
MetricValues: []*externalscaler.MetricValue{
{
MetricName: metricName,
MetricValue: int64(metricValue),
},
},
}
return res, nil
}
여기서 Interceptor가 수집하는 'TargetPendingRequests' 정보를 수집한다.
func (e *impl) interceptorMetricSpec(metricName string, interceptorTargetPendingRequests string) (*externalscaler.GetMetricSpecResponse, error) {
lggr := e.lggr.WithName("interceptorMetricSpec")
targetPendingRequests, err := strconv.ParseInt(interceptorTargetPendingRequests, 10, 64)
if err != nil {
lggr.Error(err, "unable to parse interceptorTargetPendingRequests", "value", interceptorTargetPendingRequests)
return nil, err
}
res := &externalscaler.GetMetricSpecResponse{
MetricSpecs: []*externalscaler.MetricSpec{
{
MetricName: metricName,
TargetSize: targetPendingRequests,
},
},
}
return res, nil
}
이렇게 수집한 metric을 Prometheus / OTEL로 export하는 로직도 있는 거 같은데, 이건 autoscale과는 직접적인 관련이 없으므로 당장은 분석하지 않는다.
interceptor로부터 Queue count정보 받아오는 기능
interceptor는 adminServer에 queue 정보를 알려준다.
func runAdminServer(
ctx context.Context,
lggr logr.Logger,
q queue.Counter,
port int,
) error {
lggr = lggr.WithName("runAdminServer")
adminServer := http.NewServeMux()
queue.AddCountsRoute( // 여기
lggr,
adminServer,
q,
)
addr := fmt.Sprintf("0.0.0.0:%d", port)
lggr.Info("admin server starting", "address", addr)
return kedahttp.ServeContext(ctx, addr, adminServer, nil)
}
AddCountsRoute 메소드는 /queue
라는 endpoint로 호출하면, 지금 Message queue에 데이터가 얼마나 쌓였는지를 응답해준다.
const countsPath = "/queue"
func AddCountsRoute(lggr logr.Logger, mux *http.ServeMux, q CountReader) {
lggr = lggr.WithName("pkg.queue.AddCountsRoute")
lggr.Info("adding queue counts route", "path", countsPath)
mux.Handle(countsPath, newSizeHandler(lggr, q))
}
// q.Current() 로 값을 받아와서, Http response를 응답하는 handler.
func newSizeHandler(
lggr logr.Logger,
q CountReader,
) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
cur, err := q.Current()
if err != nil {
lggr.Error(err, "getting queue size")
w.WriteHeader(500)
if _, err := w.Write([]byte(
"error getting queue size",
)); err != nil {
lggr.Error(
err,
"could not send error message to client",
)
}
return
}
if err := json.NewEncoder(w).Encode(cur); err != nil {
lggr.Error(err, "encoding QueueCounts")
w.WriteHeader(500)
if _, err := w.Write([]byte(
"error encoding queue counts",
)); err != nil {
lggr.Error(
err,
"could not send error message to client",
)
}
return
}
})
}
그러면 scaler가 /queue 라는 Path로 요청을 보낸다는 뜻. scaler는 queuePinger라는 컴포넌트가 주기적으로 요청을 보내서 값을 확인한다.
// fetchAndSaveCounts calls fetchCounts, and then
// saves them to internal state in q
func (q *queuePinger) fetchAndSaveCounts(ctx context.Context) error {
q.pingMut.Lock() // concurrency safe 위해 lock 걸어두고
defer q.pingMut.Unlock()
// 대상 pod (= endpointsFn), interceptor 컴포넌트 정보, adminPort 정보를 토대로 queue count를 받아온다
counts, err := fetchCounts(
ctx,
q.lggr,
q.getEndpointsFn,
q.interceptorNS,
q.interceptorSvcName,
q.adminPort,
)
if err != nil {
q.lggr.Error(err, "getting request counts")
q.status = PingerERROR
return err
}
// 정보를 받아오면, status / allCounts / lastPingTime을 업데이트한다.
q.status = PingerACTIVE
q.allCounts = counts
q.lastPingTime = time.Now()
return nil
}
// fetchCounts fetches all counts from every endpoint returned
// by endpointsFn for the given service named svcName on the
// port adminPort, in namespace ns.
//
// Requests to fetch endpoints are made concurrently and
// aggregated when all requests return successfully.
//
// Upon any failure, a non-nil error is returned and the
// other two return values are nil and 0, respectively.
func fetchCounts(
ctx context.Context,
lggr logr.Logger,
endpointsFn k8s.GetEndpointsFunc,
ns,
svcName,
adminPort string,
) (map[string]queue.Count, error) {
lggr = lggr.WithName("queuePinger.requestCounts")
endpointURLs, err := k8s.EndpointsForService(
ctx,
ns,
svcName,
adminPort,
endpointsFn,
)
if err != nil {
return nil, err
}
if len(endpointURLs) == 0 {
return nil, fmt.Errorf("there isn't any valid interceptor endpoint")
}
countsCh := make(chan *queue.Counts)
var wg sync.WaitGroup
fetchGrp, _ := errgroup.WithContext(ctx)
for _, endpoint := range endpointURLs {
// capture the endpoint in a loop-local
// variable so that the goroutine can
// use it
u := endpoint
// have the errgroup goroutine send to
// a "private" goroutine, which we'll
// then forward on to countsCh
ch := make(chan *queue.Counts)
wg.Add(1)
fetchGrp.Go(func() error {
counts, err := queue.GetCounts(
http.DefaultClient,
*u,
)
if err != nil {
lggr.Error(
err,
"getting queue counts from interceptor",
"interceptorAddress",
u.String(),
)
return err
}
ch <- counts
return nil
})
// forward the "private" goroutine
// on to countsCh separately
go func() {
defer wg.Done()
res := <-ch
countsCh <- res
}()
}
// close countsCh after all goroutines are done sending
// to their "private" channels, so that we can range
// over countsCh normally below
go func() {
wg.Wait()
close(countsCh)
}()
if err := fetchGrp.Wait(); err != nil {
lggr.Error(err, "fetching all counts failed")
return nil, err
}
totalCounts := make(map[string]queue.Count)
// range through the result of each endpoint
for count := range countsCh {
// each endpoint returns a map of counts, one count
// per host. add up the counts for each host
for host, val := range count.Counts {
var responseCount queue.Count
var ok bool
if responseCount, ok = totalCounts[host]; !ok {
responseCount = queue.Count{}
}
responseCount.Concurrency += val.Concurrency
responseCount.RPS += val.RPS
totalCounts[host] = responseCount
}
}
return totalCounts, nil
}
위 scaler가 호출하는 Counts 메소드는 interceptor에 정의되어 있다.
// GetQueueCounts issues an RPC call to get the queue counts
// from the given hostAndPort. Note that the hostAndPort should
// not end with a "/" and shouldn't include a path.
func GetCounts(
httpCl *http.Client,
interceptorURL url.URL,
) (*Counts, error) {
interceptorURL.Path = countsPath // `/queue`
resp, err := httpCl.Get(interceptorURL.String()) // newSizeHandler 호출. -> 메모리에 저장된 모든 routing table 정보를 응답한다.
if err != nil {
return nil, fmt.Errorf("requesting the queue counts from %s: %w", interceptorURL.String(), err)
}
defer resp.Body.Close()
counts := NewCounts()
if err := json.NewDecoder(resp.Body).Decode(counts); err != nil {
return nil, fmt.Errorf("decoding response from the interceptor at %s: %w", interceptorURL.String(), err)
}
return counts, nil
}
현 구조의 한계점
routing table이 전부 인스턴스의 in-memory에서만 관리되는 구조 = Multi-Tenancy가 고려되지 않은 아키텍처.
- request handler와 queue counter 양쪽에서 공통적으로 주석으로 지적된 부분
- 즉 k8s에서 Pod를 여러 개 만드는 식으로 scale out을 시도해도, 개별 pod의 메모리에 routing table이 복사되어야 함. 즉, scale out이 되는 구조가 아니라, memory scale up만이 해결책이다.
- cf. 사실 이건 istio의 sidecar에도 똑같은 문제가 있다. envoy config을 동적으로 모든 sidecar에 전파하기 때문
개선법이 있을까?
Redis와 같은 외부 시스템으로 Queue 또는 대기열 시스템을 적용하면 될 거 같은데... 가능한지 구현해볼 예정
기록용 레퍼런스