关键词

序号关键词英文说明
1KV CacheKey-Value Cache缓存注意力计算的中间结果
2投机解码Speculative Decoding预测-验证双阶段解码
3Continuous Batching连续批处理动态批处理策略
4CUDA GraphCUDA Execution Graph减少 GPU 调度开销
5PrefillPrefill Phase首次前向传播(处理输入)
6DecodeDecode Phase自回归生成阶段
7PagedAttention分页注意力vLLM 核心优化技术
8显存管理Memory ManagementGPU 显存分配优化
9Flash Attention快速注意力IO 感知的注意力计算
10TensorRTTensorRTNVIDIA 深度学习推理引擎

概述

大语言模型的推理过程分为两个核心阶段:Prefill 阶段处理用户输入的 prompt,进行首次前向传播并填充 KV Cache;Decode 阶段自回归地生成输出 tokens,每次生成一个 token 都需要访问完整的 KV Cache。推理优化的目标是最大化吞吐量(throughput)和降低延迟(latency),这两个目标在批处理策略上存在内在矛盾,需要根据实际场景进行权衡。

推理优化的技术栈非常丰富,从算法层面的 KV Cache、投机解码,到系统层面的批处理策略、CUDA 优化,再到框架层面的 vLLM、TensorRT-LLM 等推理引擎。理解这些技术的原理和适用场景,对于构建高效、可靠的 LLM 服务系统至关重要。

本文将系统介绍推理优化的核心技术,包括 KV Cache 的管理机制、投机解码的预测-验证范式、批处理策略的演进(从 Static Batching 到 Continuous Batching)、CUDA 图优化原理,以及主流推理服务框架的对比分析。


KV Cache 优化机制

KV Cache 的工作原理

在 Transformer 的自注意力机制中,每个 token 需要 attending to 所有前面的 tokens,包括自己。当生成第 N 个 token 时,需要计算与前 N-1 个 token 的注意力。这导致了解码阶段的计算复杂度为 ,其中 N 是序列长度。

KV Cache 的核心思想是「空间换时间」:在 Prefill 阶段计算并缓存所有 Key 和 Value 矩阵,在后续的 Decode 阶段直接使用缓存的结果,只计算新 token 的 Query 向量即可。

class KVCache:
    def __init__(self, max_batch_size, max_seq_len, n_heads, head_dim, dtype=torch.float16):
        self.max_batch_size = max_batch_size
        self.max_seq_len = max_seq_len
        self.n_heads = n_heads
        self.head_dim = head_dim
        
        # 预分配 GPU 显存
        self.k_cache = torch.zeros(
            (max_batch_size, n_heads, max_seq_len, head_dim),
            dtype=dtype,
            device='cuda'
        )
        self.v_cache = torch.zeros(
            (max_batch_size, n_heads, max_seq_len, head_dim),
            dtype=dtype,
            device='cuda'
        )
        
        # 当前有效长度
        self.seq_lens = torch.zeros(max_batch_size, dtype=torch.long, device='cuda')
    
    def update(self, batch_idx, new_k, new_v):
        """更新指定 batch 的 KV cache"""
        start_pos = self.seq_lens[batch_idx].item()
        seq_len = new_k.shape[1]
        
        self.k_cache[batch_idx, :, start_pos:start_pos + seq_len] = new_k
        self.v_cache[batch_idx, :, start_pos:start_pos + seq_len] = new_v
        self.seq_lens[batch_idx] += seq_len
    
    def get_kv(self, batch_idx):
        """获取指定 batch 的完整 KV cache"""
        seq_len = self.seq_lens[batch_idx].item()
        return (
            self.k_cache[batch_idx, :, :seq_len],
            self.v_cache[batch_idx, :, :seq_len]
        )

KV Cache 的显存瓶颈

KV Cache 的显存占用是推理优化的核心瓶颈。以 Llama-2-70B 为例:

