학습일지/Knative

knative eventing github 코드 분석

inspirit941 2023. 4. 21. 22:23
반응형

https://github.com/knative-extensions/eventing-github

 

GitHub - knative-extensions/eventing-github: Github integration with Knative Eventing.

Github integration with Knative Eventing. Contribute to knative-extensions/eventing-github development by creating an account on GitHub.

github.com

 

 

knative eventing 활용해서

  • 다양한 외부 이벤트 (webhook)를
  • knative broker 또는 knative service로 전달하는 작업을 하기 위해
  • knative eventing 진영의 stable 컴포넌트인 eventing-github 로직을 분석한 내용.

여기 코드를 분석하고, knative sample-source 레포를 활용하면 다양한 외부 시스템을 knative eventing과 연동할 수 있다.

https://github.com/knative-extensions/eventing-github/blob/main/samples/githubsource.yaml 예시

apiVersion: sources.knative.dev/v1alpha1
kind: GitHubSource
metadata:
  name: githubsource-sample
spec:
  eventTypes:
  - pull_request
  ownerAndRepository: "<your GitHub org>/<your GitHub repo>"
  accessToken:
    secretKeyRef:
      name: githubsecret
      key: accessToken
  secretToken:
    secretKeyRef:
      name: githubsecret
      key: secretToken
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: destinationServer

---

apiVersion: v1
kind: Secret
metadata:
  name: githubsecret
type: Opaque
stringData:
  accessToken: "<your GitHub access token>"
  secretToken: "<your secret token>"

Custom Resource 기본 구조

다운로드 (29)



knative EventSource는 기본적으로 위와 같은 구성으로 되어 있다.

  • Custom Resource 상태를 보고 로직을 수행하는 Reconciler. eventing-github의 경우 사용자가 원하는 특정 github Repo에 webhook을 등록하는 역할을 한다.
  • 외부 시스템에서 webhook으로 이벤트가 들어오면, cloudevent 형식으로 변환한 뒤 적절한 목적지로 전달해주는 Adapter.
  • Webhook 컴포넌트도 있지만, MutatingWebhook / ValidationWebhook에 특별한 로직은 없었다.

Adapter 로직부터 파악해야 reconcile 쪽 이해가 빨라지기 때문에 adapter부터 설명.

Adapter

  • k8s Deployment 또는 knative service로 구성된 컴포넌트. 외부에서 http 요청을 받을 수 있는 URL이 주어진다.
  • 외부에서 들어온 이벤트를
    • target service (knative의 sink)가 이해할 수 있는 포맷으로 변환.
      • GithubSource: github webhook (json format) -> cloudevent
      • KafkaSource: kafka message -> cloudevent
      • RedisSource: redisStream -> cloudevent
    • 이벤트가 전달되어야 하는 target service로 전송

adapter를 보면, http Server를 실행하는 단순한 코드로 되어 있다.

https://github.com/knative-extensions/eventing-github/blob/f34a508f89e52fe4ebd5eba5f630a0ae07d923cb/pkg/mtadapter/adapter.go#L88-L108

func (a *gitHubAdapter) Start(ctx context.Context) error {
// Start our multi-tenant server receiving GitHub events
server := &http.Server{
ReadTimeout: 10 * time.Second,
ReadHeaderTimeout: 2 * time.Second,
Addr: fmt.Sprintf(":%d", a.port),
Handler: a.router,
}
done := make(chan bool, 1)
go common.GracefulShutdown(server, a.logger, ctx.Done(), done)
a.logger.Infof("Server is ready to handle requests at %s", server.Addr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("could not listen on %s: %v", server.Addr, err)
}
<-done
a.logger.Infof("Server stopped")
return nil
}
view raw adapter.go hosted with ❤ by GitHub

 

외부 이벤트를 받아서, 의도한 곳으로 이벤트를 전달해주는 컴포넌트가 a.router

  • router 정보를 업데이트하는 로직은 RegisterHandlerFor()라는 메소드를 보면 된다.

