关键词

差分进化DE/rand/1DE/best/1变异操作交叉操作
F缩放因子CR交叉率二项式交叉指数交叉选择操作
约束处理自适应参数边界约束罚函数成功历史
jDESaDESHADEL-SHADE多目标DE

摘要

差分进化(Differential Evolution, DE)是由Rainer Storn和Kenneth Price于1995年提出的高效全局优化算法,专门用于解决实数参数空间的连续优化问题。DE以其简洁的算法结构、较少的控制参数和卓越的优化性能著称,在多个基准测试和实际应用中获得优异表现。与传统遗传算法的随机变异不同,DE采用基于差向量的变异策略,使其在局部搜索和全局探索之间实现了良好的平衡。


一、差分进化的基本原理

1.1 算法起源与动机

1995年,Storn和Price在解决切比雪夫多项式拟合问题时提出了差分进化算法。该算法的核心创新在于利用种群中个体间的差分向量来引导搜索方向,这一设计灵感来源于自然界中生物群体协同进化的特性。

DE的核心思想

其中为三个互不相同的随机索引,为缩放因子。

1.2 算法框架概览

┌─────────────────────────────────────────────────────┐
│                   差分进化流程                        │
├─────────────────────────────────────────────────────┤
│  1. 初始化种群 X⁰ = {x¹, x², ..., xᴺᴾ}              │
│  2. FOR g = 1 TO G_max:                             │
│      FOR i = 1 TO Nᴾ:                              │
│          │ 2.1 变异: 生成变异向量 vⁱ                   │
│          │ 2.2 交叉: 生成试验向量 uⁱ                   │
│          │ 2.3 选择: 基于适应度决定存活者                │
│      END FOR                                       │
│  3. 输出最优解                                      │
└─────────────────────────────────────────────────────┘

1.3 与遗传算法的关键区别

特征遗传算法差分进化
变异机制随机扰动差分向量驱动
搜索方向无方向性沿梯度方向
参数敏感性中等较低
收敛速度较慢较快
全局搜索中等(依赖参数)

二、核心操作详解

2.1 初始化

DE在维搜索空间内随机初始化种群:

def initialize_population(NP, D, bounds):
    """
    NP: 种群规模
    D: 维度
    bounds: [(low1, high1), ..., (lowD, highD)]
    """
    population = []
    for i in range(NP):
        individual = np.array([np.random.uniform(low, high) 
                              for low, high in bounds])
        population.append(individual)
    return np.array(population)

边界处理:初始化后需确保所有个体满足边界约束:

2.2 变异操作

2.2.1 基本变异策略

DE/rand/1(最常用,最稳定):

def de_rand_1(population, F=0.5, i=None):
    """
    随机选择三个不同于i的个体
    """
    NP = len(population)
    candidates = [j for j in range(NP) if j != i]
    r1, r2, r3 = random.sample(candidates, 3)
    
    mutant = population[r1] + F * (population[r2] - population[r3])
    return mutant

DE/best/1(收敛更快但可能早熟):

def de_best_1(population, fitness, F=0.5, i=None):
    """
    使用当前最优个体作为基准
    """
    NP = len(population)
    best_idx = np.argmin(fitness)
    
    candidates = [j for j in range(NP) if j != i and j != best_idx]
    r1, r2 = random.sample(candidates, 2)
    
    mutant = population[best_idx] + F * (population[r1] - population[r2])
    return mutant

DE/rand/2(更多样化):

def de_rand_2(population, F=0.5, i=None):
    """
    使用两个差分向量
    """
    NP = len(population)
    candidates = [j for j in range(NP) if j != i]
    r1, r2, r3, r4, r5 = random.sample(candidates, 5)
    
    mutant = population[r1] + F * (population[r2] - population[r3]) + \
             F * (population[r4] - population[r5])
    return mutant

2.2.2 变异策略命名规范

DE的变异策略遵循统一的命名规则:DE/x/y/z

  • x:基向量选择方式(rand/best/target to best/current to rand)
  • y:差分向量数量(1, 2, …)
  • z:交叉类型(bin/binomial/exp/exponential)

完整策略列表

策略名称公式特点
DE/rand/1全局搜索强
DE/best/1收敛快
DE/rand-to-best/1平衡型
DE/best/2激进型
DE/rand/2高变异

2.3 交叉操作

2.3.1 二项式交叉(Binomial Crossover)

def binomial_crossover(target, mutant, CR, bounds):
    """
    二项式交叉:每个维度独立决定来源
    """
    D = len(target)
    trial = np.zeros(D)
    
    # 强制包含至少一个来自变异向量的维度
    j_rand = random.randint(0, D-1)
    
    for j in range(D):
        if random.random() < CR or j == j_rand:
            trial[j] = mutant[j]
        else:
            trial[j] = target[j]
    
    # 边界处理
    trial = np.clip(trial, 
                    [b[0] for b in bounds], 
                    [b[1] for b in bounds])
    
    return trial

2.3.2 指数交叉(Exponential Crossover)

def exponential_crossover(target, mutant, CR, bounds):
    """
    指数交叉:连续一段来自变异向量
    """
    D = len(target)
    trial = target.copy()
    
    # 随机选择起始点
    n = random.randint(0, D-1)
    L = 0
    
    # 确定连续段长度
    while random.random() < CR and L < D:
        n = (n + 1) % D
        L += 1
    
    # 复制连续段
    for _ in range(L):
        trial[n] = mutant[n]
        n = (n + 1) % D
    
    # 边界处理
    trial = np.clip(trial, 
                    [b[0] for b in bounds], 
                    [b[1] for b in bounds])
    
    return trial

二项式 vs 指数交叉

特征二项式交叉指数交叉
搜索特性均匀探索局部搜索
参数敏感性CRCR
适用场景高维问题低维问题

2.4 选择操作

def select(target, trial, fitness_target, fitness_trial):
    """
    一对一选择:贪婪策略
    """
    if fitness_trial <= fitness_target:  # 最小化问题
        return trial, fitness_trial
    else:
        return target, fitness_target

数学表达


三、完整算法实现

3.1 标准差分进化

