简介
Kubernetes的client-go库介绍client-go是一个调用kubernetes集群资源对象http API的客户端(是一个典型的web服务客户端库),即通过client-go实现对kubernetes集群中资源对象(包括deployment、service、ingress、replicaSet、pod、namespace、node等)的增删改查等操作。
使用实例
以node resource 为例,展示使用client-go 对 resource 进行查询和更新
请求node 数据
clientset, err := kubernetes.NewForConfig(config)
// 直接获取node 列表
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
// 通过informer 获取node 列表
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
nodeInformer := factory.Core().V1().Nodes()
go nodeInformer.Informer().Run(stopCh)
if !cache.WaitForCacheSync(stopCh, nodeInformer.Informer().HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
nodes, err := nodeInformer.Lister().List(labels.NewSelector())
更新node
// 直接更新
_, err = a.client.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
// 发送patch 指令
patchTemplate := map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
labelkey: labelvaule,
},
},
}
patchdata, _ := json.Marshal(patchTemplate)
_, err := clientset.CoreV1().Nodes().Patch(ctx, Nodes[i].Name, types.StrategicMergePatchType, patchdata, metav1.PatchOptions{})
资源对象访问
k8s.io/client-go
/rest // 底层rest client 定义 RESTClient struct
/kubernetes // 访问 Kubernetes API的一系列的clientset
/typed
/core/v1
/pod.go // pod 相关api
/extensions/v1beta1
/deployment.go // deployment 相关api
/dynamic // 对任意Kubernetes对象执行通用操作的动态client
/informer
k8s.io/api
/core/v1
/types.go // 定义了pod service 等struct
类似于 /core/v1
和 ` /extensions/v1beta1 这些GroupVersion 在
k8s.io/client-go 和
k8s.io/api` 都有对应目录。
config,err := clientcmd.BuildConfigFromFlags("",kubeconfig)
clientset, err := kubernetes.NewForConfig(config)
pod,err := clientset
.CoreV1() // 选择APIGroupVersion 即 /api/v1
.Pods("book") // 命名空间
.Get("example",metav1.GetOptions{}) // 访问 /api/v1/namespaces/book/pods/example
从上到下来说:Clientset是调用Kubernetes资源对象最常用的client,可以操作所有的资源对象。需要指定Group、Version,然后根据Resource获取。下文从下到上来 描述下 api resource 访问的基本原理:
可以通过 kubectl proxy
(监听127.0.0.1:8001
) 来访问 apiserver http://127.0.0.1:8001/api/v1/namespaces/default/pods/$podName
。也可以 kubectl get --raw /api/v1/namespaces/default/pods/$podName
访问http 接口拿到pod 数据,为了方便 数据的访问, client-go 提供了RESTClient 来进行初步的封装。类似于 java 的HttpUtils 类。
// k8s.io/client-go/rest/client.go
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}
// 你可以直接 根据 rest.Config 初始化RESTClient,从apiServer 中拿到数据
type RESTClient struct {
// base is the root URL for all invocations of the client
base *url.URL
// versionedAPIPath is a path segment connecting the base URL to the resource root
versionedAPIPath string
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
...
}
// k8s.io/client-go/rest/request.go
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
client := r.c.Client // http.Client
// Right now we make about ten retry attempts if we get a Retry-After response.
retries := 0
for {
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
req = req.WithContext(ctx)
req.Header = r.headers
resp, err := client.Do(req)
done := func() bool {
...
}()
if done {
return nil
}
}
}
以pod 为例,对外提供了 PodInterface 封装了对Pod 的api。 Pod 的schema 数据 k8s.io/api
对应GroupVesion 路径下的 register.go 文件中 注册到 统一的 Schema 中,schema 数据在client-go 中用于 http 数据的解封装。
// k8s.io/client-go/deprecated/typed/core/v1/pod.go
type PodInterface interface {
Create(*v1.Pod) (*v1.Pod, error)
Update(*v1.Pod) (*v1.Pod, error)
Delete(name string, options *metav1.DeleteOptions) error
Get(name string, options metav1.GetOptions) (*v1.Pod, error)
List(opts metav1.ListOptions) (*v1.PodList, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Pod, err error)
...
}
// pods implements PodInterface
type pods struct {
client rest.Interface
ns string
}
func (c *pods) Get(name string, options metav1.GetOptions) (result *v1.Pod, err error) {
result = &v1.Pod{}
err = c.client.Get(). // 新建Request 对象
Namespace(c.ns). // 设置Request.namespace
Resource("pods"). // 设置Request.resource
Name(name). // 设置Request.resourceName
VersionedParams(&options, scheme.ParameterCodec).
Do(context.TODO()). // 执行Request.request
Into(result)
return
}
pod /node 等API Resource 按GroupVersion(CoreV1/ExtensionsV1beta1) 进行了聚合,对外提供CoreV1Client/ExtensionsV1beta1Client,各个GroupVersion Interface 聚合为 clientset
type CoreV1Interface interface {
RESTClient() rest.Interface
ConfigMapsGetter
EventsGetter
NamespacesGetter
NodesGetter
PersistentVolumesGetter
PersistentVolumeClaimsGetter
PodsGetter
PodTemplatesGetter
ReplicationControllersGetter
SecretsGetter
ServicesGetter
...
}
type CoreV1Client struct {
restClient rest.Interface // 通用的REST 客户端
}
client-go 包含了 k8s 一些核心对象的访问,此外一些非核心对象 或用户crd 对象可以独立提供类似 client-go 功能
- 比如metric 机制相关的 PodMetrics/NodeMetrics对象,其代码都在
k8s.io/metrics
包里。 - controller-runtime 为cr 生成对应的client,scheme中 包含了cr 的信息。
更新status,以Deployment 为例,/apis/apps/v1beta1/namespaces/${ns}/deployments/${name}
只能更新deployment 的 spec。/apis/apps/v1beta1/namespaces/${ns}/deployments/${name}/status
只能更新 deployment 的status。
发展历程
informer 在client-go 库中,k8s 多个组件用informer 来减少对api-server的访问压力。 从 Kubernetes 资源控制到开放应用模型,控制器的进化之旅
- Controller 一直访问API Server 导致API Server 压力太大,于是有了Informer
- 由 Informer 代替Controller去访问 API Server,而Controller不管是查状态还是对资源进行伸缩都和 Informer 进行交接。而且 Informer 不需要每次都去访问 API Server,它只要在初始化的时候通过 LIST API 获取所有资源的最新状态,然后再通过 WATCH API 去监听这些资源状态的变化,整个过程被称作 ListAndWatch。
- Informer 也有一个助手叫 Reflector,上面所说的 ListAndWatch 事实上是由 Reflector 一手操办的。这使 API Server 的压力大大减少。
- 后来,WATCH 数据的太多了,Informer/Reflector去 API Server 那里 WATCH 状态的时候,只 WATCH 特定资源的状态,不要一股脑儿全 WATCH。
- 一个controller 一个informer 还是压力大,于是针对每个(受多个控制器管理的)资源弄一个 Informer。比如 Pod 同时受 Deployment 和 StatefulSet 管理。这样当多个控制器同时想查 Pod 的状态时,只需要访问一个 Informer 就行了。
- 但这又引来了新的问题,SharedInformer 无法同时给多个控制器提供信息,这就需要每个控制器自己排队和重试。为了配合控制器更好地实现排队和重试,SharedInformer 搞了一个 Delta FIFO Queue(增量先进先出队列),每当资源被修改时,它的助手 Reflector 就会收到事件通知,并将对应的事件放入 Delta FIFO Queue 中。与此同时,SharedInformer 会不断从 Delta FIFO Queue 中读取事件,然后更新本地缓存的状态。
入口对象——informer
k8s.io/client-go
/rest
/informer
/core
/v1
/pod.go
/interface.go
/interface.go
/factory.go // 定义sharedInformerFactory struct
/tools
/cache // informer 机制的的重点在cache 包里
/shared_informer.go // 定义了 sharedIndexInformer struct
/controller.go
/reflector.go
/delta_fifo.go
Kubernetes: Controllers, Informers, Reflectors and StoresKubernetes offers these powerful structures to get a local representation of the API server’s resources.The Informer just a convenient wrapper to automagically syncs the upstream data to a downstream store and even offers you some handy event hooks.
“高冷”的 Kubernetes Informer 一探究竟为了让 Client-go 更快地返回 List/Get 请求的结果、减少对 Kubenetes API 的直接调用,Informer 被设计实现为一个依赖(并且只依赖) Kubernetes List/Watch API 、可监听事件并触发回调函数的二级缓存工具包。PS:这点zk/etcd 等client 也提供类似能力,只是zk/etcd client 存储的是通用数据,没有封装资源对象。
实际使用中 每一个资源对象(比如Pod、Deployment)都对应一个Informer,底层都用到了SharedIndexInformer
整体设计
Informer 中主要包含 Controller、Reflector、DeltaFIFO、LocalStore、Lister 和 Processor 六个组件,
- Controller 并不是 Kubernetes Controller,这两个 Controller 并没有任何联系;
- Reflector 的主要作用是通过 Kubernetes Watch API 监听某种 resource 下的所有事件;
- DeltaFIFO 和 LocalStore 是 Informer 的两级缓存;
- Lister 主要是被调用 List/Get 方法;
- Processor 中记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。
informer 机制主要两个流程
- Reflector 通过ListWatcher 同步apiserver 数据(只启动时搞一次),并watch apiserver ,将event 加入到 Queue 中
- controller 从 Queue中获取event,更新存储,并触发Processor 业务层注册的 ResourceEventHandler
Kubernetes Informer 详解 Informer 只会调用 Kubernetes List 和 Watch 两种类型的 API,Informer 在初始化的时,先调用 Kubernetes List API 获得某种 resource 的全部 Object,缓存在内存中; 然后,调用 Watch API 去 watch 这种 resource,去维护这份缓存; 之后,Informer 就不再调用 Kubernetes 的任何 API。
Reflector
// k8s.io/client-go/tools/cache/reflector.go
// Reflector(反射器) 监听指定的资源,将所有的变化都反射到给定的存储中去
type Reflector struct {
// name 标识这个反射器的名称,默认为 文件:行数(比如reflector.go:125)
// 默认名字通过 k8s.io/apimachinery/pkg/util/naming/from_stack.go 下面的 GetNameFromCallsite 函数生成
name string
// 期望放到 Store 中的类型名称,如果提供,则是 expectedGVK 的字符串形式
// 否则就是 expectedType 的字符串,它仅仅用于显示,不用于解析或者比较。
expectedTypeName string
// 我们放到 Store 中的对象类型
expectedType reflect.Type
// 如果是非结构化的,我们期望放在 Sotre 中的对象的 GVK
expectedGVK *schema.GroupVersionKind
// 与 watch 源同步的目标 Store
store Store
// 用来执行 lists 和 watches 操作的 listerWatcher 接口(最重要的)
listerWatcher ListerWatcher
WatchListPageSize int64
...
Reflector 对象通过 Run 函数来启动监控并处理监控事件
// k8s.io/client-go/tools/cache/reflector.go
// Run 函数反复使用反射器的 ListAndWatch 函数来获取所有对象和后续的 deltas。
// 当 stopCh 被关闭的时候,Run函数才会退出。
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
首先通过反射器的 relistResourceVersion 函数获得反射器 relist 的资源版本,如果资源版本非 0,则表示根据资源版本号继续获取,当传输过程中遇到网络故障或者其他原因导致中断,下次再连接时,会根据资源版本号继续传输未完成的部分。
ResourceVersion(资源版本号)非常重要,Kubernetes 中所有的资源都拥有该字段,它标识当前资源对象的版本号,每次修改(CUD)当前资源对象时,Kubernetes API Server 都会更改 ResourceVersion,这样 client-go 执行 Watch 操作时可以根据ResourceVersion 来确定当前资源对象是否发生了变化。
两级缓存
二级缓存属于Informer的底层缓存机制,这两级缓存分别是DeltaFIFO和LocalStore。DeltaFIFO用来存储Watch API返回的各种事件 ,LocalStore只会被Lister的List/Get方法访问 。
watch 到的event(包含资源对象数据),先看本地cache 有没有,有就是更新,没有就是新增。
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {...}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {...}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {...}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
虽然Informer和Kubernetes之间没有resync机制,但Informer内部的这两级缓存之间存在resync机制。
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{} // 保存所有数据的 map 结构
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
cache 中为resource 建立了索引,类似于mysql 的聚簇索引和非聚簇索引,对每一个资源可以提供keyFunc 和 indexFunc 返回资源相关的key 和index 由cache 自动建立索引。
processor 是如何处理数据的
两条主线
- sharedIndexInformer.Run ==> sharedProcessor.run ==> sharedProcessor.run/pop 从channel 读取数据并执行
- sharedIndexInformer.HandleDeltas ==> sharedProcessor.distribute ==> processorListener.addCh 往channel 里塞数据
// k8s.io/client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
// 加入到processorListener 的addCh 中,随后进入pendingNotifications,因为这里不能阻塞
listener.add(obj)
}
}
}
// k8s.io/client-go/tools/cache/shared_informer.go
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
pendingNotifications buffer.RingGrowing
...
}
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run) // 消费nextCh
p.wg.Start(listener.pop) // 消费addCh 经过 mq 转到 nextCh
}
p.listenersStarted = true
}()
...
}
消息流转的具体路径:addCh ==> notificationToAdd ==> pendingNotifications ==> notification ==> nextCh
func (p *processorListener) pop() {
var nextCh chan<- interface{}
var notification interface{} // 用来做消息的中转,并在最开始的时候标记pendingNotifications 为空
for {
// select case channel 更多是事件驱动的感觉,哪个channel 来数据了或者可以 接收数据了就处理哪个 case 内逻辑
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
可以看到,对于handler 来说,除非特殊场景,否则一般不需要另起协程了。
watch 是如何实现的?
client-go 请求分为 长时间运行和非长时间运行,watch、日志流、exec等 是长时间运行请求,而GET/LIST/UPDATE 等不是长时间运行的请求。informers 通过使用新连接从断开连接中恢复,应用程序代码通常不会发出通知。informer 针对watch 提供更高级别的编程接口:缓存以及按Name或其它属性 对对象快速索引查找。
从k8s.io/apimachinery/pkg/watch
返回的watch.Interface
type Interface interface{
Stop()
ResultChan() <- Event
}
type Event struct{
Type EventType // ADDED/MODIFIED/DELETED/ERROR
Object runtime.Object
}
理解 K8S 的设计精髓之 List-Watch机制和Informer模块HTTP 分块传输编码允许服务器为动态生成的内容维持 HTTP 持久链接。通常,持久链接需要服务器在开始发送消息体前发送Content-Length消息头字段,但是对于动态生成的内容来说,在内容创建完之前是不可知的。使用分块传输编码,数据分解成一系列数据块,并以一个或多个块发送,这样服务器可以发送数据而不需要预先知道发送内容的总大小。
当客户端调用watch API时,apiserver 在response的HTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便和服务端保持该链接,并等待下一个数据块,即资源的事件信息。例如:
$ curl -i http://{kube-api-server-ip}:8080/api/v1/watch/pods?watch=yes
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 02 Jan 2019 20:22:59 GMT
Transfer-Encoding: chunked
{"type":"ADDED", "object":{"kind":"Pod","apiVersion":"v1",...}}
{"type":"ADDED", "object":{"kind":"Pod","apiVersion":"v1",...}}
{"type":"MODIFIED", "object":{"kind":"Pod","apiVersion":"v1",...}}
与clientset 关联
sharedIndexInformer 中有个属性 listerWatcher,以Pod 为例,其创建如下,从中可以看到informer 通过ListWatcher 接口与 clientset 建立了关联
func NewPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, namespace, resyncPeriod, indexers, nil)
}
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, ...) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
...
return client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
...
return client.CoreV1().Pods(namespace).Watch(options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
其它
工作队列
client-go 在k8s.io/client-go/util/workqueue
提供了一个workqueue 的高效实现,用于实现controller。该package 包含许多用于不同目的的队列变种(DelayingInterface/RateLimitingInterface),基本接口如下
type Interface interface{
Add(item interface{})
Len() int
Get()(item interface{},shutdown bool)
// 当controller 处理完成后,每个Get 返回的item都要显式 Done(item) 调用
Done(item interface)
ShutDown()
ShuttingDown() bool
}
Dynamic client
Dynamic client 是一种动态的 client,它能处理 kubernetes 所有的资源。不同于 clientset,dynamic client 对GVK 一无所知, 返回的对象unstructured.Unstructured(在k8s.io/apimachinery 中定义,并注册到了schema 中) 是一个 map[string]interface{}
,如果一个 controller 中需要控制所有的 API,可以使用dynamic client,目前它在 garbage collector 和 namespace controller中被使用。
k8s.io/client-go
/dynamic
/dynamicinformer
/dynamiclister
/interface.go
相比底层的 RESTClient,基于 unstructured.Unstructured 实现了 数据的解封装 及watch 机制。
// k8s.io/client-go/dynamic/interface.go
type ResourceInterface interface {
Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error
Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
...
}
// k8s.io/client-go/dynamic/simple.go
func (c *dynamicResourceClient) Get(ctx context.Context, name string, opts metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) {
// 这里直接拼接了 api resource 的请求路径
result := c.client.client.Get().AbsPath(append(c.makeURLSegments(name), subresources...)...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)
retBytes, err := result.Raw()
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
return uncastObj.(*unstructured.Unstructured), nil
}