跳到主要内容

Serverless 计算概念与 RL 应用详解

从 Serverless 核心概念(FaaS、事件驱动、Cold Start、自动伸缩)到其在分布式强化学习训练中的应用,通过大量代码模拟展示传统服务器 vs Serverless 的成本与效率差异。


一、Serverless 概念

Serverless(无服务器计算)并不是没有服务器,而是指用户不需要管理服务器。云平台全权负责底层基础设施的分配、扩缩容和维护,用户只需要提交代码。

最典型的 Serverless 形态是 FaaS(Function as a Service),代表产品包括 AWS Lambda、Azure Functions、Google Cloud Functions 和阿里云函数计算。

用一段伪代码感受 Serverless 的使用方式:

# ============ 传统服务器方式 ============
# 用户需要:买服务器 → 装系统 → 配环境 → 部署 → 维护

# 1. 购买并配置服务器
server = CloudProvider.create_vm(cpu=8, memory="32GB", os="Ubuntu")
server.install("python3.10", "nginx", "gunicorn")

# 2. 编写并部署完整的 Web 服务
# app.py
from flask import Flask
app = Flask(__name__)

@app.route("/process")
def process_data():
result = heavy_computation()
return result

# 3. 手动管理进程
# $ gunicorn app:app --workers 4 --bind 0.0.0.0:8080
# 服务器 7×24 运行,不管有没有请求都在计费
# 流量暴涨时需要手动扩容


# ============ Serverless 方式 ============
# 用户只需要写一个函数,其他全部交给平台

# handler.py —— 这就是用户需要写的全部代码
def handler(event, context):
"""一个 Serverless 函数,平台自动管理一切"""
data = event["body"]
result = heavy_computation(data)
return {
"statusCode": 200,
"body": result
}

# 部署一行命令搞定:
# $ aws lambda deploy --function process_data --handler handler.handler
# 有请求 → 平台自动启动容器执行
# 没请求 → 不运行,不计费
# 100个并发请求 → 平台自动启动100个实例

二、传统模式 vs Serverless 对比

维度传统服务器 / IaaS / VMServerless / FaaS
用户职责买服务器、装系统、配环境、部署代码、运维只上传一个函数
运行方式7x24 常驻运行事件触发,按需执行
扩缩容手动扩容,流量小时资源闲置自动并行启动多个实例
计费模式按小时/月计费,不管用不用都付钱按实际执行时间计费(100ms 粒度),不执行不花钱
类比租一整套房子,不管住不住都要交房租住酒店,住一晚付一晚,不住不付

用代码模拟两种计费模型的差异:

# ============ 计费模型对比 ============

def traditional_server_cost(hours: int, hourly_rate: float,
actual_usage_ratio: float) -> dict:
"""
传统服务器计费:固定费率 × 时间,与实际使用无关

Args:
hours: 租用时长(小时)
hourly_rate: 每小时费率(美元)
actual_usage_ratio: 实际使用率(0-1),例如只有20%时间在处理请求
"""
total_cost = hours * hourly_rate # 总费用
useful_cost = total_cost * actual_usage_ratio # 真正有用的部分
wasted_cost = total_cost * (1 - actual_usage_ratio) # 浪费的部分
return {
"total": total_cost,
"useful": useful_cost,
"wasted": wasted_cost,
"waste_ratio": f"{(1 - actual_usage_ratio) * 100:.0f}%"
}

def serverless_cost(invocations: int, avg_duration_ms: int,
memory_mb: int) -> dict:
"""
Serverless 计费:只为实际执行付费
以 AWS Lambda 定价为例:
- 每次请求:$0.0000002
- 每 GB-秒:$0.0000166
"""
request_cost = invocations * 0.0000002
# 执行时间向上取整到最近的 1ms
duration_s = avg_duration_ms / 1000
memory_gb = memory_mb / 1024
compute_cost = invocations * duration_s * memory_gb * 0.0000166
total = request_cost + compute_cost
return {
"total": total,
"request_cost": request_cost,
"compute_cost": compute_cost,
"wasted": 0, # 不执行不花钱,没有浪费
"waste_ratio": "0%"
}

# 场景:一个 API 服务,每天处理 10000 个请求,每个请求 200ms,用 256MB 内存
# 但一天中只有 4 小时有流量(usage_ratio = 4/24 ≈ 17%)