class DifferentialEvolution:
    def __init__(self, objective, bounds, NP=None, F=0.5, CR=0.9, 
                 max_iter=1000, tol=1e-8):
        self.objective = objective
        self.bounds = bounds
        self.D = len(bounds)
        
        # 默认种群规模
        self.NP = NP if NP else max(10 * self.D, 50)
        
        self.F = F      # 缩放因子
        self.CR = CR    # 交叉概率
        self.max_iter = max_iter
        self.tol = tol
        
        self.best_fitness = float('inf')
        self.best_solution = None
        self.history = []
    
    def initialize(self):
        self.population = np.array([
            [np.random.uniform(low, high) 
             for low, high in self.bounds]
            for _ in range(self.NP)
        ])
        self.fitness = np.array([
            self.objective(x) for x in self.population
        ])
        
        best_idx = np.argmin(self.fitness)
        self.best_fitness = self.fitness[best_idx]
        self.best_solution = self.population[best_idx].copy()
    
    def mutate(self, i):
        """DE/rand/1/bin"""
        while True:
            r = random.sample(range(self.NP), 3)
            if i not in r:
                break
        
        r1, r2, r3 = r
        mutant = self.population[r1] + self.F * (
            self.population[r2] - self.population[r3]
        )
        return mutant
    
    def crossover(self, target, mutant):
        """二项式交叉"""
        trial = np.zeros(self.D)
        j_rand = random.randint(0, self.D - 1)
        
        for j in range(self.D):
            if random.random() < self.CR or j == j_rand:
                trial[j] = mutant[j]
            else:
                trial[j] = target[j]
        
        # 边界处理
        for j in range(self.D):
            low, high = self.bounds[j]
            trial[j] = max(low, min(high, trial[j]))
        
        return trial
    
    def step(self):
        """执行一代进化"""
        new_population = np.zeros_like(self.population)
        new_fitness = np.zeros(self.NP)
        
        for i in range(self.NP):
            # 变异
            mutant = self.mutate(i)
            
            # 交叉
            trial = self.crossover(self.population[i], mutant)
            
            # 选择
            fitness_trial = self.objective(trial)
            
            if fitness_trial <= self.fitness[i]:
                new_population[i] = trial
                new_fitness[i] = fitness_trial
            else:
                new_population[i] = self.population[i]
                new_fitness[i] = self.fitness[i]
        
        self.population = new_population
        self.fitness = new_fitness
        
        # 更新最优解
        best_idx = np.argmin(self.fitness)
        if self.fitness[best_idx] < self.best_fitness:
            self.best_fitness = self.fitness[best_idx]
            self.best_solution = self.population[best_idx].copy()
    
    def run(self):
        self.initialize()
        
        for gen in range(self.max_iter):
            self.step()
            self.history.append(self.best_fitness)
            
            if gen % 100 == 0:
                print(f"Gen {gen}: Best = {self.best_fitness:.8f}")
            
            # 早停条件
            if self.best_fitness < self.tol:
                break
        
        return self.best_solution, self.best_fitness

3.2 高级变体:jDE(自适应DE)

class JDE(DifferentialEvolution):
    """
    动态参数自适应差分进化
    Brest et al., 2006
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # 每个个体独立控制参数
        self.F = np.ones(self.NP) * 0.5
        self.CR = np.zeros(self.NP)
        
        # 自适应参数范围
        self.F_l, self.F_u = 0.1, 1.0
        self.CR_l, self.CR_u = 0.0, 1.0
        
        # 自适应概率
        self.tau1 = self.tau2 = 0.1
    
    def update_parameters(self, i):
        """动态更新F和CR"""
        # 更新F
        if random.random() < self.tau1:
            self.F[i] = self.F_l + random.random() * (self.F_u - self.F_l)
        
        # 更新CR
        if random.random() < self.tau2:
            self.CR[i] = self.CR_l + random.random() * (self.CR_u - self.CR_l)
    
    def mutate(self, i):
        """使用个体自适应参数"""
        self.update_parameters(i)
        
        while True:
            r = random.sample(range(self.NP), 3)
            if i not in r:
                break
        
        r1, r2, r3 = r
        mutant = self.population[r1] + self.F[i] * (
            self.population[r2] - self.population[r3]
        )
        return mutant
    
    def crossover(self, target, mutant, CR_i):
        """使用个体自适应交叉率"""
        trial = np.zeros(self.D)
        j_rand = random.randint(0, self.D - 1)
        
        for j in range(self.D):
            if random.random() < CR_i or j == j_rand:
                trial[j] = mutant[j]
            else:
                trial[j] = target[j]
        
        for j in range(self.D):
            low, high = self.bounds[j]
            trial[j] = max(low, min(high, trial[j]))
        
        return trial
    
    def step(self):
        new_population = np.zeros_like(self.population)
        new_fitness = np.zeros(self.NP)
        
        for i in range(self.NP):
            mutant = self.mutate(i)
            trial = self.crossover(self.population[i], mutant, self.CR[i])
            
            fitness_trial = self.objective(trial)
            
            if fitness_trial <= self.fitness[i]:
                new_population[i] = trial
                new_fitness[i] = fitness_trial
            else:
                new_population[i] = self.population[i]
                new_fitness[i] = self.fitness[i]
        
        self.population = new_population
        self.fitness = new_fitness
        
        best_idx = np.argmin(self.fitness)
        if self.fitness[best_idx] < self.best_fitness:
            self.best_fitness = self.fitness[best_idx]
            self.best_solution = self.population[best_idx].copy()

四、参数控制与调节

4.1 缩放因子F的影响

控制差分向量的缩放程度,直接影响算法的探索能力:

F值特性适用场景
F < 0.3局部搜索能力强微调精细解
F ≈ 0.5平衡型通用场景
F > 0.8全局探索能力强多模态问题

Warning

F过大(如F > 1.0)可能导致搜索步长过大,跨越最优区域;F过小(如F < 0.2)可能导致搜索停滞,早熟收敛。

4.2 交叉概率CR的影响

CR控制试验向量来自变异向量的比例:

CR值特性适用场景
CR < 0.3接近随机搜索高维复杂问题
CR ≈ 0.5平衡型通用场景
CR > 0.8贪婪搜索单峰问题

4.3 种群规模NP

经验公式

4.4 参数自适应策略

4.4.1 Success History Adaptive DE (SHADE)

class SHADE(DifferentialEvolution):
    """
    基于成功历史的自适应DE
    Tanabe & Fukunaga, 2013
    """
    def __init__(self, *args, H=6, **kwargs):
        super().__init__(*args, **kwargs)
        self.H = H
        
        # 历史记忆
        self.F_means = [0.5] * H
        self.CR_means = [0.5] * H
        self.archive = []  # 存档非优解
        self.k = 0
    
    def update_history(self, successful_F, successful_CR):
        """更新历史记忆"""
        if len(successful_F) == 0:
            return
        
        # 更新记忆
        F_mean = sum(f**2 for f in successful_F) / sum(successful_F)
        CR_mean = np.mean(successful_CR)
        
        self.F_means[self.k] = F_mean
        self.CR_means[self.k] = CR_mean
        
        # 循环更新索引
        self.k = (self.k + 1) % self.H
    
    def sample_parameters(self, i):
        """从历史记忆中采样参数"""
        r = random.randint(0, self.H - 1)
        F = self.F_means[r]
        CR = self.CR_means[r]
        
        # 添加随机扰动
        F = F + 0.1 * random.gauss(0, 1)
        CR = random.gauss(CR, 0.1)
        
        # 限制范围
        F = np.clip(F, 0, 1)
        CR = np.clip(CR, 0, 1)
        
        return F, CR

五、约束优化中的DE

5.1 罚函数方法

class ConstrainedDE(DifferentialEvolution):
    def __init__(self, objective, constraints, *args, penalty=1000, **kwargs):
        super().__init__(objective, *args, **kwargs)
        self.constraints = constraints
        self.penalty = penalty
    
    def evaluate_constraints(self, x):
        """
        返回约束违反程度
        g(x) <= 0: 不等式约束
        h(x) = 0: 等式约束
        """
        g_violations = [max(0, g(x)) for g in self.constraints['ineq']]
        h_violations = [abs(h(x)) for h in self.constraints['eq']]
        
        return g_violations, h_violations
    
    def penalized_fitness(self, x):
        """计算带惩罚的适应度"""
        obj = self.objective(x)
        g_viol, h_viol = self.evaluate_constraints(x)
        
        penalty = self.penalty * (sum(g_viol) + sum(h_viol))
        return obj + penalty

5.2 ε-约束处理方法

class EpsilonConstraintDE(ConstrainedDE):
    def __init__(self, *args, epsilon_0=0, rho=0.1, **kwargs):
        super().__init__(*args, **kwargs)
        self.epsilon_0 = epsilon_0
        self.rho = rho
        self.epsilon = epsilon_0
    
    def update_epsilon(self, generation):
        """更新ε参数"""
        self.epsilon = self.epsilon_0 * (self.rho ** generation)
    
    def comparison(self, x1, x2):
        """基于ε约束的比较"""
        g1, _ = self.evaluate_constraints(x1)
        g2, _ = self.evaluate_constraints(x2)
        
        total_violation1 = sum(g1)
        total_violation2 = sum(g2)
        
        # 优先选择可行解
        if total_violation1 <= self.epsilon and total_violation2 > self.epsilon:
            return x1
        if total_violation2 <= self.epsilon and total_violation1 > self.epsilon:
            return x2
        
        # 都可行或都不可行时比较目标函数值
        if self.objective(x1) <= self.objective(x2):
            return x1
        return x2

5.3 随机排名方法

class StochasticRankingDE(DifferentialEvolution):
    """
    随机排名法处理约束
    Runarsson & Yao, 2000
    """
    def __init__(self, *args, P_f=0.45, **kwargs):
        super().__init__(*args, **kwargs)
        self.P_f = P_f  # 目标比较概率
    
    def compare(self, x1, x2):
        g1, _ = self.evaluate_constraints(x1)
        g2, _ = self.evaluate_constraints(x2)
        
        v1, v2 = sum(g1), sum(g2)
        
        # 都可行
        if v1 == 0 and v2 == 0:
            return x1 if self.objective(x1) <= self.objective(x2) else x2
        
        # 基于约束违反度的比较
        if random.random() < self.P_f:
            return x1 if v1 < v2 else x2
        else:
            return x1 if self.objective(x1) <= self.objective(x2) else x2

六、DE的改进变体

6.1 多目标差分进化

class MODE(DifferentialEvolution):
    """
    多目标差分进化
    """
    def __init__(self, objectives, *args, **kwargs):
        self.objectives = objectives
        self.n_objectives = len(objectives)
        super().__init__(lambda x: 0, *args, **kwargs)  # 占位
    
    def evaluate(self, x):
        return np.array([f(x) for f in self.objectives])
    
    def dominates(self, x1, x2):
        f1 = self.evaluate(x1)
        f2 = self.evaluate(x2)
        return all(f1 <= f2) and any(f1 < f2)
    
    def fast_non_dominated_sort(self, population):
        """快速非支配排序"""
        domination_count = [0] * len(population)
        dominated_set = [set() for _ in range(len(population))]
        fronts = [[]]
        
        for i, p in enumerate(population):
            for j, q in enumerate(population):
                if i != j and self.dominates(p, q):
                    dominated_set[i].add(j)
                elif i != j and self.dominates(q, p):
                    domination_count[i] += 1
            
            if domination_count[i] == 0:
                fronts[0].append(i)
        
        current_front = 0
        while fronts[current_front]:
            next_front = []
            for i in fronts[current_front]:
                for j in dominated_set[i]:
                    domination_count[j] -= 1
                    if domination_count[j] == 0:
                        next_front.append(j)
            current_front += 1
            if next_front:
                fronts.append(next_front)
        
        return fronts[:-1] if fronts[-1] == [] else fronts

6.2 协方差引导的DE

class CMAguidedDE(DifferentialEvolution):
    """
    协方差矩阵引导的差分进化
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.cov_matrix = np.eye(self.D)
    
    def mutate_cma_guided(self, i):
        """使用协方差矩阵引导变异"""
        while True:
            r = random.sample(range(self.NP), 3)
            if i not in r:
                break
        
        r1, r2, r3 = r
        
        # 计算加权差分方向
        L = np.linalg.cholesky(self.cov_matrix)
        scaled_diff = L @ (self.population[r2] - self.population[r3])
        
        mutant = self.population[r1] + self.F * scaled_diff
        return mutant
    
    def update_covariance(self):
        """基于成功个体更新协方差"""
        # 获取最优个体
        best_idx = np.argmin(self.fitness)
        
        # 更新协方差
        pass  # 简化实现

