进化算法实战调参与工程指南

很多人学完遗传算法的基本原理,兴冲冲地跑起来,结果发现:这玩意儿怎么调了半天还是跑不出好结果?要么收敛太慢,要么早就陷入局部最优了。

别急,这很正常。进化算法的参数设置和工程优化本身就是一门手艺,需要大量的实战经验。这篇文章就来讲讲这些”只可意会不可言传”的调参技巧和工程实践经验。

遗传算法参数调优:经验法则

遗传算法的核心参数有三个:种群大小(Population Size)交叉率(Crossover Rate)变异率(Mutation Rate)。这三个参数怎么设,直接影响算法的性能和收敛速度。

种群大小

种群大小的选择取决于问题的复杂度。

经验公式

还有个更科学的说法:种群大小应该至少是染色体长度的10倍。如果你的染色体有100个基因,那种群至少要1000个个体。

为什么不能太小?因为种群太小会导致:

  • 近亲繁殖,多样性急剧下降
  • 搜索空间覆盖不足,容易陷入局部最优
  • 统计上不具代表性,选择压力失真

为什么不能太大?因为种群太大会导致:

  • 每一代评估成本暴增
  • 收敛速度变慢
  • 浪费计算资源在已经收敛的区域

实战建议:先用小种群快速测试代码逻辑(比如20-50个个体),确认没问题后逐步放大到100-200。

交叉率

交叉率控制有多少个体通过交叉产生后代。

经验值:0.6 ~ 0.9

大部分情况下,0.8是一个不错的默认值

为什么不能太高?因为交叉会破坏好的基因组合。如果交叉率太高,好的解决方案可能被”拆散”,导致无法收敛。

为什么不能太低?如果交叉率太低,算法会退化成类似随机搜索+变异的状态,进化效率很低。

自适应交叉率:有些研究者会让交叉率随进化代数动态调整。早期可以用高交叉率探索,后期降低交叉率保留好的基因。

变异率

变异率是最敏感的参数,也是最难调的。

经验值:0.01 ~ 0.05(每个基因的变异概率)

更精确的公式

其中 是染色体的长度。这个公式的意思是:变异率应该让每个个体平均发生一次变异。

变异率太高会导致:

  • 搜索变成随机游走
  • 已经找到的好解被破坏
  • 收敛极慢

变异率太低会导致:

  • 早期多样性不足
  • 后期无法突破局部最优

实战技巧:变异率应该和种群的适应度方差挂钩。当种群多样性低时,提高变异率;当种群还在快速改善时,降低变异率。

参数组合的实战经验

很多研究表明,变异率的影响最大,交叉率次之,种群大小相对不那么敏感

一个常见的参数组合是:

  • 种群大小:100
  • 交叉率:0.8
  • 变异率:0.02

但这只是一个起点。对于具体问题,你需要微调。

温度测试法:先用默认参数跑几代,观察:

  • 如果适应度提升很快但很快就停滞 → 变异率太低或种群太小
  • 如果适应度提升很慢 → 变异率太高或交叉率太低
  • 如果适应度波动很大 → 变异率太高

自适应参数控制

让参数自己进化,听起来很美好,但怎么实现?

里程碑式的自适应策略

1. Rechenberg的1/5法则

这是进化策略(ES)中的经典规则:

如果在某一代中,成功的变异比例 > 1/5,则增加变异率;如果 < 1/5,则减少变异率。

这个法则的直觉是:如果变异太”保守”(大部分都失败),应该加大探索力度;如果变异太”激进”(大部分都成功但可能不稳定),应该收敛一些。

2. Self-Adaptive Mutation Rate

每个个体携带一个”变异强度”参数,这个参数也会遗传和变异:

其中 是学习率, 是标准正态分布。

这样,变异率会在进化过程中自动调整,好的个体会传递它们成功的变异策略。

3. Population-Level Adaptation

在整个种群层面维护一个参数池。比如维护20%的个体使用高变异率,80%使用低变异率。每隔几代检查一下哪组表现更好,然后调整比例。

自适应遗传算法实现

import random
import numpy as np
from typing import List, Callable
from dataclasses import dataclass
 
@dataclass
class AdaptiveGAConfig:
    """自适应GA配置"""
    population_size: int = 100
    initial_mutation_rate: float = 0.05
    initial_crossover_rate: float = 0.8
    min_mutation_rate: float = 0.001
    max_mutation_rate: float = 0.2
    adaptation_interval: int = 10  # 每多少代调整一次
    adaptation_strength: float = 1.2  # 调整强度
 