print("=== 传统服务器(月费用)===")
print(traditional_server_cost(
hours=24*30, hourly_rate=0.50, actual_usage_ratio=0.17
))
# {'total': 360.0, 'useful': 61.2, 'wasted': 298.8, 'waste_ratio': '83%'}

print("=== Serverless(月费用)===")
print(serverless_cost(
invocations=10000*30, avg_duration_ms=200, memory_mb=256
))
# {'total': 0.0307, 'wasted': 0, 'waste_ratio': '0%'}
# 差距可达万倍!(当然高并发场景下差距会缩小)

三、Serverless 的核心机制

3.1 事件驱动(Event-Driven)

函数不是持续运行的,而是被事件触发

触发器类型示例特点
HTTP用户访问一个 API不可预测,突发性强
Timer每天凌晨2点执行完全可预测
Queue消息队列中来了新消息取决于上游速率
EventIoT 设备上报数据高频、自动化
Storage数据库/文件发生变化被动响应

不同触发器的代码示例:

# ============ 不同触发器类型示例 ============

# 1. HTTP 触发 —— 最常见,用户请求触发
def http_handler(event, context):
"""当用户访问 API 端点时触发"""
user_id = event["queryStringParameters"]["user_id"]
user_data = db.get_user(user_id)
return {"statusCode": 200, "body": json.dumps(user_data)}

# 2. Timer 触发 —— 定时任务,完全可预测
def timer_handler(event, context):
"""每天凌晨 2:00 触发,清理过期数据"""
expired = db.query("SELECT * FROM logs WHERE age > 30 days")
db.delete(expired)
return {"cleaned": len(expired)}

# 3. Queue 触发 —— 消息队列驱动
def queue_handler(event, context):
"""当 SQS/Kafka 中有新消息时触发"""
for record in event["Records"]:
message = json.loads(record["body"])
process_order(message["order_id"])

# 4. Storage 触发 —— 文件变化驱动
def storage_handler(event, context):
"""当 S3 中上传了新文件时触发"""
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
image = s3.download(bucket, key)
thumbnail = resize(image, size=(128, 128))
s3.upload(thumbnail, bucket, f"thumbnails/{key}")

3.2 Cold Start vs Warm Start

这是 Serverless 最核心的性能问题

Cold Start(冷启动):
请求到来 → 分配容器 → 加载运行时 → 加载依赖库 → 加载用户代码 → 执行
├──────────── 额外延迟:数百ms 到几秒 ────────────┤

Warm Start(热启动):
请求到来 → 容器已在内存中 → 直接执行
├── 几乎无额外延迟 ──┤

用代码模拟 cold start 和 warm start 的行为:

# ============ Cold Start vs Warm Start 模拟 ============

import time

class ServerlessRuntime:
"""模拟 Serverless 平台的容器管理"""

def __init__(self, keep_alive_seconds=600):
self.containers = {} # app_id -> {"last_used": timestamp, "state": "warm"}
self.keep_alive = keep_alive_seconds # 默认 10 分钟 keep-alive

def invoke(self, app_id: str, function_code, payload: dict) -> dict:
current_time = time.time()

if app_id in self.containers:
container = self.containers[app_id]
idle_time = current_time - container["last_used"]

if idle_time < self.keep_alive:
# ---- Warm Start ----
# 容器还在内存中,直接执行
start = time.time()
result = function_code(payload)
execution_time = time.time() - start

container["last_used"] = current_time
return {
"start_type": "warm",
"cold_start_overhead": 0,
"execution_time": execution_time,
"result": result
}
else:
# Keep-alive 过期,容器已被回收
del self.containers[app_id]

# ---- Cold Start ----
# 需要重新分配容器、加载运行时、加载代码
cold_start_time = self._cold_start(app_id)

start = time.time()
result = function_code(payload)
execution_time = time.time() - start

# 将容器保留在内存中
self.containers[app_id] = {
"last_used": current_time,
"state": "warm"
}

return {
"start_type": "cold",
"cold_start_overhead": cold_start_time,
"execution_time": execution_time,
"total_latency": cold_start_time + execution_time,
"result": result
}