七、算法性能对比

7.1 CEC基准测试表现

在CEC(IEEE Congress on Evolutionary Computation)基准测试中,DE通常在以下函数类型上表现优异:

函数类型DE表现备注
Unimodal★★★★☆收敛速度快
Multimodal★★★☆☆依赖策略选择
Hybrid Composition★★★★☆F参数关键
Multi-Swarm★★★☆☆适合多峰搜索

7.2 参数推荐配置

问题类型FCRNP策略
通用0.50.910×DDE/rand/1/bin
多模态0.6-0.90.3-0.715×DDE/rand/2/bin
高维0.50.5-0.75×DDE/rand/1/exp
约束0.50.820×DDE/best/1/bin

九、DE的高级变体与前沿研究

9.1 自适应差分进化族

9.1.1 SaDE(自适应差分进化)

class SaDE(DifferentialEvolution):
    """
    自适应差分进化
    根据历史表现自适应选择变异策略和参数
    """
    def __init__(self, *args, LP=25, **kwargs):
        super().__init__(*args, **kwargs)
        self.LP = LP  # 学习周期
        self.strategy_pool = ['DE/rand/1/bin', 'DE/rand/2/bin', 
                              'DE/best/2/bin']
        self.strategy_success = {s: [] for s in self.strategy_pool}
        self.cr_success = []
        self.f_success = []
    
    def select_strategy(self):
        """根据历史成功率选择策略"""
        if len(self.strategy_success['DE/rand/1/bin']) < self.LP:
            return random.choice(self.strategy_pool)
        
        # 计算每个策略的成功率
        probs = {}
        for strategy in self.strategy_pool:
            success = self.strategy_success[strategy]
            if len(success) >= self.LP:
                recent = success[-self.LP:]
                probs[strategy] = sum(recent) / len(recent)
            else:
                probs[strategy] = 0.1
        
        # 轮盘赌选择
        total = sum(probs.values())
        r = random.random() * total
        cumulative = 0
        
        for strategy, prob in probs.items():
            cumulative += prob
            if r <= cumulative:
                return strategy
        
        return random.choice(self.strategy_pool)
    
    def adaptive_crossover(self):
        """自适应交叉概率"""
        CR_m = 0.5
        CR_mu = 0.5
        F_m = 0.5
        
        if len(self.cr_success) >= self.LP:
            recent_cr = self.cr_success[-self.LP:]
            CR_mu = sum(recent_cr) / len(recent_cr)
        
        # 生成个体CR
        CR_i = random.gauss(CR_mu, 0.1)
        CR_i = np.clip(CR_i, 0, 1)
        
        return CR_i, F_m
    
    def mutate_with_strategy(self, i, strategy):
        """根据策略执行变异"""
        while True:
            r = random.sample(range(self.NP), 3)
            if i not in r:
                break
        
        r1, r2, r3 = r
        
        if strategy == 'DE/rand/1/bin':
            mutant = self.population[r1] + self.F * (
                self.population[r2] - self.population[r3]
            )
        elif strategy == 'DE/rand/2/bin':
            while True:
                r = random.sample(range(self.NP), 5)
                if i not in r:
                    break
            mutant = self.population[r[0]] + self.F * (
                self.population[r[1]] - self.population[r[2]]
            ) + self.F * (self.population[r[3]] - self.population[r[4]])
        elif strategy == 'DE/best/2/bin':
            best_idx = np.argmin(self.fitness)
            while True:
                r = random.sample(range(self.NP), 4)
                if i not in r and best_idx not in r:
                    break
            mutant = self.population[best_idx] + self.F * (
                self.population[r[0]] - self.population[r[1]]
            ) + self.F * (self.population[r[2]] - self.population[r[3]])
        else:
            mutant = self.population[r1] + self.F * (
                self.population[r2] - self.population[r3]
            )
        
        return mutant