class AdaptiveGA:
    """自适应遗传算法"""
    
    def __init__(self, config: AdaptiveGAConfig):
        self.config = config
        self.mutation_rate = config.initial_mutation_rate
        self.crossover_rate = config.initial_crossover_rate
        
        # 追踪历史用于自适应
        self.history = {
            'mutation_success_rate': [],
            'avg_fitness_history': [],
            'fitness_variance_history': []
        }
    
    def adapt_parameters(self, generation: int, population, fitnesses: List[float]):
        """自适应调整参数"""
        if generation % self.config.adaptation_interval != 0:
            return
        
        # 计算统计量
        avg_fitness = np.mean(fitnesses)
        fitness_variance = np.var(fitnesses)
        
        # 计算变异成功率(这代比上代好的比例)
        if len(self.history['avg_fitness_history']) > 1:
            prev_fitness = self.history['avg_fitness_history'][-1]
            improvement_rate = sum(1 for f in fitnesses if f > prev_fitness) / len(fitnesses)
        else:
            improvement_rate = 0.5
        
        self.history['avg_fitness_history'].append(avg_fitness)
        self.history['fitness_variance_history'].append(fitness_variance)
        
        # Rechenberg 1/5法则
        target_rate = 0.2
        if improvement_rate > target_rate:
            # 变异太成功,降低变异率
            self.mutation_rate /= self.config.adaptation_strength
        else:
            # 变异不够成功,提高变异率
            self.mutation_rate *= self.config.adaptation_strength
        
        # 限制范围
        self.mutation_rate = max(
            self.config.min_mutation_rate,
            min(self.config.max_mutation_rate, self.mutation_rate)
        )
        
        # 多样性自适应交叉率
        if fitness_variance < 0.01 * avg_fitness:
            # 方差太小,说明可能陷入局部最优,提高交叉率探索
            self.crossover_rate = min(0.95, self.crossover_rate * 1.1)
        else:
            # 正常情况下使用默认交叉率
            self.crossover_rate = max(0.6, self.crossover_rate * 0.99)
        
        print(f"Gen {generation}: mut_rate={self.mutation_rate:.4f}, "
              f"cross_rate={self.crossover_rate:.4f}, "
              f"improvement={improvement_rate:.2%}")
    
    def mutate(self, chromosome: np.ndarray) -> np.ndarray:
        """变异操作"""
        mutated = chromosome.copy()
        
        for i in range(len(mutated)):
            if random.random() < self.mutation_rate:
                # 高斯变异
                mutated[i] += random.gauss(0, 0.1)
                
                # 可以添加边界约束
                mutated[i] = np.clip(mutated[i], -5, 5)
        
        return mutated
    
    def crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> tuple:
        """交叉操作"""
        if random.random() > self.crossover_rate:
            return parent1.copy(), parent2.copy()
        
        # 两点交叉
        size = len(parent1)
        pt1 = random.randint(1, size - 1)
        pt2 = random.randint(pt1, size - 1)
        
        child1 = np.concatenate([parent1[:pt1], parent2[pt1:pt2], parent1[pt2:]])
        child2 = np.concatenate([parent2[:pt1], parent1[pt1:pt2], parent2[pt2:]])
        
        return child1, child2
    
    def select(self, population: List[np.ndarray], fitnesses: List[float], 
               num_parents: int) -> List[np.ndarray]:
        """锦标赛选择"""
        parents = []
        
        for _ in range(num_parents):
            # 随机选k个个体
            k = 3
            indices = random.sample(range(len(population)), k)
            candidates = [(i, fitnesses[i]) for i in indices]
            
            # 选择最优的
            winner_idx = max(candidates, key=lambda x: x[1])[0]
            parents.append(population[winner_idx].copy())
        
        return parents
    
    def evolve(self, fitness_func: Callable, n_dimensions: int, 
               generations: int = 100) -> tuple:
        """进化主循环"""
        # 初始化种群
        population = [np.random.uniform(-5, 5, n_dimensions) 
                       for _ in range(self.config.population_size)]
        
        best_ever = None
        best_fitness_ever = float('-inf')
        
        for gen in range(generations):
            # 评估
            fitnesses = [fitness_func(ind) for ind in population]
            
            # 记录最优
            best_idx = np.argmax(fitnesses)
            if fitnesses[best_idx] > best_fitness_ever:
                best_fitness_ever = fitnesses[best_idx]
                best_ever = population[best_idx].copy()
            
            if gen % 20 == 0:
                print(f"Gen {gen}: best_fitness={best_fitness_ever:.4f}")
            
            # 自适应参数
            self.adapt_parameters(gen, population, fitnesses)
            
            # 选择
            parents = self.select(population, fitnesses, 
                                  self.config.population_size // 2)
            
            # 交叉和变异产生后代
            offspring = []
            for i in range(0, len(parents) - 1, 2):
                child1, child2 = self.crossover(parents[i], parents[i + 1])
                offspring.extend([
                    self.mutate(child1),
                    self.mutate(child2)
                ])
            
            # 合并父子代
            population = parents + offspring[:len(population) - len(parents)]
        
        return best_ever, best_fitness_ever
 
 
# 测试函数:Rastrigin函数(多模态)
def rastrigin(x: np.ndarray) -> float:
    A = 10
    n = len(x)
    return A * n + np.sum(x**2 - A * np.cos(2 * np.pi * x))
 
if __name__ == "__main__":
    config = AdaptiveGAConfig(
        population_size=150,
        initial_mutation_rate=0.05,
        adaptation_interval=5
    )
    
    ga = AdaptiveGA(config)
    best, fitness = ga.evolve(rastrigin, n_dimensions=10, generations=200)
    
    print(f"\nFinal best fitness: {fitness:.4f}")
    print(f"Best solution: {best}")

约束处理技术

进化算法处理约束是个技术活。传统GA假设解空间是连续的、无约束的,但实际问题往往有一堆约束。

主流约束处理方法

1. 惩罚函数法(Penalty Function)

最简单粗暴的方法。把违反约束的解”惩罚”一下,降低它的适应度:

其中 是惩罚项, 是惩罚系数。

惩罚项可以是:

def penalty_function(solution, constraints, penalty_weight=100):
    """惩罚函数法"""
    penalty = 0
    
    for constraint in constraints:
        violation = constraint.violation(solution)
        if violation > 0:
            penalty += penalty_weight * violation ** 2
    
    return penalty
 
def fitness_with_penalty(original_fitness, penalty):
    return original_fitness - penalty

问题是惩罚系数 很难调。太大了,所有解都被惩罚,但解空间太小;太小了,约束力不够。

自适应惩罚系数可以缓解这个问题:

class AdaptivePenalty:
    """自适应惩罚"""
    
    def __init__(self):
        self.penalty_weight = 1.0
        self.best_feasible = None
        self.avg_violation = 0
    
    def update(self, population, fitnesses, constraints):
        """根据当前代的情况更新惩罚系数"""
        violations = []
        feasible_fitness = []
        
        for sol, fit in zip(population, fitnesses):
            total_violation = sum(c.violation(sol) for c in constraints)
            violations.append(total_violation)
            
            if total_violation == 0:
                feasible_fitness.append(fit)
        
        self.avg_violation = np.mean(violations) if violations else 0
        
        # 如果有可行解,用它的平均适应度来调整惩罚
        if feasible_fitness:
            if self.best_feasible is None:
                self.best_feasible = min(feasible_fitness)
            else:
                self.best_feasible = min(self.best_feasible, min(feasible_fitness))
        
        # 动态调整惩罚强度
        if self.avg_violation > 0:
            # 约束违反严重时增加惩罚
            self.penalty_weight *= 1.1
        else:
            # 约束满足时降低惩罚
            self.penalty_weight *= 0.99
        
        self.penalty_weight = np.clip(self.penalty_weight, 0.1, 1000)

2. 修复法(Repair Method)

不惩罚,而是把不可行解”修复”成可行解:

def repair_solution(solution, constraints):
    """修复不可行解"""
    repaired = solution.copy()
    
    for constraint in constraints:
        if constraint.is_violated(repaired):
            repaired = constraint.repair(repaired)
    
    return repaired

修复法的关键在于设计有效的修复策略。比如对于背包问题,可以直接把超重的物品移除;对于调度问题,可以重新分配资源。

3. 可行方向法(Feasible Direction)

在约束边界附近,搜索方向要往约束内部走。这个方法更复杂,但效果通常更好。

4. 多目标约束处理

最优雅的方法:把约束当作另一个优化目标。解同时被”原始目标”和”约束违反度”评价。在选择阶段,优先选可行解;如果都是不可行解,选约束违反最小的。

class ConstrainedFitness:
    """约束处理:优先可行解"""
    
    def __init__(self, constraints):
        self.constraints = constraints
    
    def evaluate(self, solution, original_fitness):
        violation = sum(c.violation(solution) for c in self.constraints)
        
        if violation == 0:
            # 可行解
            return original_fitness, 0
        else:
            # 不可行解,用违反度作为"适应度"(越小越好)
            return -violation, violation

多目标进化算法:NSGA-II

实际工程中,很多问题有多个互相冲突的目标。比如投资组合:你想同时最大化收益、最小化风险。鱼和熊掌不可兼得,只能找一组帕累托最优解。

Pareto最优性

一个解 支配另一个解 (记作 ),当且仅当:

  1. 在所有目标上都不比
  2. 至少在一个目标上比

Pareto前沿:所有不被其他解支配的解组成的集合。

多目标优化的目标就是找到Pareto前沿。

NSGA-II的核心机制

NSGA-II(Non-dominated Sorting Genetic Algorithm II)是多目标进化算法的经典之作,由Deb等人在2002年提出。

它的核心是非支配排序

def non_dominated_sort(population, objectives):
    """
    非支配排序
    
    返回每个个体的帕累托等级
    """
    n = len(population)
    domination_count = [0] * n  # 支配该个体的数量
    dominated_set = [[] for _ in range(n)]  # 该个体支配的个体集合
    fronts = [[]]  # 前沿列表
    
    for i in range(n):
        for j in range(i + 1, n):
            if dominates(objectives, population[i], population[j]):
                dominated_set[i].append(j)
                domination_count[j] += 1
            elif dominates(objectives, population[j], population[i]):
                dominated_set[j].append(i)
                domination_count[i] += 1
        
        if domination_count[i] == 0:
            fronts[0].append(i)
    
    # 分层
    current_front = 0
    while fronts[current_front]:
        next_front = []
        for i in fronts[current_front]:
            for j in dominated_set[i]:
                domination_count[j] -= 1
                if domination_count[j] == 0:
                    next_front.append(j)
        current_front += 1
        fronts.append(next_front)
    
    fronts.pop()  # 移除最后一个空前沿
    
    # 分配等级
    ranks = [0] * n
    for rank, front in enumerate(fronts):
        for i in front:
            ranks[i] = rank
    
    return ranks, fronts
 
 
def dominates(objectives, sol1, sol2):
    """判断sol1是否支配sol2"""
    all_better = True
    any_better = False
    
    for obj_func in objectives:
        v1, v2 = obj_func(sol1), obj_func(sol2)
        
        if v1 > v2:
            any_better = True
        elif v1 < v2:
            all_better = False
    
    return all_better and any_better

拥挤距离

非支配排序解决了解的分层问题,但同层内的解怎么排序?NSGA-II引入了拥挤距离的概念。

拥挤距离衡量一个解周围其他解的密度:

def crowding_distance(population, objectives, fronts):
    """
    计算拥挤距离
    """
    n = len(population)
    distances = [0.0] * n
    
    for obj_func in objectives:
        # 按该目标排序
        sorted_indices = sorted(range(n), 
                               key=lambda i: obj_func(population[i]))
        
        # 边界解的拥挤距离设为无穷大
        distances[sorted_indices[0]] = float('inf')
        distances[sorted_indices[-1]] = float('inf')
        
        # 计算中间解的拥挤距离
        obj_range = (obj_func(population[sorted_indices[-1]]) - 
                    obj_func(population[sorted_indices[0]]))
        
        if obj_range == 0:
            continue
        
        for i in range(1, len(sorted_indices) - 1):
            distances[sorted_indices[i]] += (
                obj_func(population[sorted_indices[i + 1]]) -
                obj_func(population[sorted_indices[i - 1]])
            ) / obj_range
    
    return distances

完整NSGA-II实现

class NSGAII:
    """NSGA-II多目标遗传算法"""
    
    def __init__(self, n_objectives, n_variables,
                 population_size=100, crossover_rate=0.9,
                 mutation_rate=0.1):
        self.n_objectives = n_objectives
        self.n_variables = n_variables
        self.pop_size = population_size
        self.crossover_rate = crossover_rate
        self.mutation_rate = mutation_rate
    
    def create_individual(self):
        """创建随机个体"""
        return np.random.uniform(-5, 5, self.n_variables)
    
    def mutate(self, individual):
        """多项式变异"""
        mutated = individual.copy()
        
        eta = 20  # 变异指数
        for i in range(len(individual)):
            if random.random() < self.mutation_rate:
                delta_q = random.gauss(0, 1)
                shift = delta_q * (2 ** -eta)
                mutated[i] = np.clip(individual[i] + shift, -5, 5)
        
        return mutated
    
    def crossover(self, parent1, parent2):
        """模拟二进制交叉(SBX)"""
        if random.random() > self.crossover_rate:
            return parent1.copy(), parent2.copy()
        
        eta = 15  # 分布指数
        child1, child2 = parent1.copy(), parent2.copy()
        
        for i in range(len(parent1)):
            if random.random() < 0.5:
                if abs(parent1[i] - parent2[i]) > 1e-10:
                    beta = 1 + 2 * min(parent1[i], parent2[i]) / 10
                    beta_q = (2 * beta) ** (1 / (eta + 1)) if random.random() < 0.5 else \
                            (2 / (2 - beta)) ** (1 / (eta + 1))
                    
                    child1[i] = 0.5 * ((1 + beta_q) * parent1[i] + 
                                      (1 - beta_q) * parent2[i])
                    child2[i] = 0.5 * ((1 - beta_q) * parent1[i] + 
                                      (1 + beta_q) * parent2[i])
        
        return child1, child2
    
    def select_parents(self, population, fitnesses, ranks, crowding):
        """基于等级和拥挤距离的选择"""
        # 计算综合分数:等级越高(越小)越好,拥挤距离越大越好
        scores = [-r + c for r, c in zip(ranks, crowding)]
        
        # 锦标赛选择
        candidates = random.sample(range(len(population)), 5)
        best = max(candidates, key=lambda i: scores[i])
        return population[best].copy()
    
    def evolve(self, objectives, n_generations=100):
        """进化"""
        # 初始化
        population = [self.create_individual() 
                      for _ in range(self.pop_size)]
        
        for gen in range(n_generations):
            # 评估
            fitness_values = [[obj(p) for obj in objectives] 
                            for p in population]
            
            # 非支配排序
            ranks, fronts = non_dominated_sort(population, objectives)
            
            # 拥挤距离
            crowding = crowding_distance(population, objectives, fronts)
            
            # 选择、交叉、变异产生后代
            offspring = []
            while len(offspring) < self.pop_size:
                p1 = self.select_parents(population, fitness_values, 
                                        ranks, crowding)
                p2 = self.select_parents(population, fitness_values, 
                                        ranks, crowding)
                
                c1, c2 = self.crossover(p1, p2)
                offspring.extend([
                    self.mutate(c1),
                    self.mutate(c2)
                ])
            
            offspring = offspring[:self.pop_size]
            
            # 合并父子代
            combined = population + offspring
            
            # 评估后代
            combined_fitness = [[obj(p) for obj in objectives] 
                              for p in combined]
            
            # 非支配排序
            ranks, fronts = non_dominated_sort(combined, objectives)
            crowding = crowding_distance(combined, objectives, fronts)
            
            # 选择下一代(基于等级和拥挤距离)
            sorted_indices = sorted(range(len(combined)),
                                   key=lambda i: (ranks[i], -crowding[i]))
            
            population = [combined[i] for i in sorted_indices[:self.pop_size]]
            
            if gen % 20 == 0:
                n_feasible = sum(1 for f in fitness_values if all(fi >= 0 for fi in f))
                print(f"Gen {gen}: {len(fronts[0])} solutions on PF")
        
        return fronts[0], population
 
 
# 测试:ZDT1问题(两目标)
def zdt1_objectives(x):
    f1 = x[0]
    g = 1 + 9 * np.sum(x[1:]) / (len(x) - 1)
    h = 1 - np.sqrt(f1 / g)
    f2 = g * h
    return f1, f2
 
if __name__ == "__main__":
    nsga2 = NSGAII(n_objectives=2, n_variables=30, 
                   population_size=100)
    
    pareto_front, population = nsga2.evolve(
        [lambda x: zdt1_objectives(x)[0],
         lambda x: zdt1_objectives(x)[1]],
        n_generations=100
    )
    
    # 绘制Pareto前沿(需要matplotlib)
    # import matplotlib.pyplot as plt
    # pf = [(x[0], zdt1_objectives(x)[1]) for x in pareto_front]
    # plt.scatter(*zip(*pf))
    # plt.xlabel('f1')
    # plt.ylabel('f2')
    # plt.title('Pareto Front')
    # plt.show()

进化算法与其他优化方法的混合:Memetic Algorithm

Memetic Algorithm(文化基因算法)这个名字听起来很高大上,其实就是进化算法 + 局部搜索

思路很简单:进化算法负责全局探索,局部搜索负责局部开采。两者结合,取长补短。

class MemeticAlgorithm:
    """文化基因算法"""
    
    def __init__(self, local_search_func):
        self.local_search = local_search_func
        self.local_search_rate = 0.3  # 多少个体执行局部搜索
    
    def local_search_step(self, individual, fitness_func):
        """执行局部搜索(爬山法)"""
        improved = individual.copy()
        current_fitness = fitness_func(improved)
        
        step_size = 0.1
        max_iterations = 50
        
        for _ in range(max_iterations):
            # 尝试小步移动
            candidate = improved.copy()
            idx = random.randint(0, len(candidate) - 1)
            candidate[idx] += random.uniform(-step_size, step_size)
            candidate[idx] = np.clip(candidate[idx], -5, 5)
            
            new_fitness = fitness_func(candidate)
            
            if new_fitness > current_fitness:
                improved = candidate
                current_fitness = new_fitness
            else:
                step_size *= 0.9  # 减小步长
        
        return improved
    
    def evolve(self, fitness_func, n_variables, generations=100):
        """带局部搜索的进化"""
        # 初始化种群...
        population = [...]
        
        for gen in range(generations):
            # 评估
            fitnesses = [fitness_func(ind) for ind in population]
            
            # 进化(选择、交叉、变异)
            offspring = self.evolve_step(population, fitnesses)
            
            # 局部搜索(只对部分个体)
            for i in range(len(offspring)):
                if random.random() < self.local_search_rate:
                    offspring[i] = self.local_search_step(
                        offspring[i], fitness_func
                    )
            
            # 合并选择...
            population = offspring
        
        return population

Memetic Algorithm在很多问题上表现比纯GA好,尤其当目标函数比较光滑、局部搜索能有效工作时。

并行化进化计算:岛屿模型

岛屿模型(Island Model)是分布式GA的经典架构。它的核心思想是把一个大种群分成多个小种群(岛屿),各自独立进化,偶尔交换个体(迁移)。

class IslandModelGA:
    """岛屿模型并行遗传算法"""
    
    def __init__(self, n_islands=4, island_size=50,
                 migration_interval=5, migration_rate=0.1):
        self.n_islands = n_islands
        self.island_size = island_size
        self.migration_interval = migration_interval
        self.migration_rate = migration_rate
        
        # 每代迁移多少个体
        self.migrants_per_island = max(1, int(island_size * migration_rate))
    
    def evolve_island(self, island_id, fitness_func, generations):
        """单个岛屿的进化"""
        # 初始化岛屿种群
        population = self.init_population(self.island_size)
        
        for gen in range(generations):
            # 评估
            fitnesses = [fitness_func(ind) for ind in population]
            
            # 岛屿内进化
            population = self.island_selection(population, fitnesses)
            offspring = self.crossover_mutation(population)
            
            # 迁移检查
            if gen % self.migration_interval == 0:
                migrants = self.select_migrants(population, fitnesses)
                # 返回迁移体给主进程
                return migrants, island_id, gen, max(fitnesses)
            
            population = offspring
        
        return None, island_id, generations, max(fitnesses)
    
    def parallel_evolve(self, fitness_func, total_generations):
        """并行进化(使用多进程)"""
        from concurrent.futures import ProcessPoolExecutor
        
        islands = list(range(self.n_islands))
        
        with ProcessPoolExecutor(max_workers=self.n_islands) as executor:
            # 提交每个岛屿的进化任务
            futures = {
                executor.submit(self.evolve_island, i, fitness_func, 
                               total_generations): i 
                for i in islands
            }
            
            migration_buffer = {}  # 存储迁移体
            
            while futures:
                done, _ = concurrent.futures.wait(
                    futures, return_when=concurrent.futures.FIRST_COMPLETED
                )
                
                for future in done:
                    migrants, island_id, gen, best_fitness = future.result()
                    
                    if migrants:
                        # 保存迁移体
                        migration_buffer[island_id] = migrants
                        
                        # 如果有移民到达,执行迁移
                        for other_island in range(self.n_islands):
                            if other_island != island_id and other_island in migration_buffer:
                                immigrants = migration_buffer[other_island]
                                # 替换最差个体
                                # ... 合并逻辑
                        
                        del migration_buffer[island_id]
                    
                    print(f"Island {island_id}: Gen {gen}, Best: {best_fitness:.4f}")
                    
                    # 重新提交
                    futures[executor.submit(
                        self.evolve_island, island_id, fitness_func,
                        total_generations
                    )] = island_id

岛屿模型的参数设置也有讲究:

  • 岛屿数量:4-10个比较合适
  • 迁移间隔:5-10代
  • 迁移率:5%-10%(每次迁移种群的一小部分)
  • 拓扑结构:环状、全连接、随机连接

迁移太快会导致种群趋同,失去多样性;迁移太慢则无法发挥岛屿模型的优势。

用DEAP库实现完整的GA pipeline

DEAP(Distributed Evolutionary Algorithms in Python)是Python中最强大的进化算法库。它封装了大量常用算子,同时保持了很高的灵活性。

from deap import base, creator, tools, algorithms
import random
import numpy as np
 
# 定义问题
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))  # 最小化问题
creator.create("Individual", list, fitness=creator.FitnessMin)
 
