跳转到文档内容

HAMI项目GPU POD调度流程源码走读

· 阅读需要 1 分钟

Author:

@elrond.wang

1. 概述

使用hami的过程中经常会出现Pod被创建出来Pending的问题,犹以如下两个问题为著

  • Pod UnexpectedAdmissionError
  • Pod Pending

介于此,展开这部分代码的粗略走读,旨在说明调度过程中各组件的交互,以及资源的计算方式,其他细节会有所遗漏

2. 调度流程

看代码之前可以先看下官方文档说明,大体上比较明确 flowchart

细节上可以分为三个阶段

  • 准备阶段: 图上可以看出有一些依赖条件,例如要有mutating webhook, device-plugin等等,所以这个阶段主要分析下依赖条件的准备, 只有在服务首次启动时需要

pod创建前的准备工作

  • pod调度阶段: 准备过程完成之后pod进入处理流程,完成调度
  • Pod启动阶段: pod如何与node上的gpu进行交互等

本文会着重分析准备阶段,主要内容为调度分析

3. Pod调度流程

  • 用户发送创建Pod请求到kube-apiserver
  • 触发adminssion webhook,更新pod中schedulerName
  • kube-apiserver根据schedulerName将请求发送给调度器处理
  • 调度器处理
    • 收集Node device信息 -- 通过node annotation收集,数据来自daemonSet hami-device-plugin 定时写入
    • 根据设备信息以及pod的limit信息进行打分,选出最高分的node
    • 将pod和node进行绑定完成绑定,进行pod创建

常见的几个问题排查及处理

Pod UnexpectedAdmissionError

pod创建状态显示 UnexpectedAdmissionError

了解流程之后,可以知道这个错误代表kube-apiserver调用拓展调度器失败,可能有两个原因,其他情况具体排查需要看 kube-apiserver日志

  • 通信异常: 从kube-apiserver到拓展调度器的https端口不通,有几种可能
    • dns无法解析
    • 跨节点通信有问题
    • 拓展调度器的服务异常
  • TLS验证错误: 一般会显示 webhook x509: certificate signed by unknown authority,helmchart部署时有一个 jobs.batch hami-vgpu.admission-pathch, 如果没有运行完成会出现这样的问题

调度问题

容器一直在 pending 状态,使用 kubectl describe命令可以看到具体原因,主要有以下几个

  • card Insufficient remaining memory
  • calcScore:node not fit pod

主要原因一般会是确实资源不足,或者配置错误,配置错误是指 devicememoryscaling 配置未符合预期,有两个地方可以配置, 优先级为节点配置大于全局配置,容易发生问题的地方在于 name需要和kubectl get node显示的nodename一致才能生效

  • 全局配置 k get cm hami-scheduler-device
  deviceMemoryScaling: 3
  • 节点配置 k get cm hami-device-plugin
{
"nodeconfig": [
{
"name": "node1",
"devicememoryscaling": 3,
"devicesplitcount": 10,
"migstrategy":"none",
"filterdevices": {
"uuid": [],
"index": []
}
}
]
}

3.1. MutatingWebhook

K8s提供了adminssionWebhook资源, 以k8s资源操作为触发器,触发hook,用途最广泛的为针对Pod创建做拦截,对Pod做yaml注入,具体的例如增加init容器注入文件等等

3.1.1. webhook配置

hami-webhook

k get mutatingwebhookconfigurations.admissionregistration.k8s.io hami-webhook -o yaml
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
annotations:
meta.helm.sh/release-name: hami
meta.helm.sh/release-namespace: kube-system
creationTimestamp: "2024-12-10T03:50:37Z"
generation: 5
labels:
app.kubernetes.io/managed-by: Helm
name: hami-webhook
resourceVersion: "2307810"
uid: 2cdcebe4-f561-429f-9480-701e65980687
webhooks:
- admissionReviewVersions:
- v1beta1
clientConfig:
caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkakNDQVJ5Z0F3SUJBZ0lSQUxjd2FQMjUrMlphdGhTTlFMcG1qT0V3Q2dZSUtvWkl6ajBFQXdJd0R6RU4KTUFzR0ExVUVDaE1FYm1sc01UQWdGdzB5TkRFeU1EWXdOekV4TVRWYUdBOHlNVEkwTVRFeE1qQTNNVEV4TlZvdwpEekVOTUFzR0ExVUVDaE1FYm1sc01UQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJDUnlXUDdYCkRmT2N4NEVTMVRYaUs0dnFFU2wrcUFHYjI2YzNrOEdMWlZTL1lHaFpLZVVxaEgydVRhTFdWTW1hZVJFbkxqM0cKSStMVFRVTTR6SVhEUld5alZ6QlZNQTRHQTFVZER3RUIvd1FFQXdJQ0JEQVRCZ05WSFNVRUREQUtCZ2dyQmdFRgpCUWNEQVRBUEJnTlZIUk1CQWY4RUJUQURBUUgvTUIwR0ExVWREZ1FXQkJTcVV4bWpGa29YUlpRK0xXVzBNM1pJCnMzck1wakFLQmdncWhrak9QUVFEQWdOSUFEQkZBaUJSY2VRL2tJVkR2VTV3Vjl0K3NRWm93TmFhTWhIMTV5K2sKT3VrR0FlRGVtQUloQUxDZzFrM0JQZUJBNG8reWY5emxvVjM2VEk2RHUzaGdMT1B3MXhaZkFvcDMKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
service:
name: hami-scheduler
namespace: kube-system
path: /webhook
port: 443
failurePolicy: Ignore
matchPolicy: Equivalent
name: vgpu.hami.io
namespaceSelector:
matchExpressions:
- key: hami.io/webhook
operator: NotIn
values:
- ignore
objectSelector:
matchExpressions:
- key: hami.io/webhook
operator: NotIn
values:
- ignore
reinvocationPolicy: Never
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- CREATE
resources:
- pods
scope: '*'
sideEffects: None
timeoutSeconds: 10

当pod创建时,调用 https://hami-scheduler.kube-system:443/webhook 做tls校验,CA为 caBundle 配置 当命名空间有 hami.io/webhook: ignore 的标签时不触发

3.1.2. webhook Server实现

需要实现一个tls的Http Server, 且提供 /webhook 接口

cmd/scheduler/main.go:84

