关键词

差分进化DE/rand/1DE/best/1变异操作交叉操作
F缩放因子CR交叉率二项式交叉指数交叉选择操作
约束处理自适应参数边界约束罚函数成功历史
jDESaDESHADEL-SHADE多目标DE

摘要

差分进化(Differential Evolution, DE)是由Rainer Storn和Kenneth Price于1995年提出的高效全局优化算法,专门用于解决实数参数空间的连续优化问题。DE以其简洁的算法结构、较少的控制参数和卓越的优化性能著称,在多个基准测试和实际应用中获得优异表现。与传统遗传算法的随机变异不同,DE采用基于差分向量的变异策略,使其在局部搜索和全局探索之间实现了良好的平衡。


一、差分进化入门:为什么DE比GA更适合连续优化?

1.1 从一个实际问题说起

假设你在优化一个投资组合,要找到一组合适的权重分配来最大化收益同时最小化风险。这个问题的决策变量是连续变化的——每个资产的权重可以是从0到1之间的任意实数。

用梯度下降?对不起,这个问题的目标函数可能根本不可导,或者有无数个局部最优解。

用遗传算法(GA)?可以,但GA在连续空间的表现总是差那么点意思。GA习惯用二进制编码来处理变量,这就意味着精度和编码长度永远是一对矛盾。而且GA的变异操作往往是加个随机噪声,方向性很差。

这时候,差分进化就登场了。

1.2 DE的核心创新

DE是专门为连续实数优化量身定做的。它的核心思想说起来很简单:

用种群中个体之间的差异(差分向量)来引导搜索方向

这就好比你在一片山区找最低点,GA是蒙着眼睛随便走,而DE是你能感知到坡度的方向——虽然不是真正的梯度,但比随机乱走强多了。

为什么这招好用?因为差分向量本质上编码了搜索空间中的局部结构信息。如果两个个体在某个维度上差异很大,说明这个区域比较”陡峭”,差分向量会引导搜索向更平坦的方向走。反之,如果两个个体很接近,说明这里可能接近局部最优点,需要更大的扰动来跳出陷阱。

1.3 DE vs GA:到底有什么区别?

很多人学完GA再学DE,觉得这俩不是差不多吗?都是种群进化,都有变异交叉选择。但实际上,DE和GA的底层逻辑有本质区别:

特征遗传算法(GA)差分进化(DE)
编码方式二进制/实数编码纯实数向量
变异机制随机扰动(如高斯变异)差分向量驱动
搜索方向几乎没有方向性沿差分向量方向
交叉作用混合父代信息决定试验向量组成
参数数量变异率、交叉率等主要就F和CR两个
收敛速度较慢通常更快
连续优化一般非常好

最关键的区别在于:GA的变异是”盲目的”,而DE的变异是”有方向的”

GA的变异一般是 x_new = x + mutation_rate * random_noise,这个random_noise跟当前解的质量毫无关系。而DE的变异是 v = x_r1 + F * (x_r2 - x_r3),差分向量 (x_r2 - x_r3) 本身携带着种群分布的信息。

想象一下,如果你的种群已经聚集在全局最优点附近,那么个体之间的差异自然就小,差分向量也会变小,搜索步长自动收缩。这就是DE的自适应特性,GA是做不到这一点的。

1.4 DE适合解决什么问题?

DE最擅长的场景:

  1. 连续变量的全局优化:变量是实数,且定义域是连续的
  2. 目标函数不可导:没有梯度信息可用
  3. 多局部最优:需要避免陷入局部最优
  4. 中等维度问题:一般来说D在10-100维度表现较好,极高维(比如10000维)会有挑战

DE不太擅长的场景:

  1. 离散优化问题:组合优化问题,比如TSP、调度问题(虽然可以用DE硬解,但专门算法更好)
  2. 极高维问题:需要配合降维技术使用
  3. 实时性要求极高:DE需要多代迭代,不像梯度下降几步就到

二、DE的核心思想:用差分向量引导搜索方向

2.1 差分向量是什么?

先看DE的变异公式:

这里 就是差分向量

从几何上看,差分向量 表示从 指向 的方向和距离。这个向量包含了两个信息:

  1. 方向:种群中另一个个体相对于当前基向量的位置关系
  2. 距离:两个个体的差异程度

加上缩放因子F之后,这个差分向量就变成了一个”步进方向”。F越大,步进距离越大,搜索范围越广;F越小,搜索越精细。

2.2 为什么要随机选三个个体?

这是DE的一个精妙设计。选择 三个互不相同的随机个体,有几个好处:

多样性保证:如果固定用某些个体,搜索方向就固定了,容易陷入局部最优。随机选择保证每一代都有新的探索方向。

自适应步长:当种群收敛时,所有个体都挤在一起, 自然变小,搜索步长自动减小。反之,种群分散时步长变大。

避免近亲繁殖:要求 ,确保参与变异的都是不同个体,防止”近亲繁殖”导致早熟。

2.3 缩放因子F的直观理解

F是DE最重要的控制参数,它决定了差分向量的影响力。

把F想成一个”探索半径调节器”:

  • F很大(接近1或更大):大步探索,适合全局搜索,能跳过大坑,但可能跳过最优区域
  • F很小(0.2-0.3):小步精修,适合局部搜索,能精细调整,但容易陷入局部最优
  • F适中(0.5-0.7):平衡探索和开发

一个直观的比喻:F就像你走路时迈的步子大小。找路的时候大步流星,到门口了就得小碎步。

2.4 DE的”试错-学习”机制

DE的工作方式本质上是一种高效的”试错-学习”循环:

  1. 试错:通过变异和交叉产生新解(试验向量)
  2. 评估:计算新解的目标函数值
  3. 学习:如果新解更好,就用它替换原解;否则保留原解

关键是,这个”试错”不是瞎试。差分向量使得试验向量倾向于向种群中其他个体所在的方向移动,而种群本身是向好的方向演化的(因为差的个体会被淘汰)。所以,DE的试错有方向性,比完全随机搜索效率高得多。


三、DE的变异策略:rand/1/bin、best/1/bin、current-to-best等

3.1 变异策略的命名规则

DE的变异策略有个统一的命名格式:DE/x/y/z

  • x:基向量(base vector)——变异公式中的主心骨

    • rand:随机选择
    • best:当前最优个体
    • current-to-best:当前个体向最优个体移动的方向
    • target-to-best:目标个体向最优个体移动
  • y:差分向量的数量(1个还是2个)

    • 1:使用1个差分向量
    • 2:使用2个差分向量
  • z:交叉类型

    • binbinomial:二项式交叉
    • expexponential:指数交叉

所以 DE/rand/1/bin 的意思就是:用随机个体做基向量,用1个差分向量,做二项式交叉。

3.2 常见变异策略详解

DE/rand/1 —— 随机基向量策略

def mutate_rand_1(population, i, F):
    """
    随机选3个不同于i的个体做变异
    """
    NP = len(population)
    candidates = [j for j in range(NP) if j != i]
    r1, r2, r3 = random.sample(candidates, 3)
    
    mutant = population[r1] + F * (population[r2] - population[r3])
    return mutant

特点

  • 全局搜索能力强,不容易早熟
  • 收敛速度相对较慢
  • 最稳定、最常用的策略
  • 适合多峰问题(多个局部最优)

什么时候用

  • 问题有多达10个以上的局部最优
  • 不确定最优解的位置
  • 需要稳健的结果

DE/best/1 —— 最优基向量策略

def mutate_best_1(population, fitness, i, F):
    """
    用最优个体做基向量
    """
    NP = len(population)
    best_idx = np.argmin(fitness)
    
    candidates = [j for j in range(NP) if j != i and j != best_idx]
    r1, r2 = random.sample(candidates, 2)
    
    mutant = population[best_idx] + F * (population[r1] - population[r2])
    return mutant

特点

  • 收敛速度快,贪婪搜索
  • 容易早熟,陷入局部最优
  • 利用性很强,探索性较弱

什么时候用

  • 确认问题只有一个全局最优(或少数几个)
  • 问题维度较低
  • 需要快速得到结果

DE/best/2 和 DE/rand/2 —— 双差分向量策略

def mutate_rand_2(population, i, F):
    """
    使用两个差分向量,探索性更强
    """
    NP = len(population)
    candidates = [j for j in range(NP) if j != i]
    r1, r2, r3, r4, r5 = random.sample(candidates, 5)
    
    mutant = population[r1] + F * (population[r2] - population[r3]) + \
             F * (population[r4] - population[r5])
    return mutant

特点

  • 比单差分向量有更大的扰动
  • 适合高维复杂问题
  • 但也更容易不稳定

DE/current-to-best/1 —— 混合策略

def mutate_current_to_best(population, fitness, i, F):
    """
    当前个体向最优个体移动,加上随机扰动
    """
    NP = len(population)
    best_idx = np.argmin(fitness)
    
    candidates = [j for j in range(NP) if j != i]
    r1, r2 = random.sample(candidates, 2)
    
    mutant = population[i] + F * (population[best_idx] - population[i]) + \
             F * (population[r1] - population[r2])

特点

  • 平衡探索和利用
  • 利用了当前个体的信息
  • 比纯粹的best策略更稳健

3.3 策略选择指南