# 注册工具箱
toolbox = base.Toolbox()
 
# 属性生成器
toolbox.register("attr_float", random.uniform, -5, 5)
 
# 个体初始化
toolbox.register("individual", tools.initRepeat, 
                 creator.Individual, toolbox.attr_float, n=10)
 
# 种群初始化
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
 
# 评估函数
def evaluate(individual):
    # Rastrigin函数
    A = 10
    n = len(individual)
    return (A * n + np.sum(individual**2 - A * np.cos(2 * np.pi * individual)),),
 
toolbox.register("evaluate", evaluate)
 
# 遗传算子
toolbox.register("mate", tools.cxSimulatedBinaryBounded, 
                 low=-5, up=5, eta=20.0)
 
toolbox.register("mutate", tools.mutPolynomialBounded,
                 low=-5, up=5, eta=20.0, indpb=0.05)
 
toolbox.register("select", tools.selTournament, tournsize=3)
 
 
def eaSimpleWithElitism(population, toolbox, cxpb=0.8, mutpb=0.2, 
                        ngen=100, elitism_rate=0.1):
    """带精英保留的进化算法"""
    
    # 统计工具
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("min", np.min)
    stats.register("avg", np.mean)
    stats.register("max", np.max)
    
    logbook = tools.Logbook()
    
    # 评估初始种群
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit
    
    record = stats.compile(population)
    logbook.record(gen=0, **record)
    print(f"Gen 0: min={record['min']:.4f}, avg={record['avg']:.4f}")
    
    # 精英数量
    n_elites = max(1, int(len(population) * elitism_rate))
    
    for gen in range(1, ngen + 1):
        # 选择
        offspring = toolbox.select(population, len(population))
        
        # 克隆(避免引用问题)
        offspring = list(map(toolbox.clone, offspring))
        
        # 交叉
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < cxpb:
                toolbox.mate(child1, child2)
                del child1.fitness.values
                del child2.fitness.values
        
        # 变异
        for mutant in offspring:
            if random.random() < mutpb:
                toolbox.mutate(mutant)
                del mutant.fitness.values
        
        # 评估无效个体
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = list(map(toolbox.evaluate, invalid_ind))
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit
        
        # 精英保留:从上一代保留最优个体
        elites = tools.selBest(population, n_elites)
        
        # 合并
        population = offspring + elites
        
        # 记录
        record = stats.compile(population)
        logbook.record(gen=gen, **record)
        
        if gen % 20 == 0:
            print(f"Gen {gen}: min={record['min']:.4f}, avg={record['avg']:.4f}")
    
    return population, logbook
 
 