func start() {
...
router.POST("/webhook", routes.WebHookRoute())

WebHookRoute 需要实现 sigs.k8s.io/controller-runtime@v0.16.3/pkg/webhook/admission/webhook.go:98

pkg/scheduler/webhook.go:52

 pod := &corev1.Pod{}
err := h.decoder.Decode(req, pod)
if err != nil {
klog.Errorf("Failed to decode request: %v", err)
return admission.Errored(http.StatusBadRequest, err)
}
if len(pod.Spec.Containers) == 0 {
klog.Warningf(template+" - Denying admission as pod has no containers", req.Namespace, req.Name, req.UID)
return admission.Denied("pod has no containers")
}
klog.Infof(template, req.Namespace, req.Name, req.UID)
hasResource := false
for idx, ctr := range pod.Spec.Containers {
c := &pod.Spec.Containers[idx]
if ctr.SecurityContext != nil {
if ctr.SecurityContext.Privileged != nil && *ctr.SecurityContext.Privileged {
klog.Warningf(template+" - Denying admission as container %s is privileged", req.Namespace, req.Name, req.UID, c.Name)
continue
}
}
for _, val := range device.GetDevices() {
found, err := val.MutateAdmission(c, pod)
if err != nil {
klog.Errorf("validating pod failed:%s", err.Error())
return admission.Errored(http.StatusInternalServerError, err)
}
hasResource = hasResource || found
}
}

if !hasResource {
klog.Infof(template+" - Allowing admission for pod: no resource found", req.Namespace, req.Name, req.UID)
//return admission.Allowed("no resource found")
} else if len(config.SchedulerName) > 0 {
pod.Spec.SchedulerName = config.SchedulerName
if pod.Spec.NodeName != "" {
klog.Infof(template+" - Pod already has node assigned", req.Namespace, req.Name, req.UID)
return admission.Denied("pod has node assigned")
}
}
marshaledPod, err := json.Marshal(pod)
if err != nil {
klog.Errorf(template+" - Failed to marshal pod, error: %v", req.Namespace, req.Name, req.UID, err)
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.Object.Raw, marshaledPod)

主要通过pod中容器的resource来判断是否要不要走拓展调度器

pkg/device/nvidia/device.go:246

func (dev *NvidiaGPUDevices) MutateAdmission(ctr *corev1.Container, p *corev1.Pod) (bool, error) {
/*gpu related */
priority, ok := ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourcePriority)]
if ok {
ctr.Env = append(ctr.Env, corev1.EnvVar{
Name: util.TaskPriority,
Value: fmt.Sprint(priority.Value()),
})
}

_, resourceNameOK := ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourceCountName)]
if resourceNameOK {
return resourceNameOK, nil
}

_, resourceCoresOK := ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourceCoreName)]
_, resourceMemOK := ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourceMemoryName)]
_, resourceMemPercentageOK := ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourceMemoryPercentageName)]

if resourceCoresOK || resourceMemOK || resourceMemPercentageOK {
if dev.config.DefaultGPUNum > 0 {
ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourceCountName)] = *resource.NewQuantity(int64(dev.config.DefaultGPUNum), resource.BinarySI)
resourceNameOK = true
}
}

if !resourceNameOK && dev.config.OverwriteEnv {
ctr.Env = append(ctr.Env, corev1.EnvVar{
Name: "NVIDIA_VISIBLE_DEVICES",
Value: "none",
})
}
return resourceNameOK, nil
}

主要比对pod的 Resources Limit中有没有包含 device-config.yaml 的配置,如果有走hami调度流程

deivce-config 以英伟达显卡为例

nvidia:
resourceCountName: nvidia.com/gpu
resourceMemoryName: nvidia.com/gpumem
resourceMemoryPercentageName: nvidia.com/gpumem-percentage
resourceCoreName: nvidia.com/gpucores
resourcePriorityName: nvidia.com/priority
overwriteEnv: false
defaultMemory: 0
defaultCores: 0
defaultGPUNum: 1
deviceSplitCount: 10
deviceMemoryScaling: 3
deviceCoreScaling: 3

确定走hami调度流程之后,通过Patch修改Pod schedulerName 为hami调度器的名称

3.2. 拓展k8s scheduler

KubeSchedulerConfiguration 拓展调度器可以通过实现拓展点进行调度器的拓展

3.2.1. KubeSchedulerConfiguration

k get cm hami-scheduler-newversion -o yaml
apiVersion: v1
data:
config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: false
profiles:
- schedulerName: hami-scheduler
extenders:
- urlPrefix: "https://127.0.0.1:443"
filterVerb: filter
bindVerb: bind
nodeCacheCapable: true
weight: 1
httpTimeout: 30s
enableHTTPS: true
tlsConfig:
insecure: true
managedResources:
- name: nvidia.com/gpu
ignoredByScheduler: true
- name: nvidia.com/gpumem
ignoredByScheduler: true
- name: nvidia.com/gpucores
ignoredByScheduler: true
- name: nvidia.com/gpumem-percentage
ignoredByScheduler: true
- name: nvidia.com/priority
ignoredByScheduler: true
- name: cambricon.com/vmlu
ignoredByScheduler: true
- name: hygon.com/dcunum
ignoredByScheduler: true
- name: hygon.com/dcumem
ignoredByScheduler: true
- name: hygon.com/dcucores
ignoredByScheduler: true
- name: iluvatar.ai/vgpu
ignoredByScheduler: true
kind: ConfigMap
metadata:
annotations:
meta.helm.sh/release-name: hami
meta.helm.sh/release-namespace: kube-system
creationTimestamp: "2024-12-10T03:50:36Z"
labels:

app.kubernetes.io/component: hami-scheduler
app.kubernetes.io/instance: hami
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: hami
app.kubernetes.io/version: 2.4.1
helm.sh/chart: hami-2.4.1
name: hami-scheduler-newversion
namespace: kube-system
resourceVersion: "2316275"
uid: 3a61a72c-0bab-432f-b4d7-5c1ae46ee14d

拓展调度器通过拓展点进行拓展, 这里拓展了filter和bind

  • filter: 找到最合适的node
  • bind: 为pod创建一个binding资源

调度时会根据拓展点顺序来调用拓展调度器的实现,这里会先调用

https://127.0.0.1:443/filter 再调用 https://127.0.0.1:443/filter

3.2.2. 拓展调度器Http Server启动

cmd/scheduler/main.go:70

func start() {
device.InitDevices()
sher = scheduler.NewScheduler()
sher.Start()
defer sher.Stop()

// start monitor metrics
go sher.RegisterFromNodeAnnotations()
go initMetrics(config.MetricsBindAddress)

// start http server
router := httprouter.New()
router.POST("/filter", routes.PredicateRoute(sher))
router.POST("/bind", routes.Bind(sher))

3.2.3. filter实现

pkg/scheduler/routes/route.go:41

func PredicateRoute(s *scheduler.Scheduler) httprouter.Handle {
klog.Infoln("Into Predicate Route outer func")
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
klog.Infoln("Into Predicate Route inner func")
checkBody(w, r)

var buf bytes.Buffer
body := io.TeeReader(r.Body, &buf)

var extenderArgs extenderv1.ExtenderArgs
var extenderFilterResult *extenderv1.ExtenderFilterResult

if err := json.NewDecoder(body).Decode(&extenderArgs); err != nil {
klog.Errorln("decode error", err.Error())
extenderFilterResult = &extenderv1.ExtenderFilterResult{
Error: err.Error(),
}
} else {
extenderFilterResult, err = s.Filter(extenderArgs)
if err != nil {
klog.Errorf("pod %v filter error, %v", extenderArgs.Pod.Name, err)
extenderFilterResult = &extenderv1.ExtenderFilterResult{
Error: err.Error(),
}
}
}

if resultBody, err := json.Marshal(extenderFilterResult); err != nil {
klog.Errorf("Failed to marshal extenderFilterResult: %+v, %+v",
err, extenderFilterResult)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
} else {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resultBody)
}
}
}

pkg/scheduler/scheduler.go:430

func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace)
nums := k8sutil.Resourcereqs(args.Pod)
total := 0
for _, n := range nums {
for _, k := range n {
total += int(k.Nums)
}
}
if total == 0 {
klog.V(1).Infof("pod %v not find resource", args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
return &extenderv1.ExtenderFilterResult{
NodeNames: args.NodeNames,
FailedNodes: nil,
Error: "",
}, nil
}
annos := args.Pod.Annotations
s.delPod(args.Pod)
nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
if err != nil {
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}
if len(failedNodes) != 0 {
klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes)
}
nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
if err != nil {
err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}
if len((*nodeScores).NodeList) == 0 {
klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
return &extenderv1.ExtenderFilterResult{
FailedNodes: failedNodes,
}, nil
}
klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
sort.Sort(nodeScores)
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
annotations := make(map[string]string)
annotations[util.AssignedNodeAnnotations] = m.NodeID
annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