选策略是个经验活,下面给个参考:

问题类型推荐策略理由
新问题,不知道特点DE/rand/1/bin最稳健
单峰问题,快速收敛DE/best/1/bin收敛快
多峰问题,多个局部最优DE/rand/1/bin 或 DE/rand/2/bin探索性强
高维问题(D>50)DE/rand/1/bin简单有效
需要精细调优DE/current-to-best/1平衡性好

四、交叉操作:二项式交叉 vs 指数交叉

4.1 为什么要交叉?

DE的交叉操作是生成试验向量的关键步骤。变异产生了变异向量 ,但这个向量可能过于激进——它只是沿着一个方向走了很远。

交叉操作的作用是混合目标向量 和变异向量 ,生成一个更合理的试验向量 。这个过程有点像”遗传”,把父代的特征(目标向量)和变异者的特征(变异向量)结合起来。

4.2 二项式交叉(Binomial Crossover)

这是DE最常用的交叉方式,也叫”独立交叉”。

def binomial_crossover(target, mutant, CR, bounds):
    """
    每个维度独立决定是选父代还是变异向量
    CR:交叉概率
    """
    D = len(target)
    trial = np.zeros(D)
    
    # 强制保证至少有一个维度来自变异向量
    # 否则试验向量就等于目标向量了
    j_rand = random.randint(0, D-1)
    
    for j in range(D):
        if random.random() < CR or j == j_rand:
            trial[j] = mutant[j]
        else:
            trial[j] = target[j]
    
    # 边界处理
    trial = np.clip(trial, 
                    [b[0] for b in bounds], 
                    [b[1] for b in bounds])
    
    return trial

直观理解

  • CR=0.9意味着每个维度有90%的概率采用变异向量的值
  • CR=0.1意味着大多数维度保留父代的值
  • j_rand的强制要求确保试验向量确实和父代不同

搜索特性

  • 交叉后各维度之间没有关联
  • 探索性比较均匀
  • 适合大多数问题

4.3 指数交叉(Exponential Crossover)

也叫”连续交叉”,它不是独立处理每个维度,而是一次性复制一段连续维度。

def exponential_crossover(target, mutant, CR, bounds):
    """
    连续一段来自变异向量
    """
    D = len(target)
    trial = target.copy()
    
    # 随机选择起始位置
    n = random.randint(0, D-1)
    L = 0
    
    # 确定连续段长度,按CR概率继续扩展
    while random.random() < CR and L < D:
        n = (n + 1) % D
        L += 1
    
    # 复制连续段
    start = (n - L + 1) % D if L > 0 else n
    for offset in range(L + 1):
        idx = (start + offset) % D
        trial[idx] = mutant[idx]
    
    # 边界处理
    trial = np.clip(trial, 
                    [b[0] for b in bounds], 
                    [b[1] for b in bounds])
    
    return trial

直观理解

  • 从随机位置开始,连续几个维度从变异向量复制
  • 复制的长度由CR控制,CR越大,连续段越长
  • 这就像把一段染色体从变异向量复制过来

搜索特性

  • 保持了一些维度之间的关联性
  • 更偏向局部搜索
  • 适合低维问题

4.4 交叉策略选择

特征二项式交叉指数交叉
维度关系独立连续
探索性均匀偏向局部
CR敏感性中等较高
适合维度任意低维(D<20)
推荐策略DE/rand/1/binDE/rand/1/exp

经验建议

  • 新问题先用二项式交叉(DE/…/bin)
  • 二项式交叉的CR通常设在0.3-0.9之间效果较好
  • CR太高(>0.95)可能导致搜索不稳定
  • CR太低(<0.1)会让变异几乎不起作用

五、选择操作:贪婪选择与精英保留

5.1 一对一竞争选择

DE的选择机制非常简单粗暴——胜者为王

def select(target, trial, fitness_target, fitness_trial):
    """
    一对一贪婪选择
    fitness_trial更小意味着目标函数值更好(假设最小化问题)
    """
    if fitness_trial <= fitness_target:
        return trial, fitness_trial
    else:
        return target, fitness_target

数学表达

关键点

  • 一对一:每个目标向量只和它自己的试验向量比较,不会和其他个体的试验向量比较
  • 贪婪:只选择更好的解,不会保留”潜力股”
  • 无精英保留:没有额外的机制来保存历史最优(但算法会单独记录best_fitness)

5.2 为什么不用更大的选择压力?

GA中常见的选择方式有轮盘赌、锦标赛等,它们的选择压力是可调的。

DE为什么用这么简单的贪婪选择?

因为DE的变异已经足够有效了。变异+交叉产生的试验向量,已经是一种”有方向的探索”。不需要再用复杂的选择机制来增强选择压力。

如果再加上强选择压力(比如只保留前10%的个体),会让种群多样性迅速丧失,早熟收敛。

DE的设计哲学是:让变异做探索,让选择做筛选,简化其他环节

5.3 边界处理

变异和交叉可能产生超出边界的解,DE用一种简单粗暴的方式处理——截断

def clip_to_bounds(x, bounds):
    """
    边界截断处理
    """
    clipped = np.array(x)
    for j in range(len(x)):
        low, high = bounds[j]
        clipped[j] = max(low, min(high, x[j]))
    return clipped

为什么用截断而不是反射或周期复制

  1. 简单,实现容易
  2. 对于大多数优化问题,边界本身就是硬约束
  3. 截断后的解仍然在可行域内

六、控制参数:缩放因子F和交叉概率CR的经验设置

6.1 缩放因子F的调节

F控制差分向量的缩放程度,直接影响算法的”步子大小”。

F值步子大小特性适用场景
F < 0.3很小精细搜索,局部开发微调已找到的好解
F ≈ 0.5适中平衡探索和开发通用场景,默认值
F ∈ [0.6, 0.8]较大全局探索多峰问题,逃逸局部最优
F > 0.9很大激进探索稀疏搜索,但容易跳过头

实战经验

  • 初次尝试用F=0.5
  • 如果容易早熟,增大F到0.7-0.9
  • 如果收敛太慢,减小F到0.3-0.5
  • 切忌F=1.0,这会让差分向量的缩放失效

6.2 交叉概率CR的调节

CR控制试验向量中有多少比例来自变异向量。

CR值变异向量占比特性适用场景
CR < 0.3很少接近随机搜索高维复杂问题
CR ≈ 0.5一半左右平衡通用场景
CR > 0.8大多数贪婪搜索,快速收敛单峰问题,简单问题

实战经验

  • 高维问题(>30)建议用较低的CR(0.3-0.5)
  • 低维问题可以用较高的CR(0.7-0.9)
  • CR=1.0是不允许的(会导致没有交叉)
  • 如果发现收敛太快而找不到好解,降低CR

6.3 F和CR的协同调整

F和CR不是独立的,它们需要配合使用:

问题场景FCR策略理由
通用/默认0.5-0.70.8-0.9DE/rand/1/bin最稳健配置
多峰/复杂0.7-0.90.3-0.5DE/rand/2/bin增强探索
单峰/简单0.3-0.50.8-0.95DE/best/1/bin快速收敛
高维(D>50)0.5-0.60.4-0.6DE/rand/1/bin平衡配置

一个直观的理解

  • 高F + 低CR = 大步探索 + 谨慎继承 = 广撒网
  • 低F + 高CR = 小步精修 + 大胆继承 = 深度挖掘

6.4 种群规模NP的设置

NP是DE的第三个控制参数,虽然影响不如F和CR那么大,但也不能随便设。

经验公式

实战建议

  • 默认值:NP = 10 × D(但至少20)
  • 简单问题:NP = 5 × D
  • 复杂多峰问题:NP = 15-20 × D
  • 维度很高时,适当增大NP,但不要超过200

为什么不是越大越好

  • NP太大会增加计算量
  • 种群太大反而会稀释.selection pressure,导致收敛变慢
  • 关键是种群要能覆盖搜索空间

七、从零实现差分进化:Python完整代码

7.1 最简版本:30行代码理解DE

先来个最简单、最核心的实现,理解DE的工作流程:

import numpy as np
import random
 
def sphere(x):
    """球形函数,最小值在原点,值为0"""
    return np.sum(x**2)
 