def _cold_start(self, app_id: str) -> float:
"""
模拟冷启动过程,总延迟约 500ms - 数秒
"""
allocate_container = 0.100 # 分配容器: ~100ms
load_runtime = 0.200 # 加载 Python 运行时: ~200ms
load_dependencies = 0.150 # 加载依赖库: ~150ms
load_user_code = 0.050 # 加载用户代码: ~50ms
total = allocate_container + load_runtime + load_dependencies + load_user_code
time.sleep(total) # 模拟延迟
return total # 总冷启动延迟 ~500ms


# 使用示例
runtime = ServerlessRuntime(keep_alive_seconds=600) # 10分钟 keep-alive

def my_function(payload):
return {"message": f"processed {payload['data']}"}

# 第一次调用:Cold Start(~500ms 额外延迟)
result1 = runtime.invoke("app_001", my_function, {"data": "hello"})
# {'start_type': 'cold', 'cold_start_overhead': 0.5, 'execution_time': 0.001}

# 立即第二次调用:Warm Start(无额外延迟)
result2 = runtime.invoke("app_001", my_function, {"data": "world"})
# {'start_type': 'warm', 'cold_start_overhead': 0, 'execution_time': 0.001}

# 15分钟后调用:又是 Cold Start(keep-alive 过期)
time.sleep(900)
result3 = runtime.invoke("app_001", my_function, {"data": "again"})
# {'start_type': 'cold', 'cold_start_overhead': 0.5, 'execution_time': 0.001}

云平台的两难

  • 一直保持容器在内存 → 内存成本巨大(81% 的应用很少被调用)
  • 执行完就卸载容器 → 下次请求必须 cold start,用户体验差
  • 折中方案:keep-alive,执行完保留容器一段时间(AWS 10分钟,Azure 20分钟)

3.3 自动伸缩(Auto-Scaling)

# ============ 自动伸缩模拟 ============

class AutoScaler:
"""模拟 Serverless 平台的自动伸缩机制"""

def __init__(self, max_concurrent=1000):
self.active_instances = 0
self.max_concurrent = max_concurrent

def handle_requests(self, concurrent_requests: int):
"""
传统服务器 vs Serverless 的伸缩对比
"""
fixed_servers = 3 # 传统方式:固定 3 台服务器

# ---- 传统方式 ----
if concurrent_requests <= fixed_servers:
traditional_utilized = concurrent_requests
traditional_idle = fixed_servers - concurrent_requests # 闲置的服务器
traditional_dropped = 0
else:
traditional_utilized = fixed_servers
traditional_idle = 0
traditional_dropped = concurrent_requests - fixed_servers # 处理不了的请求

# ---- Serverless 方式 ----
serverless_instances = min(concurrent_requests, self.max_concurrent)
serverless_idle = 0 # 永远没有闲置实例
serverless_dropped = max(0, concurrent_requests - self.max_concurrent)

return {
"traditional": {
"servers": fixed_servers,
"utilized": traditional_utilized,
"idle": traditional_idle,
"dropped": traditional_dropped
},
"serverless": {
"instances": serverless_instances,
"idle": serverless_idle,
"dropped": serverless_dropped
}
}

scaler = AutoScaler()

# 场景1:低谷期,只有 1 个请求
print(scaler.handle_requests(1))
# traditional: 3 servers, 1 utilized, 2 idle(2台白白浪费)
# serverless: 1 instance, 0 idle(精准匹配)

# 场景2:高峰期,100 个并发请求
print(scaler.handle_requests(100))
# traditional: 3 servers, 3 utilized, 0 idle, 97 dropped!(97个请求丢失)
# serverless: 100 instances, 0 idle, 0 dropped(自动扩到100个)

四、Serverless 的架构层次

┌───────────────────────────────────┐
│ 用户的函数代码 │ ← 用户只关心这一层
├───────────────────────────────────┤
│ 运行时(Python, Node.js 等) │
├───────────────────────────────────┤
│ 容器(Docker / 轻量级沙箱) │ ← 隔离 + 快速启停
├───────────────────────────────────┤
│ 容器编排 / 调度器 │ ← 决定在哪台机器上运行
├───────────────────────────────────┤
│ 物理服务器集群 │
└───────────────────────────────────┘
全部由云平台管理 ↑