for _, val := range device.GetDevices() {
val.PatchAnnotations(&annotations, m.Devices)
}

//InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
//supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
//maps.Copy(annotations, InRequestDevices)
//maps.Copy(annotations, supportDevices)
s.addPod(args.Pod, m.NodeID, m.Devices)
err = util.PatchPodAnnotations(args.Pod, annotations)
if err != nil {
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
s.delPod(args.Pod)
return nil, err
}
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
return &res, nil
}

这里核心逻辑主要有两步, 获取节点资源、根据节点已分配资源与总资源计算分数并选出一个最高分

3.2.3.1. 获取节点资源信息

pkg/scheduler/scheduler.go:241

func (s *Scheduler) getNodesUsage(nodes *[]string, task *corev1.Pod) (*map[string]*NodeUsage, map[string]string, error) {
overallnodeMap := make(map[string]*NodeUsage)
cachenodeMap := make(map[string]*NodeUsage)
failedNodes := make(map[string]string)
//for _, nodeID := range *nodes {
allNodes, err := s.ListNodes()
if err != nil {
return &overallnodeMap, failedNodes, err
}

for _, node := range allNodes {
nodeInfo := &NodeUsage{}
userGPUPolicy := config.GPUSchedulerPolicy
if task != nil && task.Annotations != nil {
if value, ok := task.Annotations[policy.GPUSchedulerPolicyAnnotationKey]; ok {
userGPUPolicy = value
}
}
nodeInfo.Node = node.Node
nodeInfo.Devices = policy.DeviceUsageList{
Policy: userGPUPolicy,
DeviceLists: make([]*policy.DeviceListsScore, 0),
}
for _, d := range node.Devices {
nodeInfo.Devices.DeviceLists = append(nodeInfo.Devices.DeviceLists, &policy.DeviceListsScore{
Score: 0,
Device: &util.DeviceUsage{
ID: d.ID,
Index: d.Index,
Used: 0,
Count: d.Count,
Usedmem: 0,
Totalmem: d.Devmem,
Totalcore: d.Devcore,
Usedcores: 0,
MigUsage: util.MigInUse{
Index: 0,
UsageList: make(util.MIGS, 0),
},
MigTemplate: d.MIGTemplate,
Mode: d.Mode,
Type: d.Type,
Numa: d.Numa,
Health: d.Health,
},
})
}
overallnodeMap[node.ID] = nodeInfo
}

podsInfo := s.ListPodsInfo()
for _, p := range podsInfo {
node, ok := overallnodeMap[p.NodeID]
if !ok {
continue
}
for _, podsingleds := range p.Devices {
for _, ctrdevs := range podsingleds {
for _, udevice := range ctrdevs {
for _, d := range node.Devices.DeviceLists {
deviceID := udevice.UUID
if strings.Contains(deviceID, "[") {
deviceID = strings.Split(deviceID, "[")[0]
}
if d.Device.ID == deviceID {
d.Device.Used++
d.Device.Usedmem += udevice.Usedmem
d.Device.Usedcores += udevice.Usedcores
if strings.Contains(udevice.UUID, "[") {
tmpIdx, Instance := util.ExtractMigTemplatesFromUUID(udevice.UUID)
if len(d.Device.MigUsage.UsageList) == 0 {
util.PlatternMIG(&d.Device.MigUsage, d.Device.MigTemplate, tmpIdx)
}
d.Device.MigUsage.UsageList[Instance].InUse = true
klog.V(3).Infoln("add mig usage", d.Device.MigUsage, "template=", d.Device.MigTemplate, "uuid=", d.Device.ID)
}
}
}
}
}
}
klog.V(5).Infof("usage: pod %v assigned %v %v", p.Name, p.NodeID, p.Devices)
}
s.overviewstatus = overallnodeMap
for _, nodeID := range *nodes {
node, err := s.GetNode(nodeID)
if err != nil {
// The identified node does not have a gpu device, so the log here has no practical meaning,increase log priority.
klog.V(5).InfoS("node unregistered", "node", nodeID, "error", err)
failedNodes[nodeID] = "node unregistered"
continue
}
cachenodeMap[node.ID] = overallnodeMap[node.ID]
}
s.cachedstatus = cachenodeMap
return &cachenodeMap, failedNodes, nil
}

获取node总的资源与已分配的资源, 首先获取node信息

pkg/scheduler/nodes.go:120

func (m *nodeManager) ListNodes() (map[string]*util.NodeInfo, error) {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.nodes, nil
}

这里用到了缓存,缓存节点信息,由 addNode 添加缓存

3.2.3.1.1. Node缓存

pkg/scheduler/nodes.go:46

func (m *nodeManager) addNode(nodeID string, nodeInfo *util.NodeInfo) {
if nodeInfo == nil || len(nodeInfo.Devices) == 0 {
return
}
m.mutex.Lock()
defer m.mutex.Unlock()
_, ok := m.nodes[nodeID]
if ok {
if len(nodeInfo.Devices) > 0 {
tmp := make([]util.DeviceInfo, 0, len(nodeInfo.Devices))
devices := device.GetDevices()
deviceType := ""
for _, val := range devices {
if strings.Contains(nodeInfo.Devices[0].Type, val.CommonWord()) {
deviceType = val.CommonWord()
}
}
for _, val := range m.nodes[nodeID].Devices {
if !strings.Contains(val.Type, deviceType) {
tmp = append(tmp, val)
}
}
m.nodes[nodeID].Devices = tmp
m.nodes[nodeID].Devices = append(m.nodes[nodeID].Devices, nodeInfo.Devices...)
}
} else {
m.nodes[nodeID] = nodeInfo
}
}

这里的主要逻辑在于 device.GetDevices() 获取设备信息

pkg/device/devices.go:81

func GetDevices() map[string]Devices {
return devices
}

device也是个缓存,后面再分析,首先看Node缓存是什么时候被调用的

pkg/scheduler/scheduler.go:155