def differential_evolution(func, bounds, NP=50, F=0.5, CR=0.9, max_iter=1000, tol=1e-8):
    """
    差分进化算法最简实现
    
    参数:
        func: 目标函数(最小化)
        bounds: 边界列表 [(low1, high1), ..., (lowD, highD)]
        NP: 种群规模
        F: 缩放因子
        CR: 交叉概率
        max_iter: 最大迭代次数
        tol: 早停阈值
    """
    D = len(bounds)
    
    # Step 1: 初始化种群
    population = np.array([
        [random.uniform(low, high) for low, high in bounds]
        for _ in range(NP)
    ])
    
    # 评估初始种群
    fitness = np.array([func(x) for x in population])
    
    # 记录最优
    best_idx = np.argmin(fitness)
    best_fitness = fitness[best_idx]
    best_solution = population[best_idx].copy()
    
    # Step 2: 进化主循环
    for gen in range(max_iter):
        for i in range(NP):
            # ===== 变异 =====
            # 随机选3个不同于i的个体
            candidates = [j for j in range(NP) if j != i]
            r1, r2, r3 = random.sample(candidates, 3)
            
            # 变异向量
            mutant = population[r1] + F * (population[r2] - population[r3])
            
            # ===== 交叉 =====
            trial = np.zeros(D)
            j_rand = random.randint(0, D - 1)  # 强制保留的维度
            
            for j in range(D):
                if random.random() < CR or j == j_rand:
                    trial[j] = mutant[j]
                else:
                    trial[j] = population[i][j]
            
            # 边界处理
            for j in range(D):
                low, high = bounds[j]
                trial[j] = max(low, min(high, trial[j]))
            
            # ===== 选择 =====
            fitness_trial = func(trial)
            
            if fitness_trial <= fitness[i]:
                population[i] = trial
                fitness[i] = fitness_trial
                
                # 更新全局最优
                if fitness_trial < best_fitness:
                    best_fitness = fitness_trial
                    best_solution = trial.copy()
        
        # 打印进度
        if gen % 100 == 0:
            print(f"Generation {gen}: Best fitness = {best_fitness:.8f}")
        
        # 早停
        if best_fitness < tol:
            print(f"Converged at generation {gen}")
            break
    
    return best_solution, best_fitness
 
 
# ===== 使用示例 =====
if __name__ == "__main__":
    # 30维球形函数
    bounds = [(-5.12, 5.12)] * 30
    
    solution, fitness = differential_evolution(
        sphere, 
        bounds,
        NP=100,
        F=0.5,
        CR=0.9,
        max_iter=2000
    )
    
    print(f"\nFinal result:")
    print(f"Fitness: {fitness:.10f}")
    print(f"Solution: {solution[:5]}... (showing first 5 dimensions)")

运行这个代码,你会看到适应度值逐渐下降,最终收敛到一个很小的值(接近0)。

7.2 面向对象版本:功能完整的DE类

下面是一个更完整的实现,带有可视化、参数记录等功能:

import numpy as np
import random
import matplotlib.pyplot as plt
from dataclasses import dataclass, field
from typing import Callable, List, Tuple, Optional
 
 
@dataclass
class DEConfig:
    """DE配置参数"""
    NP: int = 50           # 种群规模
    F: float = 0.5         # 缩放因子
    CR: float = 0.9        # 交叉概率
    max_iter: int = 1000   # 最大迭代次数
    tol: float = 1e-8      # 收敛阈值
    seed: Optional[int] = None  # 随机种子
    
    
@dataclass
class DEStats:
    """统计信息"""
    best_fitness_history: List[float] = field(default_factory=list)
    mean_fitness_history: List[float] = field(default_factory=list)
    worst_fitness_history: List[float] = field(default_factory=list)
    function_evaluations: int = 0
    
 
class DifferentialEvolution:
    """
    差分进化算法完整实现
    """
    
    # 支持的变异策略
    STRATEGIES = {
        'rand1': lambda self, i: self._mutate_rand1(i),
        'best1': lambda self, i: self._mutate_best1(i),
        'rand2': lambda self, i: self._mutate_rand2(i),
        'best2': lambda self, i: self._mutate_best2(i),
        'current2best': lambda self, i: self._mutate_current2best(i),
    }
    
    def __init__(
        self,
        objective: Callable,
        bounds: List[Tuple[float, float]],
        config: Optional[DEConfig] = None,
        strategy: str = 'rand1',
        track_stats: bool = True
    ):
        """
        初始化DE算法
        
        参数:
            objective: 目标函数(最小化)
            bounds: 边界列表 [(low, high), ...]
            config: 配置参数
            strategy: 变异策略
            track_stats: 是否跟踪统计信息
        """
        self.objective = objective
        self.bounds = bounds
        self.config = config or DEConfig()
        self.D = len(bounds)
        self.strategy = strategy
        self.track_stats = track_stats
        
        # 设置随机种子
        if self.config.seed is not None:
            random.seed(self.config.seed)
            np.random.seed(self.config.seed)
        
        # 转换为numpy数组便于计算
        self.low = np.array([b[0] for b in bounds])
        self.high = np.array([b[1] for b in bounds])
        
        # 初始化种群和适应度
        self.population: Optional[np.ndarray] = None
        self.fitness: Optional[np.ndarray] = None
        self.best_fitness: float = float('inf')
        self.best_solution: Optional[np.ndarray] = None
        
        # 统计信息
        self.stats = DEStats()
        
    def initialize(self):
        """初始化种群"""
        self.population = np.random.uniform(
            self.low, self.high, 
            (self.config.NP, self.D)
        )
        
        self.fitness = np.array([
            self.objective(x) for x in self.population
        ])
        self.stats.function_evaluations += self.config.NP
        
        # 找最优
        best_idx = np.argmin(self.fitness)
        self.best_fitness = self.fitness[best_idx]
        self.best_solution = self.population[best_idx].copy()
        
        if self.track_stats:
            self._record_stats()
    
    def _mutate_rand1(self, i: int) -> np.ndarray:
        """DE/rand/1 变异"""
        candidates = [j for j in range(self.config.NP) if j != i]
        r1, r2, r3 = random.sample(candidates, 3)
        
        return (self.population[r1] + self.config.F * 
                (self.population[r2] - self.population[r3]))
    
    def _mutate_best1(self, i: int) -> np.ndarray:
        """DE/best/1 变异"""
        best_idx = np.argmin(self.fitness)
        candidates = [j for j in range(self.config.NP) if j != i and j != best_idx]
        r1, r2 = random.sample(candidates, 2)
        
        return (self.population[best_idx] + self.config.F * 
                (self.population[r1] - self.population[r2]))
    
    def _mutate_rand2(self, i: int) -> np.ndarray:
        """DE/rand/2 变异"""
        candidates = [j for j in range(self.config.NP) if j != i]
        r1, r2, r3, r4, r5 = random.sample(candidates, 5)
        
        return (self.population[r1] + self.config.F * 
                (self.population[r2] - self.population[r3]) +
                self.config.F * (self.population[r4] - self.population[r5]))
    
    def _mutate_best2(self, i: int) -> np.ndarray:
        """DE/best/2 变异"""
        best_idx = np.argmin(self.fitness)
        candidates = [j for j in range(self.config.NP) 
                      if j != i and j != best_idx]
        r1, r2, r3, r4 = random.sample(candidates, 4)
        
        return (self.population[best_idx] + self.config.F * 
                (self.population[r1] - self.population[r2]) +
                self.config.F * (self.population[r3] - self.population[r4]))
    
    def _mutate_current2best(self, i: int) -> np.ndarray:
        """DE/current-to-best/1 变异"""
        best_idx = np.argmin(self.fitness)
        candidates = [j for j in range(self.config.NP) if j != i]
        r1, r2 = random.sample(candidates, 2)
        
        return (self.population[i] + self.config.F * 
                (self.population[best_idx] - self.population[i]) +
                self.config.F * (self.population[r1] - self.population[r2]))
    
    def mutate(self, i: int) -> np.ndarray:
        """执行变异操作"""
        mutate_func = self.STRATEGIES.get(
            self.strategy, self._mutate_rand1
        )
        return mutate_func(i)
    
    def crossover(self, target: np.ndarray, mutant: np.ndarray) -> np.ndarray:
        """二项式交叉"""
        trial = np.zeros(self.D)
        j_rand = random.randint(0, self.D - 1)
        
        for j in range(self.D):
            if random.random() < self.config.CR or j == j_rand:
                trial[j] = mutant[j]
            else:
                trial[j] = target[j]
        
        # 边界截断
        return np.clip(trial, self.low, self.high)
    
    def step(self):
        """执行一代进化"""
        new_population = np.zeros_like(self.population)
        new_fitness = np.zeros(self.config.NP)
        
        for i in range(self.config.NP):
            # 变异
            mutant = self.mutate(i)
            
            # 交叉
            trial = self.crossover(self.population[i], mutant)
            
            # 评估
            fitness_trial = self.objective(trial)
            self.stats.function_evaluations += 1
            
            # 选择
            if fitness_trial <= self.fitness[i]:
                new_population[i] = trial
                new_fitness[i] = fitness_trial
            else:
                new_population[i] = self.population[i]
                new_fitness[i] = self.fitness[i]
        
        self.population = new_population
        self.fitness = new_fitness
        
        # 更新最优
        best_idx = np.argmin(self.fitness)
        if self.fitness[best_idx] < self.best_fitness:
            self.best_fitness = self.fitness[best_idx]
            self.best_solution = self.population[best_idx].copy()
        
        if self.track_stats:
            self._record_stats()
    
    def _record_stats(self):
        """记录统计信息"""
        self.stats.best_fitness_history.append(self.best_fitness)
        self.stats.mean_fitness_history.append(np.mean(self.fitness))
        self.stats.worst_fitness_history.append(np.max(self.fitness))
    
    def run(self) -> Tuple[np.ndarray, float]:
        """运行优化"""
        self.initialize()
        
        for gen in range(self.config.max_iter):
            self.step()
            
            # 进度输出
            if (gen + 1) % 100 == 0:
                print(f"Gen {gen + 1}: Best = {self.best_fitness:.8f}, "
                      f"Mean = {np.mean(self.fitness):.8f}")
            
            # 早停
            if self.best_fitness < self.config.tol:
                print(f"Converged at generation {gen + 1}")
                break
        
        return self.best_solution.copy(), self.best_fitness
    
    def plot_convergence(self):
        """绘制收敛曲线"""
        plt.figure(figsize=(10, 6))
        
        gens = range(1, len(self.stats.best_fitness_history) + 1)
        
        plt.semilogy(gens, self.stats.best_fitness_history, 'b-', 
                     label='Best Fitness', linewidth=2)
        plt.semilogy(gens, self.stats.mean_fitness_history, 'g--', 
                     label='Mean Fitness', linewidth=1.5)
        
        plt.xlabel('Generation')
        plt.ylabel('Fitness (log scale)')
        plt.title(f'DE Convergence ({self.strategy})')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
 
 