参数规模分析

  • 模型参数量:700 亿
  • KV Cache 维度:[batch_size, n_heads, seq_len, head_dim]
  • 假设 batch_size=1, n_heads=8, seq_len=4096, head_dim=128

单 token 显存计算

对于 FP16(2 bytes),单个 token 的 KV Cache 约为:2 × 8 × 128 × 2 = 4KB

序列总显存

  • 4096 tokens:约 16MB
  • 32768 tokens(100K context):约 128MB

这还只是 70B 模型的单序列情况。当服务多个并发请求时,KV Cache 显存占用会线性增长,迅速成为系统瓶颈。

PagedAttention 与虚拟显存管理

vLLM 提出的 PagedAttention 是 KV Cache 管理的重要突破。传统方法需要预先分配连续显存块,但实际使用中序列长度是动态变化的,导致显存碎片化严重。

PagedAttention 的核心思想是借鉴操作系统的分页内存管理机制,将 KV Cache 划分为固定大小的「页」(通常为 16 tokens),通过页表管理逻辑地址到物理地址的映射:

class PagedAttention:
    def __init__(self, block_size=16, num_blocks=1024):
        self.block_size = block_size
        self.num_blocks = num_blocks
        
        # 物理块池
        self.num_allocated_blocks = 0
        
        # 块表:batch_idx -> [物理块ID列表]
        self.block_tables = {}
    
    def alloc(self, batch_idx, num_blocks):
        """为请求分配物理块"""
        self.block_tables[batch_idx] = []
        for _ in range(num_blocks):
            block_id = self.num_allocated_blocks
            self.num_allocated_blocks += 1
            self.block_tables[batch_idx].append(block_id)
    
    def update(self, batch_idx, new_k, new_v, block_offset):
        """更新新 token 的 KV 数据到指定块"""
        block_id = self.block_tables[batch_idx][block_offset]
        self._write_to_block(block_id, new_k, new_v)
    
    def attention(self, query, batch_idx):
        """在分页模式下执行注意力计算"""
        # 获取物理块列表
        block_ids = self.block_tables[batch_idx]
        
        # 收集所有物理块中的 KV 数据
        k_blocks = [self.get_block(i) for i in block_ids]
        v_blocks = [self.get_block(i, is_value=True) for i in block_ids]
        
        # 调用 CUDA kernel 执行分页注意力
        return paged_attention_kernel(
            query, k_blocks, v_blocks, self.block_size
        )

这种设计的优势在于:

  1. 消除显存碎片:按需分配,避免预分配的浪费
  2. 支持灵活的上下文扩展:通过追加新块而非重新分配来扩展序列
  3. 共享前缀优化:多个请求共享相同前缀时,可以共享物理块

投机解码技术

投机解码的核心原理

投机解码(Speculative Decoding)由 Leviathan 等人在 2023 年提出,核心思想是利用「小模型多步预测 + 大模型验证」的范式来加速自回归解码。

传统自回归解码的瓶颈在于:每生成一个 token 都必须等待完整的大模型前向传播。投机解码通过引入一个轻量级的「草稿模型」(Draft Model)来预测多个候选 tokens,然后由大模型并行验证这些预测。如果预测正确,就「免费」获得多个 tokens 的生成;如果预测错误,大模型会进行修正并继续。