func (s *Scheduler) RegisterFromNodeAnnotations() {
klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations")
ticker := time.NewTicker(time.Second * 15)
for {
select {
case <-s.nodeNotify:
case <-ticker.C:
case <-s.stopCh:
return
}
labelSelector := labels.Everything()
if len(config.NodeLabelSelector) > 0 {
labelSelector = (labels.Set)(config.NodeLabelSelector).AsSelector()
}
rawNodes, err := s.nodeLister.List(labelSelector)
if err != nil {
klog.Errorln("nodes list failed", err.Error())
continue
}
var nodeNames []string
for _, val := range rawNodes {
nodeNames = append(nodeNames, val.Name)
for devhandsk, devInstance := range device.GetDevices() {
health, needUpdate := devInstance.CheckHealth(devhandsk, val)
klog.V(5).InfoS("device check health", "node", val.Name, "deviceVendor", devhandsk, "health", health, "needUpdate", needUpdate)
if !health {
err := devInstance.NodeCleanUp(val.Name)
// If the device is not healthy, the device is removed from the node.
// At the same time, this node needs to be removed from the cache.
if err != nil {
klog.Errorln("node cleanup failed", err.Error())
}
info, ok := s.nodes[val.Name]
if ok {
klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", val.Name, devhandsk, info.ID, err, s.nodes[val.Name].Devices)
s.rmNodeDevice(val.Name, info, devhandsk)
continue
}
}
if !needUpdate {
continue
}
_, ok := util.HandshakeAnnos[devhandsk]
if ok {
tmppat := make(map[string]string)
tmppat[util.HandshakeAnnos[devhandsk]] = "Requesting_" + time.Now().Format("2006.01.02 15:04:05")
klog.V(4).InfoS("New timestamp", util.HandshakeAnnos[devhandsk], tmppat[util.HandshakeAnnos[devhandsk]], "nodeName", val.Name)
n, err := util.GetNode(val.Name)
if err != nil {
klog.Errorln("get node failed", err.Error())
continue
}
util.PatchNodeAnnotations(n, tmppat)
}

nodeInfo := &util.NodeInfo{}
nodeInfo.ID = val.Name
nodeInfo.Node = val
nodedevices, err := devInstance.GetNodeDevices(*val)
if err != nil {
continue
}
nodeInfo.Devices = make([]util.DeviceInfo, 0)
for _, deviceinfo := range nodedevices {
nodeInfo.Devices = append(nodeInfo.Devices, *deviceinfo)
}
s.addNode(val.Name, nodeInfo)
if s.nodes[val.Name] != nil && len(nodeInfo.Devices) > 0 {
klog.Infof("node %v device %s come node info=%s,%v total=%v", val.Name, devhandsk, nodeInfo.ID, nodeInfo.Devices, s.nodes[val.Name].Devices)
}
}
}
_, _, err = s.getNodesUsage(&nodeNames, nil)
if err != nil {
klog.Errorln("get node usage failed", err.Error())
}
}
}

启动了一个15s的定时任务,获取node信息维护node缓存

这里的核心逻辑在于 for devhandsk, devInstance := range device.GetDevices() 获取所有的device,主要是一些根据不同的设备注册了不同的handler, 根据注册的device获取显卡的资源信息 devInstance.GetNodeDevices

这里会通过注册的device(此环境为nvidia),调用到不同显卡的GetNodeDevices实现, device后面再做具体说明

pkg/device/nvidia/device.go:209

ffunc (dev *NvidiaGPUDevices) GetNodeDevices(n corev1.Node) ([]*util.DeviceInfo, error) {
devEncoded, ok := n.Annotations[RegisterAnnos]
if !ok {
return []*util.DeviceInfo{}, errors.New("annos not found " + RegisterAnnos)
}
nodedevices, err := util.DecodeNodeDevices(devEncoded)
if err != nil {
klog.ErrorS(err, "failed to decode node devices", "node", n.Name, "device annotation", devEncoded)
return []*util.DeviceInfo{}, err
}
if len(nodedevices) == 0 {
klog.InfoS("no nvidia gpu device found", "node", n.Name, "device annotation", devEncoded)
return []*util.DeviceInfo{}, errors.New("no gpu found on node")
}
for _, val := range nodedevices {
if val.Mode == "mig" {
val.MIGTemplate = make([]util.Geometry, 0)
for _, migTemplates := range dev.config.MigGeometriesList {
found := false
for _, migDevices := range migTemplates.Models {
if strings.Contains(val.Type, migDevices) {
found = true
break
}
}
if found {
val.MIGTemplate = append(val.MIGTemplate, migTemplates.Geometries...)
break
}
}
}
}
devDecoded := util.EncodeNodeDevices(nodedevices)
klog.V(5).InfoS("nodes device information", "node", n.Name, "nodedevices", devDecoded)
return nodedevices, nil
}

看到这里基本逻辑是 scheduler 通过定时器去读取node的annotation信息并将其维护再node缓存中,以供调度时使用

apiVersion: v1
kind: Node
metadata:
annotations:
...
hami.io/node-nvidia-register: 'GPU-7aebc545-cbd3-18a0-afce-76cae449702a,10,24576,300,NVIDIA-NVIDIA
GeForce RTX 3090,0,true:

又调用到了 device,这个我们待会儿再看,继续看谁调用的 RegisterFromNodeAnnotations

cmd/scheduler/main.go:70

func start() {
device.InitDevices()
sher = scheduler.NewScheduler()
sher.Start()
defer sher.Stop()

// start monitor metrics
go sher.RegisterFromNodeAnnotations()
go initMetrics(config.MetricsBindAddress)

调度器启动的时候就会调用,这里逻辑明确了,继续看刚刚的device

3.2.3.1.2. device

device通过 pkg/device/devices.go:85 进行初始化

func InitDevicesWithConfig(config *Config) {
devices = make(map[string]Devices)
DevicesToHandle = []string{}
devices[nvidia.NvidiaGPUDevice] = nvidia.InitNvidiaDevice(config.NvidiaConfig)
devices[cambricon.CambriconMLUDevice] = cambricon.InitMLUDevice(config.CambriconConfig)
devices[hygon.HygonDCUDevice] = hygon.InitDCUDevice(config.HygonConfig)
devices[iluvatar.IluvatarGPUDevice] = iluvatar.InitIluvatarDevice(config.IluvatarConfig)
devices[mthreads.MthreadsGPUDevice] = mthreads.InitMthreadsDevice(config.MthreadsConfig)
devices[metax.MetaxGPUDevice] = metax.InitMetaxDevice(config.MetaxConfig)

DevicesToHandle = append(DevicesToHandle, nvidia.NvidiaGPUCommonWord)
DevicesToHandle = append(DevicesToHandle, cambricon.CambriconMLUCommonWord)
DevicesToHandle = append(DevicesToHandle, hygon.HygonDCUCommonWord)
DevicesToHandle = append(DevicesToHandle, iluvatar.IluvatarGPUCommonWord)
DevicesToHandle = append(DevicesToHandle, mthreads.MthreadsGPUCommonWord)
DevicesToHandle = append(DevicesToHandle, metax.MetaxGPUCommonWord)
for _, dev := range ascend.InitDevices(config.VNPUs) {
devices[dev.CommonWord()] = dev
DevicesToHandle = append(DevicesToHandle, dev.CommonWord())
}
}

这里用的是nvidia所以主要看 InitNvidiaDevice 即可

pkg/device/devices.go:42

type Devices interface {
CommonWord() string
MutateAdmission(ctr *corev1.Container, pod *corev1.Pod) (bool, error)
CheckHealth(devType string, n *corev1.Node) (bool, bool)
NodeCleanUp(nn string) error
GetNodeDevices(n corev1.Node) ([]*util.DeviceInfo, error)
CheckType(annos map[string]string, d util.DeviceUsage, n util.ContainerDeviceRequest) (bool, bool, bool)
// CheckUUID is check current device id whether in GPUUseUUID or GPUNoUseUUID set, return true is check success.
CheckUUID(annos map[string]string, d util.DeviceUsage) bool
LockNode(n *corev1.Node, p *corev1.Pod) error
ReleaseNodeLock(n *corev1.Node, p *corev1.Pod) error
GenerateResourceRequests(ctr *corev1.Container) util.ContainerDeviceRequest
PatchAnnotations(annoinput *map[string]string, pd util.PodDevices) map[string]string
CustomFilterRule(allocated *util.PodDevices, request util.ContainerDeviceRequest, toAllicate util.ContainerDevices, device *util.DeviceUsage) bool
ScoreNode(node *corev1.Node, podDevices util.PodSingleDevice, policy string) float32
AddResourceUsage(n *util.DeviceUsage, ctr *util.ContainerDevice) error
// This should not be associated with a specific device object
//ParseConfig(fs *flag.FlagSet)
}

这里定义了一些接口,不同的设备进行不同的实现,在scheduler启动时进行初始化,以供运行中调用

获取到各个节点的各个设备的资源情况之后开始进行打分

3.2.3.2. 根据节点资源信息打分

pkg/scheduler/scheduler.go:458

 nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
if err != nil {
err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}

pkg/scheduler/score.go:198

func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) {
userNodePolicy := config.NodeSchedulerPolicy
if annos != nil {
if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok {
userNodePolicy = value
}
}
res := policy.NodeScoreList{
Policy: userNodePolicy,
NodeList: make([]*policy.NodeScore, 0),
}

//func calcScore(nodes *map[string]*NodeUsage, errMap *map[string]string, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*NodeScoreList, error) {
// res := make(NodeScoreList, 0, len(*nodes))
for nodeID, node := range *nodes {
viewStatus(*node)
score := policy.NodeScore{NodeID: nodeID, Node: node.Node, Devices: make(util.PodDevices), Score: 0}
score.ComputeDefaultScore(node.Devices)

//This loop is for different container request
ctrfit := false
for ctrid, n := range nums {
sums := 0
for _, k := range n {
sums += int(k.Nums)
}

if sums == 0 {
for idx := range score.Devices {
for len(score.Devices[idx]) <= ctrid {
score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{})
}
score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{})
continue
}
}
klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID)
fit, _ := fitInDevices(node, n, annos, task, &score.Devices)
ctrfit = fit
if !fit {
klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID)
break
}
}

