这是一个完整的重构方案,遵循接口隔离原则,实现了基于配置的算法切换,并统一了动态配置更新机制。
1. 修改配置模型:增加算法选择参数
首先,在配置中增加 PLACEMENT_ALGORITHM 环境变量支持,默认为 best-of-k。
文件: packages/api/internal/cfg/model.go
package cfg
import (
"github.com/caarlos0/env/v11"
)
// ... 原有常量 ...
type Config struct {
// ... 原有字段 ...
// 新增字段:placement 算法选择
PlacementAlgorithm string `env:"PLACEMENT_ALGORITHM" envDefault:"best-of-k"`
}
// ... Parse 函数 ...
2. 更新接口定义:下放配置更新职责
修改 Algorithm 接口,增加 UpdateConfig 方法。这使得 Orchestrator 不需要关心具体的算法配置细节,只需要传递 Feature Flags 客户端即可。
文件: packages/api/internal/orchestrator/placement/placement.go
package placement
import (
"context"
"fmt"
"go.opentelemetry.io/otel"
"github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager"
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
"github.com/e2b-dev/infra/packages/shared/pkg/machineinfo"
)
// ... 原有变量 ...
// Algorithm defines the interface for sandbox placement strategies.
type Algorithm interface {
chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo) (*nodemanager.Node, error)
// UpdateConfig allows the algorithm to update its internal configuration from feature flags.
UpdateConfig(ctx context.Context, featureFlags *featureflags.Client)
}
// ... PlaceSandbox 函数保持不变 ...
3. 实现 Random 算法
新建一个简单的随机选择算法,它也实现了 Algorithm 接口。虽然它目前可能不需要复杂的配置,但通过实现 UpdateConfig 保持了接口一致性。
新建文件: packages/api/internal/orchestrator/placement/random.go
package placement
import (
"context"
"fmt"
"math/rand"
"sync"
"github.com/e2b-dev/infra/packages/api/internal/api"
"github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager"
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
"github.com/e2b-dev/infra/packages/shared/pkg/machineinfo"
)
type Random struct {
mu sync.RWMutex
}
var _ Algorithm = &Random{}
func NewRandom() Algorithm {
return &Random{}
}
// UpdateConfig for Random algorithm (currently a no-op, but ready for future flags)
func (r *Random) UpdateConfig(ctx context.Context, featureFlags *featureflags.Client) {
// 示例:如果未来想加一个 "RandomWeighted" 开关,可以在这里读取
// r.mu.Lock()
// defer r.mu.Unlock()
}
func (r *Random) chooseNode(_ context.Context, nodes []*nodemanager.Node, excludedNodes map[string]struct{}, _ nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo) (*nodemanager.Node, error) {
var candidates []*nodemanager.Node
for _, n := range nodes {
// 1. 基本过滤:排除黑名单节点
if _, ok := excludedNodes[n.ID]; ok {
continue
}
// 2. 状态检查:必须是 Ready
if n.Status() != api.NodeStatusReady {
continue
}
// 3. 架构兼容性检查
if !isNodeCPUCompatible(n, buildMachineInfo) {
continue
}
candidates = append(candidates, n)
}
if len(candidates) == 0 {
return nil, fmt.Errorf("no node available")
}
// 纯随机选择
return candidates[rand.Intn(len(candidates))], nil
}
4. 改造 BestOfK 算法
将配置读取逻辑从 orchestrator.go 移动到 BestOfK 内部的 UpdateConfig 方法中。
文件: packages/api/internal/orchestrator/placement/placement_best_of_K.go
package placement
import (
"context"
// ... 原有 imports
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
)
// ... Config 结构体保持不变 ...
// BestOfK implements the fit-score-place algorithm
type BestOfK struct {
config BestOfKConfig
mu sync.RWMutex
}
// NewBestOfK 保持不变
func NewBestOfK(config BestOfKConfig) Algorithm {
return &BestOfK{
config: config,
}
}
// 新增:实现接口要求的配置更新逻辑
func (b *BestOfK) UpdateConfig(ctx context.Context, client *featureflags.Client) {
// 从 Feature Flags 读取配置
k := client.IntFlag(ctx, featureflags.BestOfKSampleSize)
maxOvercommitPercent := client.IntFlag(ctx, featureflags.BestOfKMaxOvercommit)
alphaPercent := client.IntFlag(ctx, featureflags.BestOfKAlpha)
canFit := client.BoolFlag(ctx, featureflags.BestOfKCanFitFlag)
tooManyStarting := client.BoolFlag(ctx, featureflags.BestOfKTooManyStartingFlag)
alpha := float64(alphaPercent) / 100.0
maxOvercommit := float64(maxOvercommitPercent) / 100.0
newConfig := BestOfKConfig{
R: maxOvercommit,
K: k,
Alpha: alpha,
CanFit: canFit,
TooManyStarting: tooManyStarting,
}
b.UpdateInternalConfig(newConfig)
}
// 将原来的 UpdateConfig 改名为 UpdateInternalConfig (或者保留原名,只改内部调用)
func (b *BestOfK) UpdateInternalConfig(config BestOfKConfig) {
b.mu.Lock()
defer b.mu.Unlock()
b.config = config
}
// ... chooseNode, Score, sample 等其他方法保持不变 ...
5. 集成到 Orchestrator
最后,在 Orchestrator 初始化时根据配置选择算法,并启动统一的配置更新循环。
文件: packages/api/internal/orchestrator/orchestrator.go
type Orchestrator struct {
// ... 其他字段
// 修改字段类型为接口
placementAlgorithm placement.Algorithm
// ... 其他字段
}
func New(
ctx context.Context,
config cfg.Config,
// ... 其他参数
) (*Orchestrator, error) {
// ... 前置初始化代码 ...
// 1. 根据配置初始化算法
var placementAlgo placement.Algorithm
switch config.PlacementAlgorithm {
case "random":
placementAlgo = placement.NewRandom()
logger.L().Info(ctx, "Using Random placement algorithm")
case "best-of-k":
fallthrough
default:
// 使用默认配置初始化,随后 UpdateConfig 会拉取最新 Flag
placementAlgo = placement.NewBestOfK(placement.DefaultBestOfKConfig())
logger.L().Info(ctx, "Using BestOfK placement algorithm")
}
o := Orchestrator{
// ... 其他字段初始化
placementAlgorithm: placementAlgo,
featureFlagsClient: featureFlags,
// ...
}
// ... 中间代码 ...
// 2. 立即同步一次配置
o.placementAlgorithm.UpdateConfig(ctx, featureFlags)
// 3. 启动定时更新 (使用新的通用方法)
go o.updatePlacementConfig(ctx)
return &o, nil
}
// updatePlacementConfig 定期调用算法接口的 UpdateConfig
func (o *Orchestrator) updatePlacementConfig(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 多态调用:无论是什么算法,都通过此接口更新配置
o.placementAlgorithm.UpdateConfig(ctx, o.featureFlagsClient)
}
}
}
// 删除原有的 getBestOfKConfig 函数,逻辑已移动到 BestOfK 类中