关键词

粒子群优化粒子更新惯性权重学习因子全局最优
局部最优速度更新位置更新收敛性拓扑结构
gbest模型lbest模型离散PSO二进制PSOPSO变体
自适应PSO协同PSO多目标PSO量子PSO混沌PSO

摘要

粒子群优化(Particle Swarm Optimization, PSO)是由James Kennedy和Russell Eberhart于1995年提出的群体智能优化算法。该算法的灵感来源于鸟群觅食行为的社会动力学模型,通过模拟群体中个体之间的信息共享与协作机制,在解空间中进行高效的协同搜索。PSO以其算法简洁、实现方便、收敛速度快著称,已成为进化计算领域应用最广泛的算法之一,在函数优化、神经网络训练、调度优化、控制系统设计等领域得到成功应用。


一、PSO的起源与理论基础

1.1 生物社会学起源

PSO的诞生源于对生物群体行为的观察与建模。Kennedy和Eberhart在研究人工生命和群智能时,发现鸟群、鱼群等群体在觅食过程中表现出一种”集体智慧”——尽管每个个体只拥有有限的认知能力,但群体通过简单的局部交互能够实现复杂的全局行为。

核心观察

  • 鸟类在飞行时会调整自身轨迹,既参考自身经验(认知部分),也参考邻近个体的经验(社会部分)
  • 群体的运动呈现一种”动态平衡”,既有探索新区域的趋势,也有向已发现优质区域聚集的趋势

1.2 社会认知理论

PSO的数学模型融合了社会心理学中的几个核心概念:

  1. 社会影响(Social Influence):个体受邻近个体行为的影响
  2. 社会学习(Social Learning):个体从群体经验中获取信息
  3. 认知一致性(Cognitive Consistency):个体倾向于保持自身经验的一致性

1.3 与其他进化算法的对比

特征PSO遗传算法差分进化
信息共享粒子相互通信交叉重组差分向量
参数数量中等中等
记忆机制pbest/gbest种群记忆选择压力
收敛速度较慢中等

二、基本数学模型

2.1 速度和位置更新

PSO中每个粒子代表解空间中的一个候选解,其状态由位置和速度描述:

位置更新公式

速度更新公式

其中:

  • :第个粒子在时刻的位置
  • :第个粒子在时刻的速度
  • :惯性权重(inertia weight)
  • :认知学习因子(cognitive parameter)
  • :社会学习因子(social parameter)
  • :均匀分布的随机数
  • :第个粒子的历史最优位置(pbest)
  • :全局最优粒子的位置(gbest)

2.2 速度限制

为防止速度过大导致粒子越过最优区域,通常对速度进行限制:

其中通常设为搜索空间范围的10%-20%。

2.3 位置限制

粒子的位置也需要限制在搜索空间内:


三、完整算法实现

3.1 标准PSO算法

import numpy as np
import random
 
class ParticleSwarmOptimizer:
    def __init__(self, objective, dim, bounds, n_particles=30,
                 w=0.729, c1=1.49445, c2=1.49445,
                 max_iter=1000, tol=1e-8):
        """
        参数初始化
        
        Args:
            objective: 目标函数(最小化)
            dim: 问题维度
            bounds: 边界 [(x_min, x_max), ...]
            n_particles: 粒子数量
            w: 惯性权重
            c1: 认知学习因子
            c2: 社会学习因子
            max_iter: 最大迭代次数
            tol: 收敛容忍度
        """
        self.objective = objective
        self.dim = dim
        self.bounds = bounds
        self.n_particles = n_particles
        self.w = w
        self.c1 = c1
        self.c2 = c2
        self.max_iter = max_iter
        self.tol = tol
        
        # 初始化速度限制
        self.v_max = np.array([(high - low) * 0.2 
                              for low, high in bounds])
        
        # 历史记录
        self.gbest_history = []
        self.pbest_history = []
    
    def initialize(self):
        """初始化粒子群"""
        bounds_array = np.array(self.bounds)
        low_bounds = bounds_array[:, 0]
        high_bounds = bounds_array[:, 1]
        
        # 随机初始化位置
        self.positions = np.random.uniform(
            low_bounds, high_bounds, 
            (self.n_particles, self.dim)
        )
        
        # 随机初始化速度
        self.velocities = np.random.uniform(
            -self.v_max, self.v_max,
            (self.n_particles, self.dim)
        )
        
        # 评估初始适应度
        self.fitness = np.array([
            self.objective(pos) for pos in self.positions
        ])
        
        # 初始化个体最优
        self.pbest_positions = self.positions.copy()
        self.pbest_fitness = self.fitness.copy()
        
        # 初始化全局最优
        gbest_idx = np.argmin(self.fitness)
        self.gbest_position = self.positions[gbest_idx].copy()
        self.gbest_fitness = self.fitness[gbest_idx]
    
    def update_velocities(self):
        """更新所有粒子的速度"""
        r1 = np.random.random((self.n_particles, self.dim))
        r2 = np.random.random((self.n_particles, self.dim))
        
        # 认知项:向自身历史最优移动
        cognitive = self.c1 * r1 * (self.pbest_positions - self.positions)
        
        # 社会项:向全局最优移动
        social = self.c2 * r2 * (self.gbest_position - self.positions)
        
        # 更新速度
        self.velocities = self.w * self.velocities + cognitive + social
        
        # 速度限制
        for i in range(self.n_particles):
            for j in range(self.dim):
                if self.velocities[i, j] > self.v_max[j]:
                    self.velocities[i, j] = self.v_max[j]
                elif self.velocities[i, j] < -self.v_max[j]:
                    self.velocities[i, j] = -self.v_max[j]
    
    def update_positions(self):
        """更新所有粒子的位置"""
        self.positions += self.velocities
        
        # 位置限制
        bounds_array = np.array(self.bounds)
        low_bounds = bounds_array[:, 0]
        high_bounds = bounds_array[:, 1]
        
        self.positions = np.clip(self.positions, low_bounds, high_bounds)
    
    def evaluate(self):
        """评估当前粒子群"""
        for i in range(self.n_particles):
            fitness = self.objective(self.positions[i])
            self.fitness[i] = fitness
            
            # 更新个体最优
            if fitness < self.pbest_fitness[i]:
                self.pbest_positions[i] = self.positions[i].copy()
                self.pbest_fitness[i] = fitness
                
                # 更新全局最优
                if fitness < self.gbest_fitness:
                    self.gbest_position = self.positions[i].copy()
                    self.gbest_fitness = fitness
    
    def run(self):
        """运行PSO算法"""
        self.initialize()
        
        for iteration in range(self.max_iter):
            self.update_velocities()
            self.update_positions()
            self.evaluate()
            
            # 记录历史
            self.gbest_history.append(self.gbest_fitness)
            
            if iteration % 50 == 0:
                print(f"Iter {iteration}: gbest = {self.gbest_fitness:.8f}")
            
            # 早停条件
            if self.gbest_fitness < self.tol:
                print(f"Converged at iteration {iteration}")
                break
        
        return self.gbest_position, self.gbest_fitness

3.2 带有收敛分析的详细版本

class ConvergentPSO:
    def __init__(self, objective, dim, bounds):
        self.objective = objective
        self.dim = dim
        self.bounds = bounds
        self.reset()
    
    def reset(self):
        """重置算法状态"""
        bounds_array = np.array(self.bounds)
        self.positions = np.random.uniform(
            bounds_array[:, 0], bounds_array[:, 1], (30, self.dim)
        )
        self.velocities = np.zeros((30, self.dim))
        self.fitness = np.array([self.objective(p) for p in self.positions])
        self.pbest = self.positions.copy()
        self.pbest_fit = self.fitness.copy()
        
        gbest_idx = np.argmin(self.fitness)
        self.gbest = self.positions[gbest_idx].copy()
        self.gbest_fit = self.fitness[gbest_idx]
        
        self.history = [self.gbest_fit]
    
    def step(self, w=0.729, c1=1.49445, c2=1.49445):
        """单步迭代"""
        r1 = np.random.random((30, self.dim))
        r2 = np.random.random((30, self.dim))
        
        # 速度更新
        self.velocities = (w * self.velocities + 
            c1 * r1 * (self.pbest - self.positions) +
            c2 * r2 * (self.gbest - self.positions)
        )
        
        # 位置更新
        self.positions += self.velocities
        bounds = np.array(self.bounds)
        self.positions = np.clip(self.positions, bounds[:, 0], bounds[:, 1])
        
        # 评估
        self.fitness = np.array([self.objective(p) for p in self.positions])
        
        # 更新pbest
        improved = self.fitness < self.pbest_fit
        self.pbest[improved] = self.positions[improved]
        self.pbest_fit[improved] = self.fitness[improved]
        
        # 更新gbest
        gbest_idx = np.argmin(self.fitness)
        if self.fitness[gbest_idx] < self.gbest_fit:
            self.gbest = self.positions[gbest_idx].copy()
            self.gbest_fit = self.fitness[gbest_idx]
        
        self.history.append(self.gbest_fit)

四、全局版与局部版PSO

4.1 gbest模型(全局版)

gbest模型使用全局最优粒子作为社会信息的来源:

class GBestPSO(ParticleSwarmOptimizer):
    """全局最优模型PSO"""
    
    def update_velocities(self):
        r1 = np.random.random((self.n_particles, self.dim))
        r2 = np.random.random((self.n_particles, self.dim))
        
        cognitive = self.c1 * r1 * (self.pbest_positions - self.positions)
        social = self.c2 * r2 * (self.gbest_position - self.positions)
        
        self.velocities = self.w * self.velocities + cognitive + social
        
        # 限制速度
        self.velocities = np.clip(self.velocities, -self.v_max, self.v_max)

特点

  • 收敛速度快
  • 适合单峰问题
  • 容易陷入局部最优

4.2 lbest模型(局部版)

lbest模型使用环形拓扑结构,每个粒子只与邻居交换信息:

class LBestPSO(ParticleSwarmOptimizer):
    """局部最优模型PSO"""
    
    def __init__(self, *args, k=3, **kwargs):
        super().__init__(*args, **kwargs)
        self.k = k  # 每个粒子的邻居数量(每侧)
        self.lbest = np.zeros(self.dim)
    
    def initialize_topology(self):
        """初始化环形拓扑"""
        self.neighbors = {}
        for i in range(self.n_particles):
            neighbors = [(i + j) % self.n_particles for j in range(-self.k, self.k+1)]
            self.neighbors[i] = neighbors
    
    def find_lbest(self):
        """找到每个粒子的局部最优"""
        for i in range(self.n_particles):
            neighbor_fitness = self.pbest_fitness[self.neighbors[i]]
            best_neighbor = self.neighbors[i][np.argmin(neighbor_fitness)]
            self.lbest = self.pbest_positions[best_neighbor]
    
    def update_velocities(self):
        self.find_lbest()
        
        r1 = np.random.random((self.n_particles, self.dim))
        r2 = np.random.random((self.n_particles, self.dim))
        
        cognitive = self.c1 * r1 * (self.pbest_positions - self.positions)
        
        # 使用lbest替代gbest
        lbest_expanded = np.array([self.pbest_positions[self.neighbors[i][
            np.argmin(self.pbest_fitness[self.neighbors[i]])]]
            for i in range(self.n_particles)])
        social = self.c2 * r2 * (lbest_expanded - self.positions)
        
        self.velocities = self.w * self.velocities + cognitive + social
        self.velocities = np.clip(self.velocities, -self.v_max, self.v_max)

拓扑结构类型

环形拓扑(lbest k=1):
    ○——○——○——○——○——○——○
    ↑__________________|

网格拓扑:
    ○ ○ ○
    ○ ● ○
    ○ ○ ○

冯诺依曼拓扑:
    ○ ○ ○
    ○ ● ○
    ○ ○ ○

4.3 拓扑结构对比

拓扑类型收敛速度全局搜索稳定性
gbest最快最弱较差
Ring较慢最强较好
Grid中等中等中等
Von Neumann中等较强较好

Note

局部版PSO虽然收敛速度较慢,但在复杂多峰问题上表现出更强的全局搜索能力,能够有效避免早熟收敛。


五、惯性权重与学习因子

5.1 线性递减惯性权重

Shi和Eberhart提出的线性递减策略:

class LinearDecreasingWPSO(ParticleSwarmOptimizer):
    def __init__(self, *args, w_max=0.9, w_min=0.4, **kwargs):
        super().__init__(*args, **kwargs)
        self.w_max = w_max
        self.w_min = w_min
    
    def get_inertia_weight(self, iteration):
        """线性递减"""
        return self.w_max - (self.w_max - self.w_min) * iteration / self.max_iter

数学表达

5.2 非线性递减惯性权重

class NonLinearWPSO(ParticleSwarmOptimizer):
    def get_inertia_weight(self, iteration):
        t = iteration / self.max_iter
        # 指数递减
        return 0.9 * 0.99 ** iteration

5.3 自适应惯性权重

