공부하고 기록하는, 경제학과 출신 개발자의 노트

학습일지/Autoscale

KEDA HTTP Add on 코드분석

inspirit941 2024. 12. 30. 16:39
반응형

KEDA http-add-on Autoscaling 로직 파악하기

작성시간 기준, KEDA add on의 가장 최신버전인 v0.9.0 기준으로 작성.

Design

arch



외부에서 HTTP 요청이 오면, Ingress가 k8s svc로 트래픽을 전달해준다.

  • keda-add-ons-http-interceptor-proxy 라는 k8s svc에서 트래픽을 받아서, interceptor 컴포넌트로 트래픽을 전달한다.
  • interceptor는 scale zero인 deployment의 pod가 올라올 때까지 HTTP request를 pending하고, 트래픽을 전달할 수 있는 상태가 되었을 때 routing하는 역할.
  • Scaler는 interceptor에게 주기적으로 RPC call해서 pending queue 상태를 확인하며, current traffic 정보와 desired pending request 정보에 따라 pod을 Scale up / down한다.

 

interceptor

https://github.com/kedacore/http-add-on/blob/v0.9.0/interceptor/main.go

interceptor의 역할은 크게 두 가지.

  • request pending 정보 / routing table을 관리한다.
  • 요청이 들어오면 pending해둔다.
func main() {
  // ...
  cfg := ctrl.GetConfigOrDie()

    cl, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        setupLog.Error(err, "creating new Kubernetes ClientSet")
        os.Exit(1)
    }
  // k8s 컴포넌트 상태이벤트를 받기 위한 shared informer 생성.
    k8sSharedInformerFactory := k8sinformers.NewSharedInformerFactory(cl, time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS))

  // informer로 받을 이벤트는 두 개. k8s service (core.v1.service) / endpoints (core.v1.endpoint)
  //// endpoint가 정상적으로 생성되었다면 서비스가 정상인 것으로 간주하고 scale up을 진행함.
  //// 따라서, endpoint가 정상이지만 pod는 트래픽을 받을 수 없는 경우 (imagePullError나 runtime exception)... 서비스 실패가 사용자에게 그대로 노출된다.
    svcCache := k8s.NewInformerBackedServiceCache(ctrl.Log, cl, k8sSharedInformerFactory)
    endpointsCache := k8s.NewInformerBackedEndpointsCache(ctrl.Log, cl, time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS))
    if err != nil {
        setupLog.Error(err, "creating new endpoints cache")
        os.Exit(1)
    }

  // endpoint 개수가 1 이상이면 정상처리, 아닐 경우 wait하는 메소드
    waitFunc := newWorkloadReplicasForwardWaitFunc(ctrl.Log, endpointsCache)

  httpCl, err := clientset.NewForConfig(cfg)
    if err != nil {
        setupLog.Error(err, "creating new HTTP ClientSet")
        os.Exit(1)
    }

  // in-memory queue를 생성한다. traffic counter 역할.
    queues := queue.NewMemory()

    sharedInformerFactory := informers.NewSharedInformerFactory(httpCl, servingCfg.ConfigMapCacheRsyncPeriod)

  // httpScaledObject CR의 이벤트를 informer로 받고, queues에서 traffic counter 정보를 받아서 routing table을 생성하는 역할.
    routingTable, err := routing.NewTable(sharedInformerFactory, servingCfg.WatchNamespace, queues)
    if err != nil {
        setupLog.Error(err, "fetching routing table")
        os.Exit(1)
    }

    setupLog.Info("Interceptor starting")

  // ...

  // start a proxy server without TLS.
    eg.Go(func() error {
        k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())
        setupLog.Info("starting the proxy server with TLS disabled", "port", proxyPort)

        k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())

    // traffic을 routing할 proxy server를 띄운다.
        if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, svcCache, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) {
            setupLog.Error(err, "proxy server failed")
            return err
        }

        return nil
    })

    build.PrintComponentInfo(ctrl.Log, "Interceptor")

    if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
        setupLog.Error(err, "fatal error")
        os.Exit(1)
    }

    setupLog.Info("Bye!")
}

Scale up 대기 / traffic forwarding 기능

upstream traffic을 서비스로 전달하는 router 역할 proxy 로직은 아래와 같다.