if ctrfit {
res.NodeList = append(res.NodeList, &score)
score.OverrideScore(node.Devices, userNodePolicy)
}
}
return &res, nil
}

这块逻辑主要分为遍历节点打分,遍历pod的容器计算每个容器对应的设备的分数,返回所有可以承载limits所需资源的node返回

3.2.3.3. 计算出节点的分数

pkg/scheduler/policy/node_policy.go:68

func (ns *NodeScore) ComputeDefaultScore(devices DeviceUsageList) {
used, usedCore, usedMem := int32(0), int32(0), int32(0)
for _, device := range devices.DeviceLists {
used += device.Device.Used
usedCore += device.Device.Usedcores
usedMem += device.Device.Usedmem
}
klog.V(2).Infof("node %s used %d, usedCore %d, usedMem %d,", ns.NodeID, used, usedCore, usedMem)

total, totalCore, totalMem := int32(0), int32(0), int32(0)
for _, deviceLists := range devices.DeviceLists {
total += deviceLists.Device.Count
totalCore += deviceLists.Device.Totalcore
totalMem += deviceLists.Device.Totalmem
}
useScore := float32(used) / float32(total)
coreScore := float32(usedCore) / float32(totalCore)
memScore := float32(usedMem) / float32(totalMem)
ns.Score = float32(Weight) * (useScore + coreScore + memScore)
klog.V(2).Infof("node %s computer default score is %f", ns.NodeID, ns.Score)
}

节点打分规则比较简单

3.2.3.4. 计算每个容器对应的设备的分数

pkg/scheduler/score.go:149

func fitInDevices(node *NodeUsage, requests util.ContainerDeviceRequests, annos map[string]string, pod *corev1.Pod, devinput *util.PodDevices) (bool, float32) {
//devmap := make(map[string]util.ContainerDevices)
devs := util.ContainerDevices{}
total, totalCore, totalMem := int32(0), int32(0), int32(0)
free, freeCore, freeMem := int32(0), int32(0), int32(0)
sums := 0
// computer all device score for one node
for index := range node.Devices.DeviceLists {
node.Devices.DeviceLists[index].ComputeScore(requests)
}
//This loop is for requests for different devices
for _, k := range requests {
sums += int(k.Nums)
if int(k.Nums) > len(node.Devices.DeviceLists) {
klog.InfoS("request devices nums cannot exceed the total number of devices on the node.", "pod", klog.KObj(pod), "request devices nums", k.Nums, "node device nums", len(node.Devices.DeviceLists))
return false, 0
}
sort.Sort(node.Devices)
fit, tmpDevs := fitInCertainDevice(node, k, annos, pod, devinput)
if fit {
for idx, val := range tmpDevs[k.Type] {
for nidx, v := range node.Devices.DeviceLists {
//bc node.Devices has been sorted, so we should find out the correct device
if v.Device.ID != val.UUID {
continue
}
total += v.Device.Count
totalCore += v.Device.Totalcore
totalMem += v.Device.Totalmem
free += v.Device.Count - v.Device.Used
freeCore += v.Device.Totalcore - v.Device.Usedcores
freeMem += v.Device.Totalmem - v.Device.Usedmem
err := device.GetDevices()[k.Type].AddResourceUsage(node.Devices.DeviceLists[nidx].Device, &tmpDevs[k.Type][idx])
if err != nil {
klog.Errorf("AddResource failed:%s", err.Error())
return false, 0
}
klog.Infoln("After AddResourceUsage:", node.Devices.DeviceLists[nidx].Device)
}
}
devs = append(devs, tmpDevs[k.Type]...)
} else {
return false, 0
}
(*devinput)[k.Type] = append((*devinput)[k.Type], devs)
}
return true, 0
}

主要逻辑为

  • 给容器对应的每个设备打分、遍历不同的容器对应的limit资源,找到可以承载容器limits资源的设备

pkg/scheduler/policy/gpu_policy.go:58

func (ds *DeviceListsScore) ComputeScore(requests util.ContainerDeviceRequests) {
request, core, mem := int32(0), int32(0), int32(0)
// Here we are required to use the same type device
for _, container := range requests {
request += container.Nums
core += container.Coresreq
if container.MemPercentagereq != 0 && container.MemPercentagereq != 101 {
mem += ds.Device.Totalmem * (container.MemPercentagereq / 100.0)
continue
}
mem += container.Memreq
}
klog.V(2).Infof("device %s user %d, userCore %d, userMem %d,", ds.Device.ID, ds.Device.Used, ds.Device.Usedcores, ds.Device.Usedmem)

usedScore := float32(request+ds.Device.Used) / float32(ds.Device.Count)
coreScore := float32(core+ds.Device.Usedcores) / float32(ds.Device.Totalcore)
memScore := float32(mem+ds.Device.Usedmem) / float32(ds.Device.Totalmem)
ds.Score = float32(Weight) * (usedScore + coreScore + memScore)
klog.V(2).Infof("device %s computer score is %f", ds.Device.ID, ds.Score)
}

打分规则与节点类似

pkg/scheduler/score.go:65