9.1.2 JADE(增强型自适应DE)

class JADE(DifferentialEvolution):
    """
    JADE - 带有可选历史的自适应DE
    Zhang & Sanderson, 2009
    """
    def __init__(self, *args, H=100, p=0.05, **kwargs):
        super().__init__(*args, **kwargs)
        self.H = H  # 历史记忆大小
        self.p = p  # 顶级个体比例
        self.archive = []  # 存档
        
        # 参数历史
        self.F_means = []
        self.CR_means = []
        self.success_F = []
        self.success_CR = []
    
    def update_parameters(self):
        """更新F和CR"""
        if len(self.success_F) > 0:
            # 计算新的均值
            F_new = sum(f**2 for f in self.success_F) / sum(self.success_F)
            CR_new = np.mean(self.success_CR)
            
            self.F_means.append(F_new)
            self.CR_means.append(CR_new)
            
            # 保持历史长度
            if len(self.F_means) > self.H:
                self.F_means.pop(0)
                self.CR_means.pop(0)
        
        # 重置成功列表
        self.success_F = []
        self.success_CR = []
    
    def sample_F(self):
        """采样F(柯西分布)"""
        if len(self.F_means) == 0:
            mu_F = 0.5
        else:
            mu_F = self.F_means[-1]
        
        # 柯西分布采样
        F = mu_F + 0.1 * random.gauss(0, 1)
        return min(F, 1.0)  # 上限为1
    
    def sample_CR(self):
        """采样CR(正态分布)"""
        if len(self.CR_means) == 0:
            mu_CR = 0.5
        else:
            mu_CR = self.CR_means[-1]
        
        CR = random.gauss(mu_CR, 0.1)
        return np.clip(CR, 0, 1)
    
    def get_best_individuals(self, npop):
        """获取顶级个体"""
        n_best = max(1, int(self.p * self.NP))
        sorted_indices = np.argsort(self.fitness)[:n_best]
        return [self.population[i] for i in sorted_indices]
    
    def mutate(self, i):
        """JADE变异:DE/current-to-pbest"""
        F = self.sample_F()
        CR = self.sample_CR()
        
        # 获取顶级个体
        best_candidates = self.get_best_individuals(self.NP)
        x_best = random.choice(best_candidates)
        
        # 随机选择个体
        candidates = list(range(self.NP)) + list(range(len(self.archive)))
        candidates = [c for c in candidates if c != i]
        
        # 选择两个随机个体
        r1, r2 = random.sample(candidates, 2)
        r1 = r1 if r1 < self.NP else self.archive[r1 - self.NP]
        r2 = r2 if r2 < self.NP else self.archive[r2 - self.NP]
        
        mutant = self.population[i] + F * (x_best - self.population[i]) + \
                 F * (self.population[r1] - self.population[r2])
        
        return mutant, F, CR
    
    def update_archive(self, trial, fitness_trial, target_idx):
        """更新存档"""
        if fitness_trial <= self.fitness[target_idx]:
            # 保留被替代的个体
            self.archive.append(self.population[target_idx].copy())
            
            # 限制存档大小
            if len(self.archive) > self.NP:
                self.archive.pop(random.randint(0, len(self.archive) - 1))
    
    def step(self):
        """JADE主循环"""
        new_population = []
        new_fitness = []
        
        for i in range(self.NP):
            mutant, F, CR = self.mutate(i)
            trial = self.crossover(self.population[i], mutant, CR)
            fitness_trial = self.objective(trial)
            
            if fitness_trial <= self.fitness[i]:
                new_population.append(trial)
                new_fitness.append(fitness_trial)
                
                # 记录成功
                if fitness_trial < self.fitness[i]:
                    self.success_F.append(F)
                    self.success_CR.append(CR)
                
                self.update_archive(trial, fitness_trial, i)
            else:
                new_population.append(self.population[i])
                new_fitness.append(self.fitness[i])
        
        self.population = np.array(new_population)
        self.fitness = np.array(new_fitness)
        
        self.update_parameters()
        
        best_idx = np.argmin(self.fitness)
        if self.fitness[best_idx] < self.best_fitness:
            self.best_fitness = self.fitness[best_idx]
            self.best_solution = self.population[best_idx].copy()

9.2 多目标差分进化

9.2.1 GDE3(多目标DE)