以 Azure Functions(论文中的 OpenWhisk 实现)为例:

用户请求 → REST Interface → Controller(Load Balancer)

通过 Kafka 分发
┌──────┼──────┐
Invoker Invoker Invoker
│ │ │
Container Container Container
(函数执行) (函数执行) (函数执行)

用代码模拟这个调度过程:

# ============ 模拟 Serverless 平台调度架构 ============

import random
from collections import defaultdict

class Invoker:
"""一台物理机上的 Invoker,管理该机器上的容器"""

def __init__(self, invoker_id: str, max_containers: int = 10):
self.id = invoker_id
self.containers = {} # app_id -> container state
self.max_containers = max_containers
self.current_load = 0

def can_accept(self) -> bool:
return self.current_load < self.max_containers

def run_function(self, app_id: str, payload: dict) -> dict:
self.current_load += 1

if app_id in self.containers:
start_type = "warm"
overhead = 0
else:
start_type = "cold"
overhead = 0.5 # 500ms cold start
self.containers[app_id] = {"state": "running"}

# 模拟函数执行
result = {"processed": payload, "invoker": self.id}
self.current_load -= 1
return {
"start_type": start_type,
"overhead": overhead,
"result": result
}


class Controller:
"""
中心控制器 + 负载均衡器
负责:接收请求 → 选择 Invoker → 分发任务
"""

def __init__(self, invokers: list):
self.invokers = invokers
self.request_log = []

def handle_request(self, app_id: str, payload: dict) -> dict:
"""接收用户请求,选择最合适的 Invoker"""

# 策略1:优先选择已有该 app 容器的 Invoker(避免 cold start)
for invoker in self.invokers:
if app_id in invoker.containers and invoker.can_accept():
return invoker.run_function(app_id, payload)

# 策略2:选择负载最低的 Invoker
available = [inv for inv in self.invokers if inv.can_accept()]
if available:
chosen = min(available, key=lambda inv: inv.current_load)
return chosen.run_function(app_id, payload)

# 所有 Invoker 都满了
return {"error": "No available capacity"}


# 搭建一个 3 节点的 Serverless 平台
platform = Controller([
Invoker("invoker-1", max_containers=10),
Invoker("invoker-2", max_containers=10),
Invoker("invoker-3", max_containers=10),
])

# 第一次调用 app_A:cold start,被分配到某个 invoker
r1 = platform.handle_request("app_A", {"data": "hello"})
# {'start_type': 'cold', 'overhead': 0.5, 'invoker': 'invoker-1'}

# 第二次调用 app_A:warm start,会被路由到同一个 invoker
r2 = platform.handle_request("app_A", {"data": "world"})
# {'start_type': 'warm', 'overhead': 0, 'invoker': 'invoker-1'}

五、为什么 Serverless 特别适合 RL 训练

5.1 RL 训练的资源使用特点

在 Actor-Learner 架构中,一个训练 round 的时间线如下:

时间 →  ─────────────────────────────────────────────
Actor 1: ████采样████ │空闲等待│空闲等待│空闲等待│
Actor 2: ██采样██ │空闲等待│空闲等待│空闲等待│空闲│
Actor 3: ██████采样██████ │空闲等待│空闲等待│
Learner: │等待数据│等待数据│ ███更新模型███ │分发新策略│
  • Actor 采样完成后必须等待 Learner 更新(同步训练)
  • 不同 Actor 采样速度不同,快的等慢的
  • 等待期间服务器资源闲置但仍在计费

用代码模拟这个过程:

# ============ 传统服务器 RL 训练 vs Serverless RL 训练 ============

import time
import random

# ---------- 传统服务器方式 ----------

class TraditionalRLTrainer:
"""
传统方式:预留固定数量的服务器做 Actor
问题:Actor 空闲时仍在计费
"""

def __init__(self, num_actors: int, cost_per_second: float):
self.num_actors = num_actors
self.cost_per_second = cost_per_second # 每台服务器每秒费用

def train_one_round(self):
round_start = time.time()

# 所有 Actor 并行采样,但耗时不同
sampling_times = []
for i in range(self.num_actors):
# 每个 Actor 采样耗时 5-15 秒不等(环境随机性)
t = random.uniform(5, 15)
sampling_times.append(t)