class SpeculativeDecoder:
    def __init__(self, main_model, draft_model, gamma=4, temperature=1.0):
        """
        gamma: 每个 iteration 草稿模型预测的 token 数
        """
        self.main_model = main_model
        self.draft_model = draft_model
        self.gamma = gamma
        self.temperature = temperature
    
    def decode(self, input_ids, max_new_tokens):
        """投机解码主循环"""
        generated = input_ids.clone()
        
        while len(generated[0]) - len(input_ids[0]) < max_new_tokens:
            # 1. 草稿模型生成 gamma 个候选 tokens
            draft_tokens = self._draft(input_ids, generated, self.gamma)
            
            # 2. 主模型并行验证所有候选
            accepted, accepted_probs = self._verify(
                input_ids, generated, draft_tokens
            )
            
            # 3. 接受正确的 tokens
            generated = torch.cat([generated, accepted], dim=-1)
            
            # 4. 如果所有预测都被拒绝,从主模型采样一个 token
            if len(accepted) == 0:
                next_token = self._sample_next(generated)
                generated = torch.cat([generated, next_token], dim=-1)
        
        return generated
    
    def _draft(self, input_ids, generated, gamma):
        """草稿模型生成候选 tokens"""
        # 获取当前上下文
        context = generated
        
        draft_tokens = []
        for _ in range(gamma):
            with torch.no_grad():
                outputs = self.draft_model(context)
                next_token = torch.argmax(
                    outputs.logits[:, -1, :] / self.temperature, dim=-1
                )
            draft_tokens.append(next_token)
            context = torch.cat([context, next_token], dim=-1)
        
        return torch.stack(draft_tokens, dim=1)  # [batch, gamma]
    
    def _verify(self, input_ids, generated, draft_tokens):
        """主模型验证候选 tokens"""
        # 构造完整序列:原始 + 已有 + 候选
        extended = torch.cat([generated, draft_tokens], dim=-1)
        
        with torch.no_grad():
            outputs = self.main_model(extended)
        
        # 主模型的 logits
        main_logits = outputs.logits[:, generated.shape[1]:]  # [batch, gamma, vocab]
        main_probs = F.softmax(main_logits / self.temperature, dim=-1)
        
        # 草稿模型的 logits
        draft_probs = self._get_draft_probs(generated, draft_tokens)
        
        # 贪婪验证:主模型在对应位置的预测与草稿是否一致
        accepted = []
        accepted_probs = []
        
        for i in range(draft_tokens.shape[1]):
            draft_token = draft_tokens[0, i].item()
            main_prob = main_probs[0, i, draft_token].item()
            draft_prob = draft_probs[0, i, draft_token].item()
            
            # 如果主模型给这个 token 高概率,接受它
            # 否则,拒绝并终止
            if main_prob >= draft_prob:
                accepted.append(draft_token)
                accepted_probs.append(main_prob)
            else:
                break
        
        return torch.tensor([accepted]), torch.tensor([accepted_probs])

自投机解码与树状解码

标准的投机解码需要两个模型(主模型 + 草稿模型),这对部署成本提出了挑战。自投机解码(Self-Speculative Decoding)通过同一个模型的不同计算路径来实现:主模型生成预测 token 的同时,计算一个简化的「轻量头」来生成草稿。

更高级的变体是树状投机解码(Tree Speculative Decoding):不再让草稿模型串行生成,而是生成一个 token 树,然后由主模型并行验证所有分支:

class TreeSpeculativeDecoder:
    def __init__(self, main_model, draft_model, max_depth=4, branch_factor=2):
        self.main_model = main_model
        self.draft_model = draft_model
        self.max_depth = max_depth
        self.branch_factor = branch_factor
    
    def decode(self, input_ids, max_new_tokens):
        generated = input_ids.clone()
        
        while len(generated[0]) - len(input_ids[0]) < max_new_tokens:
            # 生成 token 树
            tree = self._build_tree(generated)
            
            # 并行验证所有节点
            verified = self._verify_tree(tree, generated)
            
            # 选择验证通过的路径继续
            best_path = self._select_path(verified, generated)
            generated = best_path
            
            # 如果没有路径被验证,从主模型采样
            if len(verified) == 0:
                next_token = self._sample_next(generated)
                generated = torch.cat([generated, next_token], dim=-1)
        
        return generated
    
    def _build_tree(self, context):
        """构建投机 token 树"""
        # 简化实现:生成固定结构的树
        tree = {
            'nodes': [],  # [(parent_idx, token, prob)]
            'depth': 0
        }
        
        # 根节点
        tree['nodes'].append((-1, context[0, -1].item(), 1.0))
        
        current_level = [0]
        for depth in range(self.max_depth):
            next_level = []
            for parent_idx in current_level:
                # 采样分支
                branches = self._sample_branches(
                    context, tree['nodes'][parent_idx][1], 
                    self.branch_factor
                )
                for token, prob in branches:
                    node_idx = len(tree['nodes'])
                    tree['nodes'].append((parent_idx, token, prob))
                    next_level.append(node_idx)
            current_level = next_level
        
        tree['depth'] = self.max_depth
        return tree