class AdaptiveWPSO(ParticleSwarmOptimizer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.w = 0.9  # 初始值
        self.fitness_improvement = []
    
    def update_adaptive_w(self, iteration):
        """基于适应度改进率的自适应调整"""
        if len(self.history) >= 10:
            recent_improvement = self.history[-1] - self.history[-10]
            if recent_improvement < 1e-6:
                self.w *= 1.1  # 增加探索
            elif recent_improvement > 0.1:
                self.w *= 0.9  # 增加开发

5.4 学习因子的影响

参数配置特性适用场景
强调个体经验探索阶段
强调群体经验开发阶段
平衡型通用
Clerc标准配置理论最优

Clerc建议的标准参数

Tip

学习因子的自适应调整可以帮助PSO在探索和开发之间保持平衡。一种常见策略是在迭代早期增大以鼓励探索,后期增大以加速收敛。


六、离散与二进制PSO

6.1 二进制PSO

Kennedy和Eberhart于1997年提出的离散版本:

class BinaryPSO:
    def __init__(self, objective, dim, n_particles=30):
        self.objective = objective
        self.dim = dim
        self.n_particles = n_particles
    
    def sigmoid(self, v):
        """Sigmoid函数"""
        return 1 / (1 + np.exp(-np.clip(v, -500, 500)))
    
    def initialize(self):
        self.positions = np.random.randint(0, 2, (self.n_particles, self.dim))
        self.velocities = np.random.uniform(-6, 6, (self.n_particles, self.dim))
        self.fitness = np.array([self.objective(p) for p in self.positions])
        self.pbest = self.positions.copy()
        self.pbest_fit = self.fitness.copy()
        
        gbest_idx = np.argmin(self.fitness)
        self.gbest = self.positions[gbest_idx].copy()
        self.gbest_fit = self.fitness[gbest_idx]
    
    def update_velocities(self, c1=2, c2=2):
        r1 = np.random.random((self.n_particles, self.dim))
        r2 = np.random.random((self.n_particles, self.dim))
        
        self.velocities += (c1 * r1 * (self.pbest - self.positions) +
                           c2 * r2 * (self.gbest - self.positions))
        
        # 速度限制
        self.velocities = np.clip(self.velocities, -4, 4)
    
    def update_positions(self):
        """位置(二进制化)"""
        probability = self.sigmoid(self.velocities)
        self.positions = (np.random.random((self.n_particles, self.dim)) < probability).astype(int)
    
    def step(self):
        self.update_velocities()
        self.update_positions()
        self.fitness = np.array([self.objective(p) for p in self.positions])
        
        # 更新pbest和gbest
        improved = self.fitness < self.pbest_fit
        self.pbest[improved] = self.positions[improved]
        self.pbest_fit[improved] = self.fitness[improved]
        
        gbest_idx = np.argmin(self.fitness)
        if self.fitness[gbest_idx] < self.gbest_fit:
            self.gbest = self.positions[gbest_idx].copy()
            self.gbest_fit = self.fitness[gbest_idx]

6.2 离散PSO(整数编码)

class DiscretePSO:
    """离散变量PSO"""
    
    def __init__(self, objective, dim, n_categories):
        self.objective = objective
        self.dim = dim
        self.n_categories = n_categories  # 每个维度的离散值数量
    
    def initialize(self):
        self.positions = np.random.randint(0, self.n_categories, 
                                          (self.n_particles, self.dim))
        self.velocities = np.random.random((self.n_particles, self.dim))
        # ... 初始化其他属性 ...
    
    def update_positions(self):
        """基于概率的位置更新"""
        for i in range(self.n_particles):
            for d in range(self.dim):
                if self.velocities[i, d] > 0.5:
                    self.positions[i, d] = self.pbest[i, d]
                if self.velocities[i, d] > 0.75:
                    self.positions[i, d] = self.gbest[d]

6.3 排列问题PSO

class PermutationPSO:
    """用于旅行商等排列问题的PSO"""
    
    def __init__(self, objective, n_cities):
        self.objective = objective
        self.n_cities = n_cities
    
    def initialize(self):
        """随机排列初始化"""
        self.positions = np.array([
            np.random.permutation(self.n_cities) 
            for _ in range(self.n_particles)
        ])
        self.pbest = self.positions.copy()
        self.gbest = self.positions[0].copy()
    
    def crossover_position(self, p1, p2):
        """位置交叉产生新排列"""
        n = len(p1)
        # 随机选择一段保留
        start, end = sorted(random.sample(range(n), 2))
        
        child = np.zeros(n, dtype=int)
        child[start:end] = p1[start:end]
        
        # 填补剩余位置
        remaining = [city for city in p2 if city not in child]
        idx = 0
        for i in range(n):
            if child[i] == 0:
                child[i] = remaining[idx]
                idx += 1
        
        return child

七、PSO变体与改进

7.1 协同PSO

class CooperativePSO(ParticleSwarmOptimizer):
    """
    协同PSO:将高维问题分解为多个低维子问题
    """
    def __init__(self, *args, sub_dim=5, **kwargs):
        super().__init__(*args, **kwargs)
        self.sub_dim = sub_dim
        self.n_sub_swarms = self.dim // sub_dim
    
    def evaluate_cooperative(self, full_position):
        """评估完整解"""
        return self.objective(full_position)
    
    def step(self):
        """分阶段更新"""
        # 更新每个子种群
        for sub_idx in range(self.n_sub_swarms):
            sub_start = sub_idx * self.sub_dim
            sub_end = sub_start + self.sub_dim
            
            # 只更新当前维度
            # ...

7.2 混沌PSO

import numpy as np
 
class ChaoticPSO(ParticleSwarmOptimizer):
    """
    使用混沌序列代替随机数
    """
    def __init__(self, *args, chaos_type='logistic', **kwargs):
        super().__init__(*args, **kwargs)
        self.chaos_type = chaos_type
        self.chaos_param = 0.7  # 混沌参数
    
    def logistic_map(self, x):
        """Logistic混沌映射"""
        return self.chaos_param * x * (1 - x)
    
    def generate_chaos(self, n):
        """生成混沌序列"""
        x = np.random.random()
        sequence = []
        for _ in range(n):
            x = self.logistic_map(x)
            sequence.append(x)
        return np.array(sequence)
    
    def step(self):
        """使用混沌序列更新"""
        # 生成混沌序列
        chaos_r1 = self.generate_chaos(self.n_particles * self.dim)
        chaos_r2 = self.generate_chaos(self.n_particles * self.dim)
        
        r1 = chaos_r1.reshape(self.n_particles, self.dim)
        r2 = chaos_r2.reshape(self.n_particles, self.dim)
        
        # 更新速度
        self.velocities = (self.w * self.velocities +
            self.c1 * r1 * (self.pbest_positions - self.positions) +
            self.c2 * r2 * (self.gbest_position - self.positions)
        )
        
        # ...

7.3 量子PSO

class QuantumPSO(ParticleSwarmOptimizer):
    """
    量子粒子群优化
    引入量子力学原理
    """
    def __init__(self, *args, beta=0.5, **kwargs):
        super().__init__(*args, **kwargs)
        self.beta = beta  # 收缩-扩张因子
    
    def update_quantum_position(self, i):
        """量子位置更新"""
        phi1 = np.random.random(self.dim)
        phi2 = np.random.random(self.dim)
        
        p = (phi1 * self.pbest_positions[i] + phi2 * self.gbest_position) / (phi1 + phi2)
        
        # 量子旋转
        u = np.random.random(self.dim)
        
        new_position = p + self.beta * np.abs(self.positions[i] - p) * np.log(1/u)
        
        return new_position
    
    def step(self):
        """量子PSO步骤"""
        for i in range(self.n_particles):
            # 量子位置更新
            self.positions[i] = self.update_quantum_position(i)
        
        # 评估并更新pbest和gbest
        self.evaluate()

7.4 多目标PSO

class MultiObjectivePSO:
    """
    多目标粒子群优化
    """
    def __init__(self, objectives, dim, bounds, n_particles=100):
        self.objectives = objectives
        self.dim = dim
        self.bounds = bounds
        self.n_particles = n_particles
    
    def dominates(self, x1, x2):
        """判断x1是否支配x2"""
        f1 = np.array([f(x1) for f in self.objectives])
        f2 = np.array([f(x2) for f in self.objectives])
        return all(f1 <= f2) and any(f1 < f2)
    
    def update_leader_archive(self):
        """更新非支配解存档"""
        self.archive = []
        for particle in self.particles:
            is_dominated = False
            to_remove = []
            
            for i, archived in enumerate(self.archive):
                if self.dominates(particle, archived):
                    to_remove.append(i)
                elif self.dominates(archived, particle):
                    is_dominated = True
            
            if not is_dominated:
                self.archive.append(particle)
                for i in reversed(to_remove):
                    self.archive.pop(i)
    
    def select_leader(self):
        """从存档中选择领导者(拥挤距离)"""
        # 拥挤距离计算...
        return random.choice(self.archive)

八、收敛性分析

8.1 稳定性分析

PSO的收敛性可通过分析速度更新公式的稳定性来确定。将速度更新公式简化为:

收敛条件(Clerc & Kennedy, 2002):

其中为认知和社会影响的权重。

8.2 参数约束

为保证算法收敛,参数需满足:

或采用收缩-扩张因子

8.3 收敛速度估计

class ConvergenceAnalyzer:
    def analyze(self, history):
        """分析PSO收敛行为"""
        history = np.array(history)
        
        # 计算收敛代数
        converged_gen = self.find_convergence_generation(history)
        
        # 计算收敛率
        convergence_rate = self.calculate_convergence_rate(history)
        
        # 分析探索-开发平衡
        exploration_score = self.estimate_exploration(history)
        
        return {
            'converged_at': converged_gen,
            'convergence_rate': convergence_rate,
            'exploration_score': exploration_score
        }

九、应用实例

9.1 函数优化

def optimize_function():
    """使用PSO优化测试函数"""
    # Sphere函数
    sphere = lambda x: sum(x**2)
    
    optimizer = ParticleSwarmOptimizer(
        objective=sphere,
        dim=10,
        bounds=[(-10, 10)] * 10,
        n_particles=30,
        max_iter=1000
    )
    
    best_pos, best_fit = optimizer.run()
    print(f"Best position: {best_pos}")
    print(f"Best fitness: {best_fit}")
    
    return best_pos, best_fit
 
# Rosenbrock函数
def rosenbrock(x):
    return sum(100 * (x[i+1] - x[i]**2)**2 + (x[i] - 1)**2 for i in range(len(x)-1))
 
# Ackley函数
def ackley(x):
    return -20 * np.exp(-0.2 * np.sqrt(np.mean(x**2))) - \
           np.exp(np.mean(np.cos(2*np.pi*x))) + 20 + np.e

9.2 神经网络训练

class PSOTrainableNN:
    def __init__(self, nn_architecture):
        self.nn = nn_architecture
        self.dim = sum(w.size for w in nn.get_weights())
    
    def fitness_function(self, weights_flat):
        """使用扁平化权重向量作为PSO的位置"""
        self.nn.set_weights(weights_flat)
        predictions = self.nn.predict(X_train)
        loss = self.nn.compute_loss(predictions, y_train)
        return loss
    
    def train(self, X_train, y_train, n_particles=30, max_iter=500):
        self.nn.set_training_data(X_train, y_train)
        
        optimizer = ParticleSwarmOptimizer(
            objective=self.fitness_function,
            dim=self.dim,
            bounds=[(-1, 1)] * self.dim,
            n_particles=n_particles,
            max_iter=max_iter
        )
        
        best_weights, best_loss = optimizer.run()
        self.nn.set_weights(best_weights)
        
        return best_loss

9.3 聚类优化

class PSOKMeans:
    def __init__(self, n_clusters, max_iter=100):
        self.n_clusters = n_clusters
        self.max_iter = max_iter
    
    def fitness(self, centers, X):
        """计算聚类质量(簇内距离和)"""
        labels = self.assign_labels(centers, X)
        distance = 0
        for k in range(self.n_clusters):
            cluster_points = X[labels == k]
            if len(cluster_points) > 0:
                distance += np.sum((cluster_points - centers[k])**2)
        return distance
    
    def pso_optimize(self, X):
        dim = X.shape[1] * self.n_clusters
        
        optimizer = ParticleSwarmOptimizer(
            objective=lambda c: self.fitness(c.reshape(self.n_clusters, -1), X),
            dim=dim,
            bounds=[(X.min(), X.max())] * dim,
            n_particles=30,
            max_iter=self.max_iter
        )
        
        best_centers = optimizer.run()[0]
        return best_centers.reshape(self.n_clusters, -1)

十、PSO的理论深度分析

10.1 动力学系统视角

10.1.1 PSO作为非线性动力学系统

PSO的更新公式可以被建模为一个高维非线性离散动力系统:

固定点分析:当算法收敛时,有:

这要求,即速度收敛到零。

10.1.2 相轨迹分析

class PhaseTrajectoryAnalyzer:
    """分析PSO粒子的相轨迹"""
    
    def __init__(self, optimizer):
        self.optimizer = optimizer
        self.position_history = []
        self.velocity_history = []
    
    def track_trajectory(self, particle_idx, steps=100):
        """跟踪单个粒子的相轨迹"""
        self.optimizer.reset()
        
        for _ in range(steps):
            self.position_history.append(
                self.optimizer.positions[particle_idx].copy()
            )
            self.velocity_history.append(
                self.optimizer.velocities[particle_idx].copy()
            )
            self.optimizer.step()
        
        return np.array(self.position_history), np.array(self.velocity_history)
    
    def plot_phase_diagram(self):
        """绘制相图"""
        import matplotlib.pyplot as plt
        
        pos = np.array(self.position_history)
        vel = np.array(self.velocity_history)
        
        # 使用前两个维度
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        
        # 位置-时间图
        axes[0, 0].plot(pos[:, 0], pos[:, 1])
        axes[0, 0].set_xlabel('x1')
        axes[0, 0].set_ylabel('x2')
        axes[0, 0].set_title('Position Trajectory')
        
        # 速度-时间图
        axes[0, 1].plot(vel[:, 0], vel[:, 1])
        axes[0, 1].set_xlabel('v1')
        axes[0, 1].set_ylabel('v2')
        axes[0, 1].set_title('Velocity Trajectory')
        
        # 速度范数衰减
        vel_norms = np.linalg.norm(vel, axis=1)
        axes[1, 0].plot(vel_norms)
        axes[1, 0].set_xlabel('Iteration')
        axes[1, 0].set_ylabel('||v||')
        axes[1, 0].set_title('Velocity Magnitude Decay')
        
        # 相轨迹(速度vs位置)
        axes[1, 1].scatter(pos[:, 0], vel[:, 0], c=range(len(pos)))
        axes[1, 1].set_xlabel('Position x1')
        axes[1, 1].set_ylabel('Velocity v1')
        axes[1, 1].set_title('Phase Plot')
        
        plt.tight_layout()
        plt.show()

10.1.3 吸引域分析

吸引域(Basin of Attraction)定义了初始条件最终收敛到的吸引子位置:

class BasinOfAttractionAnalyzer:
    """
    分析PSO的吸引域
    """
    def __init__(self, optimizer, objective):
        self.optimizer = optimizer
        self.objective = objective
    
    def sample_attraction_basin(self, n_samples=1000):
        """采样吸引域"""
        basins = {}
        
        for _ in range(n_samples):
            # 随机初始化
            self.optimizer.reset()
            initial_pos = self.optimizer.positions[0].copy()
            
            # 运行到收敛
            for _ in range(self.optimizer.max_iter):
                self.optimizer.step()
            
            # 记录收敛位置
            final_pos = tuple(self.optimizer.gbest_position.round(4))
            if final_pos not in basins:
                basins[final_pos] = []
            basins[final_pos].append(initial_pos)
        
        return basins
    
    def visualize_basins(self, basins):
        """可视化吸引域"""
        import matplotlib.pyplot as plt
        
        colors = plt.cm.rainbow(np.linspace(0, 1, len(basins)))
        
        for (final_pos, initial_points), color in zip(basins.items(), colors):
            initials = np.array(initial_points)
            plt.scatter(initials[:, 0], initials[:, 1], 
                       c=[color], alpha=0.5, s=10)
            plt.scatter(final_pos[0], final_pos[1], c=[color], 
                       marker='*', s=200, edgecolors='black')
        
        plt.xlabel('x1')
        plt.ylabel('x2')
        plt.title('Attraction Basins')
        plt.show()

10.2 概率分析

10.2.1 收敛概率分析

class ConvergenceProbabilityAnalyzer:
    """分析PSO的全局收敛概率"""
    
    def __init__(self, objective, dim, bounds, n_trials=100):
        self.objective = objective
        self.dim = dim
        self.bounds = bounds
        self.n_trials = n_trials
    
    def estimate_convergence_probability(self, tolerance=1e-6):
        """估计找到全局最优的概率"""
        successes = 0
        
        for _ in range(self.n_trials):
            pso = ParticleSwarmOptimizer(
                self.objective, self.dim, self.bounds,
                max_iter=500
            )
            _, fitness = pso.run()
            
            if fitness < tolerance:
                successes += 1
        
        return successes / self.n_trials
    
    def confidence_interval(self, p, n, confidence=0.95):
        """计算置信区间"""
        from scipy import stats
        
        z = stats.norm.ppf((1 + confidence) / 2)
        se = np.sqrt(p * (1 - p) / n)
        
        return (p - z * se, p + z * se)

10.2.2 期望收敛时间分析

class ExpectedConvergenceTime:
    """估计期望收敛时间"""
    
    def analyze_convergence_time(self, optimizer, true_optimum):
        """分析收敛时间统计"""
        convergence_times = []
        
        for _ in range(30):
            optimizer.reset()
            converged = False
            
            for t in range(optimizer.max_iter):
                optimizer.step()
                
                distance = np.linalg.norm(
                    optimizer.gbest_position - true_optimum
                )
                
                if distance < 1e-4:
                    convergence_times.append(t)
                    converged = True
                    break
            
            if not converged:
                convergence_times.append(optimizer.max_iter)
        
        return {
            'mean': np.mean(convergence_times),
            'std': np.std(convergence_times),
            'median': np.median(convergence_times),
            'min': np.min(convergence_times),
            'max': np.max(convergence_times),
        }

10.3 邻域拓扑的数学分析

10.3.1 拓扑结构与信息流动

不同的拓扑结构决定了粒子间的信息流动模式:

class TopologyAnalyzer:
    """分析不同拓扑结构的影响"""
    
    TOPOLOGIES = {
        'gbest': 'global_best',
        'lbest_ring': 'local_best_ring',
        'lbest_star': 'local_best_star',
        'pyramid': 'pyramid',
        'voronoi': 'voronoi',
    }
    
    def analyze_information_flow(self, topology):
        """分析拓扑中的信息流动"""
        n_particles = 30
        adjacency_matrix = self.build_adjacency(topology, n_particles)
        
        # 计算图的连通性
        eigenvalues = np.linalg.eigvalsh(adjacency_matrix)
        
        # 代数连通性(第二小特征值)
        algebraic_connectivity = sorted(eigenvalues)[1]
        
        # 直径
        diameter = self.calculate_diameter(adjacency_matrix)
        
        # 平均路径长度
        avg_path_length = self.calculate_avg_path(adjacency_matrix)
        
        return {
            'algebraic_connectivity': algebraic_connectivity,
            'diameter': diameter,
            'avg_path_length': avg_path_length,
        }
    
    def build_adjacency(self, topology, n):
        """构建邻接矩阵"""
        adj = np.zeros((n, n))
        
        if topology == 'lbest_ring':
            for i in range(n):
                adj[i, (i-1) % n] = 1
                adj[i, (i+1) % n] = 1
        elif topology == 'gbest':
            adj[:] = 1  # 全连接
        # ... 其他拓扑
        
        return adj

10.3.2 小世界拓扑

class SmallWorldPSO(ParticleSwarmOptimizer):
    """
    小世界拓扑PSO
    结合了局部搜索和全局搜索的优势
    """
    def __init__(self, *args, rewire_prob=0.1, **kwargs):
        super().__init__(*args, **kwargs)
        self.rewire_prob = rewire_prob
        self.build_ring_topology()
    
    def build_ring_topology(self):
        """构建环形基础拓扑"""
        self.neighbors = {}
        for i in range(self.n_particles):
            self.neighbors[i] = [(i-1) % self.n_particles,
                                 (i+1) % self.n_particles]
    
    def rewire_edges(self):
        """随机重连边"""
        for i in range(self.n_particles):
            for j_idx, j in enumerate(self.neighbors[i]):
                if random.random() < self.rewire_prob:
                    # 随机选择新邻居
                    new_neighbor = random.choice([
                        k for k in range(self.n_particles)
                        if k != i and k not in self.neighbors[i]
                    ])
                    self.neighbors[i][j_idx] = new_neighbor
    
    def get_best_neighbor(self, i):
        """获取邻居中最优的粒子"""
        neighbor_indices = self.neighbors[i]
        neighbor_fitness = self.pbest_fitness[neighbor_indices]
        best_idx = np.argmin(neighbor_fitness)
        return self.pbest_positions[neighbor_indices[best_idx]]

十一、高级变体与前沿研究

11.1 耗散粒子群优化(DPSO)

11.1.1 理论基础

耗散PSO通过引入负熵流来抵抗早熟收敛:

class DissipativePSO(ParticleSwarmOptimizer):
    """
    耗散粒子群优化
    通过随机扰动引入熵
    """
    def __init__(self, *args, entropy_threshold=1e-6, 
                 perturbation_strength=0.1, **kwargs):
        super().__init__(*args, **kwargs)
        self.entropy_threshold = entropy_threshold
        self.perturbation_strength = perturbation_strength
    
    def calculate_entropy(self):
        """计算粒子群熵"""
        # 使用速度分布的熵
        velocities = self.velocities.flatten()
        hist, _ = np.histogram(velocities, bins=50, density=True)
        hist = hist[hist > 0]  # 去除零
        return -np.sum(hist * np.log(hist + 1e-10))
    
    def inject_entropy(self):
        """注入熵以跳出局部最优"""
        entropy = self.calculate_entropy()
        
        if entropy < self.entropy_threshold:
            # 随机选择粒子添加扰动
            n_perturbed = max(1, self.n_particles // 10)
            perturbed_indices = random.sample(
                range(self.n_particles), n_perturbed
            )
            
            for idx in perturbed_indices:
                # 添加随机扰动
                perturbation = np.random.uniform(
                    -self.perturbation_strength,
                    self.perturbation_strength,
                    self.dim
                )
                self.positions[idx] += perturbation
                
                # 重新评估
                self.fitness[idx] = self.objective(self.positions[idx])
                
                # 更新pbest
                if self.fitness[idx] < self.pbest_fitness[idx]:
                    self.pbest_positions[idx] = self.positions[idx].copy()
                    self.pbest_fitness[idx] = self.fitness[idx]
    
    def step(self):
        self.update_velocities()
        self.update_positions()
        self.evaluate()
        self.inject_entropy()  # 关键差异

11.1.2 耗散机制的效果分析

class EntropyAnalysis:
    """分析熵注入的效果"""
    
    def compare_with_standard(self, objective, dim=10, n_runs=20):
        """对比标准PSO和耗散PSO"""
        standard_results = []
        dissipative_results = []
        
        for _ in range(n_runs):
            # 标准PSO
            pso_std = ParticleSwarmOptimizer(
                objective, dim, [(-10, 10)] * dim
            )
            _, fitness_std = pso_std.run()
            standard_results.append(fitness_std)
            
            # 耗散PSO
            pso_diss = DissipativePSO(
                objective, dim, [(-10, 10)] * dim
            )
            _, fitness_diss = pso_diss.run()
            dissipative_results.append(fitness_diss)
        
        return {
            'standard': {
                'mean': np.mean(standard_results),
                'std': np.std(standard_results),
            },
            'dissipative': {
                'mean': np.mean(dissipative_results),
                'std': np.std(dissipative_results),
            },
            'improvement': np.mean(standard_results) - np.mean(dissipative_results)
        }

11.2 协同进化PSO

11.2.1 协同进化框架

class CoevolutionPSO:
    """
    协同进化PSO
    将复杂问题分解为多个子问题协同优化
    """
    def __init__(self, objective, dim, bounds, n_subpopulations=4):
        self.objective = objective
        self.dim = dim
        self.bounds = bounds
        self.n_subpopulations = n_subpopulations
        
        # 子种群规模
        self.sub_dim = dim // n_subpopulations
        self.n_particles_per_sub = 20
        
        # 创建子种群
        self.sub_swarms = []
        for i in range(n_subpopulations):
            sub_bounds = bounds[i*self.sub_dim:(i+1)*self.sub_dim]
            self.sub_swarms.append(
                ParticleSwarmOptimizer(
                    lambda x, si=i: self._evaluate_subsolution(x, si),
                    self.sub_dim, sub_bounds,
                    n_particles=self.n_particles_per_sub
                )
            )
    
    def _evaluate_subsolution(self, sub_pos, sub_idx):
        """评估子解(需要组合其他子解)"""
        # 从其他子种群获取当前最优
        combined = np.zeros(self.dim)
        
        for i, swarm in enumerate(self.sub_swarms):
            if i == sub_idx:
                combined[sub_idx*self.sub_dim:(i+1)*self.sub_dim] = sub_pos
            else:
                combined[i*self.sub_dim:(i+1)*self.sub_dim] = \
                    swarm.gbest_position
        
        return self.objective(combined)
    
    def run(self, max_iter=1000):
        """协同进化主循环"""
        for swarm in self.sub_swarms:
            swarm.initialize()
        
        best_fitness = float('inf')
        best_solution = None
        
        for iteration in range(max_iter):
            for swarm in self.sub_swarms:
                swarm.update_velocities()
                swarm.update_positions()
                swarm.evaluate()
            
            # 重组并评估全局适应度
            global_best = self._reconstruct_solution()
            fitness = self.objective(global_best)
            
            if fitness < best_fitness:
                best_fitness = fitness
                best_solution = global_best.copy()
        
        return best_solution, best_fitness
    
    def _reconstruct_solution(self):
        """重组全局解"""
        solution = np.zeros(self.dim)
        for i, swarm in enumerate(self.sub_swarms):
            solution[i*self.sub_dim:(i+1)*self.sub_dim] = \
                swarm.gbest_position
        return solution

11.3 稀疏PSO

11.3.1 稀疏编码优化

class SparsePSO:
    """
    稀疏PSO
    专门用于稀疏优化问题
    """
    def __init__(self, objective, dim, sparsity_target=0.1):
        self.objective = objective
        self.dim = dim
        self.sparsity_target = sparsity_target
    
    def sparse_fitness(self, x, alpha=0.01):
        """
        稀疏惩罚适应度
        L1正则化
        """
        reconstruction_error = self.objective(x)
        sparsity_penalty = alpha * np.sum(np.abs(x))
        return reconstruction_error + sparsity_penalty
    
    def threshold_solution(self, x, threshold=1e-3):
        """阈值化使解更稀疏"""
        x_sparse = x.copy()
        x_sparse[np.abs(x_sparse) < threshold] = 0
        return x_sparse
    
    def optimize(self, n_particles=30, max_iter=500):
        """运行稀疏PSO"""
        bounds = [(-10, 10)] * self.dim
        
        optimizer = ParticleSwarmOptimizer(
            lambda x: self.sparse_fitness(x),
            self.dim, bounds, n_particles=n_particles
        )
        
        best_pos, _ = optimizer.run()
        best_sparse = self.threshold_solution(best_pos)
        
        # 评估原始目标函数
        original_fitness = self.objective(best_sparse)
        actual_sparsity = 1 - np.count_nonzero(best_sparse) / self.dim
        
        return {
            'solution': best_sparse,
            'sparsity': actual_sparsity,
            'fitness': original_fitness
        }

11.4 批量适应度PSO

11.4.1 批量评估策略

class BatchFitnessPSO(ParticleSwarmOptimizer):
    """
    批量适应度评估PSO
    利用向量化加速适应度计算
    """
    def __init__(self, objective_vectorized, *args, **kwargs):
        super().__init__(lambda x: objective_vectorized(x[None, :])[0], 
                        *args, **kwargs)
        self.objective_vectorized = objective_vectorized
    
    def evaluate_batch(self):
        """批量评估所有粒子"""
        # 向量化评估
        fitness_values = self.objective_vectorized(self.positions)
        self.fitness = fitness_values
        
        # 更新pbest
        improved = self.fitness < self.pbest_fitness
        self.pbest_positions[improved] = self.positions[improved]
        self.pbest_fitness[improved] = self.fitness[improved]
        
        # 更新gbest
        gbest_idx = np.argmin(self.fitness)
        if self.fitness[gbest_idx] < self.gbest_fitness:
            self.gbest_position = self.positions[gbest_idx].copy()
            self.gbest_fitness = self.fitness[gbest_idx]
 
 
class VectorizedFitnessExample:
    """向量化的适应度函数示例"""
    
    @staticmethod
    def rastrigin_batch(X):
        """
        批量Rastrigin函数
        X shape: (n_particles, dim)
        """
        A = 10
        n = X.shape[1]
        return A * n + np.sum(X**2 - A * np.cos(2 * np.pi * X), axis=1)
    
    @staticmethod
    def rosenbrock_batch(X):
        """
        批量Rosenbrock函数
        """
        x = X[:, :-1]
        y = X[:, 1:]
        return np.sum(100 * (y - x**2)**2 + (1 - x)**2, axis=1)

十二、性能基准与测试套件

12.1 标准测试函数集

12.1.1 单峰函数

class BenchmarkFunctions:
    """标准基准测试函数"""
    
    @staticmethod
    def sphere(x):
        """Sphere函数 - 最简单的单峰函数"""
        return np.sum(x**2)
    
    @staticmethod
    def rosenbrock(x):
        """Rosenbrock函数 - 经典优化问题"""
        return sum(100*(x[i+1] - x[i]**2)**2 + (1-x[i])**2 
                   for i in range(len(x)-1))
    
    @staticmethod
    def schwefel(x):
        """Schwefel函数"""
        n = len(x)
        return 418.9829 * n - np.sum(x * np.sin(np.sqrt(np.abs(x))))
    
    @staticmethod
    def step(x):
        """Step函数"""
        return np.sum(np.floor(x + 0.5)**2)
    
    @staticmethod
    def quartic(x):
        """四次函数(含噪声)"""
        n = len(x)
        return np.sum((n - i) * x[i]**4 for i in range(n)) + random.random()

12.1.2 多峰函数

    @staticmethod
    def rastrigin(x):
        """Rastrigin函数 - 多峰,有大量局部最优"""
        A = 10
        n = len(x)
        return A * n + np.sum(x**2 - A * np.cos(2 * np.pi * x))
    
    @staticmethod
    def griewank(x):
        """Griewank函数"""
        part1 = np.sum(x**2) / 4000
        part2 = np.prod(np.cos(x / np.sqrt(np.arange(1, len(x)+1))))
        return part1 - part2 + 1
    
    @staticmethod
    def ackley(x):
        """Ackley函数"""
        a = 20
        b = 0.2
        c = 2 * np.pi
        
        sum1 = np.sum(x**2)
        sum2 = np.sum(np.cos(c * x))
        
        term1 = -a * np.exp(-b * np.sqrt(sum1 / len(x)))
        term2 = -np.exp(sum2 / len(x))
        
        return term1 + term2 + a + np.e
    
    @staticmethod
    def perm(x, beta=0.5):
        """Perm函数"""
        n = len(x)
        result = 0
        for k in range(1, n + 1):
            temp = 0
            for j in range(1, n + 1):
                temp += (j + beta) * (x[j-1]**k - 1/j**k)
            result += temp**2
        return result

12.1.3 组合函数

    @staticmethod
    def zakharov(x):
        """Zakharov函数"""
        sum1 = np.sum(x**2)
        sum2 = np.sum(0.5 * (i+1) * x[i] for i in range(len(x)))
        return sum1 + sum2**2 + sum2**4
    
    @staticmethod
    def dixon_price(x):
        """Dixon-Price函数"""
        n = len(x)
        term1 = (x[0] - 1)**2
        term2 = sum((i+2) * (2*x[i]**2 - x[i-1])**2 for i in range(1, n))
        return term1 + term2

12.2 基准测试框架

class BenchmarkSuite:
    """PSO基准测试套件"""
    
    FUNCTIONS = {
        'sphere': BenchmarkFunctions.sphere,
        'rosenbrock': BenchmarkFunctions.rosenbrock,
        'rastrigin': BenchmarkFunctions.rastrigin,
        'griewank': BenchmarkFunctions.griewank,
        'ackley': BenchmarkFunctions.ackley,
        'schwefel': BenchmarkFunctions.schwefel,
    }
    
    def __init__(self, dimensions=[10, 30, 50]):
        self.dimensions = dimensions
    
    def run_benchmark(self, optimizer_class, n_runs=30):
        """运行完整基准测试"""
        results = {}
        
        for func_name, func in self.FUNCTIONS.items():
            results[func_name] = {}
            
            for dim in self.dimensions:
                runs = []
                
                for _ in range(n_runs):
                    optimizer = optimizer_class(
                        func, dim, [(-100, 100)] * dim,
                        max_iter=1000
                    )
                    _, fitness = optimizer.run()
                    runs.append(fitness)
                
                results[func_name][dim] = {
                    'mean': np.mean(runs),
                    'std': np.std(runs),
                    'min': np.min(runs),
                    'max': np.max(runs),
                    'median': np.median(runs),
                }
        
        return results
    
    def generate_report(self, results, output_path='benchmark_report.md'):
        """生成基准测试报告"""
        report = "# PSO Benchmark Report\n\n"
        
        for func_name in self.FUNCTIONS:
            report += f"## {func_name}\n\n"
            report += "| Dim | Mean | Std | Min | Max |\n"
            report += "|-----|------|-----|-----|-----|\n"
            
            for dim, stats in results[func_name].items():
                report += f"| {dim} | {stats['mean']:.6e} | "
                report += f"{stats['std']:.6e} | {stats['min']:.6e} | "
                report += f"{stats['max']:.6e} |\n"
            
            report += "\n"
        
        with open(output_path, 'w') as f:
            f.write(report)
        
        return report

12.3 统计分析工具

class StatisticalAnalysis:
    """统计显著性分析"""
    
    @staticmethod
    def wilcoxon_signed_rank_test(algorithm1_results, algorithm2_results):
        """
        Wilcoxon符号秩检验
        配对样本非参数检验
        """
        from scipy import stats
        
        # 差值
        d = np.array(algorithm1_results) - np.array(algorithm2_results)
        
        # 排除零差
        d_nonzero = d[d != 0]
        
        if len(d_nonzero) == 0:
            return {'p_value': 1.0, 'significant': False}
        
        # 计算统计量
        stat, p_value = stats.wilcoxon(algorithm1_results, algorithm2_results)
        
        return {
            'statistic': stat,
            'p_value': p_value,
            'significant': p_value < 0.05,
            'effect_size': np.mean(d) / np.std(d),
        }
    
    @staticmethod
    def friedman_test(all_algorithms_results):
        """
        Friedman检验
        多算法比较
        """
        from scipy import stats
        
        n_problems = len(all_algorithms_results[0])
        k_algorithms = len(all_algorithms_results)
        
        # 计算每个问题的排名
        rankings = []
        for problem_idx in range(n_problems):
            fitnesses = [results[problem_idx] for results in all_algorithms_results]
            ranks = stats.rankdata(fitnesses)
            rankings.append(ranks)
        
        rankings = np.array(rankings)
        
        # Friedman统计量
        mean_ranks = np.mean(rankings, axis=0)
        
        chi2 = (12 * n_problems / (k_algorithms * (k_algorithms + 1))) * \
               np.sum(mean_ranks**2) - 3 * n_problems * (k_algorithms + 1)
        
        # 自由度为k-1
        p_value = 1 - stats.chi2.cdf(chi2, k_algorithms - 1)
        
        return {
            'chi2': chi2,
            'p_value': p_value,
            'mean_ranks': mean_ranks,
        }

十三、工程实践指南

13.1 参数调优策略

13.1.1 网格搜索

class ParameterGridSearch:
    """PSO参数网格搜索"""
    
    PARAM_GRID = {
        'w': [0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
        'c1': [1.0, 1.5, 2.0, 2.5],
        'c2': [1.0, 1.5, 2.0, 2.5],
        'n_particles': [20, 30, 50, 100],
    }
    
    def __init__(self, objective, dim, bounds):
        self.objective = objective
        self.dim = dim
        self.bounds = bounds
    
    def grid_search(self, n_eval_per_config=10):
        """网格搜索最优参数"""
        from itertools import product
        
        best_fitness = float('inf')
        best_params = None
        all_results = []
        
        param_combinations = product(
            self.PARAM_GRID['w'],
            self.PARAM_GRID['c1'],
            self.PARAM_GRID['c2'],
            self.PARAM_GRID['n_particles']
        )
        
        for w, c1, c2, n_particles in param_combinations:
            fitnesses = []
            
            for _ in range(n_eval_per_config):
                pso = ParticleSwarmOptimizer(
                    self.objective, self.dim, self.bounds,
                    w=w, c1=c1, c2=c2,
                    n_particles=n_particles
                )
                _, fitness = pso.run()
                fitnesses.append(fitness)
            
            mean_fitness = np.mean(fitnesses)
            all_results.append({
                'w': w, 'c1': c1, 'c2': c2, 
                'n_particles': n_particles,
                'mean_fitness': mean_fitness
            })
            
            if mean_fitness < best_fitness:
                best_fitness = mean_fitness
                best_params = {'w': w, 'c1': c1, 'c2': c2, 
                              'n_particles': n_particles}
        
        return best_params, all_results

13.1.2 自适应参数调整框架

class AdaptiveParameterController:
    """
    自适应参数控制器
    根据搜索进度动态调整参数
    """
    
    def __init__(self, initial_params):
        self.params = initial_params.copy()
        self.history = {
            'fitness': [],
            'diversity': [],
            'params': []
        }
    
    def calculate_diversity(self, positions):
        """计算种群多样性"""
        centroid = np.mean(positions, axis=0)
        distances = np.linalg.norm(positions - centroid, axis=1)
        return np.mean(distances)
    
    def adjust_parameters(self, generation, fitness, positions):
        """根据当前状态调整参数"""
        # 记录历史
        self.history['fitness'].append(fitness)
        self.history['diversity'].append(
            self.calculate_diversity(positions)
        )
        
        # 计算进度
        progress = generation / self.max_iter
        
        # 基于多样性的调整
        current_diversity = self.history['diversity'][-1]
        initial_diversity = self.history['diversity'][0]
        diversity_ratio = current_diversity / (initial_diversity + 1e-10)
        
        # 基于收敛的调整
        if len(self.history['fitness']) > 10:
            recent_improvement = self.history['fitness'][-10] - \
                               self.history['fitness'][-1]
        else:
            recent_improvement = 0
        
        # 自适应规则
        if diversity_ratio < 0.1:
            # 多样性过低,增加探索
            self.params['w'] = min(0.95, self.params['w'] * 1.1)
            self.params['c1'] = min(2.5, self.params['c1'] * 1.05)
            self.params['c2'] = max(0.5, self.params['c2'] * 0.95)
        elif recent_improvement < 1e-8:
            # 停滞,增加扰动
            self.params['w'] = min(0.9, self.params['w'] + 0.05)
        else:
            # 正常搜索
            self.params['w'] *= 0.999
        
        # 限制范围
        self.params['w'] = np.clip(self.params['w'], 0.1, 0.95)
        self.params['c1'] = np.clip(self.params['c1'], 0.5, 2.5)
        self.params['c2'] = np.clip(self.params['c2'], 0.5, 2.5)
        
        self.history['params'].append(self.params.copy())
        
        return self.params

13.2 早熟收敛检测

13.2.1 多样性度量

class PrematureConvergenceDetector:
    """早熟收敛检测器"""
    
    def __init__(self, threshold=1e-6, window_size=20):
        self.threshold = threshold
        self.window_size = window_size
        self.fitness_history = []
    
    def check_diversity(self, positions):
        """检查种群多样性"""
        # 方法1:位置标准差
        position_std = np.std(positions, axis=0)
        mean_std = np.mean(position_std)
        
        # 方法2:到质心的平均距离
        centroid = np.mean(positions, axis=0)
        distances = np.linalg.norm(positions - centroid, axis=1)
        mean_distance = np.mean(distances)
        
        # 方法3:唯一位置比例
        unique_ratio = len(np.unique(positions.round(4), axis=0)) / len(positions)
        
        return {
            'position_std': mean_std,
            'mean_distance': mean_distance,
            'unique_ratio': unique_ratio,
            'is_converged': mean_std < self.threshold
        }
    
    def check_stagnation(self, current_fitness):
        """检查适应度停滞"""
        self.fitness_history.append(current_fitness)
        
        if len(self.fitness_history) < self.window_size:
            return {'stagnant': False}
        
        recent = self.fitness_history[-self.window_size:]
        
        # 检查是否所有值都相同
        if len(set(recent)) == 1:
            return {'stagnant': True, 'reason': 'identical_values'}
        
        # 检查变化是否足够小
        max_change = max(recent) - min(recent)
        if max_change < self.threshold:
            return {'stagnant': True, 'reason': 'minimal_change'}
        
        return {'stagnant': False}
    
    def detect_and_respond(self, positions, fitness):
        """检测并响应早熟"""
        diversity_info = self.check_diversity(positions)
        stagnation_info = self.check_stagnation(fitness)
        
        if diversity_info['is_converged'] or stagnation_info['stagnant']:
            return {
                'premature': True,
                'diversity': diversity_info,
                'stagnation': stagnation_info,
                'action': self.suggest_action()
            }
        
        return {'premature': False}
    
    def suggest_action(self):
        """建议的应对动作"""
        actions = []
        
        # 增加随机扰动
        actions.append('inject_random_particles')
        
        # 重新初始化部分粒子
        actions.append('reinitialize_subpopulation')
        
        # 增大惯性权重
        actions.append('increase_inertia')
        
        # 切换拓扑结构
        actions.append('change_topology')
        
        return actions

13.3 混合优化策略

13.3.1 PSO + 局部搜索

class HybridPSOLocalSearch(ParticleSwarmOptimizer):
    """
    PSO与局部搜索混合
    PSO负责全局探索,局部搜索负责精细调优
    """
    def __init__(self, *args, local_search_freq=50,
                 local_search_iters=20, **kwargs):
        super().__init__(*args, **kwargs)
        self.local_search_freq = local_search_freq
        self.local_search_iters = local_search_iters
    
    def local_search(self, position):
        """使用L-BFGS-B进行局部搜索"""
        from scipy.optimize import minimize
        
        result = minimize(
            self.objective,
            position,
            method='L-BFGS-B',
            options={'maxiter': self.local_search_iters}
        )
        
        return result.x, result.fun
    
    def step(self, iteration):
        # 标准的PSO更新
        self.update_velocities()
        self.update_positions()
        self.evaluate()
        
        # 周期性局部搜索
        if iteration % self.local_search_freq == 0:
            self.apply_local_search()
    
    def apply_local_search(self):
        """对全局最优应用局部搜索"""
        new_pos, new_fitness = self.local_search(self.gbest_position)
        
        if new_fitness < self.gbest_fitness:
            self.gbest_position = new_pos
            self.gbest_fitness = new_fitness
            
            # 更新一个粒子的位置
            worst_idx = np.argmax(self.fitness)
            self.positions[worst_idx] = new_pos
            self.fitness[worst_idx] = new_fitness
            self.pbest_positions[worst_idx] = new_pos
            self.pbest_fitness[worst_idx] = new_fitness

13.3.2 PSO + 差分进化

class HybridPSODE(ParticleSwarmOptimizer):
    """
    PSO与差分进化混合
    利用DE的变异策略增强PSO的探索能力
    """
    def __init__(self, *args, de_probability=0.3, **kwargs):
        super().__init__(*args, **kwargs)
        self.de_probability = de_probability
    
    def de_mutation(self, i):
        """差分进化变异"""
        NP = self.n_particles
        
        # 选择三个不同于i的个体
        candidates = [j for j in range(NP) if j != i]
        r1, r2, r3 = random.sample(candidates, 3)
        
        F = 0.5  # 缩放因子
        mutant = self.positions[r1] + F * (
            self.positions[r2] - self.positions[r3]
        )
        
        # 边界处理
        mutant = np.clip(mutant,
                        [b[0] for b in self.bounds],
                        [b[1] for b in self.bounds])
        
        return mutant
    
    def hybrid_step(self):
        """混合更新策略"""
        for i in range(self.n_particles):
            if random.random() < self.de_probability:
                # 使用DE变异
                mutant = self.de_mutation(i)
                
                # 评估
                fitness_mutant = self.objective(mutant)
                
                if fitness_mutant < self.fitness[i]:
                    self.positions[i] = mutant
                    self.fitness[i] = fitness_mutant
                    
                    if fitness_mutant < self.pbest_fitness[i]:
                        self.pbest_positions[i] = mutant
                        self.pbest_fitness[i] = fitness_mutant
                        
                        if fitness_mutant < self.gbest_fitness:
                            self.gbest_position = mutant
                            self.gbest_fitness = fitness_mutant
            else:
                # 使用标准PSO更新
                self.update_velocities()
                self.update_positions()
        
        self.evaluate()

十四、深度学习中的应用

14.1 超参数优化

class PSOHyperparameterOptimizer:
    """
    使用PSO优化神经网络超参数
    """
    
    def __init__(self, train_data, val_data, search_space):
        self.train_data = train_data
        self.val_data = val_data
        self.search_space = search_space
    
    def decode_particle(self, particle):
        """将粒子编码转换为超参数"""
        params = {}
        
        idx = 0
        for param_name, (low, high) in self.search_space.items():
            params[param_name] = low + particle[idx] * (high - low)
            idx += 1
        
        return params
    
    def evaluate_config(self, params):
        """评估超参数配置"""
        # 构建模型
        model = self.build_model(params)
        
        # 训练
        history = model.fit(
            self.train_data[0], self.train_data[1],
            validation_data=self.val_data,
            epochs=int(params.get('epochs', 10)),
            verbose=0
        )
        
        # 返回验证集损失
        return min(history.history['val_loss'])
    
    def build_model(self, params):
        """根据参数构建模型"""
        import tensorflow as tf
        from tensorflow import keras
        from tensorflow.keras import layers
        
        model = keras.Sequential([
            layers.Dense(int(params['hidden_units']),
                        activation=params.get('activation', 'relu'),
                        input_shape=self.train_data[0].shape[1:]),
            layers.Dropout(params.get('dropout', 0.5)),
            layers.Dense(int(params['hidden_units2']),
                        activation=params.get('activation', 'relu')),
            layers.Dropout(params.get('dropout', 0.5)),
            layers.Dense(10, activation='softmax')
        ])
        
        model.compile(
            optimizer=keras.optimizers.Adam(
                learning_rate=params.get('lr', 0.001)
            ),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def optimize(self, n_particles=30, max_iter=50):
        """运行超参数优化"""
        dim = len(self.search_space)
        bounds = [(0, 1)] * dim
        
        optimizer = ParticleSwarmOptimizer(
            objective=lambda x: self.evaluate_config(
                self.decode_particle(x)
            ),
            dim=dim,
            bounds=bounds,
            n_particles=n_particles
        )
        
        best_pos, best_fitness = optimizer.run()
        
        return {
            'best_params': self.decode_particle(best_pos),
            'best_val_loss': best_fitness
        }

14.2 神经网络结构搜索

class PSONeuralArchitectureSearch:
    """
    基于PSO的神经网络架构搜索
    """
    
    def __init__(self, input_dim, output_dim):
        self.input_dim = input_dim
        self.output_dim = output_dim
    
    def encode_architecture(self, particle):
        """编码粒子为网络架构"""
        n_layers = int(3 + particle[0] * 3)  # 3-6层
        architecture = []
        
        idx = 1
        for _ in range(n_layers):
            units = int(32 + particle[idx] * 256)
            activation = ['relu', 'tanh', 'sigmoid'][int(particle[idx+1] * 3)]
            use_bn = particle[idx+2] > 0.5
            use_dropout = particle[idx+3] > 0.5
            dropout_rate = particle[idx+4]
            
            layer_config = {
                'units': units,
                'activation': activation,
                'batch_norm': use_bn,
                'dropout': use_dropout,
                'dropout_rate': dropout_rate
            }
            architecture.append(layer_config)
            idx += 5
        
        return architecture
    
    def build_network(self, architecture):
        """根据架构构建网络"""
        import tensorflow as tf
        from tensorflow.keras import layers, Model
        
        inputs = tf.keras.Input(shape=(self.input_dim,))
        x = inputs
        
        for layer_config in architecture:
            x = layers.Dense(
                layer_config['units'],
                activation=layer_config['activation']
            )(x)
            
            if layer_config['batch_norm']:
                x = layers.BatchNormalization()(x)
            
            if layer_config['dropout']:
                x = layers.Dropout(layer_config['dropout_rate'])(x)
        
        outputs = layers.Dense(self.output_dim, activation='softmax')(x)
        
        return Model(inputs=inputs, outputs=outputs)
    
    def fitness_function(self, particle):
        """评估架构的适应度"""
        architecture = self.encode_architecture(particle)
        model = self.build_network(architecture)
        
        # 简化的评估(实际应用中需要完整的训练和验证)
        model.compile(
            optimizer='adam',
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
        
        # 训练少量epoch
        # 这里使用随机数据作为示例
        x_train = np.random.randn(1000, self.input_dim)
        y_train = np.random.randint(0, self.output_dim, 1000)
        
        model.fit(x_train, y_train, epochs=3, verbose=0)
        
        # 返回负的准确率(因为我们最小化)
        _, accuracy = model.evaluate(x_train, y_train, verbose=0)
        
        return -accuracy  # 最小化问题

十五、约束优化问题

15.1 约束处理技术

15.1.1 罚函数法

class ConstrainedPSO(ParticleSwarmOptimizer):
    """
    带约束的PSO
    使用罚函数法处理约束
    """
    
    def __init__(self, objective, constraints, *args,
                 penalty_coef=1000, **kwargs):
        super().__init__(objective, *args, **kwargs)
        self.constraints = constraints
        self.penalty_coef = penalty_coef
    
    def evaluate_constraints(self, x):
        """评估约束违反程度"""
        violations = {
            'inequality': [],
            'equality': []
        }
        
        for constraint in self.constraints.get('ineq', []):
            value = constraint(x)
            violations['inequality'].append(max(0, value))
        
        for constraint in self.constraints.get('eq', []):
            value = abs(constraint(x))
            violations['equality'].append(value)
        
        return violations
    
    def constrained_objective(self, x):
        """带惩罚的目标函数"""
        obj = self.objective(x)
        violations = self.evaluate_constraints(x)
        
        penalty = self.penalty_coef * (
            sum(violations['inequality']) +
            sum(violations['equality'])
        )
        
        return obj + penalty
    
    def is_feasible(self, x):
        """检查解是否可行"""
        violations = self.evaluate_constraints(x)
        total_violation = (
            sum(violations['inequality']) +
            sum(violations['equality'])
        )
        return total_violation < 1e-6

15.1.2 可行性规则

class FeasibilityRulePSO(ParticleSwarmOptimizer):
    """
    使用可行性规则处理约束
    优先选择可行解
    """
    
    def __init__(self, objective, constraints, *args, **kwargs):
        super().__init__(objective, *args, **kwargs)
        self.constraints = constraints
    
    def dominates(self, x1, x2):
        """判断x1是否支配x2(约束优化版)"""
        v1 = self.constraint_violation(x1)
        v2 = self.constraint_violation(x2)
        f1 = self.objective(x1)
        f2 = self.objective(x2)
        
        # x1可行且x2不可行
        if v1 == 0 and v2 > 0:
            return True
        
        # 都不可行,违反度小的支配违反度大的
        if v1 > 0 and v2 > 0:
            return v1 < v2
        
        # 都可行,目标值小的支配目标值大的
        if v1 == 0 and v2 == 0:
            return f1 < f2
        
        return False
    
    def constraint_violation(self, x):
        """计算总约束违反度"""
        violations = []
        
        for constraint in self.constraints.get('ineq', []):
            violations.append(max(0, constraint(x)))
        
        for constraint in self.constraints.get('eq', []):
            violations.append(abs(constraint(x)))
        
        return sum(violations)
    
    def selection_with_constraints(self):
        """基于可行性规则的选择"""
        # 更新pbest
        for i in range(self.n_particles):
            if self.dominates(self.positions[i], self.pbest_positions[i]):
                self.pbest_positions[i] = self.positions[i].copy()
                self.pbest_fitness[i] = self.fitness[i]
        
        # 更新gbest
        feasible_indices = [
            i for i in range(self.n_particles)
            if self.constraint_violation(self.positions[i]) == 0
        ]
        
        if feasible_indices:
            # 选择最优的可行解
            feasible_fitness = self.pbest_fitness[feasible_indices]
            best_idx = feasible_indices[np.argmin(feasible_fitness)]
            self.gbest_position = self.pbest_positions[best_idx].copy()
            self.gbest_fitness = self.pbest_fitness[best_idx]
        else:
            # 没有可行解,选择违反度最小的
            violations = [self.constraint_violation(self.pbest_positions[i])
                         for i in range(self.n_particles)]
            best_idx = np.argmin(violations)
            self.gbest_position = self.pbest_positions[best_idx].copy()
            self.gbest_fitness = violations[best_idx]

15.2 约束优化示例

class EngineeringConstraintExample:
    """工程约束优化示例"""
    
    @staticmethod
    def pressure_vessel_design(x):
        """
        压力容器设计问题
        最小化材料成本
        约束:壁厚、容量等
        """
        # 解编码:[Ts, Th, R, L]
        Ts, Th, R, L = x
        
        # 目标:成本最小化
        cost = 0.6224*Ts*R*L + 1.7781*Th*R**2 + 3.1661*Ts**2*L + 19.84*Ts**2*R
        
        # 约束
        g1 = -Ts + 0.0193*R  # 壁厚约束
        g2 = -Th + 0.00954*R
        g3 = -np.pi*R**2*L - (4/3)*np.pi*R**3 + 1296000  # 体积约束
        g4 = L + 2*R - 240  # 长度约束
        
        # 罚函数
        penalty = 0
        for g in [g1, g2, g3, g4]:
            if g < 0:
                penalty += -g * 1000
        
        return cost + penalty
    
    @staticmethod
    def spring_design(x):
        """
        弹簧设计问题
        最小化重量
        """
        # 解编码:[d, D, N]
        d, D, N = x
        
        # 目标
        weight = (N + 2) * D * d**2
        
        # 约束
        C = D / d  # 弹簧指数
        
        g1 = 1 - C**3 * N / 7178  # 剪切应力
        g2 = C - (140 + 0.1) / (120 + 0.3)  # 弹簧指数
        g3 = (4*C - C**2) / (4*C - 3*C**2 + C**3 - 0.00185) - 1/750  # 偏转
        g4 = d - 0.2  # 直径下限
        
        penalty = 0
        for g in [g1, g2, g3, g4]:
            if g < 0:
                penalty += -g * 1000
        
        return weight + penalty

十六、代码模板与最佳实践

16.1 通用PSO模板

"""
PSO通用优化模板
开箱即用,适用于大多数连续优化问题
"""
 
import numpy as np
import random
from typing import Callable, List, Tuple, Optional
from dataclasses import dataclass
 
 
@dataclass
class PSOConfig:
    """PSO配置参数"""
    n_particles: int = 30
    max_iter: int = 1000
    w: float = 0.729
    c1: float = 1.49445
    c2: float = 1.49445
    v_max_ratio: float = 0.2
    tol: float = 1e-8
    early_stop_patience: int = 50
 
 
class GenericPSO:
    """通用PSO优化器"""
    
    def __init__(self, objective: Callable,
                 bounds: List[Tuple[float, float]],
                 config: Optional[PSOConfig] = None):
        self.objective = objective
        self.bounds = bounds
        self.config = config or PSOConfig()
        
        self.dim = len(bounds)
        self._setup()
    
    def _setup(self):
        """初始化"""
        bounds_array = np.array(self.bounds)
        self.low_bounds = bounds_array[:, 0]
        self.high_bounds = bounds_array[:, 1]
        
        self.v_max = (self.high_bounds - self.low_bounds) * \
                     self.config.v_max_ratio
        
        # 初始化粒子
        self.positions = np.random.uniform(
            self.low_bounds, self.high_bounds,
            (self.config.n_particles, self.dim)
        )
        
        self.velocities = np.random.uniform(
            -self.v_max, self.v_max,
            (self.config.n_particles, self.dim)
        )
        
        self.fitness = np.array([
            self.objective(pos) for pos in self.positions
        ])
        
        self.pbest = self.positions.copy()
        self.pbest_fit = self.fitness.copy()
        
        best_idx = np.argmin(self.fitness)
        self.gbest = self.positions[best_idx].copy()
        self.gbest_fit = self.fitness[best_idx]
        
        self.history = [self.gbest_fit]
        self.no_improvement_count = 0
    
    def step(self):
        """执行一步迭代"""
        r1 = np.random.random((self.config.n_particles, self.dim))
        r2 = np.random.random((self.config.n_particles, self.dim))
        
        # 更新速度
        cognitive = self.config.c1 * r1 * (self.pbest - self.positions)
        social = self.config.c2 * r2 * (self.gbest - self.positions)
        
        self.velocities = self.config.w * self.velocities + cognitive + social
        
        # 限制速度
        self.velocities = np.clip(self.velocities, -self.v_max, self.v_max)
        
        # 更新位置
        self.positions += self.velocities
        self.positions = np.clip(self.positions, self.low_bounds, self.high_bounds)
        
        # 评估
        self.fitness = np.array([self.objective(pos) for pos in self.positions])
        
        # 更新pbest
        improved = self.fitness < self.pbest_fit
        self.pbest[improved] = self.positions[improved]
        self.pbest_fit[improved] = self.fitness[improved]
        
        # 更新gbest
        if self.fitness.min() < self.gbest_fit:
            self.gbest_fit = self.fitness.min()
            self.gbest = self.positions[np.argmin(self.fitness)].copy()
            self.no_improvement_count = 0
        else:
            self.no_improvement_count += 1
        
        self.history.append(self.gbest_fit)
    
    def run(self) -> Tuple[np.ndarray, float]:
        """运行优化"""
        for _ in range(self.config.max_iter):
            self.step()
            
            if self.gbest_fit < self.config.tol:
                break
            
            if self.no_improvement_count > self.config.early_stop_patience:
                break
        
        return self.gbest.copy(), self.gbest_fit
 
 
# 使用示例
if __name__ == "__main__":
    # 定义优化问题
    def sphere(x):
        return np.sum(x**2)
    
    # 设置边界
    bounds = [(-10, 10)] * 10
    
    # 创建优化器
    pso = GenericPSO(sphere, bounds)
    
    # 运行优化
    best_pos, best_fitness = pso.run()
    
    print(f"Best position: {best_pos}")
    print(f"Best fitness: {best_fitness}")

16.2 面向对象PSO框架

"""
面向对象的PSO框架
支持扩展和定制
"""
 
from abc import ABC, abstractmethod
from typing import List, Tuple, Callable
import numpy as np
 
 
class TopologyStrategy(ABC):
    """拓扑策略接口"""
    
    @abstractmethod
    def get_leader(self, fitness: np.ndarray, 
                   pbest_positions: np.ndarray,
                   positions: np.ndarray) -> np.ndarray:
        pass
 
 
class GlobalBestTopology(TopologyStrategy):
    """全局最优拓扑"""
    
    def get_leader(self, fitness: np.ndarray, 
                   pbest_positions: np.ndarray,
                   positions: np.ndarray) -> np.ndarray:
        best_idx = np.argmin(fitness)
        return pbest_positions[best_idx]
 
 
class RingTopology(TopologyStrategy):
    """环形拓扑"""
    
    def __init__(self, k: int = 2):
        self.k = k
    
    def get_leader(self, fitness: np.ndarray,
                   pbest_positions: np.ndarray,
                   positions: np.ndarray) -> np.ndarray:
        n = len(fitness)
        leaders = np.zeros_like(positions)
        
        for i in range(n):
            neighbors = [(i + j) % n for j in range(-self.k, self.k + 1)]
            neighbor_fitness = fitness[neighbors]
            leaders[i] = pbest_positions[neighbors[np.argmin(neighbor_fitness)]]
        
        return leaders
 
 
class VelocityUpdateStrategy(ABC):
    """速度更新策略接口"""
    
    @abstractmethod
    def update(self, velocities: np.ndarray,
               positions: np.ndarray,
               pbest: np.ndarray,
               leader: np.ndarray,
               params: dict) -> np.ndarray:
        pass
 
 
class StandardVelocityUpdate(VelocityUpdateStrategy):
    """标准速度更新"""
    
    def update(self, velocities: np.ndarray,
               positions: np.ndarray,
               pbest: np.ndarray,
               leader: np.ndarray,
               params: dict) -> np.ndarray:
        w = params.get('w', 0.729)
        c1 = params.get('c1', 1.49445)
        c2 = params.get('c2', 1.49445)
        
        r1 = np.random.random(positions.shape)
        r2 = np.random.random(positions.shape)
        
        return (w * velocities +
                c1 * r1 * (pbest - positions) +
                c2 * r2 * (leader - positions))
 
 
class CompletePSOFramework:
    """
    完整的PSO框架
    可配置的组件化设计
    """
    
    def __init__(self,
                 objective: Callable,
                 bounds: List[Tuple[float, float]],
                 topology: TopologyStrategy = None,
                 velocity_update: VelocityUpdateStrategy = None):
        self.objective = objective
        self.bounds = bounds
        self.dim = len(bounds)
        
        self.topology = topology or GlobalBestTopology()
        self.velocity_update = velocity_update or StandardVelocityUpdate()
        
        self.bounds_array = np.array(bounds)
        self.low_bounds = self.bounds_array[:, 0]
        self.high_bounds = self.bounds_array[:, 1]
        
        self.params = {'w': 0.729, 'c1': 1.49445, 'c2': 1.49445}
    
    def initialize(self, n_particles: int = 30):
        """初始化种群"""
        self.n_particles = n_particles
        
        self.positions = np.random.uniform(
            self.low_bounds, self.high_bounds,
            (n_particles, self.dim)
        )
        
        self.v_max = (self.high_bounds - self.low_bounds) * 0.2
        self.velocities = np.random.uniform(
            -self.v_max, self.v_max,
            (n_particles, self.dim)
        )
        
        self.fitness = np.array([self.objective(p) for p in self.positions])
        self.pbest = self.positions.copy()
        self.pbest_fit = self.fitness.copy()
    
    def step(self):
        """执行一步"""
        # 获取领导者
        leader = self.topology.get_leader(
            self.pbest_fit, self.pbest, self.positions
        )
        
        # 更新速度
        self.velocities = self.velocity_update.update(
            self.velocities, self.positions, self.pbest, leader, self.params
        )
        self.velocities = np.clip(self.velocities, -self.v_max, self.v_max)
        
        # 更新位置
        self.positions += self.velocities
        self.positions = np.clip(self.positions, self.low_bounds, self.high_bounds)
        
        # 评估
        self.fitness = np.array([self.objective(p) for p in self.positions])
        
        # 更新pbest
        improved = self.fitness < self.pbest_fit
        self.pbest[improved] = self.positions[improved]
        self.pbest_fit[improved] = self.fitness[improved]
    
    def run(self, max_iter: int = 1000):
        """运行优化"""
        for _ in range(max_iter):
            self.step()
        
        best_idx = np.argmin(self.pbest_fit)
        return self.pbest[best_idx], self.pbest_fit[best_idx]

十七、常见问题与解决方案

17.1 收敛问题

问题症状解决方案
早熟收敛适应度早期停滞增加惯性权重,使用lbest拓扑
收敛过慢大量迭代后仍不收敛减小惯性权重,增加学习因子
振荡粒子速度过大减小速度限制,增大c1, c2
发散适应度越来越大检查边界处理,增加阻尼

17.2 参数调优指南

17.2.1 问题导向的参数选择

class ProblemBasedParameterSelector:
    """基于问题特征选择参数"""
    
    @staticmethod
    def select_for_unimodal(objective_gradients=None):
        """单峰问题"""
        return {
            'w': 0.729,
            'c1': 1.49445,
            'c2': 1.49445,
            'topology': 'gbest'
        }
    
    @staticmethod
    def select_for_multimodal():
        """多峰问题"""
        return {
            'w': 0.5,  # 较低惯性以增加探索
            'c1': 1.5,
            'c2': 1.5,
            'topology': 'ring'
        }
    
    @staticmethod
    def select_for_high_dim(dim):
        """高维问题"""
        return {
            'w': 0.6,
            'c1': 1.5,
            'c2': 1.5,
            'n_particles': min(100, 10 * dim),
            'topology': 'von_neumann'
        }
    
    @staticmethod
    def select_for_noisy():
        """噪声环境"""
        return {
            'w': 0.4,  # 低惯性
            'c1': 2.0,  # 强调认知
            'c2': 0.5,
            'n_particles': 100,
            'topology': 'ring'
        }

17.3 调试清单

class PSODebugger:
    """PSO调试工具"""
    
    def __init__(self, optimizer):
        self.optimizer = optimizer
        self.debug_info = {
            'velocity_magnitude': [],
            'position_spread': [],
            'fitness_variance': [],
            'diversity': []
        }
    
    def collect_debug_info(self):
        """收集调试信息"""
        # 速度幅度
        vel_mag = np.mean(np.linalg.norm(self.optimizer.velocities, axis=1))
        self.debug_info['velocity_magnitude'].append(vel_mag)
        
        # 位置分散度
        spread = np.mean(np.std(self.optimizer.positions, axis=0))
        self.debug_info['position_spread'].append(spread)
        
        # 适应度方差
        fit_var = np.var(self.optimizer.fitness)
        self.debug_info['fitness_variance'].append(fit_var)
        
        # 多样性
        centroid = np.mean(self.optimizer.positions, axis=0)
        diversity = np.mean(np.linalg.norm(
            self.optimizer.positions - centroid, axis=1
        ))
        self.debug_info['diversity'].append(diversity)
    
    def diagnose_issues(self):
        """诊断问题"""
        issues = []
        
        vm = self.debug_info['velocity_magnitude']
        if vm and vm[-1] < 0.01:
            issues.append("Velocity too low - consider increasing w")
        if vm and vm[-1] > 10:
            issues.append("Velocity too high - consider decreasing w or v_max")
        
        div = self.debug_info['diversity']
        if div and div[-1] < 0.1:
            issues.append("Low diversity - risk of premature convergence")
        
        return issues
    
    def generate_report(self):
        """生成诊断报告"""
        return {
            'final_velocity_magnitude': self.debug_info['velocity_magnitude'][-1],
            'final_position_spread': self.debug_info['position_spread'][-1],
            'final_fitness_variance': self.debug_info['fitness_variance'][-1],
            'final_diversity': self.debug_info['diversity'][-1],
            'issues': self.diagnose_issues()
        }

十八、参考文献

  1. Kennedy, J., & Eberhart, R. (1995). Particle Swarm Optimization. Proceedings of IEEE International Conference on Neural Networks, 4, 1942-1948.
  2. Shi, Y., & Eberhart, R. C. (1998). A Modified Particle Swarm Optimizer. Proceedings of IEEE International Conference on Evolutionary Computation, 69-73.
  3. Clerc, M., & Kennedy, J. (2002). The Particle Swarm - Explosion, Stability, and Convergence in a Multidimensional Complex Space. IEEE Transactions on Evolutionary Computation, 6(1), 58-73.
  4. Kennedy, J., & Eberhart, R. C. (1997). A Discrete Binary Version of the Particle Swarm Algorithm. Proceedings of IEEE International Conference on Systems, Man, and Cybernetics, 4104-4108.
  5. Poli, R., Kennedy, J., & Blackwell, T. (2007). Particle Swarm Optimization: An Overview. Swarm Intelligence, 1(1), 33-57.
  6. Eberhart, R. C., & Shi, Y. (2001). Particle Swarm Optimization: Developments, Applications and Resources. Proceedings of the 2001 Congress on Evolutionary Computation, 1, 81-86.
  7. Van den Bergh, F., & Engelbrecht, A. P. (2004). A Cooperative Approach to Particle Swarm Optimization. IEEE Transactions on Evolutionary Computation, 8(3), 225-239.
  8. Mendes, R., Kennedy, J., & Neves, J. (2004). The Fully Informed Particle Swarm: Simpler, Maybe Better. IEEE Transactions on Evolutionary Computation, 8(3), 204-210.
  9. Zhan, Z. H., et al. (2009). Adaptive Particle Swarm Optimization. IEEE Transactions on Systems, Man, and Cybernetics, 39(6), 1362-1381.
  10. Liu, Q., et al. (2015). Order of Magnitude Improved Particle Swarm Optimizer. Information Sciences, 320, 73-94.

相关文档