E2B 的核心调度逻辑主要集中在 packages/api/internal/orchestrator/placement (放置策略) 和 packages/api/internal/orchestrator/nodemanager (节点管理) 这两个包中。

它们共同协作,负责在高并发环境下,将用户的 Sandbox(沙箱)请求高效、均衡地分配到合适的物理/虚拟节点上。

以下是这两个模块的详细设计与实现分析,附带关键源代码。


1. Placement (放置策略)

E2B 的放置策略采用了 “Best of K” (Power of Two Choices) 算法的变体。这是一种经典的负载均衡策略,相比于遍历所有节点寻找全局最优(耗时且容易造成竞争)或完全随机(可能导致负载不均),它在性能和均衡性之间取得了很好的平衡。

核心设计

  1. 接口抽象 (Algorithm):定义了通用的选择节点接口,允许未来替换不同的策略。

  2. 评分机制 (Score):基于 CPU 预留、实际使用率和超卖比率 (Over-commit Ratio) 计算节点得分。

  3. 采样 (sample):不遍历所有节点,而是随机采样 K 个节点进行评估。

  4. 重试与回退 (PlaceSandbox):处理节点资源耗尽、网络超时等情况,支持重试机制。

关键实现代码

1. 算法接口 (placement.go)

// Algorithm 定义了 Sandbox 放置策略的接口
type Algorithm interface {
	chooseNode(
        ctx context.Context, 
        nodes []*nodemanager.Node, 
        nodesExcluded map[string]struct{}, 
        requested nodemanager.SandboxResources, 
        buildMachineInfo machineinfo.MachineInfo,
    ) (*nodemanager.Node, error)
}

2. “Best of K” 评分逻辑 (placement_best_of_K.go)

这是算法的核心。它计算节点的负载分数,分数越低越好。公式考虑了已分配的 CPU (reserved)、当前实际使用率 (usageAvg) 以及配置的超卖系数 (R)。

// Score 计算节点的放置分数
func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig) float64 {
	metrics := node.Metrics()
	reserved := metrics.CpuAllocated

	// 1 CPU 使用量 = 100% CPU percent
	usageAvg := float64(metrics.CpuPercent) / 100

	// 避免除以零
	cpuCount := float64(metrics.CpuCount)
	if cpuCount == 0 {
		return math.MaxFloat64
	}

    // 计算总容量,包含超卖系数 R
	totalCapacity := config.R * cpuCount

	cpuRequested := float64(resources.CPUs)

    // 分数 = (请求量 + 已分配量 + Alpha权重 * 当前实际使用率) / 总容量
	return (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity
}

本策略存在高并发时Metrics有延迟的问题,见 E2B Placement Best of K优化

3. 采样与筛选 (sample)

在 chooseNode 内部,系统首先通过 sample 函数随机选出 K 个候选节点(默认 K=3)。这个过程会过滤掉不健康的节点、CPU 架构不兼容的节点或者正在启动过多实例的节点。

func (b *BestOfK) sample(items []*nodemanager.Node, config BestOfKConfig, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo) []*nodemanager.Node {
	// ... 随机打乱索引 ...

	candidates := make([]*nodemanager.Node, 0, config.K)
    // ... 循环选取 ...
    
    // 过滤逻辑:
    // 1. 排除已标记的节点
    if _, ok := excludedNodes[n.ID]; ok { continue }
    // 2. 节点必须是 Ready 状态
    if n.Status() != api.NodeStatusReady { continue }
    // 3. CPU 架构兼容性检查
    if !isNodeCPUCompatible(n, buildMachineInfo) { continue }
    // 4. 资源是否足够 (CanFit)
    if config.CanFit && !b.CanFit(n, resources, config) { continue }
    // 5. 防止节点瞬时过载:如果正在启动的实例太多,则跳过
    if config.TooManyStarting && n.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode { continue }

    // ... 加入候选列表 ...
	return candidates
}

4. 执行放置 (PlaceSandbox in placement.go)

这是 Orchestrator 调用的主入口。它包含了一个重试循环,如果选中的节点创建失败(例如资源瞬间耗尽),它会将该节点加入黑名单 (nodesExcluded) 并重试。

func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*nodemanager.Node, preferredNode *nodemanager.Node, sbxRequest *orchestrator.SandboxCreateRequest, buildMachineInfo machineinfo.MachineInfo) (*nodemanager.Node, error) {
	// ... 初始化 ...
	attempt := 0
	for attempt < maxRetries {
        // 如果有首选节点(例如 Resume 操作),优先尝试
		if node != nil {
			telemetry.ReportEvent(ctx, "Placing sandbox on the preferred node", telemetry.WithNodeID(node.ID))
		} else {
            // 否则使用算法选择最佳节点
			node, err = algorithm.chooseNode(ctx, clusterNodes, nodesExcluded, nodemanager.SandboxResources{...}, buildMachineInfo)
			if err != nil { return nil, err }
		}

        // 标记开始放置,增加 InProgress 计数,防止其他协程同时选中该节点导致过载
		node.PlacementMetrics.StartPlacing(sbxRequest.GetSandbox().GetSandboxId(), ...)

        // 发起 gRPC 请求创建 Sandbox
		err = node.SandboxCreate(ctx, sbxRequest)
		
		if err == nil {
			node.PlacementMetrics.Success(sbxRequest.GetSandbox().GetSandboxId())
			return node, nil
		}

        // 处理失败情况
        // ... 如果是资源耗尽 (ResourceExhausted),记录并重试 ...
        nodesExcluded[failedNode.ID] = struct{}{}
        failedNode.PlacementMetrics.Fail(sbxRequest.GetSandbox().GetSandboxId())
        attempt++
	}
	return nil, errSandboxCreateFailed
}

