深入源码分析kubernetes informer机制(三)Resync

news/2024/9/29 14:28:26 标签: kubernetes, 容器, 云原生

[阅读指南]
这是该系列第三篇
基于kubernetes 1.27 stage版本
为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。


文章目录

  • 为什么需要resync
  • resync做了什么

为什么需要resync

如果看过上一篇,大概能了解,client数据主要通过reflector 的list/watch进行同步。

回顾一下informer整体的数据同步逻辑。

  1. informer初始化时,调用list接口获取制定类型的全量资源数据,此时的resource version默认为0。假如指定资源类型为pod,那么就是获取所有pod数据
  2. list 获取到数据后,将全量数据同步到本地缓存。首次list完成后,informer后续都将通过watch来同步资源更新
  3. watcher监控到资源更新事件,将接收到的事件放入存储队列中(delta FIFO)
  4. informer 的另一个process会不断取出存储队列中的delta事件进行数据更新
  5. 缓存数据更新成功后,将数据变化通过回调函数同步至custom controller workqueue中
  6. custom controller顺序处理workqueue中的数据变更事件
    在这里插入图片描述

流程包括了三端的数据同步。

  • 首先api-server与informer中间通过sourceVersion可以保证数据的一致性
    client携带本地的sourceVersion请求api-server,api-server会将最新版本的增量变化通过事件返回给client。
    如图所示,在此期间,如果数据连接发生任何异常,informer会在重新建立watcher连接时,携带上个版本的sourceVersion,并再次更新所有的增量变化。
    在这里插入图片描述

  • 然后是本地informer与custom之间,通过workqueue来进行事件通知。
    informer的协程将FIFO队列中的事件取出更新至本地后,还会将事件同步回调至custom controller,加入到workqueue队列中。
    但是回看informer的代码,informer在处理回调事件时,并不会关注回调的结果。
    在这里插入图片描述

也就是说,如果custom controller侧的消费出现异常导致数据同步失败,informer是不知情的。

所以还需要引入别的机制来保障custom数据与本地缓存的一致性,以维持体的可靠性,也就是resync。
(当然如果controller本身也存在对比sourceVersion的逻辑,其实不需要这一机制也是可以确保数据一致的,resync相当于从框架层增加了一层保护,这篇博客有对相关的问题进行探讨)

resync做了什么

resync的逻辑非常简单,就是定时将本地缓存中所有的资源对象生成事件重新推送到FIFO中,重新触发controller的回调。
参考《Programming Kubernetes》一书中的概念,其实就是在边缘触发,水平驱动的基础上,附加了定时同步的能力。
在这里插入图片描述

具体来看下resync的代码实现。

informer在初始化时指定了resync执行间隔。

// informer创建方法
func NewIndexerInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration, // Resync执行周期
	h ResourceEventHandler,
	indexers Indexers,
) (Indexer, Controller) {}

// workqueue调用示例
// 0 代表不重复执行
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{...})

在informer初始化完成后,拉起一个协程进行定时resync

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	...
    
	go r.startResync(stopCh, cancelCh, resyncerrc)
	return r.watch(w, stopCh, resyncerrc)
}

该协程会按照informer配置的时间间隔定时调用存储对象的resync方法。
比较特殊的是,sharedIndexInformer类型的informer会另外有ShouldResync方法来轮询每个监听了当前资源对象的listener的是否需要进行resync操作。

func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
	resyncCh, cleanup := r.resyncChan() // 返回一个触发resync的信号,内部实现就是一个timer
	defer func() {
		cleanup() // Call the last one written into cleanup
	}()
	for {
		select {
		case <-resyncCh:
		case <-stopCh:
			return
		case <-cancelCh:
			return
		}

        // sharedIndexInformer 中用ShouldResync()来管理各个listener的resync
		if r.ShouldResync == nil || r.ShouldResync() {
			if err := r.store.Resync(); err != nil { 
				resyncerrc <- err 
				return
			}
		}
		cleanup()
		resyncCh, cleanup = r.resyncChan()
	}
}

resync只做一件事,将本地缓存里的资源对象全部重新添加到FIFO队列中,再触发contronller处理一次。
不过,为了避免与最新的变更发生冲突,FIFO队列中已有delta且还没有处理的资源对象,不会被重新添加。