if __name__ == "__main__":
    random.seed(42)
    np.random.seed(42)
    
    # 创建种群
    population = toolbox.population(n=100)
    
    # 进化
    final_pop, logbook = eaSimpleWithElitism(
        population, toolbox,
        cxpb=0.8, mutpb=0.1,
        ngen=200,
        elitism_rate=0.05
    )
    
    # 最佳解
    best = tools.selBest(final_pop, 1)[0]
    print(f"\nBest solution: {best}")
    print(f"Best fitness: {best.fitness.values[0]:.4f}")

DEAP的优势是代码简洁、算子丰富、统计方便。但它的学习曲线有点陡,建议先通读官方文档的examples部分。

性能提升技巧

精英保留

每代把最好的几个个体直接复制到下一代,避免”退化”。这是最简单有效的提升技巧。

稳态GA(Steady-State GA)

不是每代完全替换种群,而是每次只替换一两个最差的个体。这样好的基因能快速扩散,但可能降低多样性。

def steady_state_ga(fitness_func, n_variables, generations=1000):
    """稳态GA"""
    population_size = 100
    population = init_population(population_size, n_variables)
    
    for gen in range(generations):
        # 评估
        fitnesses = [fitness_func(ind) for ind in population]
        
        # 随机选两个父母
        parents = random.sample(range(population_size), 2)
        
        # 交叉
        child1, child2 = crossover(population[parents[0]], 
                                   population[parents[1]])
        
        # 变异
        child1 = mutate(child1)
        child2 = mutate(child2)
        
        # 评估
        child_fitness = [fitness_func(child1), fitness_func(child2)]
        
        # 替换最差个体
        worst_indices = np.argsort(fitnesses)[:2]
        for i, worst in enumerate(worst_indices):
            if child_fitness[i] > fitnesses[worst]:  # 最大化
                population[worst] = [child1, child2][i]
        
        if gen % 100 == 0:
            print(f"Gen {gen}: best={max(fitnesses):.4f}")
    
    return max(population, key=fitness_func)