func (uh *Upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    r = util.RequestWithLoggerWithName(r, "UpstreamHandler")
    ctx := r.Context()

    stream := util.StreamFromContext(ctx)
    if stream == nil {
        sh := NewStatic(http.StatusInternalServerError, errNilStream)
        sh.ServeHTTP(w, r)

        return
    }

  // go httputil에서 제공하는 reverse proxy 서버를 띄우고, request를 전달한다.
    proxy := httputil.NewSingleHostReverseProxy(stream)
    superDirector := proxy.Director
    proxy.Transport = uh.roundTripper
    proxy.Director = func(req *http.Request) {
        superDirector(req)
        req.URL = stream
        req.URL.Path = r.URL.Path
        req.URL.RawPath = r.URL.RawPath
        req.URL.RawQuery = r.URL.RawQuery
        // delete the incoming X-Forwarded-For header so the proxy
        // puts its own in. This is also important to prevent IP spoofing
        req.Header.Del("X-Forwarded-For ")
    }
    proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
        sh := NewStatic(http.StatusBadGateway, err)
        sh.ServeHTTP(w, r)
    }

    proxy.ServeHTTP(w, r)
}

 

scale up 가능한지 확인하는 로직은 아래와 같다. 아래의 newWorkloadReplicasForwardWaitFunc() 메소드가 'WaitFunc' 라는 이름으로 사용될 예정

func workloadActiveEndpoints(endpoints v1.Endpoints) int {
    total := 0
    for _, subset := range endpoints.Subsets {
        total += len(subset.Addresses)
    }
    return total
}

func newWorkloadReplicasForwardWaitFunc(
    lggr logr.Logger,
    endpointCache k8s.EndpointsCache,
) forwardWaitFunc {
    return func(ctx context.Context, endpointNS, endpointName string) (bool, error) {
        // get a watcher & its result channel before querying the
        // endpoints cache, to ensure we don't miss events
        watcher, err := endpointCache.Watch(endpointNS, endpointName)
        if err != nil {
            return false, err
        }
        eventCh := watcher.ResultChan()
        defer watcher.Stop()

        endpoints, err := endpointCache.Get(endpointNS, endpointName)
        if err != nil {
            // if we didn't get the initial endpoints state, bail out
            return false, fmt.Errorf(
                "error getting state for endpoints %s/%s: %w",
                endpointNS,
                endpointName,
                err,
            )
        }
        // if there is 1 or more active endpoints, we're done waiting
        activeEndpoints := workloadActiveEndpoints(endpoints)
        if activeEndpoints > 0 {
            return false, nil
        }

        for {
            select {
            case event := <-eventCh:
                endpoints, ok := event.Object.(*v1.Endpoints)
                if !ok {
                    lggr.Info(
                        "Didn't get a endpoints back in event",
                    )
                } else if activeEndpoints := workloadActiveEndpoints(*endpoints); activeEndpoints > 0 {
                    return true, nil
                }
            case <-ctx.Done():
                // otherwise, if the context is marked done before
                // we're done waiting, fail.
                return false, fmt.Errorf(
                    "context marked done while waiting for workload reach > 0 replicas: %w",
                    ctx.Err(),
                )
            }
        }
    }
}

따라서, 두 가지 컴포넌트를 합치면 아래의 기능을 수행할 수 있다.

// newForwardingHandler takes in the service URL for the app backend
// and forwards incoming requests to it. Note that it isn't multitenant.
// It's intended to be deployed and scaled alongside the application itself.
//
// fwdSvcURL must have a valid scheme in it. The best way to do this is
// creating a URL with url.Parse("https://...")
func newForwardingHandler(
    lggr logr.Logger,
    dialCtxFunc kedanet.DialContextFunc,
    waitFunc forwardWaitFunc,
    fwdCfg forwardingConfig,
    tlsCfg *tls.Config,
) http.Handler {
    roundTripper := &http.Transport{
        Proxy:                 http.ProxyFromEnvironment,
        DialContext:           dialCtxFunc,
        ForceAttemptHTTP2:     fwdCfg.forceAttemptHTTP2,
        MaxIdleConns:          fwdCfg.maxIdleConns,
        IdleConnTimeout:       fwdCfg.idleConnTimeout,
        TLSHandshakeTimeout:   fwdCfg.tlsHandshakeTimeout,
        ExpectContinueTimeout: fwdCfg.expectContinueTimeout,
        ResponseHeaderTimeout: fwdCfg.respHeaderTimeout,
        TLSClientConfig:       tlsCfg,
    }
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ctx := r.Context()

    // context에서 httpScaledObject 리소스 정보를 가져온다. 
    // request를 받았을 때, context에 httpScaledObject 채워넣는 부분은 middleware에서 수행한다.
        httpso := util.HTTPSOFromContext(ctx)

        waitFuncCtx, done := context.WithTimeout(r.Context(), fwdCfg.waitTimeout)
        defer done()
        isColdStart, err := waitFunc(
            waitFuncCtx,
            httpso.GetNamespace(),
            httpso.Spec.ScaleTargetRef.Service,
        )
        if err != nil {
            lggr.Error(err, "wait function failed, not forwarding request")
            w.WriteHeader(http.StatusBadGateway)
            if _, err := w.Write([]byte(fmt.Sprintf("error on backend (%s)", err))); err != nil {
                lggr.Error(err, "could not write error response to client")
            }
            return
        }
        w.Header().Add("X-KEDA-HTTP-Cold-Start", strconv.FormatBool(isColdStart))

        uh := handler.NewUpstream(roundTripper)
        uh.ServeHTTP(w, r)
    })
}