2. Node Manager (节点管理)

nodemanager 包负责维护系统中每个节点的抽象 (Node struct)。它充当了 API server 和底层运行 Sandbox 的实际机器(通过 gRPC 通信)之间的代理。

核心设计

  1. 节点抽象 (Node):封装了 gRPC 客户端、节点状态、元数据和缓存。

  2. 状态映射:将底层的 Nomad/Orchestrator 状态映射为 API 层面的状态 (api.NodeStatus)。

  3. 本地指标追踪 (PlacementMetrics):为了让放置算法做出快速决策,Node 结构体在内存中维护了当前正在创建的 Sandbox 数量 (sandboxesInProgress),这比等待远程节点上报 Metrics 更快。

关键实现代码

1. Node 结构体 (node.go)

type Node struct {
	ID            string
	ClusterID     uuid.UUID
	IPAddress     string
	// ...

	client *clusters.GRPCClient // gRPC 客户端,用于与节点上的 Agent 通信
	status api.NodeStatus       // 节点健康状态

	metrics   Metrics           // 节点上报的指标(CPU, 内存等)
	
    // 放置指标:用于放置算法的快速决策
	PlacementMetrics PlacementMetrics 

	buildCache *ttlcache.Cache[string, any] // 构建缓存,避免重复下载构建
}

2. 初始化节点 (New / NewClusterNode)

E2B 支持本地节点(开发环境)和集群节点。初始化时会建立连接并获取初始的机器信息。

func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID uuid.UUID, sandboxDomain *string, i *clusters.Instance) (*Node, error) {
    // ... 状态映射 ...
    // ... 初始化元数据 ...

	n := &Node{
		ID:               i.NodeID,
		ClusterID:        clusterID,
        // ...
		PlacementMetrics: PlacementMetrics{
			sandboxesInProgress: smap.New[SandboxResources](), // 线程安全的 Map
			createSuccess:       atomic.Uint64{},
			createFails:         atomic.Uint64{},
		},
		client: client,
        // ...
	}

    // 获取节点的详细信息(CPU 架构等)
	nodeClient, ctx := n.GetClient(ctx)
	nodeInfo, err := nodeClient.Info.ServiceInfo(ctx, &emptypb.Empty{})
    // ... 更新 metrics 和 machineInfo ...

	return n, nil
}

3. 获取节点上的 Sandboxes (sandboxes.go)

Node 还提供了方法来列出其上运行的所有 Sandbox,并转换配置格式以供 API 使用。

func (n *Node) GetSandboxes(ctx context.Context) ([]sandbox.Sandbox, error) {
	client, childCtx := n.GetClient(childCtx)
    // 调用 gRPC 接口 List
	res, err := client.Sandbox.List(childCtx, &empty.Empty{})
    
    // ... 解析响应并转换为内部 sandbox.Sandbox 结构 ...
	return sandboxesInfo, nil
}

3. Orchestrator 的集成

在 orchestrator.go 中,这两个模块被组合在一起。Orchestrator 持有一个节点列表 (nodes *smap.Map[*nodemanager.Node]) 和一个放置算法实例 (placementAlgorithm)。

当创建 Sandbox 时(create_instance.go):

  1. 获取节点池clusterNodes := o.GetClusterNodes(nodeClusterID)

  2. 处理亲和性 (Affinity):如果是 Resume 操作且指定了 nodeID,会尝试优先使用该节点。

  3. 调用放置:调用 placement.PlaceSandbox,传入算法、节点池和请求。

  4. 后期处理:成功后更新 Metrics,将 Sandbox 添加到全局存储 (sandboxStore)。

这种设计使得 E2B 能够水平扩展,通过添加更多节点到集群中,并通过调整 BestOfKConfig(如 K 值或 Alpha权重)来动态优化调度性能。