// RegisterHandlerFor implements MTAdapter.
func (a *gitHubAdapter) RegisterHandlerFor(ctx context.Context, src *v1alpha1.GitHubSource) error {
secretCli := a.secrGetter.Secrets(src.Namespace)
secretToken, err := common.SecretFrom(ctx, secretCli, src.Spec.SecretToken.SecretKeyRef)
if err != nil {
return fmt.Errorf("reading token from Secret: %w", err)
}
logger := logging.FromContext(ctx)
ceSrc := v1alpha1.GitHubEventSource(src.Spec.OwnerAndRepository)
handler := common.NewHandler(a.ceClient, src.Status.SinkURI.String(), ceSrc, secretToken, logger)
path := fmt.Sprintf("/%s/%s", src.Namespace, src.Name)
a.router.Register(src.Name, src.Namespace, path, handler)
return nil
}

 

router를 보니 common.NewHandler() 라는 메소드가 있다.

  • github webhook에서 들어오는 이벤트 (pull_request 같은 것) 를 받아서
  • GithubSource 라는 Custom Resource의 정보를 토대로
  • Knative Sink에 정의된 리소스 (example.yaml의 destinationServer) 로 전달한다.

handler 메소드 내부를 보면

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
payload, err := h.Hook.Parse(r, ValidEvents...)
if err != nil {
if err == gh.ErrEventNotFound {
w.WriteHeader(http.StatusNotFound)
h.Logger.Info("Event not found")
return
}
w.WriteHeader(http.StatusBadRequest)
h.Logger.Errorf("Error processing request: %v", err)
return
}
ctx := context.Background()
if len(h.SinkURI) > 0 {
ctx = cloudevents.ContextWithTarget(ctx, h.SinkURI)
}
err = h.handleEvent(ctx, payload, r.Header)
if err != nil {
h.Logger.Errorf("Event handler error: %v", err)
w.WriteHeader(400)
w.Write([]byte(err.Error()))
return
}
h.Logger.Infof("Event processed")
w.WriteHeader(202)
w.Write([]byte("accepted"))
}
func (h *Handler) handleEvent(ctx context.Context, payload interface{}, hdr http.Header) error {
gitHubEventType := hdr.Get(GHHeaderEvent)
if gitHubEventType == "" {
return fmt.Errorf("%q header is not set", GHHeaderEvent)
}
eventID := hdr.Get(GHHeaderDelivery)
if eventID == "" {
return fmt.Errorf("%q header is not set", GHHeaderDelivery)
}
h.Logger.Infof("Handling %s", gitHubEventType)
cloudEventType := sourcesv1alpha1.GitHubEventType(gitHubEventType)
subject, extensions := SubjectAndExtensionsFromGitHubEvent(gh.Event(gitHubEventType), payload, h.Logger)
event := cloudevents.NewEvent()
event.SetID(eventID)
event.SetType(cloudEventType)
event.SetSource(h.Source)
event.SetSubject(subject)
for k, v := range extensions {
event.SetExtension(k, v)
}
if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil {
return fmt.Errorf("failed to marshal event data: %w", err)
}
result := h.Client.Send(ctx, event)
if !cloudevents.IsACK(result) {
return result
}
return nil
}
view raw handler.go hosted with ❤ by GitHub

 

adapter url에서 payload로 들어온 이벤트 타입을 파싱한다 (h.Hook.Parse())

  • 잘못된 타입이면 BadRequest
  • context 정보에 이벤트를 전달할 목적지인 h.SinkURL 정보를 포함한다.

h.handleEvent()로 이벤트 핸들링 로직 수행

  • github event payload 파싱하고
  • cloudevent의 SetType / SetSource / SetSubject / SetData 등의 메소드로 cloudevent를 생성한다
  • h.Client.Send() 로 cloudevent를 h.SinkURL에 전달한다.
