E2B 的核心调度逻辑主要集中在 packages/api/internal/orchestrator/placement (放置策略) 和 packages/api/internal/orchestrator/nodemanager (节点管理) 这两个包中。
它们共同协作,负责在高并发环境下,将用户的 Sandbox(沙箱)请求高效、均衡地分配到合适的物理/虚拟节点上。
以下是这两个模块的详细设计与实现分析,附带关键源代码。
1. Placement (放置策略)
E2B 的放置策略采用了 “Best of K” (Power of Two Choices) 算法的变体。这是一种经典的负载均衡策略,相比于遍历所有节点寻找全局最优(耗时且容易造成竞争)或完全随机(可能导致负载不均),它在性能和均衡性之间取得了很好的平衡。
核心设计
接口抽象 (
Algorithm):定义了通用的选择节点接口,允许未来替换不同的策略。评分机制 (
Score):基于 CPU 预留、实际使用率和超卖比率 (Over-commit Ratio) 计算节点得分。采样 (
sample):不遍历所有节点,而是随机采样 K 个节点进行评估。重试与回退 (
PlaceSandbox):处理节点资源耗尽、网络超时等情况,支持重试机制。
关键实现代码
1. 算法接口 (placement.go)
// Algorithm 定义了 Sandbox 放置策略的接口
type Algorithm interface {
chooseNode(
ctx context.Context,
nodes []*nodemanager.Node,
nodesExcluded map[string]struct{},
requested nodemanager.SandboxResources,
buildMachineInfo machineinfo.MachineInfo,
) (*nodemanager.Node, error)
}
2. “Best of K” 评分逻辑 (placement_best_of_K.go)
这是算法的核心。它计算节点的负载分数,分数越低越好。公式考虑了已分配的 CPU (reserved)、当前实际使用率 (usageAvg) 以及配置的超卖系数 (R)。
// Score 计算节点的放置分数
func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig) float64 {
metrics := node.Metrics()
reserved := metrics.CpuAllocated
// 1 CPU 使用量 = 100% CPU percent
usageAvg := float64(metrics.CpuPercent) / 100
// 避免除以零
cpuCount := float64(metrics.CpuCount)
if cpuCount == 0 {
return math.MaxFloat64
}
// 计算总容量,包含超卖系数 R
totalCapacity := config.R * cpuCount
cpuRequested := float64(resources.CPUs)
// 分数 = (请求量 + 已分配量 + Alpha权重 * 当前实际使用率) / 总容量
return (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity
}
本策略存在高并发时Metrics有延迟的问题,见 E2B Placement Best of K优化
3. 采样与筛选 (sample)
在 chooseNode 内部,系统首先通过 sample 函数随机选出 K 个候选节点(默认 K=3)。这个过程会过滤掉不健康的节点、CPU 架构不兼容的节点或者正在启动过多实例的节点。
func (b *BestOfK) sample(items []*nodemanager.Node, config BestOfKConfig, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo) []*nodemanager.Node {
// ... 随机打乱索引 ...
candidates := make([]*nodemanager.Node, 0, config.K)
// ... 循环选取 ...
// 过滤逻辑:
// 1. 排除已标记的节点
if _, ok := excludedNodes[n.ID]; ok { continue }
// 2. 节点必须是 Ready 状态
if n.Status() != api.NodeStatusReady { continue }
// 3. CPU 架构兼容性检查
if !isNodeCPUCompatible(n, buildMachineInfo) { continue }
// 4. 资源是否足够 (CanFit)
if config.CanFit && !b.CanFit(n, resources, config) { continue }
// 5. 防止节点瞬时过载:如果正在启动的实例太多,则跳过
if config.TooManyStarting && n.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode { continue }
// ... 加入候选列表 ...
return candidates
}
4. 执行放置 (PlaceSandbox in placement.go)
这是 Orchestrator 调用的主入口。它包含了一个重试循环,如果选中的节点创建失败(例如资源瞬间耗尽),它会将该节点加入黑名单 (nodesExcluded) 并重试。
func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*nodemanager.Node, preferredNode *nodemanager.Node, sbxRequest *orchestrator.SandboxCreateRequest, buildMachineInfo machineinfo.MachineInfo) (*nodemanager.Node, error) {
// ... 初始化 ...
attempt := 0
for attempt < maxRetries {
// 如果有首选节点(例如 Resume 操作),优先尝试
if node != nil {
telemetry.ReportEvent(ctx, "Placing sandbox on the preferred node", telemetry.WithNodeID(node.ID))
} else {
// 否则使用算法选择最佳节点
node, err = algorithm.chooseNode(ctx, clusterNodes, nodesExcluded, nodemanager.SandboxResources{...}, buildMachineInfo)
if err != nil { return nil, err }
}
// 标记开始放置,增加 InProgress 计数,防止其他协程同时选中该节点导致过载
node.PlacementMetrics.StartPlacing(sbxRequest.GetSandbox().GetSandboxId(), ...)
// 发起 gRPC 请求创建 Sandbox
err = node.SandboxCreate(ctx, sbxRequest)
if err == nil {
node.PlacementMetrics.Success(sbxRequest.GetSandbox().GetSandboxId())
return node, nil
}
// 处理失败情况
// ... 如果是资源耗尽 (ResourceExhausted),记录并重试 ...
nodesExcluded[failedNode.ID] = struct{}{}
failedNode.PlacementMetrics.Fail(sbxRequest.GetSandbox().GetSandboxId())
attempt++
}
return nil, errSandboxCreateFailed
}
2. Node Manager (节点管理)
nodemanager 包负责维护系统中每个节点的抽象 (Node struct)。它充当了 API server 和底层运行 Sandbox 的实际机器(通过 gRPC 通信)之间的代理。
核心设计
节点抽象 (
Node):封装了 gRPC 客户端、节点状态、元数据和缓存。状态映射:将底层的 Nomad/Orchestrator 状态映射为 API 层面的状态 (
api.NodeStatus)。本地指标追踪 (
PlacementMetrics):为了让放置算法做出快速决策,Node结构体在内存中维护了当前正在创建的 Sandbox 数量 (sandboxesInProgress),这比等待远程节点上报 Metrics 更快。
关键实现代码
1. Node 结构体 (node.go)
type Node struct {
ID string
ClusterID uuid.UUID
IPAddress string
// ...
client *clusters.GRPCClient // gRPC 客户端,用于与节点上的 Agent 通信
status api.NodeStatus // 节点健康状态
metrics Metrics // 节点上报的指标(CPU, 内存等)
// 放置指标:用于放置算法的快速决策
PlacementMetrics PlacementMetrics
buildCache *ttlcache.Cache[string, any] // 构建缓存,避免重复下载构建
}
2. 初始化节点 (New / NewClusterNode)
E2B 支持本地节点(开发环境)和集群节点。初始化时会建立连接并获取初始的机器信息。
func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID uuid.UUID, sandboxDomain *string, i *clusters.Instance) (*Node, error) {
// ... 状态映射 ...
// ... 初始化元数据 ...
n := &Node{
ID: i.NodeID,
ClusterID: clusterID,
// ...
PlacementMetrics: PlacementMetrics{
sandboxesInProgress: smap.New[SandboxResources](), // 线程安全的 Map
createSuccess: atomic.Uint64{},
createFails: atomic.Uint64{},
},
client: client,
// ...
}
// 获取节点的详细信息(CPU 架构等)
nodeClient, ctx := n.GetClient(ctx)
nodeInfo, err := nodeClient.Info.ServiceInfo(ctx, &emptypb.Empty{})
// ... 更新 metrics 和 machineInfo ...
return n, nil
}
3. 获取节点上的 Sandboxes (sandboxes.go)
Node 还提供了方法来列出其上运行的所有 Sandbox,并转换配置格式以供 API 使用。
func (n *Node) GetSandboxes(ctx context.Context) ([]sandbox.Sandbox, error) {
client, childCtx := n.GetClient(childCtx)
// 调用 gRPC 接口 List
res, err := client.Sandbox.List(childCtx, &empty.Empty{})
// ... 解析响应并转换为内部 sandbox.Sandbox 结构 ...
return sandboxesInfo, nil
}
3. Orchestrator 的集成
在 orchestrator.go 中,这两个模块被组合在一起。Orchestrator 持有一个节点列表 (nodes *smap.Map[*nodemanager.Node]) 和一个放置算法实例 (placementAlgorithm)。
当创建 Sandbox 时(create_instance.go):
获取节点池:
clusterNodes := o.GetClusterNodes(nodeClusterID)。处理亲和性 (Affinity):如果是
Resume操作且指定了nodeID,会尝试优先使用该节点。调用放置:调用
placement.PlaceSandbox,传入算法、节点池和请求。后期处理:成功后更新 Metrics,将 Sandbox 添加到全局存储 (
sandboxStore)。
这种设计使得 E2B 能够水平扩展,通过添加更多节点到集群中,并通过调整 BestOfKConfig(如 K 值或 Alpha权重)来动态优化调度性能。