小生境技术(Niching)

防止多个相似解同时存在,保持种群多样性。

共享函数法

个体的适应度除以它与其他个体的相似度之和:

def niching_selection(population, fitnesses, niche_radius=0.5, 
                     niche_size=10):
    """小生境选择"""
    selected = []
    
    while len(selected) < niche_size:
        # 按原始适应度排序
        candidates = sorted(range(len(population)), 
                           key=lambda i: fitnesses[i], reverse=True)
        
        # 选择最优的
        for idx in candidates:
            if idx not in selected:
                # 检查是否在已有选择的小生境内
                in_niche = False
                for sel_idx in selected:
                    dist = np.linalg.norm(
                        np.array(population[idx]) - 
                        np.array(population[sel_idx])
                    )
                    if dist < niche_radius:
                        in_niche = True
                        break
                
                if not in_niche:
                    selected.append(idx)
                    break
    
    return [population[i] for i in selected]

实战案例

案例1:遗传算法优化神经网络超参数

from deap import base, creator, tools
import numpy as np
 
# 目标:优化神经网络的层数、每层神经元数、学习率
 
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", dict, fitness=creator.FitnessMax)
 
toolbox = base.Toolbox()
 
# 定义搜索空间
toolbox.register("n_layers", random.randint, 1, 4)
toolbox.register("neurons_per_layer", random.randint, 16, 256)
toolbox.register("learning_rate", random.uniform, 0.0001, 0.1)
toolbox.register("activation", random.choice, ['relu', 'tanh', 'sigmoid'])
 