mt-adapter?

default adapter의 경우 Custom Resource 하나 만들어질 때마다 adapter 역할을 하는 서버도 하나씩 만들어지는 구조.

  • Custom Resource가 많아질수록 리소스 점유가 불필요하게 늘어난다
  • multi-tenant 구조를 사용하면, adapter URL 하나만 관리하고 path 기반 route를 사용해서 리소스 효율적으로 adapter를 관리할 수 있다.

// ServeHTTP implements http.Handler.
func (h *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Path-based dispatch
h.routersMu.RLock()
keyedHandler, ok := h.routers[r.URL.Path]
h.routersMu.RUnlock()
if ok {
// Check if source still exists.
_, err := h.lister.GitHubSources(keyedHandler.namespace).Get(keyedHandler.name)
if err == nil {
keyedHandler.handler.ServeHTTP(w, r)
return
}
h.Unregister(r.URL.Path)
}
http.NotFoundHandler().ServeHTTP(w, r)
}
// Register adds a new GitHub event handler for the given GitHubSource.
func (h *Router) Register(name, namespace, path string, handler http.Handler) {
h.routersMu.Lock()
defer h.routersMu.Unlock()
h.routers[path] = keyedHandler{
handler: handler,
namespace: namespace,
name: name,
}
}
// Unregister removes the GitHubSource served at the given path.
func (h *Router) Unregister(path string) {
h.routersMu.Lock()
defer h.routersMu.Unlock()
delete(h.routers, path)
}
view raw router.go hosted with ❤ by GitHub

 

k8s Resource의 API Path와 마찬가지로

GithubSource 이름과, 배포된 namespace를 사용해서 path 기반 route를 설정한다.

다시말해

adapter URL의 경우 eventing-github는 knative service를 사용하는데

  • knative service는 scale to zero 옵션까지 있으므로,
  • 이벤트가 들어오지 않을 경우 pod가 0으로 내려간다.
    • 자주 발생하는 이벤트가 아니라면, idle 리소스에서 발생하는 낭비를 줄일 수 있음.

Reconciler

https://github.com/knative-extensions/eventing-github/blob/main/pkg/reconciler/source/githubsource.go

두 개의 메소드가 중요하다.

  • 사용자가 Custom Resource를 배포했을 때, github repo에 webhook을 등록하는 ReconcileKind()
  • 사용자가 Custom Resource를 삭제했을 때, github repo에서 webhook을 삭제하는 FinalizeKind()

func (r *Reconciler) ReconcileKind(ctx context.Context, source *sourcesv1alpha1.GitHubSource) pkgreconciler.Event {
source.Status.InitializeConditions()
accessToken, err := r.secretFrom(ctx, source.Namespace, source.Spec.AccessToken.SecretKeyRef)
if err != nil {
source.Status.MarkNoSecrets("AccessTokenNotFound", "%s", err)
return err
}
secretToken, err := r.secretFrom(ctx, source.Namespace, source.Spec.SecretToken.SecretKeyRef)
if err != nil {
source.Status.MarkNoSecrets("SecretTokenNotFound", "%s", err)
return err
}
source.Status.MarkSecrets()
dest := source.Spec.Sink.DeepCopy()
if dest.Ref != nil {
// To call URIFromDestination(), dest.Ref must have a Namespace. If there is
// no Namespace defined in dest.Ref, we will use the Namespace of the source
// as the Namespace of dest.Ref.
if dest.Ref.Namespace == "" {
dest.Ref.Namespace = source.GetNamespace()
}
}
uri, err := r.sinkResolver.URIFromDestinationV1(ctx, *dest, source)
if err != nil {
source.Status.MarkNoSink("NotFound", "%s", err)
return err
}
source.Status.MarkSink(uri)
ksvc, err := r.reconcileReceiveAdapter(ctx, source)
if err != nil {
source.Status.MarkWebhookNotConfigured("MissingReceiveAdapter", err.Error())
return err
}
if ksvc.Status.GetCondition(apis.ConditionReady).IsTrue() && ksvc.Status.URL != nil {
withPath := *ksvc.Status.URL
if r.receiveAdapterImage == "" {
withPath.Path = fmt.Sprintf("/%s/%s", source.Namespace, source.Name)
}
args := &webhookArgs{
source: source,
url: &withPath,
accessToken: accessToken,
secretToken: secretToken,
alternateGitHubAPIURL: source.Spec.GitHubAPIURL,
}
// source.Status.MarkServiceDeployed(ra)
// TODO: Mark Deployed for the ksvc
if source.Status.WebhookIDKey == "" {
hookID, err := r.createWebhook(ctx, args)
if err != nil {
source.Status.MarkWebhookNotConfigured("CreationFailed", err.Error())
return err
}
source.Status.WebhookIDKey = hookID
} else {
err := r.reconcileWebhook(ctx, args, source.Status.WebhookIDKey)
if err != nil {
source.Status.MarkWebhookNotConfigured("ReconciliationFailed", err.Error())
return err
}
}
source.Status.MarkWebhookConfigured()
}
source.Status.CloudEventAttributes = r.createCloudEventAttributes(source)
source.Status.ObservedGeneration = source.Generation
return nil
}

 