class GDE3(DifferentialEvolution):
    """
    GDE3 - 多目标差分进化
    Kukkonen & Lampinen, 2005
    """
    def __init__(self, objectives, *args, **kwargs):
        super().__init__(lambda x: 0, *args, **kwargs)
        self.objectives = objectives
        self.n_objectives = len(objectives)
        self.archive = []  # Pareto存档
    
    def evaluate(self, x):
        """评估多个目标"""
        return np.array([f(x) for f in self.objectives])
    
    def dominates(self, x1, x2):
        """判断支配关系"""
        f1 = self.evaluate(x1)
        f2 = self.evaluate(x2)
        return all(f1 <= f2) and any(f1 < f2)
    
    def crowding_distance(self, solutions):
        """计算拥挤距离"""
        n = len(solutions)
        if n <= 2:
            return [float('inf')] * n
        
        distances = [0.0] * n
        objectives = [self.evaluate(s) for s in solutions]
        
        for m in range(self.n_objectives):
            # 按当前目标排序
            sorted_idx = sorted(range(n), 
                             key=lambda i: objectives[i][m])
            
            # 边界点距离无限
            distances[sorted_idx[0]] = float('inf')
            distances[sorted_idx[-1]] = float('inf')
            
            # 计算其他点的距离
            f_range = (objectives[sorted_idx[-1]][m] - 
                      objectives[sorted_idx[0]][m])
            
            if f_range > 0:
                for i in range(1, n - 1):
                    distances[sorted_idx[i]] += (
                        objectives[sorted_idx[i+1]][m] - 
                        objectives[sorted_idx[i-1]][m]
                    ) / f_range
        
        return distances
    
    def step(self):
        """GDE3主循环"""
        new_population = []
        
        for i in range(self.NP):
            # 变异
            mutant = self.mutate(i)
            
            # 交叉
            trial = self.crossover(self.population[i], mutant)
            fitness_trial = self.evaluate(trial)
            fitness_i = self.evaluate(self.population[i])
            
            # 三方选择
            dominates_trial = self.dominates(trial, self.population[i])
            dominates_target = self.dominates(self.population[i], trial)
            
            if dominates_trial and not dominates_target:
                # trial支配target
                new_population.append(trial)
            elif dominates_target and not dominates_trial:
                # target支配trial
                new_population.append(self.population[i])
            else:
                # 互不支配,添加到存档并选择拥挤距离小的
                self.archive.append(trial)
                self.archive.append(self.population[i])
                
                # 更新存档(去除被支配的)
                self.update_archive()
                
                # 从存档中选择
                if len(self.archive) > self.NP:
                    distances = self.crowding_distance(self.archive)
                    worst_idx = distances.index(min(distances))
                    new_population.append(self.archive[worst_idx])
                    self.archive.pop(worst_idx)
                else:
                    new_population.append(random.choice(self.archive))
        
        self.population = np.array(new_population[:self.NP])
        
        best_idx = np.argmin([self.objectives[0](x) for x in self.population])
        self.best_solution = self.population[best_idx]
    
    def update_archive(self):
        """更新Pareto存档"""
        pareto_front = []
        
        for i, p in enumerate(self.archive):
            is_dominated = False
            dominated = []
            
            for j, q in enumerate(self.archive):
                if i != j:
                    if self.dominates(q, p):
                        is_dominated = True
                    elif self.dominates(p, q):
                        dominated.append(j)
            
            if not is_dominated:
                pareto_front.append((i, p))
        
        # 只保留Pareto前沿
        self.archive = [p for _, p in pareto_front]

9.3 约束差分进化

9.3.1 CDE(约束DE)

class ConstrainedDE(DifferentialEvolution):
    """
    约束差分进化
    使用多种约束处理技术
    """
    def __init__(self, objective, constraints, *args, 
                 method='adaptive_penalty', **kwargs):
        super().__init__(objective, *args, **kwargs)
        self.constraints = constraints
        self.method = method
        self.penalty_history = []
    
    def constraint_violation(self, x):
        """计算约束违反度"""
        total_violation = 0
        
        # 不等式约束
        for g in self.constraints.get('ineq', []):
            violation = max(0, g(x))
            total_violation += violation
        
        # 等式约束
        for h in self.constraints.get('eq', []):
            violation = abs(h(x))
            total_violation += violation
        
        return total_violation
    
    def adaptive_penalty(self, x, iteration):
        """自适应惩罚"""
        base_penalty = 100
        
        # 约束违反度
        violation = self.constraint_violation(x)
        
        if violation == 0:
            return self.objective(x)
        
        # 自适应惩罚系数
        if len(self.penalty_history) > 0:
            best_feasible = min(
                [self.objective(xi) for xi in self.penalty_history 
                 if self.constraint_violation(xi) == 0],
                default=1e6
            )
            
            worst_infeasible = max(
                [self.objective(xi) for xi in self.penalty_history 
                 if self.constraint_violation(xi) > 0],
                default=1e6
            )
            
            # 动态调整惩罚系数
            if worst_infeasible > best_feasible:
                penalty_coef = base_penalty * (
                    1 + (worst_infeasible - best_feasible) / 
                    (best_feasible + 1e-10)
                )
            else:
                penalty_coef = base_penalty
        else:
            penalty_coef = base_penalty
        
        return self.objective(x) + penalty_coef * violation
    
    def step(self):
        """约束DE主循环"""
        new_population = []
        
        for i in range(self.NP):
            mutant = self.mutate(i)
            trial = self.crossover(self.population[i], mutant)
            
            # 边界处理
            for j in range(self.D):
                low, high = self.bounds[j]
                trial[j] = max(low, min(high, trial[j]))
            
            # 基于约束处理方法选择
            if self.method == 'adaptive_penalty':
                fitness_trial = self.adaptive_penalty(trial, len(self.penalty_history))
                fitness_i = self.adaptive_penalty(self.population[i], 
                                                 len(self.penalty_history))
            else:
                fitness_trial = self.objective(trial)
                fitness_i = self.objective(self.population[i])
            
            # 选择
            if fitness_trial <= fitness_i:
                new_population.append(trial)
                self.penalty_history.append(trial)
            else:
                new_population.append(self.population[i])
                self.penalty_history.append(self.population[i])
            
            # 保持历史长度
            if len(self.penalty_history) > 1000:
                self.penalty_history.pop(0)
        
        self.population = np.array(new_population)
        
        # 找到最优可行解
        for i in range(self.NP):
            if self.constraint_violation(self.population[i]) == 0:
                if self.best_fitness == float('inf'):
                    self.best_fitness = self.objective(self.population[i])
                    self.best_solution = self.population[i].copy()
                elif self.objective(self.population[i]) < self.best_fitness:
                    self.best_fitness = self.objective(self.population[i])
                    self.best_solution = self.population[i].copy()

9.4 分布式与并行DE

9.4.1 并行DE框架

from multiprocessing import Pool, cpu_count
import numpy as np
 
class ParallelDE(DifferentialEvolution):
    """
    并行差分进化
    使用多进程加速适应度评估
    """
    def __init__(self, objective, bounds, *args, n_workers=None, **kwargs):
        super().__init__(objective, bounds, *args, **kwargs)
        self.n_workers = n_workers or max(1, cpu_count() - 1)
    
    def evaluate_batch(self, positions):
        """批量评估适应度"""
        with Pool(self.n_workers) as pool:
            fitness = pool.map(self.objective, positions)
        return np.array(fitness)
    
    def step(self):
        """并行主循环"""
        # 并行变异和交叉
        trials = []
        for i in range(self.NP):
            mutant = self.mutate(i)
            trial = self.crossover(self.population[i], mutant)
            
            # 边界处理
            for j in range(self.D):
                low, high = self.bounds[j]
                trial[j] = max(low, min(high, trial[j]))
            
            trials.append(trial)
        
        # 并行评估
        trial_fitness = self.evaluate_batch(trials)
        
        # 选择
        for i in range(self.NP):
            if trial_fitness[i] <= self.fitness[i]:
                self.population[i] = trials[i]
                self.fitness[i] = trial_fitness[i]
        
        best_idx = np.argmin(self.fitness)
        if self.fitness[best_idx] < self.best_fitness:
            self.best_fitness = self.fitness[best_idx]
            self.best_solution = self.population[best_idx].copy()