Batching 策略优化

静态批处理的问题

传统的 Static Batching 策略要求批次中所有请求使用相同的输入长度,并且所有请求必须等待最慢的请求完成后才能返回结果。这种策略在请求长度差异大时效率极低:短请求需要等待长请求完成,造成计算资源浪费。

# 静态批处理的低效示例
def static_batching_inefficient(requests):
    """
    传统批处理:必须等待所有请求完成
    """
    # 对齐到批次最大长度(浪费计算)
    max_len = max(req['input_len'] for req in requests)
    
    # 处理批次
    results = []
    while requests:
        batch = requests[:batch_size]
        
        # 填充到 batch 最大长度
        padded_batch = pad_sequence(batch, max_len)
        
        # 前向传播
        outputs = model(padded_batch)
        
        # 等待所有请求完成(长请求拖慢短请求)
        results.extend(outputs)
        
        # 移除已处理的请求
        requests = requests[batch_size:]
    
    return results

Continuous Batching 原理

Continuous Batching(又称 Iteration-level Scheduling 或 Dynamic Batching)是目前主流的批处理策略。其核心思想是将请求的处理粒度从「批次级别」降低到「迭代级别」:每完成一个 token 的生成,就立即释放已完成请求的资源,调度新的请求进入批次。

class ContinuousBatcher:
    def __init__(self, model, max_batch_size, max_seq_len):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_seq_len = max_seq_len
        
        # 活跃请求
        self.active_requests = {}
        self.request_counter = 0
    
    def add_request(self, input_ids, request_id=None):
        """添加新请求"""
        if request_id is None:
            request_id = self.request_counter
            self.request_counter += 1
        
        self.active_requests[request_id] = {
            'input_ids': input_ids,
            'output_ids': input_ids.clone(),
            'finished': False,
            'finish_reason': None
        }
        
        return request_id
    
    def step(self):
        """执行一步推理(生成一个 token)"""
        # 收集活跃请求
        active_reqs = {
            rid: req for rid, req in self.active_requests.items()
            if not req['finished']
        }
        
        if not active_reqs:
            return []
        
        # 构造批次
        batch_input_ids = [req['output_ids'] for req in active_reqs.values()]
        
        # 前向传播
        outputs = self.model.forward(batch_input_ids)
        
        # 采样下一个 token
        next_tokens = self._sample(outputs)
        
        # 更新每个请求并检查是否完成
        finished = []
        for i, (rid, req) in enumerate(active_reqs.items()):
            next_token = next_tokens[i:i+1]
            req['output_ids'] = torch.cat([req['output_ids'], next_token], dim=-1)
            
            # 检查终止条件
            if self._check_done(req, next_token):
                req['finished'] = True
                req['finish_reason'] = self._get_finish_reason(next_token)
                finished.append(rid)
        
        return finished
    
    def run(self):
        """持续运行直到所有请求完成"""
        while self.active_requests:
            # 执行推理步骤
            finished = self.step()
            
            # 释放完成的请求
            for rid in finished:
                result = self.active_requests.pop(rid)
                yield result

Preemption 与 Eviction 策略

当 GPU 显存不足以容纳所有活跃请求时,需要进行请求抢占(Preemption)或 KV Cache 驱逐(Eviction)。

抢占策略:暂停低优先级或最长等待的请求,释放其显存资源。抢占的请求会被重新调度,可能导致延迟增加。