# 同步屏障:必须等最慢的 Actor 完成
max_sampling_time = max(sampling_times)

# 每个 Actor 的空闲等待时间
idle_times = [max_sampling_time - t for t in sampling_times]
total_idle = sum(idle_times)

# Learner 更新(2秒)
learner_time = 2.0

# 整个 round 的时间 = 最慢的 actor + learner 更新
round_time = max_sampling_time + learner_time

# 费用 = 所有服务器 × 整个 round 时间(不管忙不忙都在计费)
total_cost = self.num_actors * round_time * self.cost_per_second
useful_cost = sum(sampling_times) * self.cost_per_second
wasted_cost = total_cost - useful_cost

return {
"round_time": round_time,
"total_cost": total_cost,
"wasted_cost": wasted_cost,
"waste_ratio": f"{wasted_cost/total_cost*100:.1f}%",
"total_actor_idle_seconds": total_idle
}


# ---------- Serverless 方式 ----------

class ServerlessRLTrainer:
"""
Serverless 方式:Actor 用函数实现,采样完即释放
优势:不为 idle 时间付费,且可以动态调整 Actor 数量
"""

def __init__(self, cost_per_second: float):
self.cost_per_second = cost_per_second

def train_one_round(self, num_actors: int):
# 每个 Actor 作为一个 serverless 函数
sampling_times = []
cold_start_overheads = []

for i in range(num_actors):
cold_start = random.uniform(0.3, 1.0) # cold start 开销
sampling = random.uniform(5, 15)
sampling_times.append(sampling)
cold_start_overheads.append(cold_start)

# 同步屏障:等最慢的(包含 cold start)
total_times = [c + s for c, s in zip(cold_start_overheads, sampling_times)]
max_total_time = max(total_times)

learner_time = 2.0
round_time = max_total_time + learner_time

# 关键区别:每个 Actor 只为自己的(cold_start + 采样时间)付费
# Actor 完成采样后立即释放,不为等待 Learner 的时间付费
actor_cost = sum(total_times) * self.cost_per_second
learner_cost = learner_time * self.cost_per_second
total_cost = actor_cost + learner_cost

# 没有浪费:每一秒的费用都对应实际的计算
return {
"round_time": round_time,
"total_cost": total_cost,
"wasted_cost": 0,
"waste_ratio": "0%",
"avg_cold_start": sum(cold_start_overheads) / len(cold_start_overheads)
}


# 对比实验
traditional = TraditionalRLTrainer(num_actors=10, cost_per_second=0.001)
serverless = ServerlessRLTrainer(cost_per_second=0.001)

print("=== 传统服务器 RL 训练(1 round)===")
print(traditional.train_one_round())
# 典型输出:{'round_time': 17.0, 'total_cost': 0.17, 'wasted_cost': 0.08, 'waste_ratio': '47%'}

print("\n=== Serverless RL 训练(1 round)===")
print(serverless.train_one_round(num_actors=10))
# 典型输出:{'round_time': 17.5, 'total_cost': 0.10, 'wasted_cost': 0, 'waste_ratio': '0%'}

5.2 Serverless 解决 RL 的三个痛点

RL 训练痛点传统服务器Serverless 解决方式
Actor 空闲浪费idle 期间持续计费采样完立即释放,不为 idle 付费
资源粒度粗服务器级(8核 32GB 打包售卖)函数级(按需分配 CPU/内存)
训练需求动态变化固定机器数,无法弹性调整不同 round 可启动不同数量的 actor

用代码展示动态 Actor 伸缩的优势:

# ============ 动态 Actor 伸缩 ============

class DynamicServerlessTrainer:
"""
Serverless 的核心优势:根据训练进度动态调整 Actor 数量
- 训练初期:探索多,需要大量 Actor
- 训练中期:策略趋于收敛,减少 Actor 省钱
- 关键轮次(boostable round):临时增加 Actor 加速突破
"""

def __init__(self):
self.cost_per_actor_per_round = 0.01 # 每个 actor 每轮的成本

def decide_num_actors(self, round_idx: int,
recent_reward_improvement: float) -> int:
"""
根据训练状态动态决定 Actor 数量
传统方式无法做到这一点(服务器数量是固定的)
"""
if round_idx < 10:
# 训练初期:大量探索
return 20
elif recent_reward_improvement > 5.0:
# reward 快速提升中(boostable round):加大采样
return 16
elif recent_reward_improvement < 0.5:
# reward 趋于平稳:减少 Actor 省钱
return 4
else:
return 8

