关键词
| 序号 | 关键词 | 英文 | 说明 |
|---|---|---|---|
| 1 | KV Cache | Key-Value Cache | 缓存注意力计算的中间结果 |
| 2 | 投机解码 | Speculative Decoding | 预测-验证双阶段解码 |
| 3 | Continuous Batching | 连续批处理 | 动态批处理策略 |
| 4 | CUDA Graph | CUDA Execution Graph | 减少 GPU 调度开销 |
| 5 | Prefill | Prefill Phase | 首次前向传播(处理输入) |
| 6 | Decode | Decode Phase | 自回归生成阶段 |
| 7 | PagedAttention | 分页注意力 | vLLM 核心优化技术 |
| 8 | 显存管理 | Memory Management | GPU 显存分配优化 |
| 9 | Flash Attention | 快速注意力 | IO 感知的注意力计算 |
| 10 | TensorRT | TensorRT | NVIDIA 深度学习推理引擎 |
概述
大语言模型的推理过程分为两个核心阶段:Prefill 阶段处理用户输入的 prompt,进行首次前向传播并填充 KV Cache;Decode 阶段自回归地生成输出 tokens,每次生成一个 token 都需要访问完整的 KV Cache。推理优化的目标是最大化吞吐量(throughput)和降低延迟(latency),这两个目标在批处理策略上存在内在矛盾,需要根据实际场景进行权衡。
推理优化的技术栈非常丰富,从算法层面的 KV Cache、投机解码,到系统层面的批处理策略、CUDA 优化,再到框架层面的 vLLM、TensorRT-LLM 等推理引擎。理解这些技术的原理和适用场景,对于构建高效、可靠的 LLM 服务系统至关重要。
本文将系统介绍推理优化的核心技术,包括 KV Cache 的管理机制、投机解码的预测-验证范式、批处理策略的演进(从 Static Batching 到 Continuous Batching)、CUDA 图优化原理,以及主流推理服务框架的对比分析。
KV Cache 优化机制
KV Cache 的工作原理
在 Transformer 的自注意力机制中,每个 token 需要 attending to 所有前面的 tokens,包括自己。当生成第 N 个 token 时,需要计算与前 N-1 个 token 的注意力。这导致了解码阶段的计算复杂度为 ,其中 N 是序列长度。
KV Cache 的核心思想是「空间换时间」:在 Prefill 阶段计算并缓存所有 Key 和 Value 矩阵,在后续的 Decode 阶段直接使用缓存的结果,只计算新 token 的 Query 向量即可。
class KVCache:
def __init__(self, max_batch_size, max_seq_len, n_heads, head_dim, dtype=torch.float16):
self.max_batch_size = max_batch_size
self.max_seq_len = max_seq_len
self.n_heads = n_heads
self.head_dim = head_dim
# 预分配 GPU 显存
self.k_cache = torch.zeros(
(max_batch_size, n_heads, max_seq_len, head_dim),
dtype=dtype,
device='cuda'
)
self.v_cache = torch.zeros(
(max_batch_size, n_heads, max_seq_len, head_dim),
dtype=dtype,
device='cuda'
)
# 当前有效长度
self.seq_lens = torch.zeros(max_batch_size, dtype=torch.long, device='cuda')
def update(self, batch_idx, new_k, new_v):
"""更新指定 batch 的 KV cache"""
start_pos = self.seq_lens[batch_idx].item()
seq_len = new_k.shape[1]
self.k_cache[batch_idx, :, start_pos:start_pos + seq_len] = new_k
self.v_cache[batch_idx, :, start_pos:start_pos + seq_len] = new_v
self.seq_lens[batch_idx] += seq_len
def get_kv(self, batch_idx):
"""获取指定 batch 的完整 KV cache"""
seq_len = self.seq_lens[batch_idx].item()
return (
self.k_cache[batch_idx, :, :seq_len],
self.v_cache[batch_idx, :, :seq_len]
)KV Cache 的显存瓶颈
KV Cache 的显存占用是推理优化的核心瓶颈。以 Llama-2-70B 为例:
参数规模分析:
- 模型参数量:700 亿
- KV Cache 维度:[batch_size, n_heads, seq_len, head_dim]
- 假设 batch_size=1, n_heads=8, seq_len=4096, head_dim=128
单 token 显存计算:
对于 FP16(2 bytes),单个 token 的 KV Cache 约为:2 × 8 × 128 × 2 = 4KB
序列总显存:
- 4096 tokens:约 16MB
- 32768 tokens(100K context):约 128MB
这还只是 70B 模型的单序列情况。当服务多个并发请求时,KV Cache 显存占用会线性增长,迅速成为系统瓶颈。
PagedAttention 与虚拟显存管理
vLLM 提出的 PagedAttention 是 KV Cache 管理的重要突破。传统方法需要预先分配连续显存块,但实际使用中序列长度是动态变化的,导致显存碎片化严重。
PagedAttention 的核心思想是借鉴操作系统的分页内存管理机制,将 KV Cache 划分为固定大小的「页」(通常为 16 tokens),通过页表管理逻辑地址到物理地址的映射:
class PagedAttention:
def __init__(self, block_size=16, num_blocks=1024):
self.block_size = block_size
self.num_blocks = num_blocks
# 物理块池
self.num_allocated_blocks = 0
# 块表:batch_idx -> [物理块ID列表]
self.block_tables = {}
def alloc(self, batch_idx, num_blocks):
"""为请求分配物理块"""
self.block_tables[batch_idx] = []
for _ in range(num_blocks):
block_id = self.num_allocated_blocks
self.num_allocated_blocks += 1
self.block_tables[batch_idx].append(block_id)
def update(self, batch_idx, new_k, new_v, block_offset):
"""更新新 token 的 KV 数据到指定块"""
block_id = self.block_tables[batch_idx][block_offset]
self._write_to_block(block_id, new_k, new_v)
def attention(self, query, batch_idx):
"""在分页模式下执行注意力计算"""
# 获取物理块列表
block_ids = self.block_tables[batch_idx]
# 收集所有物理块中的 KV 数据
k_blocks = [self.get_block(i) for i in block_ids]
v_blocks = [self.get_block(i, is_value=True) for i in block_ids]
# 调用 CUDA kernel 执行分页注意力
return paged_attention_kernel(
query, k_blocks, v_blocks, self.block_size
)这种设计的优势在于:
- 消除显存碎片:按需分配,避免预分配的浪费
- 支持灵活的上下文扩展:通过追加新块而非重新分配来扩展序列
- 共享前缀优化:多个请求共享相同前缀时,可以共享物理块
投机解码技术
投机解码的核心原理
投机解码(Speculative Decoding)由 Leviathan 等人在 2023 年提出,核心思想是利用「小模型多步预测 + 大模型验证」的范式来加速自回归解码。
传统自回归解码的瓶颈在于:每生成一个 token 都必须等待完整的大模型前向传播。投机解码通过引入一个轻量级的「草稿模型」(Draft Model)来预测多个候选 tokens,然后由大模型并行验证这些预测。如果预测正确,就「免费」获得多个 tokens 的生成;如果预测错误,大模型会进行修正并继续。
class SpeculativeDecoder:
def __init__(self, main_model, draft_model, gamma=4, temperature=1.0):
"""
gamma: 每个 iteration 草稿模型预测的 token 数
"""
self.main_model = main_model
self.draft_model = draft_model
self.gamma = gamma
self.temperature = temperature
def decode(self, input_ids, max_new_tokens):
"""投机解码主循环"""
generated = input_ids.clone()
while len(generated[0]) - len(input_ids[0]) < max_new_tokens:
# 1. 草稿模型生成 gamma 个候选 tokens
draft_tokens = self._draft(input_ids, generated, self.gamma)
# 2. 主模型并行验证所有候选
accepted, accepted_probs = self._verify(
input_ids, generated, draft_tokens
)
# 3. 接受正确的 tokens
generated = torch.cat([generated, accepted], dim=-1)
# 4. 如果所有预测都被拒绝,从主模型采样一个 token
if len(accepted) == 0:
next_token = self._sample_next(generated)
generated = torch.cat([generated, next_token], dim=-1)
return generated
def _draft(self, input_ids, generated, gamma):
"""草稿模型生成候选 tokens"""
# 获取当前上下文
context = generated
draft_tokens = []
for _ in range(gamma):
with torch.no_grad():
outputs = self.draft_model(context)
next_token = torch.argmax(
outputs.logits[:, -1, :] / self.temperature, dim=-1
)
draft_tokens.append(next_token)
context = torch.cat([context, next_token], dim=-1)
return torch.stack(draft_tokens, dim=1) # [batch, gamma]
def _verify(self, input_ids, generated, draft_tokens):
"""主模型验证候选 tokens"""
# 构造完整序列:原始 + 已有 + 候选
extended = torch.cat([generated, draft_tokens], dim=-1)
with torch.no_grad():
outputs = self.main_model(extended)
# 主模型的 logits
main_logits = outputs.logits[:, generated.shape[1]:] # [batch, gamma, vocab]
main_probs = F.softmax(main_logits / self.temperature, dim=-1)
# 草稿模型的 logits
draft_probs = self._get_draft_probs(generated, draft_tokens)
# 贪婪验证:主模型在对应位置的预测与草稿是否一致
accepted = []
accepted_probs = []
for i in range(draft_tokens.shape[1]):
draft_token = draft_tokens[0, i].item()
main_prob = main_probs[0, i, draft_token].item()
draft_prob = draft_probs[0, i, draft_token].item()
# 如果主模型给这个 token 高概率,接受它
# 否则,拒绝并终止
if main_prob >= draft_prob:
accepted.append(draft_token)
accepted_probs.append(main_prob)
else:
break
return torch.tensor([accepted]), torch.tensor([accepted_probs])自投机解码与树状解码
标准的投机解码需要两个模型(主模型 + 草稿模型),这对部署成本提出了挑战。自投机解码(Self-Speculative Decoding)通过同一个模型的不同计算路径来实现:主模型生成预测 token 的同时,计算一个简化的「轻量头」来生成草稿。
更高级的变体是树状投机解码(Tree Speculative Decoding):不再让草稿模型串行生成,而是生成一个 token 树,然后由主模型并行验证所有分支:
class TreeSpeculativeDecoder:
def __init__(self, main_model, draft_model, max_depth=4, branch_factor=2):
self.main_model = main_model
self.draft_model = draft_model
self.max_depth = max_depth
self.branch_factor = branch_factor
def decode(self, input_ids, max_new_tokens):
generated = input_ids.clone()
while len(generated[0]) - len(input_ids[0]) < max_new_tokens:
# 生成 token 树
tree = self._build_tree(generated)
# 并行验证所有节点
verified = self._verify_tree(tree, generated)
# 选择验证通过的路径继续
best_path = self._select_path(verified, generated)
generated = best_path
# 如果没有路径被验证,从主模型采样
if len(verified) == 0:
next_token = self._sample_next(generated)
generated = torch.cat([generated, next_token], dim=-1)
return generated
def _build_tree(self, context):
"""构建投机 token 树"""
# 简化实现:生成固定结构的树
tree = {
'nodes': [], # [(parent_idx, token, prob)]
'depth': 0
}
# 根节点
tree['nodes'].append((-1, context[0, -1].item(), 1.0))
current_level = [0]
for depth in range(self.max_depth):
next_level = []
for parent_idx in current_level:
# 采样分支
branches = self._sample_branches(
context, tree['nodes'][parent_idx][1],
self.branch_factor
)
for token, prob in branches:
node_idx = len(tree['nodes'])
tree['nodes'].append((parent_idx, token, prob))
next_level.append(node_idx)
current_level = next_level
tree['depth'] = self.max_depth
return treeBatching 策略优化
静态批处理的问题
传统的 Static Batching 策略要求批次中所有请求使用相同的输入长度,并且所有请求必须等待最慢的请求完成后才能返回结果。这种策略在请求长度差异大时效率极低:短请求需要等待长请求完成,造成计算资源浪费。
# 静态批处理的低效示例
def static_batching_inefficient(requests):
"""
传统批处理:必须等待所有请求完成
"""
# 对齐到批次最大长度(浪费计算)
max_len = max(req['input_len'] for req in requests)
# 处理批次
results = []
while requests:
batch = requests[:batch_size]
# 填充到 batch 最大长度
padded_batch = pad_sequence(batch, max_len)
# 前向传播
outputs = model(padded_batch)
# 等待所有请求完成(长请求拖慢短请求)
results.extend(outputs)
# 移除已处理的请求
requests = requests[batch_size:]
return resultsContinuous Batching 原理
Continuous Batching(又称 Iteration-level Scheduling 或 Dynamic Batching)是目前主流的批处理策略。其核心思想是将请求的处理粒度从「批次级别」降低到「迭代级别」:每完成一个 token 的生成,就立即释放已完成请求的资源,调度新的请求进入批次。
class ContinuousBatcher:
def __init__(self, model, max_batch_size, max_seq_len):
self.model = model
self.max_batch_size = max_batch_size
self.max_seq_len = max_seq_len
# 活跃请求
self.active_requests = {}
self.request_counter = 0
def add_request(self, input_ids, request_id=None):
"""添加新请求"""
if request_id is None:
request_id = self.request_counter
self.request_counter += 1
self.active_requests[request_id] = {
'input_ids': input_ids,
'output_ids': input_ids.clone(),
'finished': False,
'finish_reason': None
}
return request_id
def step(self):
"""执行一步推理(生成一个 token)"""
# 收集活跃请求
active_reqs = {
rid: req for rid, req in self.active_requests.items()
if not req['finished']
}
if not active_reqs:
return []
# 构造批次
batch_input_ids = [req['output_ids'] for req in active_reqs.values()]
# 前向传播
outputs = self.model.forward(batch_input_ids)
# 采样下一个 token
next_tokens = self._sample(outputs)
# 更新每个请求并检查是否完成
finished = []
for i, (rid, req) in enumerate(active_reqs.items()):
next_token = next_tokens[i:i+1]
req['output_ids'] = torch.cat([req['output_ids'], next_token], dim=-1)
# 检查终止条件
if self._check_done(req, next_token):
req['finished'] = True
req['finish_reason'] = self._get_finish_reason(next_token)
finished.append(rid)
return finished
def run(self):
"""持续运行直到所有请求完成"""
while self.active_requests:
# 执行推理步骤
finished = self.step()
# 释放完成的请求
for rid in finished:
result = self.active_requests.pop(rid)
yield resultPreemption 与 Eviction 策略
当 GPU 显存不足以容纳所有活跃请求时,需要进行请求抢占(Preemption)或 KV Cache 驱逐(Eviction)。
抢占策略:暂停低优先级或最长等待的请求,释放其显存资源。抢占的请求会被重新调度,可能导致延迟增加。
KV Cache 驱逐:选择性释放部分请求的 KV Cache。常见策略包括:
- 部分驱逐:保留最近生成的 tokens,释放早期内容
- 压缩:将 KV Cache 压缩到更少的位置
- 重组:在请求之间移动 KV Cache 以提高局部性
class MemoryAwareScheduler:
def __init__(self, model, max_memory_gb):
self.model = model
self.max_memory_gb = max_memory_gb
self.allocated_memory = 0
def can_accept(self, seq_len, batch_size):
"""检查是否能接受新请求"""
required = self._estimate_memory(seq_len, batch_size)
return self.allocated_memory + required <= self.max_memory_gb
def preempt_if_needed(self, requests):
"""如果显存不足,执行抢占"""
while not self.can_accept(self._avg_seq_len(requests), len(requests)):
# 选择要抢占的请求:优先选择最长等待或最低优先级的
victim = self._select_victim(requests)
# 保存状态以便恢复
self._save_checkpoint(victim)
# 释放显存
self._free_memory(victim)
requests.remove(victim)
return requests
def _select_victim(self, requests):
"""选择抢占目标"""
# 综合考虑:等待时间、优先级、已生成长度
scores = []
for req in requests:
wait_time = time.time() - req['arrival_time']
priority = req.get('priority', 1.0)
generated_len = len(req['output_ids']) - len(req['input_ids'])
# 分数 = 等待时间 / (优先级 * 已生成长度)
# 分数越高越应该被抢占
score = wait_time / (priority * max(generated_len, 1))
scores.append(score)
return requests[np.argmax(scores)]CUDA 图优化
CUDA Graph 的工作原理
CUDA Graph 是 NVIDIA 从 CUDA 10 开始引入的特性,旨在减少 GPU 操作的启动开销。在传统的 CUDA 编程中,每个 kernel(GPU 函数)都需要单独启动,存在 CPU 端的调度开销。当需要执行数千个小操作时,这种开销会累积成显著的性能损失。
CUDA Graph 的核心思想是「录制-回放」:将一系列 CUDA 操作录制为一个图(Graph),然后一次性提交执行。这避免了重复的 kernel 启动开销,并能启用更多的 GPU 优化。
import torch
import ctypes
# 检查 CUDA Graph 支持
torch.cuda.set_device(0)
assert torch.cuda.is_available()
# 检查 CUDA 版本
cuda_version = torch.version.cuda
print(f"CUDA Version: {cuda_version}")
# 创建 CUDA Graph
graph = torch.cuda.CUDAGraph()
# 创建占位张量(用于重放时输入)
static_input = torch.empty(1, dtype=torch.long, device='cuda')
static_output = torch.empty(1, dtype=torch.long, device='cuda')
# 录制推理过程
with torch.cuda.graph(graph):
# 这里放置要录制的操作
# 每次调用 model 时会记录操作到图中
logits = model(static_input)
static_output.copy_(logits)
# 重放(高速)
for _ in range(100):
static_input.copy_(input_tensor)
graph.replay()
result = static_output.clone()LLM 推理中的 CUDA Graph 优化
在 LLM 推理中,KV Cache 的更新、注意力计算、MLP 前向传播等操作天然适合用 CUDA Graph 优化。由于 Decode 阶段每个 token 的计算图结构相同(只是数据不同),使用 CUDA Graph 可以显著减少调度开销。
class CudaGraphOptimizedInference:
def __init__(self, model, max_batch_size, max_seq_len):
self.model = model
self.model.eval()
# 预热:触发 CUDA JIT 编译
self._warmup()
# 创建静态张量用于图录制
self.static_input_ids = torch.zeros(
max_batch_size, dtype=torch.long, device='cuda'
)
self.static_positions = torch.zeros(
max_batch_size, dtype=torch.long, device='cuda'
)
# 创建 KV Cache 相关的静态张量
self._create_static_kv_cache()
# 录制 CUDA Graph
self._record_graph()
def _warmup(self):
"""预热:触发 CUDA kernel 的 JIT 编译"""
dummy_input = torch.zeros(1, 10, dtype=torch.long, device='cuda')
for _ in range(10):
with torch.no_grad():
self.model(dummy_input)
torch.cuda.synchronize()
def _record_graph(self):
"""录制推理图"""
self.graph = torch.cuda.CUDAGraph()
with torch.cuda.graph(self.graph):
# 这里定义推理的计算图
self.outputs = self.model(
input_ids=self.static_input_ids,
position_ids=self.static_positions,
use_cache=True
)
def step(self, input_ids, batch_size=1):
"""执行一步推理(使用录制的图)"""
# 复制输入到静态张量
self.static_input_ids[:batch_size].copy_(input_ids)
# 回放图
self.graph.replay()
# 提取输出
return self.outputs[0][:batch_size]Flash Attention 与算子融合
Flash Attention 是由 Tri Dao 等人提出的 IO 感知注意力实现,通过分块计算和融合 kernel 显著提升注意力计算的效率。相比标准注意力实现,Flash Attention 可以将计算速度提升 2-4 倍,同时将显存占用从 降低到 。
from flash_attn import flash_attn_func
# 标准注意力(显存 O(N^2))
def standard_attention(Q, K, V):
scores = torch.matmul(Q, K.transpose(-2, -1))
scores = F.softmax(scores, dim=-1)
output = torch.matmul(scores, V)
return output
# Flash Attention(显存 O(N))
def flash_attention(Q, K, V):
# Flash Attention 自动处理分块和显存优化
output = flash_attn_func(
Q, K, V,
causal=True # 自回归需要 causal mask
)
return output
# 使用 Flash Attention 2(最新版本,更快)
from flash_attn import flash_attn_with_kvcache
def decode_with_flash_attn(q, k_cache, v_cache, positions):
"""
解码阶段的注意力计算
q: [batch, 1, heads, dim] 当前 token 的 query
k_cache, v_cache: [batch, heads, max_seq, dim] 缓存的 KV
positions: [batch, 1] 当前 token 的位置
"""
# 从缓存中获取历史 KV
k = k_cache[:, :, :positions.max().item()]
v = v_cache[:, :, :positions.max().item()]
# Flash Attention
output = flash_attn_with_kvcache(
q, k, v,
cache_k=k_cache,
cache_v=v_cache,
causal=True
)
return output推理服务框架对比
vLLM 架构与特性
vLLM 是由 UC Berkeley LMSYS 开发的开源推理服务框架,其核心创新是 PagedAttention 技术。vLLM 已成为目前最流行的 LLM 推理框架之一。
核心特性:
- PagedAttention 实现 KV Cache 的虚拟显存管理
- 支持 CUDA Graph 优化
- 连续批处理(Continuous Batching)
- 支持 Tensor Parallelism 多卡并行
- 支持 GPTQ、AWQ 等量化方法
from vllm import LLM, SamplingParams
# 初始化 vLLM 引擎
llm = LLM(
model="meta-llama/Llama-2-70B",
tensor_parallel_size=4, # 4 卡并行
quantization="gptq", # GPTQ 量化
max_model_len=4096,
gpu_memory_utilization=0.9
)
# 定义采样参数
sampling_params = SamplingParams(
temperature=0.8,
top_p=0.95,
max_tokens=256
)
# 批量推理
prompts = [
"请介绍一下大语言模型的基本原理。",
"什么是 Transformer 架构?",
"解释一下注意力机制的数学原理。"
]
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
print(f"Prompt: {output.prompt}")
print(f"Generated: {output.outputs[0].text}")
print("---")TensorRT-LLM 深度优化
TensorRT-LLM 是 NVIDIA 官方提供的 LLM 推理优化框架,提供最深度的性能优化,但需要针对特定模型进行编译。
核心特性:
- FP8、FP16、INT8 多精度支持
- In-flight Batching
- Tensor Parallelism(支持 AllReduce 优化)
- 高度优化的 CUDA kernels
- 动态 batch 和调度
# TensorRT-LLM 构建引擎
trtllm-build \
--model_dir /path/to/hf/model \
--output_dir /path/to/engine \
--tp_size 8 \
--precision fp16 \
--max_batch_size 256 \
--max_input_len 4096 \
--max_output_len 1024
# TensorRT-LLM 服务
trtllm-serve /path/to/engine \
--http_port 8000 \
--tokenizer /path/to/tokenizer框架对比与选型建议
| 特性 | vLLM | TensorRT-LLM | TGI | llama.cpp |
|---|---|---|---|---|
| 性能 | 高 | 极高 | 高 | 中等 |
| 易用性 | 高 | 中等 | 高 | 高 |
| 量化支持 | GPTQ/AWQ | INT8/FP8 | bitsandbytes | GGUF |
| 部署难度 | 低 | 高 | 低 | 低 |
| 硬件要求 | NVIDIA | NVIDIA H100+ | NVIDIA | 任意 |
| 适用场景 | 通用部署 | 极致性能 | HuggingFace 模型 | 本地/边缘 |
Note
选型建议:对于大多数生产环境,vLLM 是最佳选择——性能优秀、易于部署、社区活跃。如果需要极致性能且有专业团队支持,TensorRT-LLM 是首选。对于需要在 CPU 或消费级 GPU 上运行的场景,llama.cpp + GGUF 是唯一可行的选择。
端到端优化实践
延迟-吞吐量权衡
推理优化需要在延迟(Latency)和吞吐量(Throughput)之间权衡:
低延迟场景(如交互式对话):优先优化 TTFT(Time to First Token)和单轮响应时间,适合小 batch、跳过 prefill 优化。
高吞吐量场景(如批量内容生成):优先优化 overall throughput,适合大 batch、充分预热。
class LatencyThroughputOptimizer:
def __init__(self, model, target_mode='balanced'):
self.model = model
self.target_mode = target_mode
def optimize_config(self):
if self.target_mode == 'low_latency':
return {
'batch_size': 1,
'prefill_chunk_size': 512,
'use_cuda_graph': True,
'kv_cache_dtype': 'fp16'
}
elif self.target_mode == 'high_throughput':
return {
'batch_size': 64,
'prefill_chunk_size': 2048,
'use_cuda_graph': True,
'kv_cache_dtype': 'fp8'
}
else: # balanced
return {
'batch_size': 16,
'prefill_chunk_size': 1024,
'use_cuda_graph': True,
'kv_cache_dtype': 'fp16'
}混合精度与量化策略
合理的精度配置可以在性能和质量之间取得平衡:
from vllm import LLM, ModelProxy
def create_optimized_llm(model_name, mode='production'):
if mode == 'development':
# 开发模式:快速、低精度
return LLM(
model=model_name,
dtype='float16',
max_model_len=2048,
gpu_memory_utilization=0.5
)
elif mode == 'production':
# 生产模式:平衡性能和精度
return LLM(
model=model_name,
dtype='auto', # 自动选择最优精度
max_model_len=8192,
gpu_memory_utilization=0.9,
quantization='fp8' if 'H100' in torch.cuda.get_device_name() else None,
tensor_parallel_size=torch.cuda.device_count()
)
else: # high_quality
# 高质量模式:最大精度
return LLM(
model=model_name,
dtype='bfloat16',
max_model_len=16384,
gpu_memory_utilization=0.85,
tensor_parallel_size=torch.cuda.device_count()
)相关资源
- LM_Eval评估框架 - 评估推理性能
- 模型压缩技术 - 压缩模型以适配推理资源
- 幻觉评估方法 - 评估生成质量
- 部署与推理 - 实际部署指南
本文档由 AI 辅助生成,内容经过严格审核