KV Cache 驱逐:选择性释放部分请求的 KV Cache。常见策略包括:

  • 部分驱逐:保留最近生成的 tokens,释放早期内容
  • 压缩:将 KV Cache 压缩到更少的位置
  • 重组:在请求之间移动 KV Cache 以提高局部性
class MemoryAwareScheduler:
    def __init__(self, model, max_memory_gb):
        self.model = model
        self.max_memory_gb = max_memory_gb
        self.allocated_memory = 0
    
    def can_accept(self, seq_len, batch_size):
        """检查是否能接受新请求"""
        required = self._estimate_memory(seq_len, batch_size)
        return self.allocated_memory + required <= self.max_memory_gb
    
    def preempt_if_needed(self, requests):
        """如果显存不足,执行抢占"""
        while not self.can_accept(self._avg_seq_len(requests), len(requests)):
            # 选择要抢占的请求:优先选择最长等待或最低优先级的
            victim = self._select_victim(requests)
            
            # 保存状态以便恢复
            self._save_checkpoint(victim)
            
            # 释放显存
            self._free_memory(victim)
            
            requests.remove(victim)
        
        return requests
    
    def _select_victim(self, requests):
        """选择抢占目标"""
        # 综合考虑:等待时间、优先级、已生成长度
        scores = []
        for req in requests:
            wait_time = time.time() - req['arrival_time']
            priority = req.get('priority', 1.0)
            generated_len = len(req['output_ids']) - len(req['input_ids'])
            
            # 分数 = 等待时间 / (优先级 * 已生成长度)
            # 分数越高越应该被抢占
            score = wait_time / (priority * max(generated_len, 1))
            scores.append(score)
        
        return requests[np.argmax(scores)]

CUDA 图优化

CUDA Graph 的工作原理

CUDA Graph 是 NVIDIA 从 CUDA 10 开始引入的特性,旨在减少 GPU 操作的启动开销。在传统的 CUDA 编程中,每个 kernel(GPU 函数)都需要单独启动,存在 CPU 端的调度开销。当需要执行数千个小操作时,这种开销会累积成显著的性能损失。

CUDA Graph 的核心思想是「录制-回放」:将一系列 CUDA 操作录制为一个图(Graph),然后一次性提交执行。这避免了重复的 kernel 启动开销,并能启用更多的 GPU 优化。

import torch
import ctypes
 
# 检查 CUDA Graph 支持
torch.cuda.set_device(0)
assert torch.cuda.is_available()
 
# 检查 CUDA 版本
cuda_version = torch.version.cuda
print(f"CUDA Version: {cuda_version}")
 
# 创建 CUDA Graph
graph = torch.cuda.CUDAGraph()
 
# 创建占位张量(用于重放时输入)
static_input = torch.empty(1, dtype=torch.long, device='cuda')
static_output = torch.empty(1, dtype=torch.long, device='cuda')
 
# 录制推理过程
with torch.cuda.graph(graph):
    # 这里放置要录制的操作
    # 每次调用 model 时会记录操作到图中
    logits = model(static_input)
    static_output.copy_(logits)
 
# 重放(高速)
for _ in range(100):
    static_input.copy_(input_tensor)
    graph.replay()
    result = static_output.clone()

LLM 推理中的 CUDA Graph 优化

在 LLM 推理中,KV Cache 的更新、注意力计算、MLP 前向传播等操作天然适合用 CUDA Graph 优化。由于 Decode 阶段每个 token 的计算图结构相同(只是数据不同),使用 CUDA Graph 可以显著减少调度开销。