httpScaledObject 정보를 context에 채워넣는 middleware 로직

func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    r = util.RequestWithLoggerWithName(r, "RoutingMiddleware")

  // request의 host, path 값으로 `bytes(//host/path)` 만들어서 key로 사용 -> route table에서 HttpScaledObject 조회한다.
    httpso := rm.routingTable.Route(r)
    if httpso == nil {
        if rm.isProbe(r) {
            rm.probeHandler.ServeHTTP(w, r)
            return
        }

        sh := handler.NewStatic(http.StatusNotFound, nil)
        sh.ServeHTTP(w, r)

        return
    }
    r = r.WithContext(util.ContextWithHTTPSO(r.Context(), httpso))

  // httpScaledObject 필드 사용해서 internal DNS url정보를 리턴한다. (http(s)://svc.namespace:port)
    stream, err := rm.streamFromHTTPSO(r.Context(), httpso)
    if err != nil {
        sh := handler.NewStatic(http.StatusInternalServerError, err)
        sh.ServeHTTP(w, r)

        return
    }
    r = r.WithContext(util.ContextWithStream(r.Context(), stream))

    rm.upstreamHandler.ServeHTTP(w, r)
}

func (rm *Routing) streamFromHTTPSO(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) {
    port, err := rm.getPort(ctx, httpso)
    if err != nil {
        return nil, fmt.Errorf("failed to get port: %w", err)
    }
    if rm.tlsEnabled {
        return url.Parse(fmt.Sprintf(
            "https://%s.%s:%d",
            httpso.Spec.ScaleTargetRef.Service,
            httpso.GetNamespace(),
            port,
        ))
    }
    //goland:noinspection HttpUrlsUsage
    return url.Parse(fmt.Sprintf(
        "http://%s.%s:%d",
        httpso.Spec.ScaleTargetRef.Service,
        httpso.GetNamespace(),
        port,
    ))
}

Traffic Pending Queue / Routing 기능

in-memory queue의 경우 아래와 같이 생겼다.

// NewMemoryQueue creates a new empty in-memory queue
// RWMutex 적용, traffic counter 역할을 한다.
func NewMemory() *Memory {
    lock := new(sync.RWMutex)
    return &Memory{
        concurrentMap: make(map[string]int),
        rpsMap:        make(map[string]*RequestsBuckets),
        mut:           lock,
    }
}

queue에 traffic이 들어오고 나갈 때마다 count를 바꿔주는 로직

func (cm *Counting) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    r = util.RequestWithLoggerWithName(r, "CountingMiddleware")
    ctx := r.Context()

    defer cm.countAsync(ctx)()
  // defer로 count를 실행할 경우, countAsync 메소드 내부의 실행코드 순서가 변경됨.
  // 1. countAsync메소드 자체는 바로 실행된다. 따라서 go cm.count()는 바로 실행된다.
  // 2. countAsync 메소드에서 리턴하는 익명함수 func() { go signaler.Signal() } 은 defer 처리된다. 따라서 이 익명함수는 ServeHTTP 메소드가 끝난 뒤 실행된다.
  // 3. cm.upstreamHandler.ServeHTTP() 가 실행된다.

    cm.upstreamHandler.ServeHTTP(w, r)
}

func (cm *Counting) countAsync(ctx context.Context) func() {
    signaler := util.NewSignaler()

    go cm.count(ctx, signaler) // 여기는 cm.upstreamHandler.ServeHTTP() 이전에 실행된다.

    return func() {
        go signaler.Signal() // cm.upstreamHandler.ServeHTTP() 이후에 실행된다.
    }
}