func fitInCertainDevice(node *NodeUsage, request util.ContainerDeviceRequest, annos map[string]string, pod *corev1.Pod, allocated *util.PodDevices) (bool, map[string]util.ContainerDevices) {
k := request
originReq := k.Nums
prevnuma := -1
klog.InfoS("Allocating device for container request", "pod", klog.KObj(pod), "card request", k)
var tmpDevs map[string]util.ContainerDevices
tmpDevs = make(map[string]util.ContainerDevices)
for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- {
klog.InfoS("scoring pod", "pod", klog.KObj(pod), "Memreq", k.Memreq, "MemPercentagereq", k.MemPercentagereq, "Coresreq", k.Coresreq, "Nums", k.Nums, "device index", i, "device", node.Devices.DeviceLists[i].Device.ID)
found, numa := checkType(annos, *node.Devices.DeviceLists[i].Device, k)
if !found {
klog.InfoS("card type mismatch,continuing...", "pod", klog.KObj(pod), (node.Devices.DeviceLists[i].Device).Type, k.Type)
continue
}
if numa && prevnuma != node.Devices.DeviceLists[i].Device.Numa {
klog.InfoS("Numa not fit, resotoreing", "pod", klog.KObj(pod), "k.nums", k.Nums, "numa", numa, "prevnuma", prevnuma, "device numa", node.Devices.DeviceLists[i].Device.Numa)
k.Nums = originReq
prevnuma = node.Devices.DeviceLists[i].Device.Numa
tmpDevs = make(map[string]util.ContainerDevices)
}
if !checkUUID(annos, *node.Devices.DeviceLists[i].Device, k) {
klog.InfoS("card uuid mismatch,", "pod", klog.KObj(pod), "current device info is:", *node.Devices.DeviceLists[i].Device)
continue
}

memreq := int32(0)
if node.Devices.DeviceLists[i].Device.Count <= node.Devices.DeviceLists[i].Device.Used {
continue
}
if k.Coresreq > 100 {
klog.ErrorS(nil, "core limit can't exceed 100", "pod", klog.KObj(pod))
k.Coresreq = 100
//return false, tmpDevs
}
if k.Memreq > 0 {
memreq = k.Memreq
}
if k.MemPercentagereq != 101 && k.Memreq == 0 {
//This incurs an issue
memreq = node.Devices.DeviceLists[i].Device.Totalmem * k.MemPercentagereq / 100
}
if node.Devices.DeviceLists[i].Device.Totalmem-node.Devices.DeviceLists[i].Device.Usedmem < memreq {
klog.V(5).InfoS("card Insufficient remaining memory", "pod", klog.KObj(pod), "device index", i, "device", node.Devices.DeviceLists[i].Device.ID, "device total memory", node.Devices.DeviceLists[i].Device.Totalmem, "device used memory", node.Devices.DeviceLists[i].Device.Usedmem, "request memory", memreq)
continue
}
if node.Devices.DeviceLists[i].Device.Totalcore-node.Devices.DeviceLists[i].Device.Usedcores < k.Coresreq {
klog.V(5).InfoS("card Insufficient remaining cores", "pod", klog.KObj(pod), "device index", i, "device", node.Devices.DeviceLists[i].Device.ID, "device total core", node.Devices.DeviceLists[i].Device.Totalcore, "device used core", node.Devices.DeviceLists[i].Device.Usedcores, "request cores", k.Coresreq)
continue
}
// Coresreq=100 indicates it want this card exclusively
if node.Devices.DeviceLists[i].Device.Totalcore == 100 && k.Coresreq == 100 && node.Devices.DeviceLists[i].Device.Used > 0 {
klog.V(5).InfoS("the container wants exclusive access to an entire card, but the card is already in use", "pod", klog.KObj(pod), "device index", i, "device", node.Devices.DeviceLists[i].Device.ID, "used", node.Devices.DeviceLists[i].Device.Used)
continue
}
// You can't allocate core=0 job to an already full GPU
if node.Devices.DeviceLists[i].Device.Totalcore != 0 && node.Devices.DeviceLists[i].Device.Usedcores == node.Devices.DeviceLists[i].Device.Totalcore && k.Coresreq == 0 {
klog.V(5).InfoS("can't allocate core=0 job to an already full GPU", "pod", klog.KObj(pod), "device index", i, "device", node.Devices.DeviceLists[i].Device.ID)
continue
}
if !device.GetDevices()[k.Type].CustomFilterRule(allocated, request, tmpDevs[k.Type], node.Devices.DeviceLists[i].Device) {
continue
}
if k.Nums > 0 {
klog.InfoS("first fitted", "pod", klog.KObj(pod), "device", node.Devices.DeviceLists[i].Device.ID)
k.Nums--
tmpDevs[k.Type] = append(tmpDevs[k.Type], util.ContainerDevice{
Idx: int(node.Devices.DeviceLists[i].Device.Index),
UUID: node.Devices.DeviceLists[i].Device.ID,
Type: k.Type,
Usedmem: memreq,
Usedcores: k.Coresreq,
})
}
if k.Nums == 0 {
klog.InfoS("device allocate success", "pod", klog.KObj(pod), "allocate device", tmpDevs)
return true, tmpDevs
}
if node.Devices.DeviceLists[i].Device.Mode == "mig" {
i++
}
}
return false, tmpDevs
}

遍历设备,主要根据设备资源余量来判断是否够container分配,返回所有够分配的设备

pkg/scheduler/scheduler.go:458

 nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
if err != nil {
err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}
if len((*nodeScores).NodeList) == 0 {
klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
return &extenderv1.ExtenderFilterResult{
FailedNodes: failedNodes,
}, nil
}
klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
sort.Sort(nodeScores)
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
annotations := make(map[string]string)
annotations[util.AssignedNodeAnnotations] = m.NodeID
annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

for _, val := range device.GetDevices() {
val.PatchAnnotations(&annotations, m.Devices)
}

//InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
//supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
//maps.Copy(annotations, InRequestDevices)
//maps.Copy(annotations, supportDevices)
s.addPod(args.Pod, m.NodeID, m.Devices)
err = util.PatchPodAnnotations(args.Pod, annotations)
if err != nil {
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
s.delPod(args.Pod)
return nil, err
}
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
return &res, nil

遍历完成之后选择分数最高的, 给pod打标签

apiVersion: v1
kind: Pod
metadata:
annotations:
hami.io/vgpu-node: node1
hami.io/vgpu-time: "1733988480"
hami.io/vgpu-devices-allocated: GPU-7aebc545-cbd3-18a0-afce-76cae449702a,NVIDIA,20000,80:;
hami.io/vgpu-devices-to-allocate: ;

3.2.4. binding实现

bind逻辑比较简单,将pod绑定到node

pkg/scheduler/routes/route.go:82

func Bind(s *scheduler.Scheduler) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
var buf bytes.Buffer
body := io.TeeReader(r.Body, &buf)
var extenderBindingArgs extenderv1.ExtenderBindingArgs
var extenderBindingResult *extenderv1.ExtenderBindingResult

if err := json.NewDecoder(body).Decode(&extenderBindingArgs); err != nil {
klog.ErrorS(err, "Decode extender binding args")
extenderBindingResult = &extenderv1.ExtenderBindingResult{
Error: err.Error(),
}
} else {
extenderBindingResult, err = s.Bind(extenderBindingArgs)
}

if response, err := json.Marshal(extenderBindingResult); err != nil {
klog.ErrorS(err, "Marshal binding result", "result", extenderBindingResult)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
errMsg := fmt.Sprintf("{'error':'%s'}", err.Error())
w.Write([]byte(errMsg))
} else {
klog.V(5).InfoS("Return bind response", "result", extenderBindingResult)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(response)
}
}
}

路由处理

``

func (s *Scheduler) Bind(args extenderv1.ExtenderBindingArgs) (*extenderv1.ExtenderBindingResult, error) {
klog.InfoS("Bind", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)
var err error
var res *extenderv1.ExtenderBindingResult
binding := &corev1.Binding{
ObjectMeta: metav1.ObjectMeta{Name: args.PodName, UID: args.PodUID},
Target: corev1.ObjectReference{Kind: "Node", Name: args.Node},
}
current, err := s.kubeClient.CoreV1().Pods(args.PodNamespace).Get(context.Background(), args.PodName, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Get pod failed")
}

node, err := s.kubeClient.CoreV1().Nodes().Get(context.Background(), args.Node, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Failed to get node", "node", args.Node)
s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, fmt.Errorf("failed to get node %v", args.Node))
res = &extenderv1.ExtenderBindingResult{
Error: err.Error(),
}
return res, nil
}