class CudaGraphOptimizedInference:
    def __init__(self, model, max_batch_size, max_seq_len):
        self.model = model
        self.model.eval()
        
        # 预热:触发 CUDA JIT 编译
        self._warmup()
        
        # 创建静态张量用于图录制
        self.static_input_ids = torch.zeros(
            max_batch_size, dtype=torch.long, device='cuda'
        )
        self.static_positions = torch.zeros(
            max_batch_size, dtype=torch.long, device='cuda'
        )
        
        # 创建 KV Cache 相关的静态张量
        self._create_static_kv_cache()
        
        # 录制 CUDA Graph
        self._record_graph()
    
    def _warmup(self):
        """预热:触发 CUDA kernel 的 JIT 编译"""
        dummy_input = torch.zeros(1, 10, dtype=torch.long, device='cuda')
        for _ in range(10):
            with torch.no_grad():
                self.model(dummy_input)
        torch.cuda.synchronize()
    
    def _record_graph(self):
        """录制推理图"""
        self.graph = torch.cuda.CUDAGraph()
        
        with torch.cuda.graph(self.graph):
            # 这里定义推理的计算图
            self.outputs = self.model(
                input_ids=self.static_input_ids,
                position_ids=self.static_positions,
                use_cache=True
            )
    
    def step(self, input_ids, batch_size=1):
        """执行一步推理(使用录制的图)"""
        # 复制输入到静态张量
        self.static_input_ids[:batch_size].copy_(input_ids)
        
        # 回放图
        self.graph.replay()
        
        # 提取输出
        return self.outputs[0][:batch_size]

Flash Attention 与算子融合

Flash Attention 是由 Tri Dao 等人提出的 IO 感知注意力实现,通过分块计算和融合 kernel 显著提升注意力计算的效率。相比标准注意力实现,Flash Attention 可以将计算速度提升 2-4 倍,同时将显存占用从 降低到

from flash_attn import flash_attn_func
 
# 标准注意力(显存 O(N^2))
def standard_attention(Q, K, V):
    scores = torch.matmul(Q, K.transpose(-2, -1))
    scores = F.softmax(scores, dim=-1)
    output = torch.matmul(scores, V)
    return output
 
# Flash Attention(显存 O(N))
def flash_attention(Q, K, V):
    # Flash Attention 自动处理分块和显存优化
    output = flash_attn_func(
        Q, K, V,
        causal=True  # 自回归需要 causal mask
    )
    return output
 
# 使用 Flash Attention 2(最新版本,更快)
from flash_attn import flash_attn_with_kvcache
 
def decode_with_flash_attn(q, k_cache, v_cache, positions):
    """
    解码阶段的注意力计算
    q: [batch, 1, heads, dim] 当前 token 的 query
    k_cache, v_cache: [batch, heads, max_seq, dim] 缓存的 KV
    positions: [batch, 1] 当前 token 的位置
    """
    # 从缓存中获取历史 KV
    k = k_cache[:, :, :positions.max().item()]
    v = v_cache[:, :, :positions.max().item()]
    
    # Flash Attention
    output = flash_attn_with_kvcache(
        q, k, v,
        cache_k=k_cache,
        cache_v=v_cache,
        causal=True
    )
    
    return output

推理服务框架对比

vLLM 架构与特性

vLLM 是由 UC Berkeley LMSYS 开发的开源推理服务框架,其核心创新是 PagedAttention 技术。vLLM 已成为目前最流行的 LLM 推理框架之一。

核心特性

  • PagedAttention 实现 KV Cache 的虚拟显存管理
  • 支持 CUDA Graph 优化
  • 连续批处理(Continuous Batching)
  • 支持 Tensor Parallelism 多卡并行
  • 支持 GPTQ、AWQ 等量化方法
from vllm import LLM, SamplingParams
 
# 初始化 vLLM 引擎
llm = LLM(
    model="meta-llama/Llama-2-70B",
    tensor_parallel_size=4,  # 4 卡并行
    quantization="gptq",     # GPTQ 量化
    max_model_len=4096,
    gpu_memory_utilization=0.9
)
 
# 定义采样参数
sampling_params = SamplingParams(
    temperature=0.8,
    top_p=0.95,
    max_tokens=256
)
 
# 批量推理
prompts = [
    "请介绍一下大语言模型的基本原理。",
    "什么是 Transformer 架构?",
    "解释一下注意力机制的数学原理。"
]
 
outputs = llm.generate(prompts, sampling_params)
 