def create_individual():
    n_layers = random.randint(1, 4)
    layers = [random.randint(16, 256) for _ in range(n_layers)]
    return creator.Individual({
        'architecture': layers,
        'learning_rate': random.uniform(0.0001, 0.1),
        'activation': random.choice(['relu', 'tanh'])
    })
 
def evaluate_network(individual):
    """评估一个网络配置"""
    import time
    
    # 这里简化处理,实际应该真正训练网络
    # 用网络复杂度作为适应度的近似
    
    layers = individual['architecture']
    n_params = sum([layers[i] * layers[i+1] for i in range(len(layers)-1)])
    n_params += layers[0] * 784 + 10 * layers[-1]  # 输入输出
    
    # 越简单的网络越好(这里假设简单网络更好)
    # 实际应该用验证集准确率
    return min(1.0, 1000 / (n_params + 1)),
 
toolbox.register("individual", create_individual)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", evaluate_network)
toolbox.register("mate", tools.cxTwoPoint)  # 适用于dict
toolbox.register("mutate", tools.mutFlipBit, indpb=0.1)
toolbox.register("select", tools.selTournament, tournsize=3)
 
# 进化
population = toolbox.population(n=20)
for gen in range(30):
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit
    
    offspring = toolbox.select(population, len(population))
    offspring = list(map(toolbox.clone, offspring))
    
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < 0.5:
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values
    
    for mutant in offspring:
        if random.random() < 0.2:
            toolbox.mutate(mutant)
            del mutant.fitness.values
    
    population = offspring[:len(population)]
 