9.4.2 分布式DE

class DistributedDE:
    """
    分布式差分进化
    多个子种群协同进化
    """
    def __init__(self, objective, bounds, n_subpops=4, subpop_size=20):
        self.n_subpops = n_subpops
        self.subpop_size = subpop_size
        
        # 创建子种群
        self.subpopulations = []
        for _ in range(n_subpops):
            de = DifferentialEvolution(
                objective, bounds, NP=subpop_size
            )
            de.initialize()
            self.subpopulations.append(de)
        
        self.migration_interval = 10
        self.migration_rate = 0.1
    
    def migrate(self):
        """迁移操作"""
        for i in range(self.n_subpops):
            next_pop = (i + 1) % self.n_subpops
            
            # 选择最优个体迁移
            n_migrate = max(1, int(self.subpop_size * self.migration_rate))
            
            # 获取当前种群的最优个体
            sorted_idx = np.argsort(self.subpopulations[i].fitness)[:n_migrate]
            migrants = self.subpopulations[i].population[sorted_idx]
            
            # 替换目标种群的最差个体
            worst_idx = np.argsort(self.subpopulations[next_pop].fitness)[-n_migrate:]
            self.subpopulations[next_pop].population[worst_idx] = migrants
            self.subpopulations[next_pop].fitness[worst_idx] = \
                self.subpopulations[i].fitness[sorted_idx]
    
    def run(self, max_iter=1000):
        """分布式进化主循环"""
        for iteration in range(max_iter):
            # 各子种群独立进化
            for de in self.subpopulations:
                de.step()
            
            # 周期性迁移
            if iteration % self.migration_interval == 0:
                self.migrate()
        
        # 收集所有子种群的最优解
        best_overall = None
        best_fitness = float('inf')
        
        for de in self.subpopulations:
            if de.best_fitness < best_fitness:
                best_fitness = de.best_fitness
                best_overall = de.best_solution
        
        return best_overall, best_fitness

十、性能基准与测试

10.1 CEC基准函数测试

10.1.1 CEC测试函数接口

class CECBenchmark:
    """CEC基准测试函数集合"""
    
    FUNCTIONS = {
        1: 'Shifted Sphere',
        2: 'Shifted Schwefel 1.2',
        3: 'Shifted Rotated High Conditioned Elliptic',
        4: 'Schwefel 1.2 with Noise',
        5: 'Schwefel 2.6',
        6: 'Shifted Rosenbrock',
        7: 'Shifted Rotated Ackley',
        8: 'Shifted Rastrigin',
        9: 'Shifted Rotated Rastrigin',
        10: 'Shifted Griewank',
    }
    
    @staticmethod
    def sphere_shift(x, shift):
        """平移Sphere函数"""
        return np.sum((x - shift)**2)
    
    @staticmethod
    def schwefel_12(x, shift):
        """平移Schwefel 1.2"""
        running_sum = 0
        total = 0
        for i in range(len(x)):
            running_sum += x[i] - shift[i]
            total += running_sum**2
        return total
    
    @staticmethod
    def rosenbrock_shift(x, shift):
        """平移Rosenbrock"""
        total = 0
        for i in range(len(x) - 1):
            total += 100 * (x[i]**2 - x[i+1] - shift[i])**2 + \
                     (x[i] - 1 - shift[i+1])**2
        return total
    
    @staticmethod
    def rastrigin_shift(x, shift):
        """平移Rastrigin"""
        return sum((x[i] - shift[i])**2 - 
                  10 * np.cos(2 * np.pi * (x[i] - shift[i])) + 10
                  for i in range(len(x)))

10.1.2 基准测试框架

class BenchmarkRunner:
    """DE基准测试运行器"""
    
    def __init__(self, de_class, dimensions=[10, 30, 50], n_runs=30):
        self.de_class = de_class
        self.dimensions = dimensions
        self.n_runs = n_runs
        self.results = {}
    
    def run_function(self, func, dim, bounds=(-100, 100)):
        """运行单个函数测试"""
        fitnesses = []
        
        for _ in range(self.n_runs):
            de = self.de_class(
                func, [(bounds[0], bounds[1])] * dim,
                NP=max(50, 10 * dim),
                max_iter=1000
            )
            de.initialize()
            
            for _ in range(1000):
                de.step()
            
            fitnesses.append(de.best_fitness)
        
        return {
            'mean': np.mean(fitnesses),
            'std': np.std(fitnesses),
            'min': np.min(fitnesses),
            'max': np.max(fitnesses),
            'median': np.median(fitnesses),
        }
    
    def run_all(self, output_file='benchmark_results.txt'):
        """运行完整基准测试"""
        functions = [
            CECBenchmark.sphere_shift,
            CECBenchmark.schwefel_12,
            CECBenchmark.rosenbrock_shift,
            CECBenchmark.rastrigin_shift,
        ]
        
        for dim in self.dimensions:
            for func in functions:
                key = f"{func.__name__}_D{dim}"
                self.results[key] = self.run_function(func, dim)
        
        # 保存结果
        with open(output_file, 'w') as f:
            for key, stats in self.results.items():
                f.write(f"{key}:\n")
                f.write(f"  Mean: {stats['mean']:.6e}\n")
                f.write(f"  Std: {stats['std']:.6e}\n")
                f.write(f"  Min: {stats['min']:.6e}\n")
                f.write(f"  Max: {stats['max']:.6e}\n\n")
        
        return self.results

十一、工程应用案例

11.1 电力系统经济调度

class EconomicDispatchDE:
    """
    电力系统经济调度优化
    最小化发电成本同时满足各种约束
    """
    
    def __init__(self, generators, load_demand):
        self.generators = generators  # [(Pmin, Pmax, a, b, c), ...]
        self.load_demand = load_demand
    
    def objective(self, P):
        """发电成本函数"""
        total_cost = 0
        for i, (Pmin, Pmax, a, b, c) in enumerate(self.generators):
            total_cost += a * P[i]**2 + b * P[i] + c
        return total_cost
    
    def constraints(self, P):
        """约束条件"""
        Pmin_total = sum(g[0] for g in self.generators)
        Pmax_total = sum(g[1] for g in self.generators)
        
        # 功率平衡
        power_balance = abs(sum(P) - self.load_demand)
        
        # 发电机限制
        violation = 0
        for i, (Pmin, Pmax, _, _, _) in enumerate(self.generators):
            if P[i] < Pmin:
                violation += Pmin - P[i]
            if P[i] > Pmax:
                violation += P[i] - Pmax
        
        # 系统限制
        if sum(P) < Pmin_total or sum(P) > Pmax_total:
            violation += abs(sum(P) - self.load_demand) * 10
        
        return violation
    
    def optimize(self):
        """运行优化"""
        n_gens = len(self.generators)
        bounds = [(g[0], g[1]) for g in self.generators]
        
        de = ConstrainedDE(
            objective=self.objective,
            constraints={'ineq': [self.constraints]},
            bounds=bounds,
            NP=50,
            max_iter=500
        )
        
        de.initialize()
        for _ in range(500):
            de.step()
        
        return de.best_solution, de.best_fitness