for output in outputs:
    print(f"Prompt: {output.prompt}")
    print(f"Generated: {output.outputs[0].text}")
    print("---")

TensorRT-LLM 深度优化

TensorRT-LLM 是 NVIDIA 官方提供的 LLM 推理优化框架,提供最深度的性能优化,但需要针对特定模型进行编译。

核心特性

  • FP8、FP16、INT8 多精度支持
  • In-flight Batching
  • Tensor Parallelism(支持 AllReduce 优化)
  • 高度优化的 CUDA kernels
  • 动态 batch 和调度
# TensorRT-LLM 构建引擎
trtllm-build \
    --model_dir /path/to/hf/model \
    --output_dir /path/to/engine \
    --tp_size 8 \
    --precision fp16 \
    --max_batch_size 256 \
    --max_input_len 4096 \
    --max_output_len 1024
 
# TensorRT-LLM 服务
trtllm-serve /path/to/engine \
    --http_port 8000 \
    --tokenizer /path/to/tokenizer

框架对比与选型建议

特性vLLMTensorRT-LLMTGIllama.cpp
性能极高中等
易用性中等
量化支持GPTQ/AWQINT8/FP8bitsandbytesGGUF
部署难度
硬件要求NVIDIANVIDIA H100+NVIDIA任意
适用场景通用部署极致性能HuggingFace 模型本地/边缘

Note

选型建议:对于大多数生产环境,vLLM 是最佳选择——性能优秀、易于部署、社区活跃。如果需要极致性能且有专业团队支持,TensorRT-LLM 是首选。对于需要在 CPU 或消费级 GPU 上运行的场景,llama.cpp + GGUF 是唯一可行的选择。


端到端优化实践

延迟-吞吐量权衡

推理优化需要在延迟(Latency)和吞吐量(Throughput)之间权衡:

低延迟场景(如交互式对话):优先优化 TTFT(Time to First Token)和单轮响应时间,适合小 batch、跳过 prefill 优化。

高吞吐量场景(如批量内容生成):优先优化 overall throughput,适合大 batch、充分预热。

class LatencyThroughputOptimizer:
    def __init__(self, model, target_mode='balanced'):
        self.model = model
        self.target_mode = target_mode
    
    def optimize_config(self):
        if self.target_mode == 'low_latency':
            return {
                'batch_size': 1,
                'prefill_chunk_size': 512,
                'use_cuda_graph': True,
                'kv_cache_dtype': 'fp16'
            }
        elif self.target_mode == 'high_throughput':
            return {
                'batch_size': 64,
                'prefill_chunk_size': 2048,
                'use_cuda_graph': True,
                'kv_cache_dtype': 'fp8'
            }
        else:  # balanced
            return {
                'batch_size': 16,
                'prefill_chunk_size': 1024,
                'use_cuda_graph': True,
                'kv_cache_dtype': 'fp16'
            }

混合精度与量化策略

合理的精度配置可以在性能和质量之间取得平衡:

from vllm import LLM, ModelProxy
 
def create_optimized_llm(model_name, mode='production'):
    if mode == 'development':
        # 开发模式:快速、低精度
        return LLM(
            model=model_name,
            dtype='float16',
            max_model_len=2048,
            gpu_memory_utilization=0.5
        )
    elif mode == 'production':
        # 生产模式:平衡性能和精度
        return LLM(
            model=model_name,
            dtype='auto',  # 自动选择最优精度
            max_model_len=8192,
            gpu_memory_utilization=0.9,
            quantization='fp8' if 'H100' in torch.cuda.get_device_name() else None,
            tensor_parallel_size=torch.cuda.device_count()
        )
    else:  # high_quality
        # 高质量模式:最大精度
        return LLM(
            model=model_name,
            dtype='bfloat16',
            max_model_len=16384,
            gpu_memory_utilization=0.85,
            tensor_parallel_size=torch.cuda.device_count()
        )

相关资源


本文档由 AI 辅助生成,内容经过严格审核