ReconcileKind()의 경우

  • github repo에 접근하기 위한 access Token 정보를 k8s secret에서 조회
  • reconcileReceiveAdapter() 메소드로 adapter에서 새로운 종류의 이벤트 핸들러 등록
  • 원하는 종류의 이벤트와, 이벤트 전달받을 adapter URL 정보를 github webhook에 등록
    • webhookID 결과를 Status 필드에 기록
  • Custom Resource가 수정되면, 수정사항을 github webhook에도 반영
    • 새로운 이벤트 타입 등록 or 기존 이벤트 타입 삭제 -> github API로 변경사항 반영

func (r *Reconciler) FinalizeKind(ctx context.Context, source *sourcesv1alpha1.GitHubSource) pkgreconciler.Event {
// If a webhook was created, try to delete it
if source.Status.WebhookIDKey != "" {
// Get access token
accessToken, err := r.secretFrom(ctx, source.Namespace, source.Spec.AccessToken.SecretKeyRef)
if apierrors.IsNotFound(err) {
source.Status.MarkNoSecrets("AccessTokenNotFound", "%s", err)
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeWarning,
"WebhookDeletionSkipped", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err)
// return EventTypeNormal to avoid finalize loop
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "WebhookDeletionSkipped", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err)
} else if err != nil {
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeWarning,
"WebhookDeletionFailed", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err)
return fmt.Errorf("error getting secret: %v", err)
}
args := &webhookArgs{
source: source,
accessToken: accessToken,
alternateGitHubAPIURL: source.Spec.GitHubAPIURL,
hookID: source.Status.WebhookIDKey,
}
// Delete the webhook using the access token and stored webhook ID
err = r.deleteWebhook(ctx, args)
var gherr *ghclient.ErrorResponse
if errors.As(err, &gherr) {
if gherr.Response.StatusCode == http.StatusNotFound {
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeWarning, "WebhookDeletionSkipped", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err)
// return EventTypeNormal to avoid finalize loop
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "WebhookDeletionSkipped", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err)
}
} else {
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeWarning,
"WebhookDeletionFailed", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err)
return fmt.Errorf("error deleting webhook: %v", err)
}
// Webhook deleted, clear ID
source.Status.WebhookIDKey = ""
}
return nil
}
view raw finalizekind.go hosted with ❤ by GitHub

 

FinalizedKind()

  • Custom Resource에 등록된 정보가 삭제되므로, 외부 리소스에도 삭제 사항을 반영한다
    • status의 webhookID 정보를 github API로 삭제
반응형