func (cm *Counting) count(ctx context.Context, signaler util.Signaler) {
    logger := util.LoggerFromContext(ctx)
    httpso := util.HTTPSOFromContext(ctx)

    key := k8s.NamespacedNameFromObject(httpso).String()

    if !cm.inc(logger, key) {
        return
    }

    if err := signaler.Wait(ctx); err != nil && err != context.Canceled {
        logger.Error(err, "failed to wait signal")
    }

    cm.dec(logger, key)
}

func (cm *Counting) inc(logger logr.Logger, key string) bool {
    if err := cm.queueCounter.Increase(key, 1); err != nil {
        logger.Error(err, "error incrementing queue counter", "key", key)

        return false
    }

    metrics.RecordPendingRequestCount(key, int64(1))

    return true
}

queue의 값을 확인해서, traffic 정보를 관리할 table 정보는 아래와 같다.

type table struct {
    httpScaledObjectInformer                 sharedIndexInformer
    httpScaledObjectEventHandlerRegistration cache.ResourceEventHandlerRegistration
    httpScaledObjects                        map[types.NamespacedName]*httpv1alpha1.HTTPScaledObject
    httpScaledObjectsMutex                   sync.RWMutex
    memoryHolder                             util.AtomicValue[TableMemory]
    memorySignaler                           util.Signaler
    queueCounter                             queue.Counter
}

func NewTable(sharedInformerFactory externalversions.SharedInformerFactory, namespace string, counter queue.Counter) (Table, error) {
    httpScaledObjects := informershttpv1alpha1.New(sharedInformerFactory, namespace, nil).HTTPScaledObjects()

    t := table{
        httpScaledObjects: make(map[types.NamespacedName]*httpv1alpha1.HTTPScaledObject),
        memorySignaler:    util.NewSignaler(),
    }

    informer, ok := httpScaledObjects.Informer().(sharedIndexInformer)
    if !ok {
        return nil, errUnknownSharedIndexInformer
    }
    t.httpScaledObjectInformer = informer

    registration, err := informer.AddEventHandler(&t) // table struct에 onAdd, onUpdate, onDelete 메소드가 구현되어 있다.
    if err != nil {
        return nil, err
    }
    t.httpScaledObjectEventHandlerRegistration = registration
    t.queueCounter = counter
    return &t, nil
}

Scaler

scale metric 수집 / 연산기능

scaler의 handler를 보면, request queue size 정보를 받아서 user app을 scale하기 위한 인터페이스 구현체라는 설명이 쓰여 있다.

func (e *impl) GetMetrics(
    _ context.Context,
    metricRequest *externalscaler.GetMetricsRequest,
) (*externalscaler.GetMetricsResponse, error) {
    lggr := e.lggr.WithName("GetMetrics")
    sor := metricRequest.ScaledObjectRef

    namespacedName := k8s.NamespacedNameFromScaledObjectRef(sor)
    metricName := MetricName(namespacedName)

    scalerMetadata := sor.GetScalerMetadata()
    httpScaledObjectName, ok := scalerMetadata[k8s.HTTPScaledObjectKey]
    if !ok {
        if scalerMetadata := sor.GetScalerMetadata(); scalerMetadata != nil {
            if _, ok := scalerMetadata[keyInterceptorTargetPendingRequests]; ok {
                return e.interceptorMetrics(metricName)
            }
        }
        err := fmt.Errorf("unable to get HTTPScaledObject reference")
        lggr.Error(err, "unable to get the linked HTTPScaledObject for ScaledObject", "name", sor.Name, "namespace", sor.Namespace)
        return nil, err
    }

    httpso, err := e.httpsoInformer.Lister().HTTPScaledObjects(sor.Namespace).Get(httpScaledObjectName)
    if err != nil {
        lggr.Error(err, "unable to get HTTPScaledObject", "name", httpScaledObjectName, "namespace", sor.Namespace)
        return nil, err
    }

    key := namespacedName.String()
    count := e.pinger.counts()[key] // 메모리의 routing table에서 특정 svc의 count 정보를 가져온다.

  // 적절한 연산을 수행해서 concurrency / RPS 계산한다
    var metricValue int
    if httpso.Spec.ScalingMetric != nil &&
        httpso.Spec.ScalingMetric.Rate != nil {
        metricValue = int(math.Ceil(count.RPS))
        lggr.V(1).Info(fmt.Sprintf("%d rps for %s", metricValue, httpso.GetName()))
    } else {
        metricValue = count.Concurrency
        lggr.V(1).Info(fmt.Sprintf("%d concurrent requests for %s", metricValue, httpso.GetName()))
    }

    res := &externalscaler.GetMetricsResponse{
        MetricValues: []*externalscaler.MetricValue{
            {
                MetricName:  metricName,
                MetricValue: int64(metricValue),
            },
        },
    }
    return res, nil
}