tmppatch := make(map[string]string)
for _, val := range device.GetDevices() {
err = val.LockNode(node, current)
if err != nil {
goto ReleaseNodeLocks
}
}

tmppatch[util.DeviceBindPhase] = "allocating"
tmppatch[util.BindTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

err = util.PatchPodAnnotations(current, tmppatch)
if err != nil {
klog.ErrorS(err, "patch pod annotation failed")
}
if err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{}); err != nil {
klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)
}
if err == nil {
s.recordScheduleBindingResultEvent(current, EventReasonBindingSucceed, []string{args.Node}, nil)
res = &extenderv1.ExtenderBindingResult{
Error: "",
}
klog.Infoln("After Binding Process")
return res, nil
}
ReleaseNodeLocks:
klog.InfoS("bind failed", "err", err.Error())
for _, val := range device.GetDevices() {
val.ReleaseNodeLock(node, current)
}
s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, err)
return &extenderv1.ExtenderBindingResult{
Error: err.Error(),
}, nil
}

3.3. Node将设备情况写入node annotation

scheduler获取node的设备信息主要是通过读取node的annotation,主要有如下几步

  • 启动插件
apiVersion: v1
kind: Node
metadata:
annotations:
hami.io/node-handshake: Requesting_2024.12.24 03:31:30
hami.io/node-handshake-dcu: Deleted_2024.12.06 07:43:49
hami.io/node-nvidia-register: 'GPU-7aebc545-cbd3-18a0-afce-76cae449702a,10,73728,300,NVIDIA-NVIDIA
GeForce RTX 3090,0,true:'

3.3.1. 启动device-plugin服务

这里用到了 github.com/urfave/cli/v2 作为command启动服务,需要注意 -v不是日志等级而是是否显示版本

cmd/device-plugin/nvidia/main.go:40

func main() {
var configFile string

c := cli.NewApp()
c.Name = "NVIDIA Device Plugin"
c.Usage = "NVIDIA device plugin for Kubernetes"
c.Version = info.GetVersionString()
c.Action = func(ctx *cli.Context) error {
return start(ctx, c.Flags)
}

3.3.2. 启动plugin

这里的plugin主要是针对不同厂家的设备需要实现不同的方法,这里定义了pluigin的控制器,例如start、restart、exit等,这里我们主要关注plugins, restartPlugins, err := startPlugins(c, flags, restarting)

cmd/device-plugin/nvidia/main.go:156

func start(c *cli.Context, flags []cli.Flag) error {
klog.Info("Starting FS watcher.")
util.NodeName = os.Getenv(util.NodeNameEnvName)
watcher, err := newFSWatcher(kubeletdevicepluginv1beta1.DevicePluginPath)
if err != nil {
return fmt.Errorf("failed to create FS watcher: %v", err)
}
defer watcher.Close()
//device.InitDevices()

/*Loading config files*/
klog.Infof("Start working on node %s", util.NodeName)
klog.Info("Starting OS watcher.")
sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

var restarting bool
var restartTimeout <-chan time.Time
var plugins []plugin.Interface
restart:
// If we are restarting, stop plugins from previous run.
if restarting {
err := stopPlugins(plugins)
if err != nil {
return fmt.Errorf("error stopping plugins from previous run: %v", err)
}
}

klog.Info("Starting Plugins.")
plugins, restartPlugins, err := startPlugins(c, flags, restarting)
if err != nil {
return fmt.Errorf("error starting plugins: %v", err)
}

if restartPlugins {
klog.Info("Failed to start one or more plugins. Retrying in 30s...")
restartTimeout = time.After(30 * time.Second)
}

restarting = true

// Start an infinite loop, waiting for several indicators to either log
// some messages, trigger a restart of the plugins, or exit the program.
for {
select {
// If the restart timeout has expired, then restart the plugins
case <-restartTimeout:
goto restart

// Detect a kubelet restart by watching for a newly created
// 'kubeletdevicepluginv1beta1.KubeletSocket' file. When this occurs, restart this loop,
// restarting all of the plugins in the process.
case event := <-watcher.Events:
if event.Name == kubeletdevicepluginv1beta1.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
klog.Infof("inotify: %s created, restarting.", kubeletdevicepluginv1beta1.KubeletSocket)
goto restart
}

// Watch for any other fs errors and log them.
case err := <-watcher.Errors:
klog.Errorf("inotify: %s", err)

// Watch for any signals from the OS. On SIGHUP, restart this loop,
// restarting all of the plugins in the process. On all other
// signals, exit the loop and exit the program.
case s := <-sigs:
switch s {
case syscall.SIGHUP:
klog.Info("Received SIGHUP, restarting.")
goto restart
default:
klog.Infof("Received signal \"%v\", shutting down.", s)
goto exit
}
}
}
exit:
err = stopPlugins(plugins)
if err != nil {
return fmt.Errorf("error stopping plugins: %v", err)
}
return nil
}

cmd/device-plugin/nvidia/main.go:239

启动插件,主要方法 p.Start()

func startPlugins(c *cli.Context, flags []cli.Flag, restarting bool) ([]plugin.Interface, bool, error) {
// Load the configuration file
klog.Info("Loading configuration.")
config, err := loadConfig(c, flags)
if err != nil {
return nil, false, fmt.Errorf("unable to load config: %v", err)
}
disableResourceRenamingInConfig(config)

/*Loading config files*/
//fmt.Println("NodeName=", config.NodeName)
devConfig, err := generateDeviceConfigFromNvidia(config, c, flags)
if err != nil {
klog.Errorf("failed to load config file %s", err.Error())
return nil, false, err
}

// Update the configuration file with default resources.
klog.Info("Updating config with default resource matching patterns.")
err = rm.AddDefaultResourcesToConfig(&devConfig)
if err != nil {
return nil, false, fmt.Errorf("unable to add default resources to config: %v", err)
}

// Print the config to the output.
configJSON, err := json.MarshalIndent(devConfig, "", " ")
if err != nil {
return nil, false, fmt.Errorf("failed to marshal config to JSON: %v", err)
}
klog.Infof("\nRunning with config:\n%v", string(configJSON))

// Get the set of plugins.
klog.Info("Retrieving plugins.")
pluginManager, err := NewPluginManager(&devConfig)
if err != nil {
return nil, false, fmt.Errorf("error creating plugin manager: %v", err)
}
plugins, err := pluginManager.GetPlugins()
if err != nil {
return nil, false, fmt.Errorf("error getting plugins: %v", err)
}

// Loop through all plugins, starting them if they have any devices
// to serve. If even one plugin fails to start properly, try
// starting them all again.
started := 0
for _, p := range plugins {
// Just continue if there are no devices to serve for plugin p.
if len(p.Devices()) == 0 {
continue
}

// Start the gRPC server for plugin p and connect it with the kubelet.
if err := p.Start(); err != nil {
klog.Error("Could not contact Kubelet. Did you enable the device plugin feature gate?")
klog.Error("You can check the prerequisites at: https://github.com/NVIDIA/k8s-device-plugin#prerequisites")
klog.Error("You can learn how to set the runtime at: https://github.com/NVIDIA/k8s-device-plugin#quick-start")
return plugins, true, nil
}
started++
}

if started == 0 {
klog.Info("No devices found. Waiting indefinitely.")
}

return plugins, false, nil
}

其中p(plugin)需要实现几个方法来管理插件

pkg/device-plugin/nvidiadevice/nvinternal/plugin/api.go:37

type Interface interface {
Devices() rm.Devices
Start() error
Stop() error
}