func (f *DeltaFIFO) Resync() error {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.knownObjects == nil {
		return nil
	}

    // f.knownObjects 可以获取到本地缓存中所有资源对象的列表
	keys := f.knownObjects.ListKeys()
	for _, k := range keys {
        // 过滤掉已经有新的事件在队列中等待处理的资源对象
        // 把所有资源对象以resync类型添加到队列中
		if err := f.syncKeyLocked(k); err != nil {
			return err
		}
	}
	return nil
}

参考:
https://www.kubernetes.org.cn/2693.html
https://github.com/cloudnativeto/sig-kubernetes/issues/11
https://www.cnblogs.com/WisWang/p/13897782.html


http://www.niftyadmin.cn/n/4942387.html

相关文章

Python程序的执行方式,是否需要main函数

Python程序的执行方式有两种&#xff0c;一种是交互式编程&#xff0c;一种是文件执行方式。交互式编程是在命令行或者IDLE中直接输入代码&#xff0c;按回车键就可以运行代码&#xff0c;并立即看到输出结果。文件执行方式是创建一个源文件&#xff0c;将所有代码放在源文件中…

远程仓库上创建一个新的分支 `b` 并将远程分支 `a` 的内容克隆到 `b` 分支上

一、需求&#xff1a; 要在远程仓库上创建一个新的分支 b 并将远程分支 a 的内容克隆到 b 分支上&#xff0c;你可以按照以下步骤进行操作&#xff1a; 二、解决方案&#xff1a; 1. 首先&#xff0c;使用 git clone 命令克隆远程仓库到本地。例如&#xff0c;要克隆一个名为…

【Rust日报】2023-08-13 安全验证工具 Kani 0.34.0已经发布了!

Pipelight - 自托管自动化管道 -> v0.6.14 嗨&#xff0c;大家好&#xff01;距离我和大家分享这个项目已经一个多月了。你们中一些人对这个项目的热情欢迎让我深受感动。感谢你们中许多人一直支持和帮助我改进它。 从那以后发生了什么变化&#xff1f; 您现在可以在文件修改…

基于Yolov5与LabelImg训练自己数据的完整流程

基于Yolov5与LabelImg训练自己数据的完整流程 1. 创建虚拟环境2. 通过git 安装 ultralytics3. 下载yolov54. 安装labelImg标注软件5. 使用labelImg进行标注&#xff0c;图片使用上面的coco1285.1 点击“打开目录”选择存储图像的文件夹进行标注&#xff0c;右下角会出现图像列表…

JVM编译优化

即时编译器 HotSpot虚拟机中内置了两个即时编译器,分别称为Client Compiler和Server Compiler,或者简称为C1编译器和C2编译器。Java8默认开启Server模式。用户可以使用“-client”或“-server”参数去指定编译模式。 C1编译器启动速度快,关注局部简单可靠的优化,比如方法…

Python学习笔记_基础篇(六)_Set集合,函数,深入拷贝,浅入拷贝,文件处理

1、Set基本数据类型 a、set集合&#xff0c;是一个无序且不重复的元素集合 class set(object):"""set() -> new empty set objectset(iterable) -> new set objectBuild an unordered collection of unique elements."""def add(self, *a…

操作符和表达式求值

目录 1.运算符的优先级和结合性 1.1运算符的优先级 1.2结合性 2.操作符的使用最终带来的是一个表达式的值 2.1.隐式类型转换&#xff08;整型提升&#xff09; 2.1.1整形提升的例子 2.2算术转换 1.运算符的优先级和结合性 运算符是编程语言中的基本元素之一&#xff0c;主…

运行nvidia-smi 和nvcc -V 出现的版本不一致问题详解

遇到的问题 虽然网上有很多的博客是关于这个问题的&#xff0c;但是我感觉还是非常的不清楚&#xff0c;这个问题整整困扰了我一天。 先说一下我的问题&#xff0c;我的服务器镜像已经安装了GPU驱动和cuda10.2(cuda toolkit) &#xff0c;但是RTX 3090 的算力是sm_86&#xf…