여기서 Interceptor가 수집하는 'TargetPendingRequests' 정보를 수집한다.

func (e *impl) interceptorMetricSpec(metricName string, interceptorTargetPendingRequests string) (*externalscaler.GetMetricSpecResponse, error) {
    lggr := e.lggr.WithName("interceptorMetricSpec")

    targetPendingRequests, err := strconv.ParseInt(interceptorTargetPendingRequests, 10, 64)
    if err != nil {
        lggr.Error(err, "unable to parse interceptorTargetPendingRequests", "value", interceptorTargetPendingRequests)
        return nil, err
    }

    res := &externalscaler.GetMetricSpecResponse{
        MetricSpecs: []*externalscaler.MetricSpec{
            {
                MetricName: metricName,
                TargetSize: targetPendingRequests,
            },
        },
    }
    return res, nil
}

이렇게 수집한 metric을 Prometheus / OTEL로 export하는 로직도 있는 거 같은데, 이건 autoscale과는 직접적인 관련이 없으므로 당장은 분석하지 않는다.

interceptor로부터 Queue count정보 받아오는 기능

interceptor는 adminServer에 queue 정보를 알려준다.

func runAdminServer(
    ctx context.Context,
    lggr logr.Logger,
    q queue.Counter,
    port int,
) error {
    lggr = lggr.WithName("runAdminServer")
    adminServer := http.NewServeMux()
    queue.AddCountsRoute( // 여기
        lggr,
        adminServer,
        q,
    )

    addr := fmt.Sprintf("0.0.0.0:%d", port)
    lggr.Info("admin server starting", "address", addr)
    return kedahttp.ServeContext(ctx, addr, adminServer, nil)
}

AddCountsRoute 메소드는 /queue 라는 endpoint로 호출하면, 지금 Message queue에 데이터가 얼마나 쌓였는지를 응답해준다.

const countsPath = "/queue"

func AddCountsRoute(lggr logr.Logger, mux *http.ServeMux, q CountReader) {
    lggr = lggr.WithName("pkg.queue.AddCountsRoute")
    lggr.Info("adding queue counts route", "path", countsPath)
    mux.Handle(countsPath, newSizeHandler(lggr, q))
}

// q.Current() 로 값을 받아와서, Http response를 응답하는 handler.
func newSizeHandler(
    lggr logr.Logger,
    q CountReader,
) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
        cur, err := q.Current()
        if err != nil {
            lggr.Error(err, "getting queue size")
            w.WriteHeader(500)
            if _, err := w.Write([]byte(
                "error getting queue size",
            )); err != nil {
                lggr.Error(
                    err,
                    "could not send error message to client",
                )
            }
            return
        }
        if err := json.NewEncoder(w).Encode(cur); err != nil {
            lggr.Error(err, "encoding QueueCounts")
            w.WriteHeader(500)
            if _, err := w.Write([]byte(
                "error encoding queue counts",
            )); err != nil {
                lggr.Error(
                    err,
                    "could not send error message to client",
                )
            }
            return
        }
    })
}

그러면 scaler가 /queue 라는 Path로 요청을 보낸다는 뜻. scaler는 queuePinger라는 컴포넌트가 주기적으로 요청을 보내서 값을 확인한다.

// fetchAndSaveCounts calls fetchCounts, and then
// saves them to internal state in q
func (q *queuePinger) fetchAndSaveCounts(ctx context.Context) error {
    q.pingMut.Lock() // concurrency safe 위해 lock 걸어두고
    defer q.pingMut.Unlock()

  // 대상 pod (= endpointsFn), interceptor 컴포넌트 정보, adminPort 정보를 토대로 queue count를 받아온다
    counts, err := fetchCounts(
        ctx,
        q.lggr,
        q.getEndpointsFn,
        q.interceptorNS,
        q.interceptorSvcName,
        q.adminPort,
    )
    if err != nil {
        q.lggr.Error(err, "getting request counts")
        q.status = PingerERROR
        return err
    }

  // 정보를 받아오면, status / allCounts / lastPingTime을 업데이트한다.
    q.status = PingerACTIVE
    q.allCounts = counts
    q.lastPingTime = time.Now()

    return nil
}