同时如果需要kubelet能够识别 resource中的类似 nvidia.com/gpu: 1 这样的拓展字段需要启动一个grpc服务挂载 /var/lib/kubelet/device-plugins/ 且实现如下方法, 这块跟调度相关性不大,暂且不展开 device-plugins

k8s.io/kubelet@v0.28.3/pkg/apis/deviceplugin/v1beta1/api.pb.go:1419

type DevicePluginServer interface {
// GetDevicePluginOptions returns options to be communicated with Device
// Manager
GetDevicePluginOptions(context.Context, *Empty) (*DevicePluginOptions, error)
// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
ListAndWatch(*Empty, DevicePlugin_ListAndWatchServer) error
// GetPreferredAllocation returns a preferred set of devices to allocate
// from a list of available ones. The resulting preferred allocation is not
// guaranteed to be the allocation ultimately performed by the
// devicemanager. It is only designed to help the devicemanager make a more
// informed allocation decision when possible.
GetPreferredAllocation(context.Context, *PreferredAllocationRequest) (*PreferredAllocationResponse, error)
// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error)
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as resetting the device before making devices available to the container
PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error)
}

3.3.3. nvidia插件的实现

主要看plugin.WatchAndRegister()

pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go:196

func (plugin *NvidiaDevicePlugin) Start() error {
plugin.initialize()

err := plugin.Serve()
if err != nil {
klog.Infof("Could not start device plugin for '%s': %s", plugin.rm.Resource(), err)
plugin.cleanup()
return err
}
klog.Infof("Starting to serve '%s' on %s", plugin.rm.Resource(), plugin.socket)

err = plugin.Register()
if err != nil {
klog.Infof("Could not register device plugin: %s", err)
plugin.Stop()
return err
}
klog.Infof("Registered device plugin for '%s' with Kubelet", plugin.rm.Resource())

if plugin.operatingMode == "mig" {
cmd := exec.Command("nvidia-mig-parted", "export")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
klog.Fatalf("nvidia-mig-parted failed with %s\n", err)
}
outStr := stdout.Bytes()
yaml.Unmarshal(outStr, &plugin.migCurrent)
os.WriteFile("/tmp/migconfig.yaml", outStr, os.ModePerm)
if len(plugin.migCurrent.MigConfigs["current"]) == 1 && len(plugin.migCurrent.MigConfigs["current"][0].Devices) == 0 {
idx := 0
plugin.migCurrent.MigConfigs["current"][0].Devices = make([]int32, 0)
for idx < GetDeviceNums() {
plugin.migCurrent.MigConfigs["current"][0].Devices = append(plugin.migCurrent.MigConfigs["current"][0].Devices, int32(idx))
idx++
}
}
klog.Infoln("Mig export", plugin.migCurrent)
}
go func() {
err := plugin.rm.CheckHealth(plugin.stop, plugin.health)
if err != nil {
klog.Infof("Failed to start health check: %v; continuing with health checks disabled", err)
}
}()

go func() {
plugin.WatchAndRegister()
}()

return nil
}

这里是个定时器,每30s收集一次该node的设备信息,并写入node annotation

func (plugin *NvidiaDevicePlugin) WatchAndRegister() {
klog.Info("Starting WatchAndRegister")
errorSleepInterval := time.Second * 5
successSleepInterval := time.Second * 30
for {
err := plugin.RegistrInAnnotation()
if err != nil {
klog.Errorf("Failed to register annotation: %v", err)
klog.Infof("Retrying in %v seconds...", errorSleepInterval)
time.Sleep(errorSleepInterval)
} else {
klog.Infof("Successfully registered annotation. Next check in %v seconds...", successSleepInterval)
time.Sleep(successSleepInterval)
}
}
}
func (plugin *NvidiaDevicePlugin) RegistrInAnnotation() error {
devices := plugin.getAPIDevices()
klog.InfoS("start working on the devices", "devices", devices)
annos := make(map[string]string)
node, err := util.GetNode(util.NodeName)
if err != nil {
klog.Errorln("get node error", err.Error())
return err
}
encodeddevices := util.EncodeNodeDevices(*devices)
annos[nvidia.HandshakeAnnos] = "Reported " + time.Now().String()
annos[nvidia.RegisterAnnos] = encodeddevices
klog.Infof("patch node with the following annos %v", fmt.Sprintf("%v", annos))
err = util.PatchNodeAnnotations(node, annos)

if err != nil {
klog.Errorln("patch node error", err.Error())
}
return err
}

具体数据收集逻辑

pkg/device-plugin/nvidiadevice/nvinternal/plugin/register.go:110

func (plugin *NvidiaDevicePlugin) getAPIDevices() *[]*util.DeviceInfo {
devs := plugin.Devices()
klog.V(5).InfoS("getAPIDevices", "devices", devs)
nvml.Init()
res := make([]*util.DeviceInfo, 0, len(devs))
for UUID := range devs {
ndev, ret := nvml.DeviceGetHandleByUUID(UUID)
if ret != nvml.SUCCESS {
klog.Errorln("nvml new device by index error uuid=", UUID, "err=", ret)
panic(0)
}
idx, ret := ndev.GetIndex()
if ret != nvml.SUCCESS {
klog.Errorln("nvml get index error ret=", ret)
panic(0)
}
memoryTotal := 0
memory, ret := ndev.GetMemoryInfo()
if ret == nvml.SUCCESS {
memoryTotal = int(memory.Total)
} else {
klog.Error("nvml get memory error ret=", ret)
panic(0)
}
Model, ret := ndev.GetName()
if ret != nvml.SUCCESS {
klog.Error("nvml get name error ret=", ret)
panic(0)
}

registeredmem := int32(memoryTotal / 1024 / 1024)
if plugin.schedulerConfig.DeviceMemoryScaling != 1 {
registeredmem = int32(float64(registeredmem) * plugin.schedulerConfig.DeviceMemoryScaling)
}
klog.Infoln("MemoryScaling=", plugin.schedulerConfig.DeviceMemoryScaling, "registeredmem=", registeredmem)
health := true
for _, val := range devs {
if strings.Compare(val.ID, UUID) == 0 {
// when NVIDIA-Tesla P4, the device info is : ID:GPU-e290caca-2f0c-9582-acab-67a142b61ffa,Health:Healthy,Topology:nil,
// it is more reasonable to think of healthy as case-insensitive
if strings.EqualFold(val.Health, "healthy") {
health = true
} else {
health = false
}
break
}
}
numa, err := plugin.getNumaInformation(idx)
if err != nil {
klog.ErrorS(err, "failed to get numa information", "idx", idx)
}
res = append(res, &util.DeviceInfo{
ID: UUID,
Index: uint(idx),
Count: int32(plugin.schedulerConfig.DeviceSplitCount),
Devmem: registeredmem,
Devcore: int32(plugin.schedulerConfig.DeviceCoreScaling * 100),
Type: fmt.Sprintf("%v-%v", "NVIDIA", Model),
Numa: numa,
Mode: plugin.operatingMode,
Health: health,
})
klog.Infof("nvml registered device id=%v, memory=%v, type=%v, numa=%v", idx, registeredmem, Model, numa)
}
return &res
}

这里通过nvidia驱动获取设备信息,需要注意的是这里有配置DeviceMemoryScaling, 内存超分配置,这里是通过命令行启动的--config-file 参数指定的schduler配置和代码中固化的config/config.json 来取值的,其中config/config.json 优先级大于--config-file

到这里,调度所需的所有东西就准备好了,pod可以顺利被分配到合适的节点上

4. 参考