def train(self, total_rounds: int = 50):
total_cost = 0
total_actors_used = 0

for r in range(total_rounds):
# 模拟 reward improvement
improvement = max(0, 10 - r * 0.2 + random.gauss(0, 2))

num_actors = self.decide_num_actors(r, improvement)
round_cost = num_actors * self.cost_per_actor_per_round

total_cost += round_cost
total_actors_used += num_actors

avg_actors = total_actors_used / total_rounds

# 对比:如果用传统服务器固定 20 台
fixed_cost = 20 * self.cost_per_actor_per_round * total_rounds

return {
"serverless_total_cost": total_cost,
"fixed_server_cost": fixed_cost,
"savings": f"{(1 - total_cost/fixed_cost)*100:.1f}%",
"avg_actors_per_round": avg_actors
}

trainer = DynamicServerlessTrainer()
print(trainer.train(total_rounds=50))
# 典型输出:{'serverless_total_cost': 3.8, 'fixed_server_cost': 10.0, 'savings': '62.0%'}

5.3 Serverless 用于 RL 的挑战

挑战说明后续论文的解决方案
Cold start 延迟启动新 actor 有额外延迟,影响训练速度Nitro: 只在关键轮次启动新 actor
无状态函数本身无状态,模型参数需要外部传递MinionsRL: 通过外部存储(S3/Redis)传递模型
执行时间限制大多数平台限制单次执行 5-15 分钟将采样任务拆分为多个短函数
网络通信函数之间通信需通过外部存储设计高效的参数同步协议
Staleness异步训练时 actor 用的策略可能已过时Stellaris: staleness-aware 梯度聚合

模拟 Staleness 问题:

# ============ Staleness 问题模拟 ============

class AsyncServerlessTrainer:
"""
异步 Serverless 训练中的 Staleness 问题
"""

def __init__(self):
self.policy_version = 0 # Learner 当前策略版本
self.policy_params = {} # 策略参数

def actor_sample(self, actor_id: int, policy_version_at_start: int):
"""
Actor 采样:使用的是启动时拿到的策略版本
采样期间 Learner 可能已经更新了多次
"""
sampling_time = random.uniform(5, 20) # 采样耗时

# 采样完成时,Learner 的策略版本可能已经变了
staleness = self.policy_version - policy_version_at_start

return {
"actor_id": actor_id,
"data_from_policy_version": policy_version_at_start,
"current_policy_version": self.policy_version,
"staleness": staleness,
"is_stale": staleness > 2, # staleness > 2 认为数据过时
"sampling_time": sampling_time
}

def learner_update(self, data_policy_version: int):
"""
Learner 用收到的数据更新策略
如果数据太旧(staleness 太大),更新方向可能有偏差
"""
staleness = self.policy_version - data_policy_version

if staleness == 0:
gradient_quality = 1.0 # 完美:数据来自当前策略
elif staleness <= 2:
gradient_quality = 0.7 # 可接受:轻微偏差
else:
gradient_quality = 0.3 # 危险:梯度方向可能错误

# 更新策略
self.policy_version += 1

return {
"updated_to_version": self.policy_version,
"data_staleness": staleness,
"gradient_quality": gradient_quality
}

六、Serverless 的应用光谱

适合 Serverless 的场景不适合 Serverless 的场景
事件驱动的 API 后端需要持续运行的服务
定时任务 / 批处理需要 GPU 常驻的推理服务
数据管道 ETL有状态的数据库
IoT 数据处理延迟极端敏感(< 10ms)
RL 训练中的 Actor(本系列论文)超长时间运算(> 15分钟)
突发流量处理需要大量本地存储

七、总结

Serverless = 把计算从"租房模式"变成"酒店模式":不用管基础设施,按实际使用付费,自动伸缩。它天然契合 RL 训练中 Actor 忙闲交替、资源需求动态变化的特点,这就是为什么后续一系列论文(MinionsRL → Nitro → Stellaris → MARLess → RLHFless)将 Serverless 引入分布式 RL 训练。