HAMI项目GPU POD调度流程源码走读
- 1. 概述
- 2. 调度流程
- 3. Pod调度流程
- 4. 参考
Author:
@elrond.wang
1. 概述
使用hami的过程中经常会出现Pod被创建出来Pending的问题,犹以如下两个问题为著
- Pod UnexpectedAdmissionError
- Pod Pending
介于此,展开这部分代码的粗略走读,旨在说明调度过程中各组件的交互,以及资源的计算方式,其他细节会有所遗漏
2. 调度流程
看代码之前可以先看下官方文档说明,大体上比较明确
细节上可以分为三个 阶段
- 准备阶段: 图上可以看出有一些依赖条件,例如要有mutating webhook, device-plugin等等,所以这个阶段主要分析下依赖条件的准备, 只有在服务首次启动时需要
- pod调度阶段: 准备过程完成之后pod进入处理流程,完成调度
- Pod启动阶段: pod如何与node上的gpu进行交互等
本文会着重分析准备阶段,主要内容为调度分析
3. Pod调度流程
- 用户发送创建Pod请求到kube-apiserver
- 触发adminssion webhook,更新pod中schedulerName
- kube-apiserver根据schedulerName将请求发送给调度器处理
- 调度器处理
- 收集Node device信息 -- 通过node annotation收集,数据来自daemonSet
hami-device-plugin
定时写入 - 根据设备信息以及pod的limit信息进行打分,选出最高分的node
- 将pod和node进行绑定完成绑定,进行pod创建
- 收集Node device信息 -- 通过node annotation收集,数据来自daemonSet
常见的几个问题排查及处理
Pod UnexpectedAdmissionError
pod创建状态显示 UnexpectedAdmissionError
了解流程之后,可以知道这个错误代表kube-apiserver调用拓展调度器失败,可能有两个原因,其他情况具体排查需要看 kube-apiserver日志
- 通信异常: 从kube-apiserver到拓展调度器的https端口不通,有几种可能
- dns无法解析
- 跨节点通信有问题
- 拓展调度器的服务异常
- TLS验证错误: 一般会显示
webhook x509: certificate signed by unknown authority
,helmchart部署时有一个jobs.batch
hami-vgpu.admission-pathch
, 如果没有运行完成会出现这样的问题
调度问题
容器一直在 pending 状态,使用 kubectl describe命令可以看到具体原因,主要有以下几个
card Insufficient remaining memory
calcScore:node not fit pod
主要原因一般会是确实资源不足,或者配置错误,配置错误是指 devicememoryscaling 配置未符合预期,有两个地方可以配置, 优先级为节点配置大于全局配置,容易发生问题的地方在于 name需要和kubectl get node显示的nodename一致才能生效
- 全局配置
k get cm hami-scheduler-device
deviceMemoryScaling: 3
- 节点配置
k get cm hami-device-plugin
{
"nodeconfig": [
{
"name": "node1",
"devicememoryscaling": 3,
"devicesplitcount": 10,
"migstrategy":"none",
"filterdevices": {
"uuid": [],
"index": []
}
}
]
}
3.1. MutatingWebhook
K8s提供了adminssionWebhook资源, 以k8s资源操作为触发器,触发hook,用途最广泛的为针对Pod创建做拦截,对Pod做yaml注入,具体的例如增加init容器注入文件等等
3.1.1. webhook配置
hami-webhook
k get mutatingwebhookconfigurations.admissionregistration.k8s.io hami-webhook -o yaml
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
annotations:
meta.helm.sh/release-name: hami
meta.helm.sh/release-namespace: kube-system
creationTimestamp: "2024-12-10T03:50:37Z"
generation: 5
labels:
app.kubernetes.io/managed-by: Helm
name: hami-webhook
resourceVersion: "2307810"
uid: 2cdcebe4-f561-429f-9480-701e65980687
webhooks:
- admissionReviewVersions:
- v1beta1
clientConfig:
caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkakNDQVJ5Z0F3SUJBZ0lSQUxjd2FQMjUrMlphdGhTTlFMcG1qT0V3Q2dZSUtvWkl6ajBFQXdJd0R6RU4KTUFzR0ExVUVDaE1FYm1sc01UQWdGdzB5TkRFeU1EWXdOekV4TVRWYUdBOHlNVEkwTVRFeE1qQTNNVEV4TlZvdwpEekVOTUFzR0ExVUVDaE1FYm1sc01UQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJDUnlXUDdYCkRmT2N4NEVTMVRYaUs0dnFFU2wrcUFHYjI2YzNrOEdMWlZTL1lHaFpLZVVxaEgydVRhTFdWTW1hZVJFbkxqM0cKSStMVFRVTTR6SVhEUld5alZ6QlZNQTRHQTFVZER3RUIvd1FFQXdJQ0JEQVRCZ05WSFNVRUREQUtCZ2dyQmdFRgpCUWNEQVRBUEJnTlZIUk1CQWY4RUJUQURBUUgvTUIwR0ExVWREZ1FXQkJTcVV4bWpGa29YUlpRK0xXVzBNM1pJCnMzck1wakFLQmdncWhrak9QUVFEQWdOSUFEQkZBaUJSY2VRL2tJVkR2VTV3Vjl0K3NRWm93TmFhTWhIMTV5K2sKT3VrR0FlRGVtQUloQUxDZzFrM0JQZUJBNG8reWY5emxvVjM2VEk2RHUzaGdMT1B3MXhaZkFvcDMKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
service:
name: hami-scheduler
namespace: kube-system
path: /webhook
port: 443
failurePolicy: Ignore
matchPolicy: Equivalent
name: vgpu.hami.io
namespaceSelector:
matchExpressions:
- key: hami.io/webhook
operator: NotIn
values:
- ignore
objectSelector:
matchExpressions:
- key: hami.io/webhook
operator: NotIn
values:
- ignore
reinvocationPolicy: Never
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- CREATE
resources:
- pods
scope: '*'
sideEffects: None
timeoutSeconds: 10
当pod创建时,调用 https://hami-scheduler.kube-system:443/webhook
做tls校验,CA为 caBundle
配置
当命名空间有 hami.io/webhook: ignore
的标签时不触发
3.1.2. webhook Server实现
需要实现一个tls的Http Server, 且提供 /webhook
接口
cmd/scheduler/main.go:84
func start() {
...
router.POST("/webhook", routes.WebHookRoute())
WebHookRoute
需要实现 sigs.k8s.io/controller-runtime@v0.16.3/pkg/webhook/admission/webhook.go:98
pkg/scheduler/webhook.go:52
pod := &corev1.Pod{}
err := h.decoder.Decode(req, pod)
if err != nil {
klog.Errorf("Failed to decode request: %v", err)
return admission.Errored(http.StatusBadRequest, err)
}
if len(pod.Spec.Containers) == 0 {
klog.Warningf(template+" - Denying admission as pod has no containers", req.Namespace, req.Name, req.UID)
return admission.Denied("pod has no containers")
}
klog.Infof(template, req.Namespace, req.Name, req.UID)
hasResource := false
for idx, ctr := range pod.Spec.Containers {
c := &pod.Spec.Containers[idx]
if ctr.SecurityContext != nil {
if ctr.SecurityContext.Privileged != nil && *ctr.SecurityContext.Privileged {
klog.Warningf(template+" - Denying admission as container %s is privileged", req.Namespace, req.Name, req.UID, c.Name)
continue
}
}
for _, val := range device.GetDevices() {
found, err := val.MutateAdmission(c, pod)
if err != nil {
klog.Errorf("validating pod failed:%s", err.Error())
return admission.Errored(http.StatusInternalServerError, err)
}
hasResource = hasResource || found
}
}
if !hasResource {
klog.Infof(template+" - Allowing admission for pod: no resource found", req.Namespace, req.Name, req.UID)
//return admission.Allowed("no resource found")
} else if len(config.SchedulerName) > 0 {
pod.Spec.SchedulerName = config.SchedulerName
if pod.Spec.NodeName != "" {
klog.Infof(template+" - Pod already has node assigned", req.Namespace, req.Name, req.UID)
return admission.Denied("pod has node assigned")
}
}
marshaledPod, err := json.Marshal(pod)
if err != nil {
klog.Errorf(template+" - Failed to marshal pod, error: %v", req.Namespace, req.Name, req.UID, err)
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.Object.Raw, marshaledPod)
主要通过pod中容器的resource来判断是否要不要走拓展调度器
pkg/device/nvidia/device.go:246
func (dev *NvidiaGPUDevices) MutateAdmission(ctr *corev1.Container, p *corev1.Pod) (bool, error) {
/*gpu related */
priority, ok := ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourcePriority)]
if ok {
ctr.Env = append(ctr.Env, corev1.EnvVar{
Name: util.TaskPriority,
Value: fmt.Sprint(priority.Value()),
})
}
_, resourceNameOK := ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourceCountName)]
if resourceNameOK {
return resourceNameOK, nil
}
_, resourceCoresOK := ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourceCoreName)]
_, resourceMemOK := ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourceMemoryName)]
_, resourceMemPercentageOK := ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourceMemoryPercentageName)]
if resourceCoresOK || resourceMemOK || resourceMemPercentageOK {
if dev.config.DefaultGPUNum > 0 {
ctr.Resources.Limits[corev1.ResourceName(dev.config.ResourceCountName)] = *resource.NewQuantity(int64(dev.config.DefaultGPUNum), resource.BinarySI)
resourceNameOK = true
}
}
if !resourceNameOK && dev.config.OverwriteEnv {
ctr.Env = append(ctr.Env, corev1.EnvVar{
Name: "NVIDIA_VISIBLE_DEVICES",
Value: "none",
})
}
return resourceNameOK, nil
}
主要比对pod的 Resources Limit中有没有包含 device-config.yaml
的配置,如果有走hami调度流程
deivce-config
以英伟达显卡为例
nvidia:
resourceCountName: nvidia.com/gpu
resourceMemoryName: nvidia.com/gpumem
resourceMemoryPercentageName: nvidia.com/gpumem-percentage
resourceCoreName: nvidia.com/gpucores
resourcePriorityName: nvidia.com/priority
overwriteEnv: false
defaultMemory: 0
defaultCores: 0
defaultGPUNum: 1
deviceSplitCount: 10
deviceMemoryScaling: 3
deviceCoreScaling: 3
确定走hami调度流程之后,通过Patch修改Pod schedulerName
为hami调度器的名称
3.2. 拓展k8s scheduler
KubeSchedulerConfiguration 拓展调度器可以通过实现拓展点进行调度器的拓展
3.2.1. KubeSchedulerConfiguration
k get cm hami-scheduler-newversion -o yaml
apiVersion: v1
data:
config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: false
profiles:
- schedulerName: hami-scheduler
extenders:
- urlPrefix: "https://127.0.0.1:443"
filterVerb: filter
bindVerb: bind
nodeCacheCapable: true
weight: 1
httpTimeout: 30s
enableHTTPS: true
tlsConfig:
insecure: true
managedResources:
- name: nvidia.com/gpu
ignoredByScheduler: true
- name: nvidia.com/gpumem
ignoredByScheduler: true
- name: nvidia.com/gpucores
ignoredByScheduler: true
- name: nvidia.com/gpumem-percentage
ignoredByScheduler: true
- name: nvidia.com/priority
ignoredByScheduler: true
- name: cambricon.com/vmlu
ignoredByScheduler: true
- name: hygon.com/dcunum
ignoredByScheduler: true
- name: hygon.com/dcumem
ignoredByScheduler: true
- name: hygon.com/dcucores
ignoredByScheduler: true
- name: iluvatar.ai/vgpu
ignoredByScheduler: true
kind: ConfigMap
metadata:
annotations:
meta.helm.sh/release-name: hami
meta.helm.sh/release-namespace: kube-system
creationTimestamp: "2024-12-10T03:50:36Z"
labels:
app.kubernetes.io/component: hami-scheduler
app.kubernetes.io/instance: hami
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: hami
app.kubernetes.io/version: 2.4.1
helm.sh/chart: hami-2.4.1
name: hami-scheduler-newversion
namespace: kube-system
resourceVersion: "2316275"
uid: 3a61a72c-0bab-432f-b4d7-5c1ae46ee14d
拓展调度器通过拓展点进行拓展, 这里拓展了filter和bind
- filter: 找到最合适的node
- bind: 为pod创建一个binding资源
调度时会根据拓展点顺序来调用拓展调度器的实现,这里会先调用
https://127.0.0.1:443/filter
再调用 https://127.0.0.1:443/filter
3.2.2. 拓展调度器Http Server启动
cmd/scheduler/main.go:70
func start() {
device.InitDevices()
sher = scheduler.NewScheduler()
sher.Start()
defer sher.Stop()
// start monitor metrics
go sher.RegisterFromNodeAnnotations()
go initMetrics(config.MetricsBindAddress)
// start http server
router := httprouter.New()
router.POST("/filter", routes.PredicateRoute(sher))
router.POST("/bind", routes.Bind(sher))
3.2.3. filter实现
pkg/scheduler/routes/route.go:41
func PredicateRoute(s *scheduler.Scheduler) httprouter.Handle {
klog.Infoln("Into Predicate Route outer func")
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
klog.Infoln("Into Predicate Route inner func")
checkBody(w, r)
var buf bytes.Buffer
body := io.TeeReader(r.Body, &buf)
var extenderArgs extenderv1.ExtenderArgs
var extenderFilterResult *extenderv1.ExtenderFilterResult
if err := json.NewDecoder(body).Decode(&extenderArgs); err != nil {
klog.Errorln("decode error", err.Error())
extenderFilterResult = &extenderv1.ExtenderFilterResult{
Error: err.Error(),
}
} else {
extenderFilterResult, err = s.Filter(extenderArgs)
if err != nil {
klog.Errorf("pod %v filter error, %v", extenderArgs.Pod.Name, err)
extenderFilterResult = &extenderv1.ExtenderFilterResult{
Error: err.Error(),
}
}
}
if resultBody, err := json.Marshal(extenderFilterResult); err != nil {
klog.Errorf("Failed to marshal extenderFilterResult: %+v, %+v",
err, extenderFilterResult)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
} else {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resultBody)
}
}
}
pkg/scheduler/scheduler.go:430
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace)
nums := k8sutil.Resourcereqs(args.Pod)
total := 0
for _, n := range nums {
for _, k := range n {
total += int(k.Nums)
}
}
if total == 0 {
klog.V(1).Infof("pod %v not find resource", args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
return &extenderv1.ExtenderFilterResult{
NodeNames: args.NodeNames,
FailedNodes: nil,
Error: "",
}, nil
}
annos := args.Pod.Annotations
s.delPod(args.Pod)
nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
if err != nil {
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}
if len(failedNodes) != 0 {
klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes)
}
nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
if err != nil {
err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}
if len((*nodeScores).NodeList) == 0 {
klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
return &extenderv1.ExtenderFilterResult{
FailedNodes: failedNodes,
}, nil
}
klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
sort.Sort(nodeScores)
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
annotations := make(map[string]string)
annotations[util.AssignedNodeAnnotations] = m.NodeID
annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)
for _, val := range device.GetDevices() {
val.PatchAnnotations(&annotations, m.Devices)
}
//InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
//supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
//maps.Copy(annotations, InRequestDevices)
//maps.Copy(annotations, supportDevices)
s.addPod(args.Pod, m.NodeID, m.Devices)
err = util.PatchPodAnnotations(args.Pod, annotations)
if err != nil {
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
s.delPod(args.Pod)
return nil, err
}
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
return &res, nil
}
这里核心逻辑主要有两步, 获取节点资源、根据节点已分配资源与总资源计算分数并选出一个最高分
3.2.3.1. 获取节点资源信息
pkg/scheduler/scheduler.go:241
func (s *Scheduler) getNodesUsage(nodes *[]string, task *corev1.Pod) (*map[string]*NodeUsage, map[string]string, error) {
overallnodeMap := make(map[string]*NodeUsage)
cachenodeMap := make(map[string]*NodeUsage)
failedNodes := make(map[string]string)
//for _, nodeID := range *nodes {
allNodes, err := s.ListNodes()
if err != nil {
return &overallnodeMap, failedNodes, err
}
for _, node := range allNodes {
nodeInfo := &NodeUsage{}
userGPUPolicy := config.GPUSchedulerPolicy
if task != nil && task.Annotations != nil {
if value, ok := task.Annotations[policy.GPUSchedulerPolicyAnnotationKey]; ok {
userGPUPolicy = value
}
}
nodeInfo.Node = node.Node
nodeInfo.Devices = policy.DeviceUsageList{
Policy: userGPUPolicy,
DeviceLists: make([]*policy.DeviceListsScore, 0),
}
for _, d := range node.Devices {
nodeInfo.Devices.DeviceLists = append(nodeInfo.Devices.DeviceLists, &policy.DeviceListsScore{
Score: 0,
Device: &util.DeviceUsage{
ID: d.ID,
Index: d.Index,
Used: 0,
Count: d.Count,
Usedmem: 0,
Totalmem: d.Devmem,
Totalcore: d.Devcore,
Usedcores: 0,
MigUsage: util.MigInUse{
Index: 0,
UsageList: make(util.MIGS, 0),
},
MigTemplate: d.MIGTemplate,
Mode: d.Mode,
Type: d.Type,
Numa: d.Numa,
Health: d.Health,
},
})
}
overallnodeMap[node.ID] = nodeInfo
}
podsInfo := s.ListPodsInfo()
for _, p := range podsInfo {
node, ok := overallnodeMap[p.NodeID]
if !ok {
continue
}
for _, podsingleds := range p.Devices {
for _, ctrdevs := range podsingleds {
for _, udevice := range ctrdevs {
for _, d := range node.Devices.DeviceLists {
deviceID := udevice.UUID
if strings.Contains(deviceID, "[") {
deviceID = strings.Split(deviceID, "[")[0]
}
if d.Device.ID == deviceID {
d.Device.Used++
d.Device.Usedmem += udevice.Usedmem
d.Device.Usedcores += udevice.Usedcores
if strings.Contains(udevice.UUID, "[") {
tmpIdx, Instance := util.ExtractMigTemplatesFromUUID(udevice.UUID)
if len(d.Device.MigUsage.UsageList) == 0 {
util.PlatternMIG(&d.Device.MigUsage, d.Device.MigTemplate, tmpIdx)
}
d.Device.MigUsage.UsageList[Instance].InUse = true
klog.V(3).Infoln("add mig usage", d.Device.MigUsage, "template=", d.Device.MigTemplate, "uuid=", d.Device.ID)
}
}
}
}
}
}
klog.V(5).Infof("usage: pod %v assigned %v %v", p.Name, p.NodeID, p.Devices)
}
s.overviewstatus = overallnodeMap
for _, nodeID := range *nodes {
node, err := s.GetNode(nodeID)
if err != nil {
// The identified node does not have a gpu device, so the log here has no practical meaning,increase log priority.
klog.V(5).InfoS("node unregistered", "node", nodeID, "error", err)
failedNodes[nodeID] = "node unregistered"
continue
}
cachenodeMap[node.ID] = overallnodeMap[node.ID]
}
s.cachedstatus = cachenodeMap
return &cachenodeMap, failedNodes, nil
}
获取node总的资源与已分配的资源, 首先获取node信息
pkg/scheduler/nodes.go:120
func (m *nodeManager) ListNodes() (map[string]*util.NodeInfo, error) {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.nodes, nil
}
这里用到了缓存,缓存节点信息,由 addNode
添加缓存
3.2.3.1.1. Node缓存
pkg/scheduler/nodes.go:46
func (m *nodeManager) addNode(nodeID string, nodeInfo *util.NodeInfo) {
if nodeInfo == nil || len(nodeInfo.Devices) == 0 {
return
}
m.mutex.Lock()
defer m.mutex.Unlock()
_, ok := m.nodes[nodeID]
if ok {
if len(nodeInfo.Devices) > 0 {
tmp := make([]util.DeviceInfo, 0, len(nodeInfo.Devices))
devices := device.GetDevices()
deviceType := ""
for _, val := range devices {
if strings.Contains(nodeInfo.Devices[0].Type, val.CommonWord()) {
deviceType = val.CommonWord()
}
}
for _, val := range m.nodes[nodeID].Devices {
if !strings.Contains(val.Type, deviceType) {
tmp = append(tmp, val)
}
}
m.nodes[nodeID].Devices = tmp
m.nodes[nodeID].Devices = append(m.nodes[nodeID].Devices, nodeInfo.Devices...)
}
} else {
m.nodes[nodeID] = nodeInfo
}
}
这里的主要逻辑在于 device.GetDevices()
获取设备信息
pkg/device/devices.go:81
func GetDevices() map[string]Devices {
return devices
}
device也是个缓存,后面再分析,首先看Node缓存是什么时候被调用的
pkg/scheduler/scheduler.go:155
func (s *Scheduler) RegisterFromNodeAnnotations() {
klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations")
ticker := time.NewTicker(time.Second * 15)
for {
select {
case <-s.nodeNotify:
case <-ticker.C:
case <-s.stopCh:
return
}
labelSelector := labels.Everything()
if len(config.NodeLabelSelector) > 0 {
labelSelector = (labels.Set)(config.NodeLabelSelector).AsSelector()
}
rawNodes, err := s.nodeLister.List(labelSelector)
if err != nil {
klog.Errorln("nodes list failed", err.Error())
continue
}
var nodeNames []string
for _, val := range rawNodes {
nodeNames = append(nodeNames, val.Name)
for devhandsk, devInstance := range device.GetDevices() {
health, needUpdate := devInstance.CheckHealth(devhandsk, val)
klog.V(5).InfoS("device check health", "node", val.Name, "deviceVendor", devhandsk, "health", health, "needUpdate", needUpdate)
if !health {
err := devInstance.NodeCleanUp(val.Name)
// If the device is not healthy, the device is removed from the node.
// At the same time, this node needs to be removed from the cache.
if err != nil {
klog.Errorln("node cleanup failed", err.Error())
}
info, ok := s.nodes[val.Name]
if ok {
klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", val.Name, devhandsk, info.ID, err, s.nodes[val.Name].Devices)
s.rmNodeDevice(val.Name, info, devhandsk)
continue
}
}
if !needUpdate {
continue
}
_, ok := util.HandshakeAnnos[devhandsk]
if ok {
tmppat := make(map[string]string)
tmppat[util.HandshakeAnnos[devhandsk]] = "Requesting_" + time.Now().Format("2006.01.02 15:04:05")
klog.V(4).InfoS("New timestamp", util.HandshakeAnnos[devhandsk], tmppat[util.HandshakeAnnos[devhandsk]], "nodeName", val.Name)
n, err := util.GetNode(val.Name)
if err != nil {
klog.Errorln("get node failed", err.Error())
continue
}
util.PatchNodeAnnotations(n, tmppat)
}
nodeInfo := &util.NodeInfo{}
nodeInfo.ID = val.Name
nodeInfo.Node = val
nodedevices, err := devInstance.GetNodeDevices(*val)
if err != nil {
continue
}
nodeInfo.Devices = make([]util.DeviceInfo, 0)
for _, deviceinfo := range nodedevices {
nodeInfo.Devices = append(nodeInfo.Devices, *deviceinfo)
}
s.addNode(val.Name, nodeInfo)
if s.nodes[val.Name] != nil && len(nodeInfo.Devices) > 0 {
klog.Infof("node %v device %s come node info=%s,%v total=%v", val.Name, devhandsk, nodeInfo.ID, nodeInfo.Devices, s.nodes[val.Name].Devices)
}
}
}
_, _, err = s.getNodesUsage(&nodeNames, nil)
if err != nil {
klog.Errorln("get node usage failed", err.Error())
}
}
}
启动了一个15s的定时任务,获取node信息维护node缓存
这里的核心逻辑在于 for devhandsk, devInstance := range device.GetDevices()
获取所有的device,主要是一些根据不同的设备注册了不同的handler, 根据注册的device获取显卡的资源信息 devInstance.GetNodeDevices
这里会通过注册的device(此环境为nvidia),调用到不同显卡的GetNodeDevices
实现, device后面再做具体说明
pkg/device/nvidia/device.go:209
ffunc (dev *NvidiaGPUDevices) GetNodeDevices(n corev1.Node) ([]*util.DeviceInfo, error) {
devEncoded, ok := n.Annotations[RegisterAnnos]
if !ok {
return []*util.DeviceInfo{}, errors.New("annos not found " + RegisterAnnos)
}
nodedevices, err := util.DecodeNodeDevices(devEncoded)
if err != nil {
klog.ErrorS(err, "failed to decode node devices", "node", n.Name, "device annotation", devEncoded)
return []*util.DeviceInfo{}, err
}
if len(nodedevices) == 0 {
klog.InfoS("no nvidia gpu device found", "node", n.Name, "device annotation", devEncoded)
return []*util.DeviceInfo{}, errors.New("no gpu found on node")
}
for _, val := range nodedevices {
if val.Mode == "mig" {
val.MIGTemplate = make([]util.Geometry, 0)
for _, migTemplates := range dev.config.MigGeometriesList {
found := false
for _, migDevices := range migTemplates.Models {
if strings.Contains(val.Type, migDevices) {
found = true
break
}
}
if found {
val.MIGTemplate = append(val.MIGTemplate, migTemplates.Geometries...)
break
}
}
}
}
devDecoded := util.EncodeNodeDevices(nodedevices)
klog.V(5).InfoS("nodes device information", "node", n.Name, "nodedevices", devDecoded)
return nodedevices, nil
}
看到这里基本逻辑是 scheduler 通过定时器去读取node的annotation信息并将其维护再node缓存中,以供调度时使用
apiVersion: v1
kind: Node
metadata:
annotations:
...
hami.io/node-nvidia-register: 'GPU-7aebc545-cbd3-18a0-afce-76cae449702a,10,24576,300,NVIDIA-NVIDIA
GeForce RTX 3090,0,true:
又调用到了 device,这个我们待会儿再看,继续看谁调用的 RegisterFromNodeAnnotations
cmd/scheduler/main.go:70
func start() {
device.InitDevices()
sher = scheduler.NewScheduler()
sher.Start()
defer sher.Stop()
// start monitor metrics
go sher.RegisterFromNodeAnnotations()
go initMetrics(config.MetricsBindAddress)
调度器启动的时候就会调用,这里逻辑明确了,继续看刚刚的device
3.2.3.1.2. device
device通过 pkg/device/devices.go:85
进行初始化
func InitDevicesWithConfig(config *Config) {
devices = make(map[string]Devices)
DevicesToHandle = []string{}
devices[nvidia.NvidiaGPUDevice] = nvidia.InitNvidiaDevice(config.NvidiaConfig)
devices[cambricon.CambriconMLUDevice] = cambricon.InitMLUDevice(config.CambriconConfig)
devices[hygon.HygonDCUDevice] = hygon.InitDCUDevice(config.HygonConfig)
devices[iluvatar.IluvatarGPUDevice] = iluvatar.InitIluvatarDevice(config.IluvatarConfig)
devices[mthreads.MthreadsGPUDevice] = mthreads.InitMthreadsDevice(config.MthreadsConfig)
devices[metax.MetaxGPUDevice] = metax.InitMetaxDevice(config.MetaxConfig)
DevicesToHandle = append(DevicesToHandle, nvidia.NvidiaGPUCommonWord)
DevicesToHandle = append(DevicesToHandle, cambricon.CambriconMLUCommonWord)
DevicesToHandle = append(DevicesToHandle, hygon.HygonDCUCommonWord)
DevicesToHandle = append(DevicesToHandle, iluvatar.IluvatarGPUCommonWord)
DevicesToHandle = append(DevicesToHandle, mthreads.MthreadsGPUCommonWord)
DevicesToHandle = append(DevicesToHandle, metax.MetaxGPUCommonWord)
for _, dev := range ascend.InitDevices(config.VNPUs) {
devices[dev.CommonWord()] = dev
DevicesToHandle = append(DevicesToHandle, dev.CommonWord())
}
}
这里用的是nvidia所以主要看 InitNvidiaDevice
即可
pkg/device/devices.go:42
type Devices interface {
CommonWord() string
MutateAdmission(ctr *corev1.Container, pod *corev1.Pod) (bool, error)
CheckHealth(devType string, n *corev1.Node) (bool, bool)
NodeCleanUp(nn string) error
GetNodeDevices(n corev1.Node) ([]*util.DeviceInfo, error)
CheckType(annos map[string]string, d util.DeviceUsage, n util.ContainerDeviceRequest) (bool, bool, bool)
// CheckUUID is check current device id whether in GPUUseUUID or GPUNoUseUUID set, return true is check success.
CheckUUID(annos map[string]string, d util.DeviceUsage) bool
LockNode(n *corev1.Node, p *corev1.Pod) error
ReleaseNodeLock(n *corev1.Node, p *corev1.Pod) error
GenerateResourceRequests(ctr *corev1.Container) util.ContainerDeviceRequest
PatchAnnotations(annoinput *map[string]string, pd util.PodDevices) map[string]string
CustomFilterRule(allocated *util.PodDevices, request util.ContainerDeviceRequest, toAllicate util.ContainerDevices, device *util.DeviceUsage) bool
ScoreNode(node *corev1.Node, podDevices util.PodSingleDevice, policy string) float32
AddResourceUsage(n *util.DeviceUsage, ctr *util.ContainerDevice) error
// This should not be associated with a specific device object
//ParseConfig(fs *flag.FlagSet)
}
这里定义了一些接口,不同的设备进行不同的实现,在scheduler启动时进行初始化,以供运行中调用
获取到各个节点的各个设备的资源情况之后开始进行打分
3.2.3.2. 根据节点资源信息打分
pkg/scheduler/scheduler.go:458
nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
if err != nil {
err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}
pkg/scheduler/score.go:198
func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) {
userNodePolicy := config.NodeSchedulerPolicy
if annos != nil {
if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok {
userNodePolicy = value
}
}
res := policy.NodeScoreList{
Policy: userNodePolicy,
NodeList: make([]*policy.NodeScore, 0),
}
//func calcScore(nodes *map[string]*NodeUsage, errMap *map[string]string, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*NodeScoreList, error) {
// res := make(NodeScoreList, 0, len(*nodes))
for nodeID, node := range *nodes {
viewStatus(*node)
score := policy.NodeScore{NodeID: nodeID, Node: node.Node, Devices: make(util.PodDevices), Score: 0}
score.ComputeDefaultScore(node.Devices)
//This loop is for different container request
ctrfit := false
for ctrid, n := range nums {
sums := 0
for _, k := range n {
sums += int(k.Nums)
}
if sums == 0 {
for idx := range score.Devices {
for len(score.Devices[idx]) <= ctrid {
score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{})
}
score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{})
continue
}
}
klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID)
fit, _ := fitInDevices(node, n, annos, task, &score.Devices)
ctrfit = fit
if !fit {
klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID)
break
}
}
if ctrfit {
res.NodeList = append(res.NodeList, &score)
score.OverrideScore(node.Devices, userNodePolicy)
}
}
return &res, nil
}
这块逻辑主要分为遍历节点打分,遍历pod的容器计算每个容器对应的设备的分数,返回所有可以承载limits所需资源的node返回
3.2.3.3. 计算出节点的分数
pkg/scheduler/policy/node_policy.go:68
func (ns *NodeScore) ComputeDefaultScore(devices DeviceUsageList) {
used, usedCore, usedMem := int32(0), int32(0), int32(0)
for _, device := range devices.DeviceLists {
used += device.Device.Used
usedCore += device.Device.Usedcores
usedMem += device.Device.Usedmem
}
klog.V(2).Infof("node %s used %d, usedCore %d, usedMem %d,", ns.NodeID, used, usedCore, usedMem)
total, totalCore, totalMem := int32(0), int32(0), int32(0)
for _, deviceLists := range devices.DeviceLists {
total += deviceLists.Device.Count
totalCore += deviceLists.Device.Totalcore
totalMem += deviceLists.Device.Totalmem
}
useScore := float32(used) / float32(total)
coreScore := float32(usedCore) / float32(totalCore)
memScore := float32(usedMem) / float32(totalMem)
ns.Score = float32(Weight) * (useScore + coreScore + memScore)
klog.V(2).Infof("node %s computer default score is %f", ns.NodeID, ns.Score)
}
节点打分规则比较简单
3.2.3.4. 计算每个容器对应的设备的分数
pkg/scheduler/score.go:149
func fitInDevices(node *NodeUsage, requests util.ContainerDeviceRequests, annos map[string]string, pod *corev1.Pod, devinput *util.PodDevices) (bool, float32) {
//devmap := make(map[string]util.ContainerDevices)
devs := util.ContainerDevices{}
total, totalCore, totalMem := int32(0), int32(0), int32(0)
free, freeCore, freeMem := int32(0), int32(0), int32(0)
sums := 0
// computer all device score for one node
for index := range node.Devices.DeviceLists {
node.Devices.DeviceLists[index].ComputeScore(requests)
}
//This loop is for requests for different devices
for _, k := range requests {
sums += int(k.Nums)
if int(k.Nums) > len(node.Devices.DeviceLists) {
klog.InfoS("request devices nums cannot exceed the total number of devices on the node.", "pod", klog.KObj(pod), "request devices nums", k.Nums, "node device nums", len(node.Devices.DeviceLists))
return false, 0
}
sort.Sort(node.Devices)
fit, tmpDevs := fitInCertainDevice(node, k, annos, pod, devinput)
if fit {
for idx, val := range tmpDevs[k.Type] {
for nidx, v := range node.Devices.DeviceLists {
//bc node.Devices has been sorted, so we should find out the correct device
if v.Device.ID != val.UUID {
continue
}
total += v.Device.Count
totalCore += v.Device.Totalcore
totalMem += v.Device.Totalmem
free += v.Device.Count - v.Device.Used
freeCore += v.Device.Totalcore - v.Device.Usedcores
freeMem += v.Device.Totalmem - v.Device.Usedmem
err := device.GetDevices()[k.Type].AddResourceUsage(node.Devices.DeviceLists[nidx].Device, &tmpDevs[k.Type][idx])
if err != nil {
klog.Errorf("AddResource failed:%s", err.Error())
return false, 0
}
klog.Infoln("After AddResourceUsage:", node.Devices.DeviceLists[nidx].Device)
}
}
devs = append(devs, tmpDevs[k.Type]...)
} else {
return false, 0
}
(*devinput)[k.Type] = append((*devinput)[k.Type], devs)
}
return true, 0
}