// fetchCounts fetches all counts from every endpoint returned
// by endpointsFn for the given service named svcName on the
// port adminPort, in namespace ns.
//
// Requests to fetch endpoints are made concurrently and
// aggregated when all requests return successfully.
//
// Upon any failure, a non-nil error is returned and the
// other two return values are nil and 0, respectively.
func fetchCounts(
    ctx context.Context,
    lggr logr.Logger,
    endpointsFn k8s.GetEndpointsFunc,
    ns,
    svcName,
    adminPort string,
) (map[string]queue.Count, error) {
    lggr = lggr.WithName("queuePinger.requestCounts")


    endpointURLs, err := k8s.EndpointsForService(
        ctx,
        ns,
        svcName,
        adminPort,
        endpointsFn,
    )
    if err != nil {
        return nil, err
    }

    if len(endpointURLs) == 0 {
        return nil, fmt.Errorf("there isn't any valid interceptor endpoint")
    }

    countsCh := make(chan *queue.Counts)
    var wg sync.WaitGroup
    fetchGrp, _ := errgroup.WithContext(ctx)
    for _, endpoint := range endpointURLs {
        // capture the endpoint in a loop-local
        // variable so that the goroutine can
        // use it
        u := endpoint
        // have the errgroup goroutine send to
        // a "private" goroutine, which we'll
        // then forward on to countsCh
        ch := make(chan *queue.Counts)
        wg.Add(1)
        fetchGrp.Go(func() error {
            counts, err := queue.GetCounts(
                http.DefaultClient,
                *u,
            )
            if err != nil {
                lggr.Error(
                    err,
                    "getting queue counts from interceptor",
                    "interceptorAddress",
                    u.String(),
                )
                return err
            }
            ch <- counts
            return nil
        })
        // forward the "private" goroutine
        // on to countsCh separately
        go func() {
            defer wg.Done()
            res := <-ch
            countsCh <- res
        }()
    }

    // close countsCh after all goroutines are done sending
    // to their "private" channels, so that we can range
    // over countsCh normally below
    go func() {
        wg.Wait()
        close(countsCh)
    }()

    if err := fetchGrp.Wait(); err != nil {
        lggr.Error(err, "fetching all counts failed")
        return nil, err
    }

    totalCounts := make(map[string]queue.Count)
    // range through the result of each endpoint
    for count := range countsCh {
        // each endpoint returns a map of counts, one count
        // per host. add up the counts for each host
        for host, val := range count.Counts {
            var responseCount queue.Count
            var ok bool
            if responseCount, ok = totalCounts[host]; !ok {
                responseCount = queue.Count{}
            }
            responseCount.Concurrency += val.Concurrency
            responseCount.RPS += val.RPS
            totalCounts[host] = responseCount
        }
    }

    return totalCounts, nil
}

위 scaler가 호출하는 Counts 메소드는 interceptor에 정의되어 있다.

// GetQueueCounts issues an RPC call to get the queue counts
// from the given hostAndPort. Note that the hostAndPort should
// not end with a "/" and shouldn't include a path.
func GetCounts(
    httpCl *http.Client,
    interceptorURL url.URL,
) (*Counts, error) {
    interceptorURL.Path = countsPath // `/queue`
    resp, err := httpCl.Get(interceptorURL.String()) // newSizeHandler 호출. -> 메모리에 저장된 모든 routing table 정보를 응답한다.
    if err != nil {
        return nil, fmt.Errorf("requesting the queue counts from %s: %w", interceptorURL.String(), err)
    }
    defer resp.Body.Close()
    counts := NewCounts()
    if err := json.NewDecoder(resp.Body).Decode(counts); err != nil {
        return nil, fmt.Errorf("decoding response from the interceptor at %s: %w", interceptorURL.String(), err)
    }

    return counts, nil
}

현 구조의 한계점

routing table이 전부 인스턴스의 in-memory에서만 관리되는 구조 = Multi-Tenancy가 고려되지 않은 아키텍처.

개선법이 있을까?

Redis와 같은 외부 시스템으로 Queue 또는 대기열 시스템을 적용하면 될 거 같은데... 가능한지 구현해볼 예정

 

기록용 레퍼런스

반응형