11.2 机器人路径规划

class RobotPathPlanningDE:
    """
    机器人路径规划
    使用DE优化平滑路径
    """
    
    def __init__(self, obstacles, start, goal, n_waypoints=10):
        self.obstacles = obstacles
        self.start = np.array(start)
        self.goal = np.array(goal)
        self.n_waypoints = n_waypoints
        self.dim = 2 * n_waypoints  # x和y坐标
    
    def path_length(self, waypoints):
        """计算路径长度"""
        full_path = np.vstack([self.start, waypoints, self.goal])
        length = 0
        
        for i in range(len(full_path) - 1):
            length += np.linalg.norm(full_path[i+1] - full_path[i])
        
        return length
    
    def collision_penalty(self, waypoints):
        """碰撞惩罚"""
        full_path = np.vstack([self.start, waypoints, self.goal])
        penalty = 0
        
        for obs in self.obstacles:
            for point in full_path:
                dist = np.linalg.norm(point - obs[:2])  # 假设obstacle为中心
                if dist < obs[2]:  # radius
                    penalty += obs[2] - dist
        
        return penalty
    
    def smoothness(self, waypoints):
        """路径平滑度(曲率)"""
        full_path = np.vstack([self.start, waypoints, self.goal])
        curvature = 0
        
        for i in range(1, len(full_path) - 1):
            v1 = full_path[i] - full_path[i-1]
            v2 = full_path[i+1] - full_path[i]
            
            angle = np.arccos(np.dot(v1, v2) / 
                            (np.linalg.norm(v1) * np.linalg.norm(v2) + 1e-10))
            curvature += angle
        
        return curvature
    
    def objective(self, x):
        """综合目标函数"""
        waypoints = x.reshape(self.n_waypoints, 2)
        
        length = self.path_length(waypoints)
        collision = self.collision_penalty(waypoints)
        smooth = self.smoothness(waypoints)
        
        return length + 10 * collision + 0.1 * smooth
    
    def optimize(self):
        """运行路径规划"""
        bounds = [(-50, 50)] * self.dim
        
        de = DifferentialEvolution(
            objective=self.objective,
            bounds=bounds,
            NP=100,
            max_iter=500
        )
        
        de.initialize()
        for _ in range(500):
            de.step()
        
        best_waypoints = de.best_solution.reshape(self.n_waypoints, 2)
        full_path = np.vstack([self.start, best_waypoints, self.goal])
        
        return full_path, de.best_fitness

11.3 神经网络训练

class DETrainingOptimizer:
    """
    使用DE优化神经网络权重
    """
    
    def __init__(self, model_builder, X_train, y_train, X_val, y_val):
        self.model_builder = model_builder
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
    
    def get_weight_dim(self):
        """获取权重维度"""
        model = self.model_builder()
        return sum(w.size for w in model.get_weights())
    
    def decode_weights(self, flat_weights):
        """解码权重向量"""
        model = self.model_builder()
        weights = model.get_weights()
        
        idx = 0
        decoded = []
        for w in weights:
            size = w.size
            decoded.append(flat_weights[idx:idx+size].reshape(w.shape))
            idx += size
        
        model.set_weights(decoded)
        return model
    
    def fitness(self, flat_weights):
        """适应度函数"""
        model = self.decode_weights(flat_weights)
        
        # 训练损失
        train_loss = model.evaluate(self.X_train, self.y_train, verbose=0)
        
        # 验证损失(作为正则化)
        val_loss = model.evaluate(self.X_val, self.y_val, verbose=0)
        
        # 权衡训练和泛化
        return train_loss + 0.1 * val_loss
    
    def optimize(self, n_particles=50, max_iter=100):
        """训练神经网络"""
        dim = self.get_weight_dim()
        bounds = [(-1, 1)] * dim
        
        de = DifferentialEvolution(
            objective=self.fitness,
            bounds=bounds,
            NP=n_particles,
            max_iter=max_iter
        )
        
        de.initialize()
        
        for iteration in range(max_iter):
            de.step()
            
            if iteration % 10 == 0:
                print(f"Iter {iteration}: Best fitness = {de.best_fitness:.4f}")
        
        return self.decode_weights(de.best_solution), de.best_fitness

十二、实用框架与工具

12.1 scipy.optimize扩展

from scipy.optimize import differential_evolution, OptimizeResult
import numpy as np
 
class EnhancedDE:
    """
    增强型差分进化
    扩展scipy实现
    """
    
    def __init__(self, objective, bounds, **kwargs):
        self.objective = objective
        self.bounds = bounds
        self.kwargs = kwargs
        
        self.maxiter = kwargs.get('maxiter', 1000)
        self.tol = kwargs.get('tol', 0.01)
        self.popsize = kwargs.get('popsize', 15)
        self.mutation = kwargs.get('mutation', (0.5, 1))
        self.recombination = kwargs.get('recombination', 0.7)
        self.strategy = kwargs.get('strategy', 'best1bin')
    
    def optimize(self):
        """执行优化"""
        result = differential_evolution(
            func=self.objective,
            bounds=self.bounds,
            maxiter=self.maxiter,
            tol=self.tol,
            popsize=self.popsize,
            mutation=self.mutation,
            recombination=self.recombination,
            strategy=self.strategy,
            polish=True,
            updating='deferred'
        )
        
        return result
    
    def optimize_with_callback(self, callback):
        """带回调的优化"""
        result = differential_evolution(
            func=self.objective,
            bounds=self.bounds,
            maxiter=self.maxiter,
            tol=self.tol,
            popsize=self.popsize,
            callback=callback,
            polish=True
        )
        
        return result

12.2 完整工程模板

"""
差分进化算法工程应用模板
开箱即用
"""
 
import numpy as np
import random
from dataclasses import dataclass
from typing import Callable, List, Tuple, Optional
 
 
@dataclass
class DEConfig:
    """DE配置"""
    NP: int = 50
    F: float = 0.5
    CR: float = 0.9
    max_iter: int = 1000
    tol: float = 1e-8
    strategy: str = 'rand1bin'
 
 
