K8s) kube-controller-manager

kube-controller-manager

API SeverInformerKCMControllerController ContextInformer FactoryindexercontrollerwatchqpushsharedProcessorhandlerqeventkeysync()listerSummery거의 모든 리소스 종류(Kind)마다 존재능동적인 상태 관리(Reconciliation)가필요한 리소스에만 존재

컨트롤러를 생성(초기화)하고 실행

CreateControllerContextBuildControllersInformerFactory.Start()RunControllers순서informer 생성 및 등록 (to factory)informer factory 생성핸들러 등록 (informer 구독)informer 실행- watch AP 서버Controller 실행- 리소스 상태 sync

핵심 초기화 (run 함수)

  • CreateControllerContext
    • 모든 컨트롤러가 공유할 SharedInformerFactory 와 Client 를 생성
  • BuildControllers
    • informer 생성 및 등록
    • 컨트롤러가 informer 에 핸들러 등록(구독)
  • InformerFactory.Start
    • InformerFactory 를 시작하여 캐시 동기화를 개시
  • RunControllers
    • 각 컨트롤러의 루프를 돌림

Controller

deployment.DeploymentControllerworkqueue.TypedRateLimitingInterface[string]queueappslisters.DeploymentListerdListerclientclientset.InterfaceInformerIndexer (cache)deploymentListerlisters.ResourceIndexer(Accessor)AddEventHandler(Subscribe)enqueueInformerhandlerdeployment.DeploymentControllerqueueenqueuedListerRun()indexergetprocessNextWorkItem()syncDeployment()e.g.addDeploymentController 동작

목표 상태와 현재 상태를 일치시키는 무한 루프

능동적인 상태 관리(Reconciliation)가 필요한 리소스에만 존재

역할

  • 다수의 컨트롤러(리소스별로 존재) 를 조율
  • 공유 의존성을 관리 (Informers, Clients)
  • 고가용성(Leader Election) 처리
Leader Election

Master Node 3대에 각각 하나씩 총 3개의 kube-controller-manager 프로세스(인스턴스)를 띄움

  • 이 중 하나만 리더로 선출(선출 기준: 선착순) 하여 모든 controller 를 생성 및 실행
  • 리더는 2초마다 Lease 라는 문서(주기적인 생존 신고용)를 갱신
  • 나머지는 대기조(Standby) 로 Leader 장애시 정권 교체
    • 모든 데이터의 원본(Truth)은 etcd에만 저장되어 있기 때문에 인수인계는 필요 없음

동작

queuekey 만 넣고 Worker 가 큐에서 꺼내서 비즈니스 로직(sync)을 실행

  • informer 에 EventHandler 등록
    • EventHandler 에서는 queue 에 key 를 넣는 로직을 실행
  • Rate Limiting Queue 사용
    • workqueue.TypedRateLimitingInterface[string]
    • 재시도가 많아질수록 더욱 천천히 재시도
  • key 형식
    • "Namespace/Name"
  • 데이터는 항상 처리 직전에 조회
    • 데이터는 informer 의 로컬 캐시를 lister 를 통해 조회

Informer

SharedInformerFactoryInformerAPI ServerListerWatcherWatchIndexersharedProcessorCachingControllerNotifyControllerupdate(Reflector)리소스 종류마다 하나씩 존재map[reflect.Type]cache.SharedIndexInformerdeploymentInformerLister()Informer()InformerIndexer (cache)InformerInformerInformer...InformerGET ?watch=trueAPI Server(Streaming)Transfer-Encoding: chunkedwatch.Eventqueueindexereventtypeobject infoprocessLoop()handlerHandleDeltas()PopprocessDeltas()ControllerupdatecallInformerRun()SharedProcessorProcessorListenerdistributeaddChnextChrunringbufferlisteners 목록 관리버퍼링(Queueing) & 핸들러 함수 실행Informerrequest(Watch)responseInformerFactory.Start()Informer 동작watch()API Serverconnectionresponse body+ DeltaTyped.reader.Read(d.buf[base:])framer.NewFrameReader(resp.Body)JSON 문법상 중괄호 { 와 } 의 짝이 맞을 때까지스트림을 읽어서 하나의 완벽한 JSON 객체가완성되는 시점을 데이터의 끝으로 인식streaming.NewDecoder

"DB 를 직접 조회하지 말고, 변경 사항만 구독해서 내 로컬 메모리(Cache)에 복사본을 갖고 있자."

거의 모든 리소스 종류(Kind)마다 존재

역할

  • API (Kubernetes API Server) Watch
    • 한 번의 연결 요청으로 커넥션을 만들고 response body 를 계속해서 받음
  • 캐시 구축
    • 로컬 캐시
      • cache.cacheStorage: treadSafeMap
        • lock (sync.RWMutex): 동시성 제어
        • items (map[string]any): 실제 데이터 저장소
        • index (storeIndex): 검색을 빠르게 하기 위한 보조 인덱스
  • 이벤트를 구독한 컨트롤러에게 알림
sharedInformerFactory

컨트롤러들이 공유하는 informer 들을 map 으로 관리하며
동일한 리소스에 대해 하나의 informer (watch, cache) 를 갖도록 함

Reference


포스트
카테고리
시리즈