# ===== 使用示例 =====
if __name__ == "__main__":
    # 定义测试函数
    def rosenbrock(x):
        """Rosenbrock函数,经典测试函数"""
        result = 0
        for i in range(len(x) - 1):
            result += 100 * (x[i]**2 - x[i+1])**2 + (x[i] - 1)**2
        return result
    
    def rastrigin(x):
        """Rastrigin函数,多峰函数"""
        return sum(x**2 - 10 * np.cos(2 * np.pi * x) + 10)
    
    # 测试 Rosenbrock 函数
    print("=" * 50)
    print("Testing DE on Rosenbrock function (30D)")
    print("=" * 50)
    
    bounds = [(-5, 10)] * 30
    
    de = DifferentialEvolution(
        objective=rosenbrock,
        bounds=bounds,
        config=DEConfig(
            NP=100,
            F=0.5,
            CR=0.9,
            max_iter=2000,
            seed=42
        ),
        strategy='rand1'
    )
    
    solution, fitness = de.run()
    print(f"\nFinal Best Fitness: {fitness:.6f}")
    print(f"Function Evaluations: {de.stats.function_evaluations}")
    
    # 绘制收敛曲线
    de.plot_convergence()

7.3 用scipy.optimize直接调用DE

Python科学计算库里已经内置了DE实现,可以直接用:

import numpy as np
from scipy.optimize import differential_evolution
 
 
def rosenbrock(x):
    """Rosenbrock banana function"""
    return sum(100 * (x[i]**2 - x[i+1])**2 + (x[i] - 1)**2 
               for i in range(len(x) - 1))
 
 
# 定义边界
bounds = [(-5, 10)] * 10  # 10维
 
# 运行DE优化
result = differential_evolution(
    rosenbrock,
    bounds,
    strategy='best1bin',      # 变异+交叉策略
    maxiter=1000,             # 最大迭代次数
    popsize=15,               # 种群规模 (popsize × D)
    tol=1e-7,                 # 收敛阈值
    mutation=(0.5, 1),        # 变异因子范围
    recombination=0.7,        # 交叉概率
    seed=42,                  # 随机种子
    polish=True,              # 用L-BFGS-B精修结果
    updating='deferred'       # 延迟更新
)
 
print(f"最优解: {result.x}")
print(f"最优值: {result.fun:.10f}")
print(f"迭代次数: {result.nit}")
print(f"函数评估次数: {result.nfev}")

scipy的实现做了很多工程优化,比自己写的版本更高效,推荐在实际项目中使用。


八、DE调参实战:F和CR的协同调整

8.1 参数敏感性问题

DE的参数敏感性比很多进化算法低,但F和CR的设置仍然会影响最终结果。

一个常见的问题是:参数设置依赖于问题本身。没有一套”万能参数”能在所有问题上都表现最好。

所以调参的第一步是了解你的问题

  1. 问题维度D是多少?
  2. 是单峰还是多峰?
  3. 目标函数的平滑程度如何?
  4. 计算目标函数的成本高不高?

8.2 经验参数配置表

问题类型DFCRNP策略特点
快速测试100.50.9100rand1bin通用
单峰问题300.40.950best1bin收敛快
多峰问题300.70.5100rand1bin探索强
高维问题1000.50.5200rand1bin平衡
高维多峰1000.80.3300rand2bin激进探索

8.3 自适应参数:让算法自己调参

手动调参太麻烦了,有没有办法让算法自动调整参数?

jDE(动态参数自适应DE) 就是解决这个问题的一个经典方案:

class JDE(DifferentialEvolution):
    """
    jDE - 动态参数自适应差分进化
    Brest et al., 2006
    
    核心思想:每个个体有自己的F和CR参数
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # 每个个体独立的F和CR参数
        self.F = np.random.uniform(0.1, 1.0, self.config.NP)
        self.CR = np.random.uniform(0.0, 1.0, self.config.NP)
        
        # F和CR的取值范围
        self.F_l, self.F_u = 0.1, 1.0
        self.CR_l, self.CR_u = 0.0, 1.0
        
        # 参数更新概率
        self.tau1 = 0.1  # F更新概率
        self.tau2 = 0.1  # CR更新概率
    
    def update_parameters(self, i):
        """更新个体i的F和CR参数"""
        # F的更新
        if random.random() < self.tau1:
            self.F[i] = self.F_l + random.random() * (self.F_u - self.F_l)
        
        # CR的更新
        if random.random() < self.tau2:
            self.CR[i] = self.CR_l + random.random() * (self.CR_u - self.CR_l)
    
    def mutate(self, i):
        """使用个体自适应的F"""
        self.update_parameters(i)
        
        candidates = [j for j in range(self.config.NP) if j != i]
        r1, r2, r3 = random.sample(candidates, 3)
        
        return (self.population[r1] + self.F[i] * 
                (self.population[r2] - self.population[r3]))
    
    def crossover(self, target, mutant, CR_i):
        """使用个体自适应的CR"""
        trial = np.zeros(self.D)
        j_rand = random.randint(0, self.D - 1)
        
        for j in range(self.D):
            if random.random() < CR_i or j == j_rand:
                trial[j] = mutant[j]
            else:
                trial[j] = target[j]
        
        return np.clip(trial, self.low, self.high)
    
    def step(self):
        """jDE主循环"""
        new_population = np.zeros_like(self.population)
        new_fitness = np.zeros(self.config.NP)
        
        for i in range(self.config.NP):
            mutant = self.mutate(i)
            trial = self.crossover(self.population[i], mutant, self.CR[i])
            
            fitness_trial = self.objective(trial)
            
            if fitness_trial <= self.fitness[i]:
                new_population[i] = trial
                new_fitness[i] = fitness_trial
            else:
                new_population[i] = self.population[i]
                new_fitness[i] = self.fitness[i]
        
        self.population = new_population
        self.fitness = new_fitness
        
        best_idx = np.argmin(self.fitness)
        if self.fitness[best_idx] < self.best_fitness:
            self.best_fitness = self.fitness[best_idx]
            self.best_solution = self.population[best_idx].copy()
        
        if self.track_stats:
            self._record_stats()

jDE的工作原理

  • 每个个体有一对(F_i, CR_i)参数
  • 每一代开始时,有一定概率(tau1、tau2)随机更新这些参数
  • 如果更新后的参数产生了更好的个体,就保留;否则回退
  • 这实际上是在执行”元进化”:让进化过程自己调整探索策略

8.4 网格搜索找最优参数

如果你的问题比较固定,可以先用网格搜索找一套好参数:

import itertools
import numpy as np
 
def grid_search_de(objective, bounds, dimensions=[10, 30], 
                    F_values=[0.3, 0.5, 0.7, 0.9],
                    CR_values=[0.3, 0.5, 0.7, 0.9],
                    n_runs=5):
    """
    网格搜索找最优参数配置
    """
    results = []
    
    for D, F, CR in itertools.product(dimensions, F_values, CR_values):
        fitnesses = []
        
        for run in range(n_runs):
            de = DifferentialEvolution(
                objective=objective,
                bounds=[(-5, 10)] * D,
                config=DEConfig(NP=max(50, 10*D), F=F, CR=CR, 
                               max_iter=500, seed=run),
                strategy='rand1'
            )
            
            _, fitness = de.run()
            fitnesses.append(fitness)
        
        mean_fitness = np.mean(fitnesses)
        std_fitness = np.std(fitnesses)
        
        results.append({
            'D': D, 'F': F, 'CR': CR,
            'mean': mean_fitness,
            'std': std_fitness
        })
        
        print(f"D={D}, F={F}, CR={CR}: mean={mean_fitness:.6f}, std={std_fitness:.6f}")
    
    # 找出最优配置
    best = min(results, key=lambda x: x['mean'])
    print(f"\nBest config: D={best['D']}, F={best['F']}, CR={best['CR']}")
    
    return results

九、DE的应用场景:函数优化、神经网络训练、路径规划

9.1 函数优化:DE的老本行

DE最初就是为函数优化设计的,这也是它最擅长的领域。

9.1.1 经典测试函数

Sphere函数(单峰,简单):

def sphere(x):
    """
    球形函数
    全局最优: 0 at x = [0, 0, ..., 0]
    维度: 任意
    """
    return np.sum(x**2)

Rosenbrock函数(单峰,有”弯道”):

def rosenbrock(x):
    """
    Rosenbrock banana函数
    全局最优: 0 at x = [1, 1, ..., 1]
    特点: 有一条狭长的抛物线谷
    """
    total = 0
    for i in range(len(x) - 1):
        total += 100 * (x[i]**2 - x[i+1])**2 + (x[i] - 1)**2
    return total

Rastrigin函数(多峰,有很多局部最优):

def rastrigin(x):
    """
    Rastrigin函数
    全局最优: 0 at x = [0, 0, ..., 0]
    特点: 大量局部最优,呈正弦波动
    """
    A = 10
    return A * len(x) + np.sum(x**2 - A * np.cos(2 * np.pi * x))

9.1.2 用DE求解测试函数

def benchmark_de():
    """DE基准测试"""
    
    functions = {
        'Sphere': lambda x: np.sum(x**2),
        'Rosenbrock': lambda x: sum(100*(x[i]**2 - x[i+1])**2 + (x[i]-1)**2 
                                     for i in range(len(x)-1)),
        'Rastrigin': lambda x: 10*len(x) + sum(x**2 - 10*np.cos(2*np.pi*x))
    }
    
    results = {}
    
    for name, func in functions.items():
        print(f"\n优化 {name} 函数:")
        
        bounds = [(-5.12, 5.12)] * 30
        
        de = DifferentialEvolution(
            objective=func,
            bounds=bounds,
            config=DEConfig(NP=100, F=0.5, CR=0.9, max_iter=1000, seed=42)
        )
        
        solution, fitness = de.run()
        results[name] = fitness
        
        print(f"  最优值: {fitness:.8f}")
    
    return results
 
 
if __name__ == "__main__":
    results = benchmark_de()

9.2 神经网络超参数优化

DE可以用来优化神经网络的超参数,比如学习率、隐藏层大小、正则化系数等:

import numpy as np
from sklearn.neural_network import MLPClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
 
 
class NeuralNetworkOptimizerDE:
    """
    用DE优化神经网络超参数
    """
    
    def __init__(self, X_train, y_train, X_val, y_val):
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
    
    def get_dimensions(self):
        """确定超参数维度"""
        # 隐藏层1大小: 10-200
        # 隐藏层2大小: 10-200  
        # 学习率: 0.001-0.1
        # 正则化: 0.0001-0.01
        return 4
    
    def decode(self, x):
        """解码染色体"""
        hidden1_size = int(10 + x[0] * 190)    # [10, 200]
        hidden2_size = int(10 + x[1] * 190)    # [10, 200]
        learning_rate = 0.001 + x[2] * 0.099     # [0.001, 0.1]
        alpha = 0.0001 + x[3] * 0.0099         # [0.0001, 0.01]
        
        return hidden1_size, hidden2_size, learning_rate, alpha
    
    def fitness(self, x):
        """适应度函数"""
        h1, h2, lr, alpha = self.decode(x)
        
        try:
            mlp = MLPClassifier(
                hidden_layer_sizes=(h1, h2),
                learning_rate_init=lr,
                alpha=alpha,
                max_iter=200,
                random_state=42
            )
            
            mlp.fit(self.X_train, self.y_train)
            
            # 用验证集准确率作为适应度(最大化)
            accuracy = mlp.score(self.X_val, self.y_val)
            return -accuracy  # DE是最小化
            
        except:
            return 1.0  # 训练失败返回最差值
    
    def optimize(self, NP=50, max_iter=30):
        """运行优化"""
        D = self.get_dimensions()
        bounds = [(0, 1)] * D
        
        de = DifferentialEvolution(
            objective=self.fitness,
            bounds=bounds,
            config=DEConfig(NP=NP, F=0.5, CR=0.9, max_iter=max_iter)
        )
        
        solution, fitness = de.run()
        best_params = self.decode(solution)
        best_accuracy = -fitness
        
        return best_params, best_accuracy
 
 
# 使用示例
if __name__ == "__main__":
    # 生成示例数据
    X, y = make_classification(n_samples=1000, n_features=20, 
                                n_informative=15, random_state=42)
    
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # 优化
    optimizer = NeuralNetworkOptimizerDE(X_train, y_train, X_val, y_val)
    best_params, best_acc = optimizer.optimize()
    
    print(f"最优参数: hidden1={best_params[0]}, hidden2={best_params[1]}, "
          f"lr={best_params[2]:.4f}, alpha={best_params[3]:.4f}")
    print(f"验证集准确率: {best_acc:.4f}")

9.3 路径规划

DE可以用于机器人路径规划,通过优化路径点来最小化路径长度、避免障碍:

import numpy as np
 
 
class PathPlanningDE:
    """
    机器人路径规划
    优化路径点以最小化长度和碰撞
    """
    
    def __init__(self, start, goal, obstacles, n_waypoints=8):
        """
        参数:
            start: 起点坐标 [x, y]
            goal: 终点坐标 [x, y]
            obstacles: 障碍物列表,每个障碍物是(x, y, radius)
            n_waypoints: 中间路径点数量
        """
        self.start = np.array(start)
        self.goal = np.array(goal)
        self.obstacles = obstacles  # [(x, y, radius), ...]
        self.n_waypoints = n_waypoints
        self.dim = 2 * n_waypoints  # x和y坐标
    
    def decode(self, chromosome):
        """解码染色体为路径点"""
        waypoints = chromosome.reshape(self.n_waypoints, 2)
        # 路径点应该大致从start到goal
        # 用sigmoid-like映射确保点在起点和终点之间
        return waypoints
    
    def path_length(self, waypoints):
        """计算路径总长度"""
        full_path = np.vstack([self.start, waypoints, self.goal])
        length = 0
        
        for i in range(len(full_path) - 1):
            length += np.linalg.norm(full_path[i+1] - full_path[i])
        
        return length
    
    def collision_check(self, waypoints):
        """检查碰撞,返回惩罚值"""
        full_path = np.vstack([self.start, waypoints, self.goal])
        penalty = 0
        
        for obs_x, obs_y, obs_r in self.obstacles:
            obs_center = np.array([obs_x, obs_y])
            
            for i in range(len(full_path) - 1):
                # 计算线段到圆心的距离
                p1, p2 = full_path[i], full_path[i+1]
                
                # 最近点投影
                t = np.dot(obs_center - p1, p2 - p1) / (np.linalg.norm(p2 - p1)**2 + 1e-10)
                t = np.clip(t, 0, 1)
                closest = p1 + t * (p2 - p1)
                
                dist = np.linalg.norm(closest - obs_center)
                
                if dist < obs_r:
                    penalty += (obs_r - dist) ** 2
        
        return penalty
    
    def smoothness(self, waypoints):
        """计算路径平滑度(曲率惩罚)"""
        full_path = np.vstack([self.start, waypoints, self.goal])
        curvature = 0
        
        for i in range(1, len(full_path) - 1):
            v1 = full_path[i] - full_path[i-1]
            v2 = full_path[i+1] - full_path[i]
            
            # 夹角越大,曲率越大
            cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2) + 1e-10)
            angle = np.arccos(np.clip(cos_angle, -1, 1))
            curvature += angle
        
        return curvature
    
    def fitness(self, chromosome):
        """总适应度"""
        waypoints = self.decode(chromosome)
        
        length = self.path_length(waypoints)
        collision = self.collision_check(waypoints)
        smooth = self.smoothness(waypoints)
        
        # 加权组合
        return length + 1000 * collision + 10 * smooth
    
    def optimize(self, bounds=(-50, 50), NP=100, max_iter=500):
        """运行路径规划"""
        bounds_list = [(bounds, bounds)] * self.dim
        
        de = DifferentialEvolution(
            objective=self.fitness,
            bounds=bounds_list,
            config=DEConfig(NP=NP, F=0.7, CR=0.5, max_iter=max_iter)
        )
        
        solution, fitness = de.run()
        waypoints = self.decode(solution)
        full_path = np.vstack([self.start, waypoints, self.goal])
        
        return full_path, fitness
 
 
# 使用示例
if __name__ == "__main__":
    # 定义起点终点和障碍物
    start = [0, 0]
    goal = [100, 100]
    obstacles = [
        (30, 40, 15),   # 障碍物1: 中心(30,40),半径15
        (60, 70, 12),   # 障碍物2
        (45, 30, 10),   # 障碍物3
    ]
    
    # 运行规划
    planner = PathPlanningDE(start, goal, obstacles, n_waypoints=8)
    path, cost = planner.optimize()
    
    print(f"最优路径长度: {planner.path_length(path[1:-1]):.2f}")
    print(f"碰撞惩罚: {planner.collision_check(path[1:-1]):.2f}")
    print(f"路径曲率: {planner.smoothness(path[1:-1]):.2f}")
    print(f"总代价: {cost:.2f}")

十、DE与其他进化算法的对比:什么时候用DE更合适?

10.1 算法选择决策树

面对一个优化问题,该选哪种算法?下面是经验性的决策指南:

这是一个优化问题吗?
├── 是离散/组合优化(TSP、调度)?
│   └── 用遗传算法(GA)、蚁群算法(ACO)、模拟退火(SA)
├── 是连续优化?
│   ├── 有梯度信息?
│   │   └── 用梯度下降、L-BFGS-B、Adam
│   ├── 没有梯度,但维度低(<10)?
│   │   └── 用Nelder-Mead、Powell
│   └── 没有梯度,维度中等(10-100)?
│       ├── 问题单峰?
│       │   └── 用DE/best/1/bin 或 拟牛顿法
│       └── 问题多峰?
│           ├── 需要全局最优?
│           │   └── 用DE/rand/1/bin、贝叶斯优化
│           └── 快速得到一个足够好的解?
│               └── 用PSO、DE

10.2 DE vs PSO(粒子群优化)

特征DEPSO
搜索机制差分向量引导跟随最优粒子
参数数量F, CR (2个)w, c1, c2 (3个)
鲁棒性较高一般
收敛速度中等较快
局部搜索能力较强一般
实现难度简单更简单
搜索历史无记忆有记忆(pbest, gbest)

选择建议

  • 如果问题可能有多个局部最优,用DE/rand策略
  • 如果问题可能只有一个全局最优,用PSO或DE/best策略
  • 如果你不确定,先试试DE

10.3 DE vs CMA-ES(协方差矩阵自适应进化策略)

特征DECMA-ES
适用维度10-1002-50(原始版),可达1000+(IPOP-CMA)
参数敏感度较低中等
计算开销高(需要更新协方差矩阵)
理论支撑经验较强理论(自然梯度)
复杂问题一般很好(尤其是病态曲率)

选择建议

  • 维度高或问题病态曲率严重,用CMA-ES
  • 普通连续优化问题,用DE
  • 预算有限(函数评估很贵),用DE
  • 追求极致精度,用CMA-ES(但需要更多调参)

10.4 DE vs 贝叶斯优化

特征DE贝叶斯优化
样本效率较低(O(n))很高(O(log(n)))
维度容忍度10-100<20
参数调优需要手动调参自动调参
计算开销高(需要拟合代理模型)
全局搜索非常好(基于不确定性)

选择建议

  • 函数评估很贵(每次评估耗时很长),用贝叶斯优化
  • 函数评估很快,评估次数可以很多,用DE
  • 高维问题(>20),用DE
  • 需要并行评估函数,用DE

十一、自适应差分进化:JADE、SaDE等改进版本

11.1 JADE:带可选存档的自适应DE

JADE(Zhang & Sanderson, 2009)是DE的一个重要改进,它的核心创新是:

  1. DE/current-to-pbest/1变异:利用当前个体信息和顶级个体信息
  2. 可选存档:保存被淘汰的个体信息,增加多样性
  3. 参数自适应:F和CR根据历史成功经验自动调整
class JADE(DifferentialEvolution):
    """
    JADE - 带有可选存档的自适应差分进化
    Zhang & Sanderson, 2009
    
    关键创新:
    1. DE/current-to-pbest/1 变异策略
    2. F用柯西分布采样
    3. CR用正态分布采样
    4. 外部存档增加多样性
    """
    
    def __init__(self, *args, H=100, p=0.05, **kwargs):
        super().__init__(*args, **kwargs)
        
        # 历史记忆大小
        self.H = H
        # 顶级个体比例
        self.p = p
        # 外部存档
        self.archive = []
        
        # 参数历史
        self.F_means = []
        self.CR_means = []
        self.success_F = []
        self.success_CR = []
    
    def sample_F(self):
        """用柯西分布采样F"""
        if len(self.F_means) == 0:
            mu_F = 0.5
        else:
            mu_F = self.F_means[-1]
        
        # 柯西分布采样,然后截断
        F = mu_F + 0.1 * np.random.randn()
        return min(F, 1.0)  # 上限1.0
    
    def sample_CR(self):
        """用正态分布采样CR"""
        if len(self.CR_means) == 0:
            mu_CR = 0.5
        else:
            mu_CR = self.CR_means[-1]
        
        CR = mu_CR + 0.1 * np.random.randn()
        return np.clip(CR, 0, 1)
    
    def get_pbest_candidates(self):
        """获取顶级个体候选"""
        n_best = max(1, int(self.p * self.config.NP))
        sorted_idx = np.argsort(self.fitness)[:n_best]
        return [self.population[i] for i in sorted_idx]
    
    def mutate(self, i):
        """DE/current-to-pbest/1 with archive"""
        F = self.sample_F()
        CR = self.sample_CR()
        
        # 获取pbest
        pbest_candidates = self.get_pbest_candidates()
        x_pbest = pbest_candidates[np.random.randint(len(pbest_candidates))]
        
        # 从种群+存档中选两个不同个体
        candidates = list(range(self.config.NP)) + list(range(len(self.archive)))
        candidates = [c for c in candidates if c != i]
        
        r1, r2 = random.sample(candidates, 2)
        r1_vec = self.population[r1] if r1 < self.config.NP else self.archive[r1 - self.config.NP]
        r2_vec = self.population[r2] if r2 < self.config.NP else self.archive[r2 - self.config.NP]
        
        # JADE变异公式
        mutant = (self.population[i] + F * (x_pbest - self.population[i]) + 
                  F * (r1_vec - r2_vec))
        
        return mutant, F, CR
    
    def update_archive(self, target_idx):
        """更新存档:保留被替代的个体"""
        self.archive.append(self.population[target_idx].copy())
        
        # 限制存档大小
        if len(self.archive) > self.config.NP:
            self.archive.pop(np.random.randint(len(self.archive)))
    
    def update_parameters(self):
        """更新F和CR的均值"""
        if len(self.success_F) > 0:
            # F: 加权均值(Lehmer mean)
            F_new = sum(f**2 for f in self.success_F) / (sum(self.success_F) + 1e-10)
            # CR: 算术均值
            CR_new = np.mean(self.success_CR)
            
            self.F_means.append(F_new)
            self.CR_means.append(CR_new)
            
            # 保持历史长度
            if len(self.F_means) > self.H:
                self.F_means.pop(0)
                self.CR_means.pop(0)
        
        # 重置成功列表
        self.success_F = []
        self.success_CR = []
    
    def step(self):
        """JADE主循环"""
        new_population = []
        new_fitness = []
        
        for i in range(self.config.NP):
            mutant, F, CR = self.mutate(i)
            trial = self.crossover(self.population[i], mutant, CR)
            fitness_trial = self.objective(trial)
            
            if fitness_trial <= self.fitness[i]:
                new_population.append(trial)
                new_fitness.append(fitness_trial)
                
                # 记录成功的F和CR
                if fitness_trial < self.fitness[i]:
                    self.success_F.append(F)
                    self.success_CR.append(CR)
                
                self.update_archive(i)
            else:
                new_population.append(self.population[i])
                new_fitness.append(self.fitness[i])
        
        self.population = np.array(new_population)
        self.fitness = np.array(new_fitness)
        
        self.update_parameters()
        
        best_idx = np.argmin(self.fitness)
        if self.fitness[best_idx] < self.best_fitness:
            self.best_fitness = self.fitness[best_idx]
            self.best_solution = self.population[best_idx].copy()
        
        if self.track_stats:
            self._record_stats()

11.2 SHADE:成功历史自适应DE

SHADE(Tanabe & Fukunaga, 2013)是JADE的简化版本,用一个更直接的历史机制:

class SHADE(DifferentialEvolution):
    """
    SHADE - Success-History based Adaptive DE
    Tanabe & Fukunaga, 2013
    
    简化版JADE,用环形缓冲区存储历史
    """
    
    def __init__(self, *args, H=6, **kwargs):
        super().__init__(*args, **kwargs)
        self.H = H
        
        # 历史记忆
        self.F_means = [0.5] * H
        self.CR_means = [0.5] * H
        self.archive = []
        self.k = 0  # 当前索引
    
    def update_history(self, successful_F, successful_CR):
        """更新历史记忆"""
        if len(successful_F) == 0:
            return
        
        # 计算成功参数的均值
        F_mean = sum(f**2 for f in successful_F) / (sum(successful_F) + 1e-10)
        CR_mean = np.mean(successful_CR)
        
        self.F_means[self.k] = F_mean
        self.CR_means[self.k] = CR_mean
        
        # 循环更新
        self.k = (self.k + 1) % self.H
    
    def sample_parameters(self):
        """从历史中采样参数"""
        r = np.random.randint(0, self.H)
        F = self.F_means[r]
        CR = self.CR_means[r]
        
        # 加扰动
        F = F + 0.05 * np.random.randn()
        CR = CR + 0.1 * np.random.randn()
        
        return np.clip(F, 0, 1), np.clip(CR, 0, 1)

11.3 L-SHADE:线性种群缩减版SHADE

L-SHADE在SHADE基础上增加了种群自动缩减机制:

  • 开始时用较大种群,增强探索
  • 逐渐减小种群规模,增强利用
  • 这符合”先广后精”的优化策略

十二、约束优化中的DE:如何处理带约束的优化问题

12.1 约束优化问题的一般形式

很多实际优化问题都有约束条件:

处理约束的方法有很多,这里介绍几种常用的。

12.2 罚函数方法

最简单粗暴的方法——把违反约束的解加上一个大惩罚:

class ConstrainedDE(DifferentialEvolution):
    """
    带约束的差分进化
    使用罚函数处理约束
    """
    
    def __init__(self, objective, constraints, *args, 
                 penalty_coef=1000, **kwargs):
        """
        参数:
            objective: 目标函数
            constraints: 约束字典 {'ineq': [g1, g2], 'eq': [h1]}
            penalty_coef: 惩罚系数
        """
        super().__init__(objective, *args, **kwargs)
        self.constraints = constraints
        self.penalty_coef = penalty_coef
    
    def constraint_violation(self, x):
        """
        计算约束违反程度
        返回总违反量
        """
        total = 0
        
        # 不等式约束 g(x) <= 0
        for g in self.constraints.get('ineq', []):
            violation = max(0, g(x))
            total += violation
        
        # 等式约束 h(x) = 0
        for h in self.constraints.get('eq', []):
            violation = abs(h(x))
            total += violation
        
        return total
    
    def penalized_fitness(self, x):
        """计算带惩罚的适应度"""
        obj = self.objective(x)
        violation = self.constraint_violation(x)
        
        return obj + self.penalty_coef * violation
 
 
# 使用示例
def example_constrained_optimization():
    """
    约束优化示例
    min x1^2 + x2^2
    s.t. x1 + x2 >= 10
         x1 - x2 <= 5
    """
    
    def objective(x):
        return x[0]**2 + x[1]**2
    
    def g1(x):
        """x1 + x2 >= 10 -> -x1 - x2 + 10 <= 0"""
        return -x[0] - x[1] + 10
    
    def g2(x):
        """x1 - x2 <= 5 -> x1 - x2 - 5 <= 0"""
        return x[0] - x[1] - 5
    
    constraints = {'ineq': [g1, g2]}
    bounds = [(-10, 10), (-10, 10)]
    
    de = ConstrainedDE(
        objective=objective,
        constraints=constraints,
        bounds=bounds,
        config=DEConfig(NP=100, max_iter=500)
    )
    
    de.initialize()
    for _ in range(500):
        de.step()
    
    print(f"最优解: {de.best_solution}")
    print(f"约束违反量: {de.constraint_violation(de.best_solution)}")
    
    return de.best_solution, de.best_fitness

罚函数的问题

  • 惩罚系数不好设:太小约束不生效,太大搜索会变慢
  • 可能导致”不可行区域陷阱”:算法被困在约束边界附近

12.3 ε约束处理方法

ε约束方法在迭代初期容忍一定程度的约束违反,后期逐渐收紧:

class EpsilonConstraintDE(ConstrainedDE):
    """
    ε约束处理方法
    迭代初期容忍不可行解,后期收紧
    """
    
    def __init__(self, *args, epsilon_0=0, rho=0.9, **kwargs):
        super().__init__(*args, **kwargs)
        self.epsilon_0 = epsilon_0
        self.rho = rho
        self.epsilon = epsilon_0
        self.generation = 0
    
    def update_epsilon(self):
        """更新ε值"""
        self.epsilon = self.epsilon_0 * (self.rho ** self.generation)
        self.generation += 1
    
    def compare(self, x1, x2):
        """基于ε约束的比较"""
        v1 = self.constraint_violation(x1)
        v2 = self.constraint_violation(x2)
        
        f1 = self.objective(x1)
        f2 = self.objective(x2)
        
        # 都可行
        if v1 <= self.epsilon and v2 <= self.epsilon:
            return x1 if f1 <= f2 else x2
        
        # 一个可行一个不可行
        if v1 <= self.epsilon and v2 > self.epsilon:
            return x1
        if v2 <= self.epsilon and v1 > self.epsilon:
            return x2
        
        # 都不可行
        return x1 if v1 <= v2 else x2

12.4 随机排序法

随机排序法(Stochastic Ranking)用概率来平衡目标优化和约束满足:

class StochasticRankingDE(DifferentialEvolution):
    """
    随机排序法处理约束
    Runarsson & Yao, 2000
    """
    
    def __init__(self, *args, P_f=0.45, **kwargs):
        super().__init__(*args, **kwargs)
        self.P_f = P_f  # 基于目标函数比较的概率
    
    def compare(self, x1, x2):
        """
        随机排序比较
        以P_f概率比较目标函数,否则比较约束违反程度
        """
        v1 = self.constraint_violation(x1)
        v2 = self.constraint_violation(x2)
        
        f1 = self.objective(x1)
        f2 = self.objective(x2)
        
        # 都可行
        if v1 == 0 and v2 == 0:
            return x1 if f1 <= f2 else x2
        
        # 随机选择比较方式
        if random.random() < self.P_f:
            # 比较目标函数
            return x1 if f1 <= f2 else x2
        else:
            # 比较约束违反
            return x1 if v1 <= v2 else x2

十三、动手实验:用DE求解Rosenbrock函数和实际优化问题

13.1 实验一:Rosenbrock函数优化

Rosenbrock函数是优化领域的经典测试函数,它的全局最优在 (1, 1, ..., 1),函数值为0。

import numpy as np
import matplotlib.pyplot as plt
 
 
def experiment_rosenbrock():
    """
    实验1: 用DE求解Rosenbrock函数
    """
    
    def rosenbrock(x):
        """Rosenbrock banana函数"""
        total = 0
        for i in range(len(x) - 1):
            total += 100 * (x[i]**2 - x[i+1])**2 + (x[i] - 1)**2
        return total
    
    # 测试不同维度
    dimensions = [2, 10, 30, 50]
    
    results = {}
    
    for D in dimensions:
        print(f"\n测试维度 D = {D}:")
        
        bounds = [(-5, 10)] * D
        
        de = DifferentialEvolution(
            objective=rosenbrock,
            bounds=bounds,
            config=DEConfig(
                NP=max(50, 10*D),
                F=0.5,
                CR=0.9,
                max_iter=1000,
                seed=42
            ),
            strategy='rand1',
            track_stats=True
        )
        
        solution, fitness = de.run()
        results[D] = {
            'fitness': fitness,
            'solution': solution,
            'history': de.stats.best_fitness_history.copy()
        }
        
        print(f"  最优适应度: {fitness:.8f}")
        print(f"  函数评估次数: {de.stats.function_evaluations}")
        print(f"  解与最优点的距离: {np.linalg.norm(solution - 1):.4f}")
    
    # 绘制收敛曲线
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    for D, data in results.items():
        plt.semilogy(data['history'], label=f'D={D}')
    plt.xlabel('Generation')
    plt.ylabel('Best Fitness (log scale)')
    plt.title('Rosenbrock: Convergence Curves')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 2, 2)
    fitnesses = [data['fitness'] for data in results.values()]
    plt.bar([str(D) for D in results.keys()], fitnesses)
    plt.xlabel('Dimension')
    plt.ylabel('Final Fitness')
    plt.title('Rosenbrock: Final Results by Dimension')
    plt.yscale('log')
    
    plt.tight_layout()
    plt.savefig('rosenbrock_experiment.png', dpi=150)
    plt.show()
    
    return results
 
 
# 运行实验
if __name__ == "__main__":
    results = experiment_rosenbrock()

13.2 实验二:多峰函数与参数敏感性

def experiment_multimodal():
    """
    实验2: 多峰函数与参数敏感性
    """
    
    def rastrigin(x):
        """Rastrigin函数:大量局部最优"""
        A = 10
        return A * len(x) + np.sum(x**2 - A * np.cos(2 * np.pi * x))
    
    def ackley(x):
        """Ackley函数:广泛用于测试"""
        a, b, c = 20, 0.2, 2 * np.pi
        n = len(x)
        sum1 = np.sum(x**2)
        sum2 = np.sum(np.cos(c * x))
        return -a * np.exp(-b * np.sqrt(sum1/n)) - np.exp(sum2/n) + a + np.exp(1)
    
    # 参数配置
    configs = [
        {'F': 0.3, 'CR': 0.9, 'name': 'Low F, High CR'},   # 精细搜索
        {'F': 0.7, 'CR': 0.5, 'name': 'High F, Low CR'},  # 广泛探索
        {'F': 0.5, 'CR': 0.5, 'name': 'Balanced'},        # 平衡
    ]
    
    functions = [
        ('Rastrigin', rastrigin),
        ('Ackley', ackley),
    ]
    
    D = 30
    bounds = [(-5.12, 5.12)] * D
    
    results = {}
    
    for func_name, func in functions:
        results[func_name] = {}
        
        for config in configs:
            print(f"\n{func_name} - {config['name']}:")
            
            de = DifferentialEvolution(
                objective=func,
                bounds=bounds,
                config=DEConfig(
                    NP=100,
                    F=config['F'],
                    CR=config['CR'],
                    max_iter=1000,
                    seed=42
                ),
                track_stats=True
            )
            
            solution, fitness = de.run()
            results[func_name][config['name']] = {
                'fitness': fitness,
                'history': de.stats.best_fitness_history.copy()
            }
            
            print(f"  F={config['F']}, CR={config['CR']}: fitness={fitness:.4f}")
    
    # 绘制对比图
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    for idx, (func_name, configs_results) in enumerate(results.items()):
        # 收敛曲线
        ax1 = axes[idx, 0]
        for config_name, data in configs_results.items():
            ax1.semilogy(data['history'], label=config_name)
        ax1.set_title(f'{func_name}: Convergence')
        ax1.set_xlabel('Generation')
        ax1.set_ylabel('Best Fitness (log)')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 最终结果柱状图
        ax2 = axes[idx, 1]
        names = list(configs_results.keys())
        values = [configs_results[n]['fitness'] for n in names]
        ax2.bar(names, values)
        ax2.set_title(f'{func_name}: Final Fitness')
        ax2.set_ylabel('Fitness')
        ax2.set_yscale('log')
    
    plt.tight_layout()
    plt.savefig('multimodal_experiment.png', dpi=150)
    plt.show()
    
    return results
 
 
if __name__ == "__main__":
    results = experiment_multimodal()

13.3 实验三:工程应用——函数拟合

def experiment_curve_fitting():
    """
    实验3: 用DE做曲线拟合
    """
    
    # 假设真实函数是 y = a*x^2 + b*x + c + noise
    np.random.seed(42)
    true_params = {'a': 2.5, 'b': -1.3, 'c': 0.8}
    
    x_data = np.linspace(-5, 5, 50)
    y_true = (true_params['a'] * x_data**2 + 
              true_params['b'] * x_data + 
              true_params['c'])
    y_noise = y_true + np.random.normal(0, 0.5, len(x_data))
    
    def fitting_objective(params):
        """拟合误差(最小化)"""
        a, b, c = params
        y_pred = a * x_data**2 + b * x_data + c
        mse = np.mean((y_noise - y_pred)**2)
        return mse
    
    # 参数搜索范围
    bounds = [
        (-10, 10),  # a
        (-10, 10),  # b
        (-10, 10),  # c
    ]
    
    # 运行DE
    de = DifferentialEvolution(
        objective=fitting_objective,
        bounds=bounds,
        config=DEConfig(NP=100, F=0.5, CR=0.9, max_iter=500),
        track_stats=True
    )
    
    solution, fitness = de.run()
    
    print("=" * 50)
    print("曲线拟合结果")
    print("=" * 50)
    print(f"\n真实参数: a={true_params['a']:.2f}, b={true_params['b']:.2f}, c={true_params['c']:.2f}")
    print(f"拟合参数: a={solution[0]:.4f}, b={solution[1]:.4f}, c={solution[2]:.4f}")
    print(f"MSE: {fitness:.6f}")
    
    # 绘图
    plt.figure(figsize=(10, 6))
    plt.scatter(x_data, y_noise, alpha=0.6, label='Data (with noise)')
    plt.plot(x_data, y_true, 'g-', linewidth=2, label='True function')
    plt.plot(x_data, (solution[0]*x_data**2 + solution[1]*x_data + solution[2]), 
             'r--', linewidth=2, label='DE fit')
    plt.legend()
    plt.title('Curve Fitting with DE')
    plt.xlabel('x')
    plt.ylabel('y')
    plt.grid(True, alpha=0.3)
    plt.savefig('curve_fitting.png', dpi=150)
    plt.show()
 
 
if __name__ == "__main__":
    experiment_curve_fitting()

十四、实用框架与完整工程模板

14.1 一站式DE工具类

"""
差分进化算法工程应用模板
开箱即用,带可视化、调参、结果保存
"""
 
import numpy as np
import random
import json
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Callable, List, Tuple, Optional
from pathlib import Path
 
 
@dataclass
class DEConfig:
    """DE配置"""
    NP: int = 50
    F: float = 0.5
    CR: float = 0.9
    max_iter: int = 1000
    tol: float = 1e-8
    seed: Optional[int] = None
    strategy: str = 'rand1'
 
 
class DifferentialEvolution:
    """完整的DE实现"""
    
    def __init__(self, objective: Callable,
                 bounds: List[Tuple[float, float]],
                 config: Optional[DEConfig] = None,
                 verbose: bool = True):
        self.objective = objective
        self.bounds = bounds
        self.config = config or DEConfig()
        self.D = len(bounds)
        self.verbose = verbose
        
        # 初始化
        if self.config.seed:
            random.seed(self.config.seed)
            np.random.seed(self.config.seed)
        
        self.low = np.array([b[0] for b in bounds])
        self.high = np.array([b[1] for b in bounds])
        
        self.population = None
        self.fitness = None
        self.best_fitness = float('inf')
        self.best_solution = None
        self.history = []
        
    def initialize(self):
        self.population = np.random.uniform(
            self.low, self.high, (self.config.NP, self.D))
        self.fitness = np.array([self.objective(x) for x in self.population])
        
        best_idx = np.argmin(self.fitness)
        self.best_fitness = self.fitness[best_idx]
        self.best_solution = self.population[best_idx].copy()
        self.history.append(self.best_fitness)
    
    def mutate(self, i: int) -> np.ndarray:
        candidates = [j for j in range(self.config.NP) if j != i]
        r1, r2, r3 = random.sample(candidates, 3)
        return self.population[r1] + self.config.F * (
            self.population[r2] - self.population[r3])
    
    def crossover(self, target: np.ndarray, mutant: np.ndarray) -> np.ndarray:
        trial = np.zeros(self.D)
        j_rand = random.randint(0, self.D - 1)
        for j in range(self.D):
            if random.random() < self.config.CR or j == j_rand:
                trial[j] = mutant[j]
            else:
                trial[j] = target[j]
        return np.clip(trial, self.low, self.high)
    
    def step(self):
        for i in range(self.config.NP):
            mutant = self.mutate(i)
            trial = self.crossover(self.population[i], mutant)
            fitness_trial = self.objective(trial)
            
            if fitness_trial <= self.fitness[i]:
                self.population[i] = trial
                self.fitness[i] = fitness_trial
                
                if fitness_trial < self.best_fitness:
                    self.best_fitness = fitness_trial
                    self.best_solution = trial.copy()
        
        self.history.append(self.best_fitness)
    
    def run(self) -> Tuple[np.ndarray, float]:
        self.initialize()
        
        for gen in range(self.config.max_iter):
            self.step()
            
            if self.verbose and (gen + 1) % 100 == 0:
                print(f"Gen {gen+1}: Best = {self.best_fitness:.8f}")
            
            if self.best_fitness < self.config.tol:
                if self.verbose:
                    print(f"Converged at generation {gen+1}")
                break
        
        return self.best_solution.copy(), self.best_fitness
    
    def save_results(self, filepath: str):
        """保存结果到文件"""
        data = {
            'best_solution': self.best_solution.tolist(),
            'best_fitness': float(self.best_fitness),
            'history': self.history,
            'config': {
                'NP': self.config.NP,
                'F': self.config.F,
                'CR': self.config.CR,
                'max_iter': self.config.max_iter,
                'strategy': self.config.strategy
            }
        }
        
        with open(filepath, 'w') as f:
            json.dump(data, f, indent=2)
    
    def plot(self, title: str = "DE Convergence", save_path: Optional[str] = None):
        """绘制收敛曲线"""
        plt.figure(figsize=(10, 6))
        plt.semilogy(self.history)
        plt.xlabel('Generation')
        plt.ylabel('Best Fitness (log)')
        plt.title(title)
        plt.grid(True, alpha=0.3)
        
        if save_path:
            plt.savefig(save_path, dpi=150)
        plt.show()
 
 
# ===== 快速使用 =====
if __name__ == "__main__":
    # 定义优化问题
    def schwefel(x):
        """Schwefel函数"""
        return 418.9829 * len(x) - np.sum(x * np.sin(np.sqrt(np.abs(x))))
    
    # 运行优化
    de = DifferentialEvolution(
        objective=schwefel,
        bounds=[(-500, 500)] * 30,
        config=DEConfig(NP=100, F=0.5, CR=0.9, max_iter=1000, seed=42),
        verbose=True
    )
    
    solution, fitness = de.run()
    
    print(f"\n结果:")
    print(f"  最优适应度: {fitness:.4f}")
    print(f"  解的前5维: {solution[:5]}")
    
    # 保存和绘图
    de.save_results('de_result.json')
    de.plot("Schwefel Function Optimization")

十五、参考文献

  1. Storn, R., & Price, K. (1997). Differential Evolution - A Simple and Efficient Heuristic for Global Optimization over Continuous Spaces. Journal of Global Optimization, 11(4), 341-359.

  2. Brest, J., et al. (2006). Self-Adapting Control Parameters in Differential Evolution: A Comparative Study on Numerical Benchmark Problems. IEEE Transactions on Evolutionary Computation, 10(6), 646-657.

  3. Das, S., & Suganthan, P. N. (2011). Differential Evolution: A Survey of the State-of-the-Art. IEEE Transactions on Evolutionary Computation, 15(1), 4-31.

  4. Tanabe, R., & Fukunaga, A. (2013). Success-History Based Parameter Adaptation for Differential Evolution. Proceedings of IEEE CEC, 2013.

  5. Price, K. V., et al. (2005). Differential Evolution: A Practical Approach to Global Optimization. Springer.

  6. Qin, A. K., et al. (2009). Differential Evolution Algorithm With Strategy Adaptation for Global Numerical Optimization. IEEE Transactions on Evolutionary Computation, 13(2), 398-417.

  7. Zhang, J., & Sanderson, A. C. (2009). JADE: Adaptive Differential Evolution With Optional External Archive. IEEE Transactions on Evolutionary Computation, 13(5), 945-958.

  8. Kukkonen, S., & Lampinen, J. (2005). GDE3: The Third Evolution Step of Generalized Differential Evolution. Proceedings of IEEE CEC, 2005.


相关文档