class DifferentialEvolutionTemplate:
    """DE模板"""
    
    MUTATION_STRATEGIES = {
        'rand1bin': lambda self, i: self._mutate_rand1(i),
        'best1bin': lambda self, i: self._mutate_best1(i),
        'rand2bin': lambda self, i: self._mutate_rand2(i),
        'best2bin': lambda self, i: self._mutate_best2(i),
        'current2best': lambda self, i: self._mutate_current2best(i),
    }
    
    def __init__(self, objective: Callable,
                 bounds: List[Tuple[float, float]],
                 config: Optional[DEConfig] = None):
        self.objective = objective
        self.bounds = bounds
        self.config = config or DEConfig()
        self.D = len(bounds)
        
        self.bounds_array = np.array(bounds)
        self.low = self.bounds_array[:, 0]
        self.high = self.bounds_array[:, 1]
        
        self.best_fitness = float('inf')
        self.best_solution = None
        self.history = []
    
    def initialize(self):
        """初始化种群"""
        self.population = np.random.uniform(
            self.low, self.high, 
            (self.config.NP, self.D)
        )
        self.fitness = np.array([
            self.objective(x) for x in self.population
        ])
        
        best_idx = np.argmin(self.fitness)
        self.best_fitness = self.fitness[best_idx]
        self.best_solution = self.population[best_idx].copy()
    
    def _mutate_rand1(self, i):
        """DE/rand/1"""
        candidates = [j for j in range(self.config.NP) if j != i]
        r1, r2, r3 = random.sample(candidates, 3)
        
        return self.population[r1] + self.config.F * (
            self.population[r2] - self.population[r3]
        )
    
    def _mutate_best1(self, i):
        """DE/best/1"""
        candidates = [j for j in range(self.config.NP) if j != i]
        r1, r2 = random.sample(candidates, 2)
        
        best_idx = np.argmin(self.fitness)
        return self.population[best_idx] + self.config.F * (
            self.population[r1] - self.population[r2]
        )
    
    def _mutate_rand2(self, i):
        """DE/rand/2"""
        candidates = [j for j in range(self.config.NP) if j != i]
        r1, r2, r3, r4, r5 = random.sample(candidates, 5)
        
        return self.population[r1] + self.config.F * (
            self.population[r2] - self.population[r3]
        ) + self.config.F * (
            self.population[r4] - self.population[r5]
        )
    
    def _mutate_best2(self, i):
        """DE/best/2"""
        candidates = [j for j in range(self.config.NP) if j != i]
        r1, r2, r3, r4 = random.sample(candidates, 4)
        
        best_idx = np.argmin(self.fitness)
        return self.population[best_idx] + self.config.F * (
            self.population[r1] - self.population[r2]
        ) + self.config.F * (
            self.population[r3] - self.population[r4]
        )
    
    def _mutate_current2best(self, i):
        """DE/current-to-best"""
        best_idx = np.argmin(self.fitness)
        candidates = [j for j in range(self.config.NP) if j != i]
        r1, r2 = random.sample(candidates, 2)
        
        return self.population[i] + self.config.F * (
            self.population[best_idx] - self.population[i]
        ) + self.config.F * (
            self.population[r1] - self.population[r2]
        )
    
    def crossover(self, target, mutant):
        """二项式交叉"""
        trial = np.zeros(self.D)
        j_rand = random.randint(0, self.D - 1)
        
        for j in range(self.D):
            if random.random() < self.config.CR or j == j_rand:
                trial[j] = mutant[j]
            else:
                trial[j] = target[j]
        
        return np.clip(trial, self.low, self.high)
    
    def step(self):
        """一步迭代"""
        mutate_func = self.MUTATION_STRATEGIES.get(
            self.config.strategy, self._mutate_rand1
        )
        
        for i in range(self.config.NP):
            mutant = mutate_func(self, i)
            trial = self.crossover(self.population[i], mutant)
            fitness_trial = self.objective(trial)
            
            if fitness_trial <= self.fitness[i]:
                self.population[i] = trial
                self.fitness[i] = fitness_trial
                
                if fitness_trial < self.best_fitness:
                    self.best_fitness = fitness_trial
                    self.best_solution = trial.copy()
        
        self.history.append(self.best_fitness)
    
    def run(self) -> Tuple[np.ndarray, float]:
        """运行优化"""
        self.initialize()
        
        for _ in range(self.config.max_iter):
            self.step()
            
            if self.best_fitness < self.config.tol:
                break
        
        return self.best_solution.copy(), self.best_fitness
 
 
# 使用示例
if __name__ == "__main__":
    # 定义问题
    def sphere(x):
        return np.sum(x**2)
    
    bounds = [(-10, 10)] * 30
    
    # 创建优化器
    de = DifferentialEvolutionTemplate(sphere, bounds)
    
    # 运行
    solution, fitness = de.run()
    
    print(f"最优适应度: {fitness:.6e}")
    print(f"收敛迭代数: {len(de.history)}")

十三、参数调优指南

13.1 自适应参数策略

class AdaptiveParameterTuner:
    """
    DE参数自适应调整器
    """
    
    @staticmethod
    def saaty_method():
        """Saaty方法确定初始参数"""
        return {
            'NP': 50,
            'F': 0.5,
            'CR': 0.9,
        }
    
    @staticmethod
    def for_multimodal():
        """多峰问题参数"""
        return {
            'NP': 100,
            'F': 0.8,  # 高变异
            'CR': 0.5,  # 低交叉
            'strategy': 'rand2bin',
        }
    
    @staticmethod
    def for_unimodal():
        """单峰问题参数"""
        return {
            'NP': 30,
            'F': 0.5,
            'CR': 0.9,
            'strategy': 'best1bin',
        }
    
    @staticmethod
    def for_high_dim(dim):
        """高维问题参数"""
        return {
            'NP': max(100, 10 * dim),
            'F': 0.5,
            'CR': 0.5,
            'strategy': 'rand1bin',
        }

13.2 常见问题解决

问题症状解决方案
早熟收敛适应度停滞增加NP,增加F
收敛慢迭代多但精度低减小F,使用best策略
振荡适应度波动大减小CR,增加NP
局部最优陷入次优点增大F,使用rand策略

十四、参考文献

  1. Storn, R., & Price, K. (1997). Differential Evolution - A Simple and Efficient Heuristic for Global Optimization over Continuous Spaces. Journal of Global Optimization, 11(4), 341-359.
  2. Brest, J., et al. (2006). Self-Adapting Control Parameters in Differential Evolution: A Comparative Study on Numerical Benchmark Problems. IEEE Transactions on Evolutionary Computation, 10(6), 646-657.
  3. Das, S., & Suganthan, P. N. (2011). Differential Evolution: A Survey of the State-of-the-Art. IEEE Transactions on Evolutionary Computation, 15(1), 4-31.
  4. Tanabe, R., & Fukunaga, A. (2013). Success-History Based Parameter Adaptation for Differential Evolution. Proceedings of IEEE CEC, 2013.
  5. Price, K. V., et al. (2005). Differential Evolution: A Practical Approach to Global Optimization. Springer.
  6. Qin, A. K., et al. (2009). Differential Evolution Algorithm With Strategy Adaptation for Global Numerical Optimization. IEEE Transactions on Evolutionary Computation, 13(2), 398-417.
  7. Zhang, J., & Sanderson, A. C. (2009). JADE: Adaptive Differential Evolution With Optional External Archive. IEEE Transactions on Evolutionary Computation, 13(5), 945-958.
  8. Kukkonen, S., & Lampinen, J. (2005). GDE3: The Third Evolution Step of Generalized Differential Evolution. Proceedings of IEEE CEC, 2005.

相关文档