best = max(population, key=lambda x: x.fitness.values)
print(f"Best architecture: {best}")

案例2:投资组合优化

def portfolio_optimization(returns, risk_free_rate=0.02, pop_size=100, 
                          generations=200):
    """
    用遗传算法优化投资组合
    
    目标:最大化夏普比率
    约束:权重和为1,每个权重在0-1之间
    """
    n_assets = len(returns)
    
    def fitness(weights):
        weights = np.array(weights)
        weights = weights / weights.sum()  # 归一化
        
        portfolio_return = np.dot(weights, returns.mean())
        portfolio_std = np.sqrt(np.dot(weights, 
                        np.dot(returns.cov(), weights)))
        
        sharpe = (portfolio_return - risk_free_rate) / (portfolio_std + 1e-10)
        
        # 惩罚极端权重
        penalty = np.sum(np.maximum(0, weights - 0.5)) * 10
        
        return sharpe - penalty,
    
    # GA设置
    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
    creator.create("Individual", list, fitness=creator.FitnessMax)
    
    toolbox = base.Toolbox()
    toolbox.register("weight", random.uniform, 0, 1)
    toolbox.register("individual", tools.initRepeat, 
                     creator.Individual, toolbox.weight, n=n_assets)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("evaluate", fitness)
    toolbox.register("mate", tools.cxSimulatedBinaryBounded, 
                    low=0, up=1, eta=20)
    toolbox.register("mutate", tools.mutPolynomialBounded,
                    low=0, up=1, eta=20, indpb=0.1)
    toolbox.register("select", tools.selTournament, tournsize=3)
    
    # 进化
    population = toolbox.population(n=pop_size)
    
    for gen in range(generations):
        fitnesses = list(map(toolbox.evaluate, population))
        for ind, fit in zip(population, fitnesses):
            ind.fitness.values = fit
        
        offspring = toolbox.select(population, len(population))
        offspring = list(map(toolbox.clone, offspring))
        
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < 0.7:
                toolbox.mate(child1, child2)
        
        for mutant in offspring:
            if random.random() < 0.2:
                toolbox.mutate(mutant)
        
        population = toolbox.select(population + offspring, len(population))
        
        if gen % 50 == 0:
            best = max(population, key=lambda x: x.fitness.values)
            weights = np.array(best)
            weights = weights / weights.sum()
            print(f"Gen {gen}: Sharpe={best.fitness.values[0]:.4f}")
            print(f"  Weights: {weights.round(3)}")
    
    # 返回最优解
    best = max(population, key=lambda x: x.fitness.values)
    weights = np.array(best)
    return weights / weights.sum()
 
 
if __name__ == "__main__":
    # 生成模拟数据
    np.random.seed(42)
    n_days = 252
    n_assets = 10
    
    # 模拟日收益率
    returns = np.random.randn(n_days, n_assets) * 0.02 + 0.0005
    
    optimal_weights = portfolio_optimization(returns)
    print(f"\nOptimal weights: {optimal_weights.round(3)}")

总结

进化算法的工程实践比理论复杂得多。参数设置、约束处理、多目标优化、并行化——每一个都是坑。

但正因为复杂,才有调优的空间。掌握了这些技巧,你可以把一个”效果一般”的GA调成”state-of-the-art”。

核心经验总结

  1. 参数设置:先从经验值开始,然后根据问题特点微调
  2. 精英保留:几乎总是有益的
  3. 多样性:监控种群多样性,必要时采取措施
  4. 约束处理:惩罚函数最简单,修复法最有效,多目标法最优雅
  5. 并行化:岛屿模型是入门的好选择
  6. 混合策略:考虑加入局部搜索(Memetic Algorithm)

希望这篇指南对你有帮助。祝调参愉快!