knative eventing github 코드 분석
https://github.com/knative-extensions/eventing-github
GitHub - knative-extensions/eventing-github: Github integration with Knative Eventing.
Github integration with Knative Eventing. Contribute to knative-extensions/eventing-github development by creating an account on GitHub.
github.com
knative eventing 활용해서
- 다양한 외부 이벤트 (webhook)를
- knative broker 또는 knative service로 전달하는 작업을 하기 위해
- knative eventing 진영의 stable 컴포넌트인 eventing-github 로직을 분석한 내용.
여기 코드를 분석하고, knative sample-source 레포를 활용하면 다양한 외부 시스템을 knative eventing과 연동할 수 있다.
https://github.com/knative-extensions/eventing-github/blob/main/samples/githubsource.yaml 예시
apiVersion: sources.knative.dev/v1alpha1
kind: GitHubSource
metadata:
name: githubsource-sample
spec:
eventTypes:
- pull_request
ownerAndRepository: "<your GitHub org>/<your GitHub repo>"
accessToken:
secretKeyRef:
name: githubsecret
key: accessToken
secretToken:
secretKeyRef:
name: githubsecret
key: secretToken
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: destinationServer
---
apiVersion: v1
kind: Secret
metadata:
name: githubsecret
type: Opaque
stringData:
accessToken: "<your GitHub access token>"
secretToken: "<your secret token>"
Custom Resource 기본 구조
knative EventSource는 기본적으로 위와 같은 구성으로 되어 있다.
- Custom Resource 상태를 보고 로직을 수행하는 Reconciler. eventing-github의 경우 사용자가 원하는 특정 github Repo에 webhook을 등록하는 역할을 한다.
- 외부 시스템에서 webhook으로 이벤트가 들어오면, cloudevent 형식으로 변환한 뒤 적절한 목적지로 전달해주는 Adapter.
- Webhook 컴포넌트도 있지만, MutatingWebhook / ValidationWebhook에 특별한 로직은 없었다.
Adapter 로직부터 파악해야 reconcile 쪽 이해가 빨라지기 때문에 adapter부터 설명.
Adapter
- k8s Deployment 또는 knative service로 구성된 컴포넌트. 외부에서 http 요청을 받을 수 있는 URL이 주어진다.
- 외부에서 들어온 이벤트를
- target service (knative의 sink)가 이해할 수 있는 포맷으로 변환.
- GithubSource: github webhook (json format) -> cloudevent
- KafkaSource: kafka message -> cloudevent
- RedisSource: redisStream -> cloudevent
- 이벤트가 전달되어야 하는 target service로 전송
- target service (knative의 sink)가 이해할 수 있는 포맷으로 변환.
adapter를 보면, http Server를 실행하는 단순한 코드로 되어 있다.
func (a *gitHubAdapter) Start(ctx context.Context) error { | |
// Start our multi-tenant server receiving GitHub events | |
server := &http.Server{ | |
ReadTimeout: 10 * time.Second, | |
ReadHeaderTimeout: 2 * time.Second, | |
Addr: fmt.Sprintf(":%d", a.port), | |
Handler: a.router, | |
} | |
done := make(chan bool, 1) | |
go common.GracefulShutdown(server, a.logger, ctx.Done(), done) | |
a.logger.Infof("Server is ready to handle requests at %s", server.Addr) | |
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { | |
return fmt.Errorf("could not listen on %s: %v", server.Addr, err) | |
} | |
<-done | |
a.logger.Infof("Server stopped") | |
return nil | |
} |
외부 이벤트를 받아서, 의도한 곳으로 이벤트를 전달해주는 컴포넌트가 a.router
- router 정보를 업데이트하는 로직은 RegisterHandlerFor()라는 메소드를 보면 된다.
// RegisterHandlerFor implements MTAdapter. | |
func (a *gitHubAdapter) RegisterHandlerFor(ctx context.Context, src *v1alpha1.GitHubSource) error { | |
secretCli := a.secrGetter.Secrets(src.Namespace) | |
secretToken, err := common.SecretFrom(ctx, secretCli, src.Spec.SecretToken.SecretKeyRef) | |
if err != nil { | |
return fmt.Errorf("reading token from Secret: %w", err) | |
} | |
logger := logging.FromContext(ctx) | |
ceSrc := v1alpha1.GitHubEventSource(src.Spec.OwnerAndRepository) | |
handler := common.NewHandler(a.ceClient, src.Status.SinkURI.String(), ceSrc, secretToken, logger) | |
path := fmt.Sprintf("/%s/%s", src.Namespace, src.Name) | |
a.router.Register(src.Name, src.Namespace, path, handler) | |
return nil | |
} |
router를 보니 common.NewHandler() 라는 메소드가 있다.
- github webhook에서 들어오는 이벤트 (pull_request 같은 것) 를 받아서
- GithubSource 라는 Custom Resource의 정보를 토대로
- Knative Sink에 정의된 리소스 (example.yaml의 destinationServer) 로 전달한다.
handler 메소드 내부를 보면
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
payload, err := h.Hook.Parse(r, ValidEvents...) | |
if err != nil { | |
if err == gh.ErrEventNotFound { | |
w.WriteHeader(http.StatusNotFound) | |
h.Logger.Info("Event not found") | |
return | |
} | |
w.WriteHeader(http.StatusBadRequest) | |
h.Logger.Errorf("Error processing request: %v", err) | |
return | |
} | |
ctx := context.Background() | |
if len(h.SinkURI) > 0 { | |
ctx = cloudevents.ContextWithTarget(ctx, h.SinkURI) | |
} | |
err = h.handleEvent(ctx, payload, r.Header) | |
if err != nil { | |
h.Logger.Errorf("Event handler error: %v", err) | |
w.WriteHeader(400) | |
w.Write([]byte(err.Error())) | |
return | |
} | |
h.Logger.Infof("Event processed") | |
w.WriteHeader(202) | |
w.Write([]byte("accepted")) | |
} | |
func (h *Handler) handleEvent(ctx context.Context, payload interface{}, hdr http.Header) error { | |
gitHubEventType := hdr.Get(GHHeaderEvent) | |
if gitHubEventType == "" { | |
return fmt.Errorf("%q header is not set", GHHeaderEvent) | |
} | |
eventID := hdr.Get(GHHeaderDelivery) | |
if eventID == "" { | |
return fmt.Errorf("%q header is not set", GHHeaderDelivery) | |
} | |
h.Logger.Infof("Handling %s", gitHubEventType) | |
cloudEventType := sourcesv1alpha1.GitHubEventType(gitHubEventType) | |
subject, extensions := SubjectAndExtensionsFromGitHubEvent(gh.Event(gitHubEventType), payload, h.Logger) | |
event := cloudevents.NewEvent() | |
event.SetID(eventID) | |
event.SetType(cloudEventType) | |
event.SetSource(h.Source) | |
event.SetSubject(subject) | |
for k, v := range extensions { | |
event.SetExtension(k, v) | |
} | |
if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil { | |
return fmt.Errorf("failed to marshal event data: %w", err) | |
} | |
result := h.Client.Send(ctx, event) | |
if !cloudevents.IsACK(result) { | |
return result | |
} | |
return nil | |
} |
adapter url에서 payload로 들어온 이벤트 타입을 파싱한다 (h.Hook.Parse())
- 잘못된 타입이면 BadRequest
- context 정보에 이벤트를 전달할 목적지인 h.SinkURL 정보를 포함한다.
h.handleEvent()로 이벤트 핸들링 로직 수행
- github event payload 파싱하고
- cloudevent의 SetType / SetSource / SetSubject / SetData 등의 메소드로 cloudevent를 생성한다
- h.Client.Send() 로 cloudevent를 h.SinkURL에 전달한다.
mt-adapter?
default adapter의 경우 Custom Resource 하나 만들어질 때마다 adapter 역할을 하는 서버도 하나씩 만들어지는 구조.
- Custom Resource가 많아질수록 리소스 점유가 불필요하게 늘어난다
- multi-tenant 구조를 사용하면, adapter URL 하나만 관리하고 path 기반 route를 사용해서 리소스 효율적으로 adapter를 관리할 수 있다.
// ServeHTTP implements http.Handler. | |
func (h *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
// Path-based dispatch | |
h.routersMu.RLock() | |
keyedHandler, ok := h.routers[r.URL.Path] | |
h.routersMu.RUnlock() | |
if ok { | |
// Check if source still exists. | |
_, err := h.lister.GitHubSources(keyedHandler.namespace).Get(keyedHandler.name) | |
if err == nil { | |
keyedHandler.handler.ServeHTTP(w, r) | |
return | |
} | |
h.Unregister(r.URL.Path) | |
} | |
http.NotFoundHandler().ServeHTTP(w, r) | |
} | |
// Register adds a new GitHub event handler for the given GitHubSource. | |
func (h *Router) Register(name, namespace, path string, handler http.Handler) { | |
h.routersMu.Lock() | |
defer h.routersMu.Unlock() | |
h.routers[path] = keyedHandler{ | |
handler: handler, | |
namespace: namespace, | |
name: name, | |
} | |
} | |
// Unregister removes the GitHubSource served at the given path. | |
func (h *Router) Unregister(path string) { | |
h.routersMu.Lock() | |
defer h.routersMu.Unlock() | |
delete(h.routers, path) | |
} |
k8s Resource의 API Path와 마찬가지로
GithubSource 이름과, 배포된 namespace를 사용해서 path 기반 route를 설정한다.
다시말해
- github에서 이벤트 보내는 webhook (= adapter) URL: https://adapter-url.example.com/default/githubsource-sample
- router 로직: default/githubsource-sample 이라는 key값에 매핑된 handler 로직을 실행한다.
adapter URL의 경우 eventing-github는 knative service를 사용하는데
- knative service는 scale to zero 옵션까지 있으므로,
- 이벤트가 들어오지 않을 경우 pod가 0으로 내려간다.
- 자주 발생하는 이벤트가 아니라면, idle 리소스에서 발생하는 낭비를 줄일 수 있음.
Reconciler
두 개의 메소드가 중요하다.
- 사용자가 Custom Resource를 배포했을 때, github repo에 webhook을 등록하는 ReconcileKind()
- 사용자가 Custom Resource를 삭제했을 때, github repo에서 webhook을 삭제하는 FinalizeKind()
func (r *Reconciler) ReconcileKind(ctx context.Context, source *sourcesv1alpha1.GitHubSource) pkgreconciler.Event { | |
source.Status.InitializeConditions() | |
accessToken, err := r.secretFrom(ctx, source.Namespace, source.Spec.AccessToken.SecretKeyRef) | |
if err != nil { | |
source.Status.MarkNoSecrets("AccessTokenNotFound", "%s", err) | |
return err | |
} | |
secretToken, err := r.secretFrom(ctx, source.Namespace, source.Spec.SecretToken.SecretKeyRef) | |
if err != nil { | |
source.Status.MarkNoSecrets("SecretTokenNotFound", "%s", err) | |
return err | |
} | |
source.Status.MarkSecrets() | |
dest := source.Spec.Sink.DeepCopy() | |
if dest.Ref != nil { | |
// To call URIFromDestination(), dest.Ref must have a Namespace. If there is | |
// no Namespace defined in dest.Ref, we will use the Namespace of the source | |
// as the Namespace of dest.Ref. | |
if dest.Ref.Namespace == "" { | |
dest.Ref.Namespace = source.GetNamespace() | |
} | |
} | |
uri, err := r.sinkResolver.URIFromDestinationV1(ctx, *dest, source) | |
if err != nil { | |
source.Status.MarkNoSink("NotFound", "%s", err) | |
return err | |
} | |
source.Status.MarkSink(uri) | |
ksvc, err := r.reconcileReceiveAdapter(ctx, source) | |
if err != nil { | |
source.Status.MarkWebhookNotConfigured("MissingReceiveAdapter", err.Error()) | |
return err | |
} | |
if ksvc.Status.GetCondition(apis.ConditionReady).IsTrue() && ksvc.Status.URL != nil { | |
withPath := *ksvc.Status.URL | |
if r.receiveAdapterImage == "" { | |
withPath.Path = fmt.Sprintf("/%s/%s", source.Namespace, source.Name) | |
} | |
args := &webhookArgs{ | |
source: source, | |
url: &withPath, | |
accessToken: accessToken, | |
secretToken: secretToken, | |
alternateGitHubAPIURL: source.Spec.GitHubAPIURL, | |
} | |
// source.Status.MarkServiceDeployed(ra) | |
// TODO: Mark Deployed for the ksvc | |
if source.Status.WebhookIDKey == "" { | |
hookID, err := r.createWebhook(ctx, args) | |
if err != nil { | |
source.Status.MarkWebhookNotConfigured("CreationFailed", err.Error()) | |
return err | |
} | |
source.Status.WebhookIDKey = hookID | |
} else { | |
err := r.reconcileWebhook(ctx, args, source.Status.WebhookIDKey) | |
if err != nil { | |
source.Status.MarkWebhookNotConfigured("ReconciliationFailed", err.Error()) | |
return err | |
} | |
} | |
source.Status.MarkWebhookConfigured() | |
} | |
source.Status.CloudEventAttributes = r.createCloudEventAttributes(source) | |
source.Status.ObservedGeneration = source.Generation | |
return nil | |
} |
ReconcileKind()의 경우
- github repo에 접근하기 위한 access Token 정보를 k8s secret에서 조회
- reconcileReceiveAdapter() 메소드로 adapter에서 새로운 종류의 이벤트 핸들러 등록
- 원하는 종류의 이벤트와, 이벤트 전달받을 adapter URL 정보를 github webhook에 등록
- webhookID 결과를 Status 필드에 기록
- Custom Resource가 수정되면, 수정사항을 github webhook에도 반영
- 새로운 이벤트 타입 등록 or 기존 이벤트 타입 삭제 -> github API로 변경사항 반영
func (r *Reconciler) FinalizeKind(ctx context.Context, source *sourcesv1alpha1.GitHubSource) pkgreconciler.Event { | |
// If a webhook was created, try to delete it | |
if source.Status.WebhookIDKey != "" { | |
// Get access token | |
accessToken, err := r.secretFrom(ctx, source.Namespace, source.Spec.AccessToken.SecretKeyRef) | |
if apierrors.IsNotFound(err) { | |
source.Status.MarkNoSecrets("AccessTokenNotFound", "%s", err) | |
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeWarning, | |
"WebhookDeletionSkipped", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err) | |
// return EventTypeNormal to avoid finalize loop | |
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "WebhookDeletionSkipped", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err) | |
} else if err != nil { | |
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeWarning, | |
"WebhookDeletionFailed", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err) | |
return fmt.Errorf("error getting secret: %v", err) | |
} | |
args := &webhookArgs{ | |
source: source, | |
accessToken: accessToken, | |
alternateGitHubAPIURL: source.Spec.GitHubAPIURL, | |
hookID: source.Status.WebhookIDKey, | |
} | |
// Delete the webhook using the access token and stored webhook ID | |
err = r.deleteWebhook(ctx, args) | |
var gherr *ghclient.ErrorResponse | |
if errors.As(err, &gherr) { | |
if gherr.Response.StatusCode == http.StatusNotFound { | |
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeWarning, "WebhookDeletionSkipped", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err) | |
// return EventTypeNormal to avoid finalize loop | |
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "WebhookDeletionSkipped", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err) | |
} | |
} else { | |
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeWarning, | |
"WebhookDeletionFailed", "Could not delete webhook %q: %v", source.Status.WebhookIDKey, err) | |
return fmt.Errorf("error deleting webhook: %v", err) | |
} | |
// Webhook deleted, clear ID | |
source.Status.WebhookIDKey = "" | |
} | |
return nil | |
} |
FinalizedKind()
- Custom Resource에 등록된 정보가 삭제되므로, 외부 리소스에도 삭제 사항을 반영한다
- status의 webhookID 정보를 github API로 삭제