组合优化应用实战

关键词速览

关键词解释
整数线性规划目标函数和约束都是线性的,变量是整数
ILP整数线性规划的缩写
调度优化安排任务/资源的时间和顺序
路径规划寻找最优路径的问题
A*算法带启发式评估的最优搜索
匈牙利算法二分图最大匹配
最大流网络中的最大流量
费用流最小成本的流量配送

摘要

组合优化是运筹学的核心领域,研究在离散空间中找到最优解的问题。从物流配送到员工排班,从电路设计到游戏AI,组合优化无处不在。本文从整数线性规划的基础出发,讲解调度优化、路径规划、图匹配、网络流等经典问题的建模和求解方法,最后通过丰富的代码实战展示如何用Python工具(PuLP、NetworkX)解决实际组合优化问题。


0. 组合优化:从现实问题到数学模型

0.1 什么是组合优化?

想象一下你要组织一场大型会议:

  • 有n个报告厅可以同时使用
  • 有m个报告需要安排
  • 每个报告时长不同,报告人间有时间冲突
  • 有的报告人只能参加特定时间段

你需要在这些约束下安排一个”最优”的会议日程。这就是组合优化的典型场景。

组合优化研究的是:在有限个离散选择中,找到满足所有约束的最优方案

0.2 为什么组合优化重要?

实际价值

  • 物流行业:配送路线优化可节省数十亿美元
  • 制造业:车间调度直接影响产能
  • 航空:航班调度、机组排班优化利润
  • IT:云计算资源分配、数据库查询优化

理论价值

  • NP难问题集中地
  • 算法设计的试验田
  • 数学与工程的桥梁

0.3 组合优化的分类

组合优化
├── 路径与网络问题
│   ├── 最短路径
│   ├── TSP/VRP
│   └── 网络流
├── 资源配置问题
│   ├── 背包
│   ├── 装箱
│   └── 调度
├── 匹配与分配
│   ├── 二分图匹配
│   ├── 指派问题
│   └── 稳定匹配
└── 布局与安排
    ├── 设施选址
    ├── 图着色
    └── 排程问题

1. 整数线性规划:组合优化的瑞士军刀

1.1 ILP是什么?

整数线性规划(Integer Linear Programming)是组合优化的基石。简单说,就是在满足一组线性约束的前提下,最小化或最大化一个线性目标函数,其中变量要求是整数。

标准形式

min c^T x
s.t. Ax ≤ b
     x ≥ 0
     x ∈ Z^n  (整数约束)

为什么需要整数?

  • 你不能安排0.7个人
  • 你不能建造0.5架飞机
  • 你不能打开0或1以外的开关

1.2 ILP vs LP:松弛与分支

线性规划(LP):去掉整数约束,可以用单纯形法或内点法快速求解。

整数线性规划(ILP):加入了整数约束,是NP难的。

关系:ILP的最优解 ≥ LP松弛的最优解(对于最小化问题)。这个下界对于分支定界很重要。

求解方法

  1. 分支定界(Branch and Bound):递归划分解空间
  2. 割平面(Cutting Planes):添加有效不等式
  3. 分支切割(Branch and Cut):结合以上两者
  4. 启发式:快速得到可行解

1.3 PuLP:Python中的ILP求解器

PuLP是一个Python建模接口,支持多种求解器(CBC、GLPK、Gurobi、CPLEX)。

# PuLP基本用法
from pulp import *
 
# 创建问题
prob = LpProblem("MyProblem", LpMinimize)
 
# 决策变量
x = LpVariable("x", lowBound=0, cat='Integer')
y = LpVariable("y", lowBound=0, cat='Binary')
 
# 目标函数
prob += 2*x + 3*y
 
# 约束
prob += x + y <= 10
prob += 2*x - y >= 3
 
# 求解
status = prob.solve()
 
# 输出结果
print(f"状态: {LpStatus[status]}")
print(f"最优值: {value(prob.objective)}")
print(f"x = {value(x)}, y = {value(y)}")

2. 调度优化:让一切有条不紊

2.1 调度问题的分类

调度问题(Scheduling)研究如何安排任务在资源上的执行顺序。

按资源类型分类

  • 单机调度:所有任务在一个处理器上
  • 并行机调度:m个相同机器同时处理任务
  • 车间调度:不同机器有不同处理顺序

按目标分类

  • 最大完工时间(Cmax):最后一个任务何时完成
  • 总完工时间:所有任务完工时间之和
  • 延迟惩罚:超过截止时间的惩罚

2.2 单机调度:EDD规则

问题:n个任务,每个任务有处理时间pᵢ和截止时间dᵢ。最小化最大延迟。

EDD(Earliest Due Date):按截止时间排序,先做最紧急的任务。

定理:EDD算法给出最优解!

2.3 Python实现单机调度

import random
from typing import List, Tuple
import matplotlib.pyplot as plt
 
class SingleMachineScheduling:
    """单机调度问题"""
    
    def __init__(self, processing_times: List[int], 
                 due_dates: List[int]):
        self.p = processing_times
        self.d = due_dates
        self.n = len(processing_times)
    
    def edd_schedule(self) -> Tuple[List[int], int]:
        """
        EDD算法(最早截止日期优先)
        返回:(调度顺序, 最大延迟)
        """
        # 按截止时间排序
        order = sorted(range(self.n), key=lambda i: self.d[i])
        
        # 计算调度结果
        completion = 0
        max_lateness = 0
        
        for i in order:
            completion += self.p[i]
            lateness = completion - self.d[i]
            max_lateness = max(max_lateness, lateness)
        
        return order, max_lateness
    
    def schedule_details(self, order: List[int]) -> dict:
        """详细调度信息"""
        completion = 0
        details = []
        
        for i in order:
            start = completion
            completion += self.p[i]
            details.append({
                'task': i,
                'start': start,
                'completion': completion,
                'due': self.d[i],
                'lateness': completion - self.d[i]
            })
        
        return details
    
    def visualize(self, order: List[int], save_path: str = 'schedule.png'):
        """可视化调度"""
        details = self.schedule_details(order)
        
        fig, ax = plt.subplots(figsize=(12, 6))
        
        y_pos = 0
        for detail in details:
            # 绘制任务条
            ax.barh(y_pos, detail['completion'] - detail['start'], 
                   left=detail['start'], height=0.5,
                   color='steelblue', edgecolor='black')
            
            # 标注
            mid = (detail['start'] + detail['completion']) / 2
            ax.text(mid, y_pos, f'T{detail["task"]}', 
                   ha='center', va='center', color='white', fontsize=10)
            
            # 截止时间线
            ax.axvline(x=detail['due'], ymin=y_pos/len(details), 
                      ymax=(y_pos+0.5)/len(details),
                      color='red', linestyle='--', linewidth=1)
            
            y_pos += 1
        
        ax.set_xlabel('时间')
        ax.set_ylabel('任务')
        ax.set_yticks(range(len(order)))
        ax.set_yticklabels([f'T{i}' for i in order])
        ax.set_title('单机制调度 (EDD)')
        ax.grid(True, axis='x', alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(save_path, dpi=150)
        print(f"调度图已保存到 {save_path}")
 
 
def generate_scheduling_instance(n: int, seed: int = 42) -> Tuple[List[int], List[int]]:
    """生成随机调度实例"""
    random.seed(seed)
    processing_times = [random.randint(1, 10) for _ in range(n)]
    
    # 截止时间随机生成
    due_dates = []
    cumulative = 0
    for p in processing_times:
        cumulative += p
        due_dates.append(cumulative + random.randint(-5, 10))
    
    return processing_times, due_dates
 
 
if __name__ == "__main__":
    print("=" * 60)
    print("单机制调度问题 - EDD算法")
    print("=" * 60)
    
    # 测试
    n = 8
    p, d = generate_scheduling_instance(n)
    
    print("\n任务数据:")
    print(f"{'任务':<6}{'处理时间':<10}{'截止时间':<10}")
    for i in range(n):
        print(f"T{i:<5}{p[i]:<10}{d[i]:<10}")
    
    scheduler = SingleMachineScheduling(p, d)
    order, max_lateness = scheduler.edd_schedule()
    
    print(f"\n最优调度顺序: {' -> '.join([f'T{i}' for i in order])}")
    print(f"最大延迟: {max_lateness}")
    
    # 详细结果
    details = scheduler.schedule_details(order)
    print("\n详细调度:")
    print(f"{'任务':<6}{'开始':<8}{'完成':<8}{'截止':<8}{'延迟':<8}")
    for detail in details:
        print(f"T{detail['task']:<5}{detail['start']:<8}{detail['completion']:<8}"
              f"{detail['due']:<8}{detail['lateness']:<8}")
    
    # 可视化
    scheduler.visualize(order)

2.4 并行机调度:P||Cmax

问题:m台相同机器,n个任务,每个任务只能在一台机器上处理,最小化最大完工时间。

LPT规则(Longest Processing Time first)

  1. 按处理时间降序排列任务
  2. 依次分配到当前负载最小的机器

近似比:LPT给出(4/3 - 1/(3m))-近似。

class ParallelMachineScheduling:
    """并行机调度问题"""
    
    def __init__(self, processing_times: List[int], num_machines: int):
        self.p = processing_times
        self.m = num_machines
        self.n = len(processing_times)
    
    def lpt_schedule(self) -> Tuple[int, List[List[int]]]:
        """
        LPT算法(最长处理时间优先)
        返回:(最大完工时间, 每台机器的任务分配)
        """
        # 排序
        sorted_tasks = sorted(range(self.n), key=lambda i: self.p[i], reverse=True)
        
        # 机器负载
        machine_loads = [0] * self.m
        machine_schedules = [[] for _ in range(self.m)]
        
        for task in sorted_tasks:
            # 选择负载最小的机器
            min_machine = min(range(self.m), key=lambda i: machine_loads[i])
            machine_schedules[min_machine].append(task)
            machine_loads[min_machine] += self.p[task]
        
        makespan = max(machine_loads)
        return makespan, machine_schedules
    
    def visualize(self, schedule: List[List[int]], save_path: str = 'parallel_schedule.png'):
        """可视化并行调度"""
        fig, ax = plt.subplots(figsize=(14, 6))
        
        colors = plt.cm.Set3(range(len(schedule)))
        
        for machine_idx, tasks in enumerate(schedule):
            current_time = 0
            for task in tasks:
                ax.barh(machine_idx, self.p[task], left=current_time, 
                       height=0.6, color=colors[task % len(colors)],
                       edgecolor='black')
                
                mid = current_time + self.p[task] / 2
                ax.text(mid, machine_idx, f'T{task}', 
                       ha='center', va='center', fontsize=9)
                
                current_time += self.p[task]
        
        ax.set_xlabel('时间')
        ax.set_ylabel('机器')
        ax.set_yticks(range(self.m))
        ax.set_yticklabels([f'M{i}' for i in range(self.m)])
        ax.set_title(f'并行机调度 (m={self.m}, Cmax={sum(self.p)})')
        ax.grid(True, axis='x', alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(save_path, dpi=150)
 
 
if __name__ == "__main__":
    # 并行机调度测试
    n = 15
    m = 3
    tasks = [random.randint(1, 20) for _ in range(n)]
    
    print(f"{n}个任务, {m}台机器")
    print(f"总处理时间: {sum(tasks)}")
    print(f"理论下界: {max(tasks)} (单机) 或 {sum(tasks)/m:.1f} (并行)")
    
    scheduler = ParallelMachineScheduling(tasks, m)
    makespan, schedule = scheduler.lpt_schedule()
    
    print(f"LPT结果:")
    print(f"  最大完工时间: {makespan}")
    print(f"  各机器负载: {[sum(tasks[t] for t in m_tasks) for m_tasks in schedule]}")
    
    scheduler.visualize(schedule)

2.5 车间调度:Job Shop Problem

问题:有m台机器,每个作业需要按特定顺序在若干台机器上处理。最优化makespan。

这是经典的NP难问题,通常用启发式或元启发式求解。

class JobShopScheduling:
    """Job Shop调度问题(简化版:遗传算法)"""
    
    def __init__(self, jobs: List[List[Tuple[int, int]]]):
        """
        jobs: 每个作业的(机器, 处理时间)序列
        例如: [[(0, 3), (1, 2)], [(1, 4), (0, 2)]]
        """
        self.jobs = jobs
        self.n = len(jobs)
        self.m = max(max(m for m, _ in job) for job in jobs) + 1
    
    def evaluate(self, schedule: List[int]) -> int:
        """
        评估一个调度方案(作业顺序)
        schedule: 每个时间步执行的作业索引
        """
        # 记录每台机器和每个作业的完成时间
        machine_available = [0] * self.m
        job_next_op = [0] * self.n  # 每个作业下一步操作的索引
        job_available = [0] * self.n
        
        current_time = 0
        time_point = 0
        
        for job in schedule:
            if job_next_op[job] >= len(self.jobs[job]):
                continue
            
            machine, duration = self.jobs[job][job_next_op[job]]
            start = max(current_time, machine_available[machine], job_available[job])
            finish = start + duration
            
            machine_available[machine] = finish
            job_available[job] = finish
            job_next_op[job] += 1
            current_time = finish
        
        return max(machine_available)
    
    def random_schedule(self) -> List[int]:
        """生成随机调度"""
        return random.sample(range(self.n), self.n)
    
    def priority_schedule(self) -> List[int]:
        """用优先级规则生成调度"""
        # 每个作业的剩余操作数
        remaining = [len(job) for job in self.jobs]
        
        schedule = []
        available = set(range(self.n))
        
        while len(schedule) < self.n:
            # 选择剩余操作数最少的作业
            if available:
                job = min(available, key=lambda j: remaining[j])
                schedule.append(job)
                remaining[job] -= 1
                if remaining[job] == 0:
                    available.remove(job)
            else:
                break
        
        return schedule
 
 
if __name__ == "__main__":
    # Job Shop实例
    jobs = [
        [(0, 3), (1, 2), (2, 4)],  # 作业0: M0->3, M1->2, M2->4
        [(1, 2), (0, 3), (2, 1)],  # 作业1
        [(2, 4), (0, 2), (1, 3)],  # 作业2
    ]
    
    print("Job Shop调度问题:")
    for i, job in enumerate(jobs):
        print(f"  作业{i}: " + " -> ".join([f"M{m}({t})" for m, t in job]))
    
    scheduler = JobShopScheduling(jobs)
    
    # 测试随机调度
    random_sched = scheduler.random_schedule()
    makespan = scheduler.evaluate(random_sched)
    print(f"\n随机调度: {random_sched}, Makespan={makespan}")
    
    # 优先级调度
    priority_sched = scheduler.priority_schedule()
    makespan = scheduler.evaluate(priority_sched)
    print(f"优先级调度: {priority_sched}, Makespan={makespan}")

3. 路径规划:A*算法及其变体

3.1 路径规划问题概述

路径规划是机器人和游戏AI中的核心问题:在图中找到从起点到终点的最优路径

经典算法

  • Dijkstra:O((V+E)log V),无权图BFS
  • A*:利用启发式加速
  • D*:动态环境下的路径规划
  • RRT:基于采样的运动规划

3.2 A*算法原理

A*算法结合了Dijkstra的完备性和贪心最佳优先搜索的速度。

评估函数:f(n) = g(n) + h(n)

  • g(n):从起点到n的实际成本
  • h(n):从n到目标的启发式估计
  • f(n):经过n的总成本估计

关键性质

  • 如果h(n)是可采纳的(从不高估),A*能找到最优解
  • 如果h(n)是一致的(满足三角不等式),A*效率更高

3.3 Python实现A*算法

import heapq
import math
from typing import List, Tuple, Optional, Dict, Callable
import random
 
class AStar:
    """A*路径规划算法"""
    
    def __init__(self, grid: List[List[int]] = None, 
                 width: int = 0, height: int = 0):
        """
        网格世界中的A*算法
        grid: 0=可通过, 1=障碍物
        """
        if grid:
            self.grid = grid
            self.height = len(grid)
            self.width = len(grid[0]) if self.height > 0 else 0
        else:
            self.width = width
            self.height = height
            self.grid = [[0] * width for _ in range(height)]
    
    def heuristic(self, a: Tuple[int, int], 
                  b: Tuple[int, int]) -> float:
        """欧几里得距离启发式"""
        return math.sqrt((a[0] - b[0])**2 + (a[1] - b[1])**2)
    
    def get_neighbors(self, pos: Tuple[int, int]) -> List[Tuple[int, int]]:
        """获取相邻格子(8方向)"""
        x, y = pos
        neighbors = []
        
        # 8个方向
        directions = [(-1, -1), (-1, 0), (-1, 1),
                     (0, -1),           (0, 1),
                     (1, -1),  (1, 0),  (1, 1)]
        
        for dx, dy in directions:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.height and 0 <= ny < self.width:
                if self.grid[nx][ny] == 0:  # 可通过
                    # 对角线移动代价更高
                    cost = 1.414 if dx != 0 and dy != 0 else 1
                    neighbors.append((nx, ny, cost))
        
        return neighbors
    
    def solve(self, start: Tuple[int, int], 
              goal: Tuple[int, int]) -> Tuple[Optional[List[Tuple[int, int]]], float]:
        """
        A*算法求解
        返回: (路径, 成本) 或 (None, inf)
        """
        # 优先队列: (f, g, counter, position, path)
        counter = 0
        open_set = [(self.heuristic(start, goal), 0, counter, start, [start])]
        heapq.heapify(open_set)
        
        # 记录已访问节点的最佳g值
        g_best: Dict[Tuple[int, int], float] = {start: 0}
        
        # 记录探索节点数
        nodes_explored = 0
        
        while open_set:
            f, g, _, current, path = heapq.heappop(open_set)
            nodes_explored += 1
            
            # 到达目标
            if current == goal:
                return path, g
            
            # 跳过已处理的节点
            if g > g_best.get(current, float('inf')):
                continue
            
            # 探索邻居
            for nx, ny, cost in self.get_neighbors(current):
                new_g = g + cost
                
                # 如果找到更好的路径
                if new_g < g_best.get((nx, ny), float('inf')):
                    g_best[(nx, ny)] = new_g
                    f = new_g + self.heuristic((nx, ny), goal)
                    counter += 1
                    heapq.heappush(open_set, 
                                  (f, new_g, counter, (nx, ny), path + [(nx, ny)]))
        
        return None, float('inf')
    
    def solve_with_callback(self, start: Tuple[int, int], 
                           goal: Tuple[int, int],
                           callback: Callable = None) -> Tuple[Optional[List[Tuple[int, int]]], float]:
        """带回调的A*(用于可视化)"""
        counter = 0
        open_set = [(self.heuristic(start, goal), 0, counter, start, [start])]
        heapq.heapify(open_set)
        
        g_best: Dict[Tuple[int, int], float] = {start: 0}
        closed_set = set()
        explored_order = []
        
        while open_set:
            f, g, _, current, path = heapq.heappop(open_set)
            
            if current in closed_set:
                continue
            
            closed_set.add(current)
            explored_order.append(current)
            
            if callback:
                callback(current, g_best.copy(), closed_set.copy(), explored_order)
            
            if current == goal:
                return path, g
            
            for nx, ny, cost in self.get_neighbors(current):
                if (nx, ny) in closed_set:
                    continue
                
                new_g = g + cost
                if new_g < g_best.get((nx, ny), float('inf')):
                    g_best[(nx, ny)] = new_g
                    f = new_g + self.heuristic((nx, ny), goal)
                    counter += 1
                    heapq.heappush(open_set, 
                                  (f, new_g, counter, (nx, ny), path + [(nx, ny)]))
        
        return None, float('inf')
 
 
class EightPuzzle:
    """八数码问题(8-Puzzle)"""
    
    def __init__(self, state: List[int] = None):
        self.size = 3
        if state:
            self.state = state
        else:
            # 默认已解决状态
            self.state = list(range(1, 9)) + [0]
    
    def find_zero(self) -> int:
        return self.state.index(0)
    
    def neighbors(self) -> List[List[int]]:
        """生成所有可能的移动"""
        zero_pos = self.find_zero()
        x, y = zero_pos // self.size, zero_pos % self.size
        
        moves = []
        directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        
        for dx, dy in directions:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.size and 0 <= ny < self.size:
                new_state = self.state.copy()
                new_zero_pos = nx * self.size + ny
                new_state[zero_pos], new_state[new_zero_pos] = \
                    new_state[new_zero_pos], new_state[zero_pos]
                moves.append(new_state)
        
        return moves
    
    def manhattan_distance(self, goal: List[int]) -> int:
        """曼哈顿距离启发式"""
        distance = 0
        for i, val in enumerate(self.state):
            if val != 0:
                goal_pos = goal.index(val)
                x1, y1 = i // self.size, i % self.size
                x2, y2 = goal_pos // self.size, goal_pos % self.size
                distance += abs(x1 - x2) + abs(y1 - y2)
        return distance
    
    def solve_a_star(self, goal: List[int] = None) -> Tuple[Optional[List[List[int]]], int]:
        """A*求解八数码"""
        if goal is None:
            goal = list(range(1, 9)) + [0]
        
        start_state = tuple(self.state)
        goal_state = tuple(goal)
        
        if start_state == goal_state:
            return [list(start_state)], 0
        
        # 优先队列: (f, g, counter, state, path)
        counter = 0
        h = self.manhattan_distance(goal)
        open_set = [(h, 0, counter, start_state, [list(start_state)])]
        heapq.heapify(open_set)
        
        g_best = {start_state: 0}
        
        while open_set:
            f, g, _, current, path = heapq.heappop(open_set)
            
            if current == goal_state:
                return path, g
            
            if g > g_best.get(current, float('inf')):
                continue
            
            # 扩展邻居
            current_list = list(current)
            zero_idx = current_list.index(0)
            x, y = zero_idx // 3, zero_idx % 3
            
            for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
                nx, ny = x + dx, y + dy
                if 0 <= nx < 3 and 0 <= ny < 3:
                    new_idx = nx * 3 + ny
                    new_state = list(current)
                    new_state[zero_idx], new_state[new_idx] = \
                        new_state[new_idx], new_state[zero_idx]
                    new_state_tuple = tuple(new_state)
                    
                    new_g = g + 1
                    if new_g < g_best.get(new_state_tuple, float('inf')):
                        g_best[new_state_tuple] = new_g
                        # 计算新的启发式值
                        val = new_state[new_idx]
                        goal_pos = goal.index(val) if val in goal else 0
                        new_h = abs(nx - goal_pos // 3) + abs(ny - goal_pos % 3)
                        # 使用简化的曼哈顿距离
                        new_h = sum(
                            abs((i // 3) - (goal.index(new_state[i]) // 3)) + 
                                abs((i % 3) - (goal.index(new_state[i]) % 3))
                            for i in range(9) if new_state[i] != 0
                        )
                        counter += 1
                        heapq.heappush(open_set, 
                                      (new_g + new_h, new_g, counter, 
                                       new_state_tuple, path + [new_state]))
        
        return None, -1
    
    def print_state(self, state: List[int] = None):
        """打印状态"""
        if state is None:
            state = self.state
        for i in range(3):
            row = []
            for j in range(3):
                val = state[i * 3 + j]
                row.append(str(val) if val != 0 else '.')
            print(' '.join(row))
        print()
 
 
def demo_a_star():
    """A*算法演示"""
    print("=" * 60)
    print("A*算法路径规划演示")
    print("=" * 60)
    
    # 创建带障碍的网格
    grid = [
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 1, 1, 1, 1, 1, 0, 1, 1, 0],
        [0, 0, 0, 0, 0, 1, 0, 1, 0, 0],
        [0, 1, 1, 1, 0, 1, 0, 1, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
        [0, 1, 0, 1, 1, 1, 1, 1, 1, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 1, 1, 1, 1, 1, 1, 0, 1, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
    ]
    
    astar = AStar(grid)
    start = (0, 0)
    goal = (9, 9)
    
    print(f"起点: {start}, 目标: {goal}")
    
    path, cost = astar.solve(start, goal)
    
    if path:
        print(f"找到路径! 成本: {cost:.2f}, 步数: {len(path)-1}")
        print(f"路径: {' -> '.join(map(str, path))}")
        
        # 可视化
        import matplotlib.pyplot as plt
        
        fig, ax = plt.subplots(figsize=(10, 10))
        
        # 绘制网格
        for i in range(len(grid)):
            for j in range(len(grid[0])):
                if grid[i][j] == 1:
                    ax.add_patch(plt.Rectangle((j, len(grid)-1-i), 1, 1, 
                                              color='black'))
                else:
                    ax.add_patch(plt.Rectangle((j, len(grid)-1-i), 1, 1, 
                                              color='white', edgecolor='gray'))
        
        # 绘制路径
        if path:
            path_x = [p[1] + 0.5 for p in path]
            path_y = [len(grid) - 1 - p[0] + 0.5 for p in path]
            ax.plot(path_x, path_y, 'b-', linewidth=2)
            ax.scatter(path_x, path_y, c='blue', s=50, zorder=5)
        
        # 标记起点和终点
        ax.scatter(start[1] + 0.5, len(grid) - 1 - start[0] + 0.5, 
                  c='green', s=200, marker='s', zorder=6, label='起点')
        ax.scatter(goal[1] + 0.5, len(grid) - 1 - goal[0] + 0.5, 
                  c='red', s=200, marker='*', zorder=6, label='终点')
        
        ax.set_xlim(0, len(grid[0]))
        ax.set_ylim(0, len(grid))
        ax.set_aspect('equal')
        ax.legend()
        plt.title('A*路径规划')
        plt.savefig('a_star_path.png', dpi=150)
        print("路径图已保存到 a_star_path.png")
    else:
        print("未找到路径")
 
 
def demo_eight_puzzle():
    """八数码问题演示"""
    print("\n" + "=" * 60)
    print("八数码问题 A*求解")
    print("=" * 60)
    
    # 随机打乱状态
    random.seed(42)
    state = list(range(1, 9)) + [0]
    random.shuffle(state)
    
    # 确保有解(检查逆序数)
    def count_inversions(s):
        return sum(1 for i in range(len(s)) 
                  for j in range(i+1, len(s)) 
                  if s[i] != 0 and s[j] != 0 and s[i] > s[j])
    
    # 如果逆序数为奇数,再交换一次
    if count_inversions(state) % 2 == 1:
        state[0], state[1] = state[1], state[0]
    
    print("初始状态:")
    puzzle = EightPuzzle(state)
    puzzle.print_state()
    
    goal = list(range(1, 9)) + [0]
    print("目标状态:")
    puzzle.print_state(goal)
    
    path, moves = puzzle.solve_a_star(goal)
    
    if path:
        print(f"找到解! 移动次数: {moves}")
        print("解路径:")
        for i, step in enumerate(path[:6]):
            print(f"步骤 {i}:")
            for j in range(3):
                row = []
                for k in range(3):
                    val = step[j * 3 + k]
                    row.append(str(val) if val != 0 else '.')
                print(' '.join(row))
            print()
        if len(path) > 6:
            print(f"... 共{len(path)}步")
    else:
        print("无解")
 
 
if __name__ == "__main__":
    demo_a_star()
    demo_eight_puzzle()

4. 图匹配:找到最佳配对

4.1 匹配问题基础

匹配(Matching):在图中选择边,使得每个顶点最多与一条选中的边关联。

完美匹配:每个顶点都被匹配。

二分图匹配:顶点可以分为两组,同组内无边相连。

4.2 匈牙利算法:最优指派

匈牙利算法(Hungarian Algorithm)用于求解二分图的最大权完美匹配。

时间复杂度:O(n³)

import numpy as np
 
class HungarianAlgorithm:
    """匈牙利算法求解最优指派问题"""
    
    def __init__(self, cost_matrix: np.ndarray):
        """
        cost_matrix: 成本矩阵 (n x m, n <= m)
        """
        self.cost = cost_matrix.astype(float)
        self.n, self.m = cost_matrix.shape
    
    def solve(self) -> Tuple[float, List[int]]:
        """
        求解最小成本指派
        返回: (最小成本, 指派方案)
        """
        n, m = self.n, self.m
        
        # 扩展为方阵
        size = max(n, m)
        cost = np.full((size, size), float('inf'))
        cost[:n, :m] = self.cost
        
        # 初始化u, v (对偶变量)
        u = np.zeros(size + 1)
        v = np.zeros(size + 1)
        p = np.zeros(size + 1, dtype=int)  # 匹配
        way = np.zeros(size + 1, dtype=int)
        
        for i in range(1, size + 1):
            p[0] = i
            j0 = 0
            minv = np.full(size + 1, float('inf'))
            used = np.full(size + 1, False)
            
            while p[j0] != 0:
                used[j0] = True
                i0 = p[j0]
                delta = float('inf')
                j1 = 0
                
                for j in range(1, size + 1):
                    if not used[j]:
                        cur = cost[i0-1, j-1] - u[i0] - v[j]
                        if cur < minv[j]:
                            minv[j] = cur
                            way[j] = j0
                        if minv[j] < delta:
                            delta = minv[j]
                            j1 = j
                
                for j in range(size + 1):
                    if used[j]:
                        u[p[j]] += delta
                        v[j] -= delta
                    else:
                        minv[j] -= delta
                
                j0 = j1
            
            # 回溯更新匹配
            while j0 != 0:
                j1 = way[j0]
                p[j0] = p[j1]
                j0 = j1
        
        # 提取结果
        assignment = [-1] * n
        for j in range(1, size + 1):
            if p[j] != 0 and p[j] <= n and j <= m:
                assignment[p[j] - 1] = j - 1
        
        total_cost = -v[0]
        return total_cost, assignment
 
 
def demo_hungarian():
    """匈牙利算法演示"""
    print("=" * 60)
    print("匈牙利算法 - 最优指派问题")
    print("=" * 60)
    
    # 工人-任务成本矩阵
    cost_matrix = np.array([
        [9, 2, 7, 8],
        [6, 4, 3, 7],
        [5, 8, 1, 8],
        [7, 6, 9, 4]
    ])
    
    print("成本矩阵:")
    print(cost_matrix)
    
    hungarian = HungarianAlgorithm(cost_matrix)
    min_cost, assignment = hungarian.solve()
    
    print(f"\n最优指派:")
    for worker, task in enumerate(assignment):
        print(f"  工人{worker} -> 任务{task} (成本: {cost_matrix[worker, task]})")
    print(f"总成本: {min_cost:.0f}")
 
 
if __name__ == "__main__":
    demo_hungarian()

5. 网络流:最大流与最小割

5.1 网络流问题

最大流问题:在网络中找到从源点到汇点的最大流量。

Ford-Fulkerson方法:不断寻找增广路径直到无法增广。

Edmonds-Karp算法:用BFS找增广路径,O(VE²)。

5.2 Python实现最大流

import networkx as nx
from typing import Dict, Tuple
 
class MaxFlowSolver:
    """最大流求解器"""
    
    def __init__(self, graph: nx.DiGraph):
        self.G = graph
    
    def ford_fulkerson(self, source: int, sink: int) -> Tuple[float, Dict]:
        """Ford-Fulkerson算法"""
        def bfs_find_path():
            """用BFS找增广路径"""
            visited = {source}
            parent = {source: None}
            queue = [source]
            
            while queue:
                u = queue.pop(0)
                if u == sink:
                    # 重建路径
                    path = []
                    node = sink
                    while node is not None:
                        path.append(node)
                        node = parent[node]
                    return path[::-1]
                
                for v in self.G.successors(u):
                    if v not in visited and self.G[u][v]['capacity'] > 0:
                        visited.add(v)
                        parent[v] = u
                        queue.append(v)
            
            return None
        
        max_flow = 0
        flow_dict = {u: {v: 0 for v in self.G.nodes()} 
                     for u in self.G.nodes()}
        
        while True:
            path = bfs_find_path()
            if path is None:
                break
            
            # 找路径上的最小残余容量
            path_flow = float('inf')
            for i in range(len(path) - 1):
                u, v = path[i], path[i+1]
                path_flow = min(path_flow, self.G[u][v]['capacity'])
            
            # 更新残余网络
            for i in range(len(path) - 1):
                u, v = path[i], path[i+1]
                self.G[u][v]['capacity'] -= path_flow
                self.G[v][u]['capacity'] += path_flow
                flow_dict[u][v] += path_flow
            
            max_flow += path_flow
        
        return max_flow, flow_dict
    
    def min_cut(self, source: int, sink: int) -> Tuple[float, Tuple[set, set]]:
        """计算最小割"""
        # 使用NetworkX的内置实现
        flow_value, flow_dict = nx.maximum_flow(self.G, source, sink)
        
        # BFS找源侧可达顶点
        reachable = set([source])
        queue = [source]
        while queue:
            u = queue.pop(0)
            for v in self.G.successors(u):
                if v not in reachable and self.G[u][v]['capacity'] > 0:
                    reachable.add(v)
                    queue.append(v)
        
        # 不可达的是汇侧
        unreachable = set(self.G.nodes()) - reachable
        
        return flow_value, (reachable, unreachable)
 
 
def demo_maxflow():
    """最大流问题演示"""
    print("=" * 60)
    print("最大流问题求解")
    print("=" * 60)
    
    # 创建网络流图
    G = nx.DiGraph()
    
    # 添加边 (起点, 终点, 容量)
    edges = [
        ('s', 'a', 10),
        ('s', 'b', 5),
        ('a', 'b', 15),
        ('a', 'c', 10),
        ('b', 'd', 15),
        ('c', 'd', 5),
        ('c', 't', 10),
        ('d', 't', 10),
    ]
    
    for u, v, cap in edges:
        G.add_edge(u, v, capacity=cap)
    
    print("网络结构:")
    print("源点(s) -> a(10) -> c(10) -> 汇点(t)")
    print("    ↓(5)↘(15)        ↘(5)")
    print("    b -----> d ----->")
    print("          (15)      (10)")
    print()
    
    solver = MaxFlowSolver(G.copy())
    max_flow, flow_dict = solver.ford_fulkerson('s', 't')
    
    print(f"最大流值: {max_flow}")
    print("\n各边的流量:")
    for u, v, data in G.edges(data=True):
        if flow_dict[u][v] > 0:
            print(f"  {u} -> {v}: {flow_dict[u][v]} / {data['capacity']}")
    
    # 计算最小割
    cut_value, (source_side, sink_side) = solver.min_cut('s', 't')
    print(f"\n最小割值: {cut_value}")
    print(f"源侧: {source_side}")
    print(f"汇侧: {sink_side}")
    
    # 使用NetworkX验证
    print("\n--- 使用NetworkX验证 ---")
    flow_value, flow_dict = nx.maximum_flow(G, 's', 't')
    print(f"NetworkX最大流: {flow_value}")
 
 
def demo_mincost_flow():
    """最小费用流问题演示"""
    print("\n" + "=" * 60)
    print("最小费用流问题")
    print("=" * 60)
    
    # 创建带费用的网络
    G = nx.DiGraph()
    
    # 添加边 (起点, 终点, 容量, 费用)
    edges = [
        ('s', 'a', 5, 2),
        ('s', 'b', 3, 1),
        ('a', 'b', 2, 1),
        ('a', 'c', 4, 3),
        ('b', 'c', 3, 2),
        ('b', 't', 4, 4),
        ('c', 't', 6, 1),
    ]
    
    for u, v, cap, cost in edges:
        G.add_edge(u, v, capacity=cap, cost=cost)
    
    # 设置供给和需求
    demand = {'s': -6, 't': 6, 'a': 0, 'b': 0, 'c': 0}
    nx.set_node_attributes(G, demand, 'demand')
    
    # 求解最小费用流
    flow_cost, flow_dict = nx.min_cost_flow_cost(G)
    
    print(f"最小总费用: {flow_cost}")
    print("\n最优流量:")
    for u, v in flow_dict:
        if flow_dict[u][v] > 0:
            print(f"  {u} -> {v}: {flow_dict[u][v]} (费用: {flow_dict[u][v] * G[u][v]['cost']})")
 
 
if __name__ == "__main__":
    demo_maxflow()
    demo_mincost_flow()

6. 实战:物流配送路径优化

class VehicleRoutingProblem:
    """车辆路径问题(VRP)简化版"""
    
    def __init__(self, locations: List[Tuple[float, float]], 
                 depot: int = 0,
                 vehicle_count: int = 3,
                 capacity: float = 100):
        self.locations = locations
        self.n = len(locations)
        self.depot = depot
        self.vehicle_count = vehicle_count
        self.capacity = capacity
        
        # 计算距离矩阵
        self.dist = np.zeros((self.n, self.n))
        for i in range(self.n):
            for j in range(self.n):
                self.dist[i][j] = np.sqrt(
                    (locations[i][0] - locations[j][0])**2 +
                    (locations[i][1] - locations[j][1])**2
                )
    
    def solve_nearest_neighbor(self, demands: List[float]) -> Dict:
        """最近邻贪心算法"""
        remaining = set(range(self.n))
        remaining.remove(self.depot)
        demands_remaining = demands.copy()
        
        routes = []
        total_distance = 0
        
        for _ in range(self.vehicle_count):
            if not remaining:
                break
            
            route = [self.depot]
            current = self.depot
            current_load = 0
            route_distance = 0
            
            while remaining:
                # 找最近的能满足需求的客户
                best = None
                best_dist = float('inf')
                
                for customer in remaining:
                    if (demands_remaining[customer] + current_load <= self.capacity and
                        self.dist[current][customer] < best_dist):
                        best = customer
                        best_dist = self.dist[current][customer]
                
                if best is None:
                    break
                
                route.append(best)
                current_load += demands_remaining[best]
                route_distance += best_dist
                remaining.remove(best)
                current = best
            
            route.append(self.depot)
            route_distance += self.dist[current][self.depot]
            
            routes.append(route)
            total_distance += route_distance
        
        return {
            'routes': routes,
            'total_distance': total_distance,
            'routes_count': len(routes)
        }
    
    def solve_savings_heuristic(self, demands: List[float]) -> Dict:
        """节约算法( Clarke-Wright Savings)"""
        # 计算节约值
        savings = []
        for i in range(self.n):
            for j in range(self.n):
                if i != j and i != self.depot and j != self.depot:
                    s = self.dist[i][self.depot] + self.dist[self.depot][j] - self.dist[i][j]
                    savings.append((s, i, j))
        
        # 按节约值降序排序
        savings.sort(reverse=True)
        
        # 初始化:每个客户一条路线
        routes = {i: [self.depot, i, self.depot] for i in range(self.n) 
                 if i != self.depot}
        route_endpoints = {i: (i, i) for i in routes}
        
        # 合并路线
        for s, i, j in savings:
            if i not in route_endpoints or j not in route_endpoints:
                continue
            
            route_i_end = route_endpoints[i]
            route_j_start = route_endpoints[j]
            
            # 检查是否可以合并
            # i在一条路线的末端,j在一条路线的始端
            if route_i_end[1] != i or route_j_start[0] != j:
                continue
            
            # 检查容量约束
            route_i = routes[i]
            route_j = routes[j]
            combined_demand = sum(demands[r] for r in route_i + route_j 
                                 if r != self.depot)
            
            if combined_demand > self.capacity:
                continue
            
            # 合并路线
            new_route = route_i[:-1] + route_j[1:]
            del routes[i], routes[j]
            routes[new_route[1]] = new_route
            
            # 更新端点
            del route_endpoints[i], route_endpoints[j]
            route_endpoints[new_route[1]] = (new_route[0], new_route[-1])
        
        # 计算总距离
        total_distance = 0
        result_routes = list(routes.values())
        for route in result_routes:
            dist = sum(self.dist[route[i]][route[i+1]] for i in range(len(route)-1))
            total_distance += dist
        
        return {
            'routes': result_routes,
            'total_distance': total_distance,
            'routes_count': len(result_routes)
        }
 
 
def demo_vrp():
    """VRP演示"""
    print("=" * 60)
    print("车辆路径问题(VRP)")
    print("=" * 60)
    
    # 生成随机位置
    np.random.seed(42)
    n_customers = 20
    locations = [(0, 0)] + list(zip(
        np.random.uniform(0, 100, n_customers),
        np.random.uniform(0, 100, n_customers)
    ))
    
    demands = [0] + list(np.random.uniform(5, 30, n_customers))
    
    print(f"客户数: {n_customers}")
    print(f"车辆数: 3")
    print(f"容量: 100")
    
    vrp = VehicleRoutingProblem(locations, depot=0, 
                               vehicle_count=3, capacity=100)
    
    # 最近邻算法
    result_nn = vrp.solve_nearest_neighbor(demands)
    print(f"\n最近邻算法:")
    print(f"  总距离: {result_nn['total_distance']:.2f}")
    print(f"  路线数: {result_nn['routes_count']}")
    
    # 节约算法
    result_savings = vrp.solve_savings_heuristic(demands)
    print(f"\n节约算法:")
    print(f"  总距离: {result_savings['total_distance']:.2f}")
    print(f"  路线数: {result_savings['routes_count']}")
    
    # 可视化
    fig, ax = plt.subplots(figsize=(12, 10))
    
    # 绘制客户点
    ax.scatter(locations[0][0], locations[0][1], 
              c='red', s=200, marker='s', label='仓库', zorder=5)
    
    for i, (x, y) in enumerate(locations[1:], 1):
        ax.scatter(x, y, c='blue', s=50, zorder=5)
        ax.annotate(str(i), (x, y), textcoords="offset points",
                   xytext=(5, 5), fontsize=8)
    
    # 绘制路线
    colors = ['green', 'orange', 'purple', 'brown', 'pink']
    for i, route in enumerate(result_savings['routes'][:5]):
        route_x = [locations[r][0] for r in route]
        route_y = [locations[r][1] for r in route]
        ax.plot(route_x, route_y, c=colors[i], linewidth=2, 
               marker='o', markersize=6, alpha=0.7)
    
    ax.set_xlabel('X坐标')
    ax.set_ylabel('Y坐标')
    ax.set_title('VRP求解结果(节约算法)')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    plt.savefig('vrp_solution.png', dpi=150)
    print("\n可视化已保存到 vrp_solution.png")
 
 
if __name__ == "__main__":
    demo_vrp()

7. 实战:用PuLP建模资源分配

class ResourceAllocation:
    """资源分配问题的ILP建模"""
    
    def __init__(self, n_projects: int, n_resources: int):
        self.n_projects = n_projects
        self.n_resources = n_resources
    
    def solve(self, project_values: List[float], 
              project_costs: List[List[float]],
              resource_capacities: List[float],
              budget: float) -> Dict:
        """
        求解资源分配问题
        
        最大化: sum(value_i * x_i)
        s.t.   sum(cost_ij * x_i) <= capacity_j  for all j
              sum(sum(cost_ij * x_i)) <= budget
              x_i in {0, 1}
        """
        if not HAS_PULP:
            print("需要安装PuLP: pip install pulp")
            return {}
        
        prob = LpProblem("ResourceAllocation", LpMaximize)
        
        # 决策变量:是否选择项目i
        x = [LpVariable(f"x{i}", cat='Binary') for i in range(self.n_projects)]
        
        # 目标函数
        prob += lpSum([project_values[i] * x[i] for i in range(self.n_projects)])
        
        # 资源容量约束
        for j in range(self.n_resources):
            prob += lpSum([project_costs[i][j] * x[i] 
                          for i in range(self.n_projects)]) <= resource_capacities[j]
        
        # 预算约束
        total_costs = [sum(project_costs[i][j] for j in range(self.n_resources))
                      for i in range(self.n_projects)]
        prob += lpSum([total_costs[i] * x[i] for i in range(self.n_projects)]) <= budget
        
        # 求解
        prob.solve()
        
        # 提取结果
        selected = [i for i in range(self.n_projects) if value(x[i]) > 0.5]
        total_value = sum(project_values[i] for i in selected)
        total_cost = sum(total_costs[i] for i in selected)
        
        return {
            'selected_projects': selected,
            'total_value': total_value,
            'total_cost': total_cost,
            'status': LpStatus[prob.status]
        }
 
 
def demo_resource_allocation():
    """资源分配演示"""
    print("=" * 60)
    print("资源分配问题 - ILP求解")
    print("=" * 60)
    
    n_projects = 10
    n_resources = 3
    
    np.random.seed(42)
    project_values = np.random.uniform(50, 200, n_projects).tolist()
    project_costs = np.random.uniform(10, 50, (n_projects, n_resources)).tolist()
    resource_capacities = [100, 80, 120]
    budget = 200
    
    print(f"项目数: {n_projects}")
    print(f"资源类型: {n_resources}")
    print(f"预算上限: {budget}")
    print()
    
    solver = ResourceAllocation(n_projects, n_resources)
    result = solver.solve(project_values, project_costs, 
                          resource_capacities, budget)
    
    if result:
        print(f"求解状态: {result['status']}")
        print(f"选中项目: {result['selected_projects']}")
        print(f"总价值: {result['total_value']:.2f}")
        print(f"总成本: {result['total_cost']:.2f}")
        
        print("\n项目详情:")
        print(f"{'项目':<8}{'价值':<12}{'成本':<20}{'选中':<8}")
        for i in range(n_projects):
            cost_str = ', '.join([f"R{j}:{project_costs[i][j]:.1f}" 
                                  for j in range(n_resources)])
            selected = '✓' if i in result['selected_projects'] else ''
            print(f"{i:<8}{project_values[i]:<12.1f}{cost_str:<20}{selected:<8}")
 
 
if __name__ == "__main__":
    demo_resource_allocation()

8. 总结与延伸

8.1 组合优化工具链

工具用途特点
PuLPPython建模轻量,易用
Gurobi商业求解器快速,强大
CPLEXIBM求解器企业级
OR-ToolsGoogle优化多种问题
NetworkX图算法可视化,快速
scipy.optimize科学计算基础优化

8.2 实际问题选择指南

问题类型
├── 小规模 (< 30变量)
│   └── ILP求解器(分支定界)
│
├── 中规模调度
│   ├── 单机 → EDD/LPT规则
│   ├── 并行机 → LPT + 局部搜索
│   └── Job Shop → 元启发式
│
├── 路径规划
│   ├── 静态 → A*
│   ├── 动态 → D* Lite
│   └── 连续空间 → RRT/RRT*
│
├── 网络流
│   └── NetworkX + max_flow
│
└── 大规模组合优化
    └── 元启发式(SA, GA, ACO)

8.3 实用建议

  1. 先建模再求解:把问题形式化是解决问题的第一步
  2. 利用问题结构:特殊结构可能让问题变简单
  3. 混合方法:精确算法找下界,启发式找可行解
  4. 可视化调试:画出解来发现问题
  5. 多次运行:随机算法需要多次试验

参考来源

  1. Bertsimas, D., & Tsitsiklis, J. (1997). Introduction to Linear Optimization.
  2. Schrijver, A. (2003). Combinatorial Optimization.
  3. Lavalle, S. M. (2006). Planning Algorithms.
  4. Papadimitriou, C. H., & Steiglitz, K. (1998). Combinatorial Optimization.

相关文档