组合优化应用实战
关键词速览
| 关键词 | 解释 |
|---|---|
| 整数线性规划 | 目标函数和约束都是线性的,变量是整数 |
| ILP | 整数线性规划的缩写 |
| 调度优化 | 安排任务/资源的时间和顺序 |
| 路径规划 | 寻找最优路径的问题 |
| A*算法 | 带启发式评估的最优搜索 |
| 匈牙利算法 | 二分图最大匹配 |
| 最大流 | 网络中的最大流量 |
| 费用流 | 最小成本的流量配送 |
摘要
组合优化是运筹学的核心领域,研究在离散空间中找到最优解的问题。从物流配送到员工排班,从电路设计到游戏AI,组合优化无处不在。本文从整数线性规划的基础出发,讲解调度优化、路径规划、图匹配、网络流等经典问题的建模和求解方法,最后通过丰富的代码实战展示如何用Python工具(PuLP、NetworkX)解决实际组合优化问题。
0. 组合优化:从现实问题到数学模型
0.1 什么是组合优化?
想象一下你要组织一场大型会议:
- 有n个报告厅可以同时使用
- 有m个报告需要安排
- 每个报告时长不同,报告人间有时间冲突
- 有的报告人只能参加特定时间段
你需要在这些约束下安排一个”最优”的会议日程。这就是组合优化的典型场景。
组合优化研究的是:在有限个离散选择中,找到满足所有约束的最优方案。
0.2 为什么组合优化重要?
实际价值:
- 物流行业:配送路线优化可节省数十亿美元
- 制造业:车间调度直接影响产能
- 航空:航班调度、机组排班优化利润
- IT:云计算资源分配、数据库查询优化
理论价值:
- NP难问题集中地
- 算法设计的试验田
- 数学与工程的桥梁
0.3 组合优化的分类
组合优化
├── 路径与网络问题
│ ├── 最短路径
│ ├── TSP/VRP
│ └── 网络流
├── 资源配置问题
│ ├── 背包
│ ├── 装箱
│ └── 调度
├── 匹配与分配
│ ├── 二分图匹配
│ ├── 指派问题
│ └── 稳定匹配
└── 布局与安排
├── 设施选址
├── 图着色
└── 排程问题
1. 整数线性规划:组合优化的瑞士军刀
1.1 ILP是什么?
整数线性规划(Integer Linear Programming)是组合优化的基石。简单说,就是在满足一组线性约束的前提下,最小化或最大化一个线性目标函数,其中变量要求是整数。
标准形式:
min c^T x
s.t. Ax ≤ b
x ≥ 0
x ∈ Z^n (整数约束)
为什么需要整数?
- 你不能安排0.7个人
- 你不能建造0.5架飞机
- 你不能打开0或1以外的开关
1.2 ILP vs LP:松弛与分支
线性规划(LP):去掉整数约束,可以用单纯形法或内点法快速求解。
整数线性规划(ILP):加入了整数约束,是NP难的。
关系:ILP的最优解 ≥ LP松弛的最优解(对于最小化问题)。这个下界对于分支定界很重要。
求解方法:
- 分支定界(Branch and Bound):递归划分解空间
- 割平面(Cutting Planes):添加有效不等式
- 分支切割(Branch and Cut):结合以上两者
- 启发式:快速得到可行解
1.3 PuLP:Python中的ILP求解器
PuLP是一个Python建模接口,支持多种求解器(CBC、GLPK、Gurobi、CPLEX)。
# PuLP基本用法
from pulp import *
# 创建问题
prob = LpProblem("MyProblem", LpMinimize)
# 决策变量
x = LpVariable("x", lowBound=0, cat='Integer')
y = LpVariable("y", lowBound=0, cat='Binary')
# 目标函数
prob += 2*x + 3*y
# 约束
prob += x + y <= 10
prob += 2*x - y >= 3
# 求解
status = prob.solve()
# 输出结果
print(f"状态: {LpStatus[status]}")
print(f"最优值: {value(prob.objective)}")
print(f"x = {value(x)}, y = {value(y)}")2. 调度优化:让一切有条不紊
2.1 调度问题的分类
调度问题(Scheduling)研究如何安排任务在资源上的执行顺序。
按资源类型分类:
- 单机调度:所有任务在一个处理器上
- 并行机调度:m个相同机器同时处理任务
- 车间调度:不同机器有不同处理顺序
按目标分类:
- 最大完工时间(Cmax):最后一个任务何时完成
- 总完工时间:所有任务完工时间之和
- 延迟惩罚:超过截止时间的惩罚
2.2 单机调度:EDD规则
问题:n个任务,每个任务有处理时间pᵢ和截止时间dᵢ。最小化最大延迟。
EDD(Earliest Due Date):按截止时间排序,先做最紧急的任务。
定理:EDD算法给出最优解!
2.3 Python实现单机调度
import random
from typing import List, Tuple
import matplotlib.pyplot as plt
class SingleMachineScheduling:
"""单机调度问题"""
def __init__(self, processing_times: List[int],
due_dates: List[int]):
self.p = processing_times
self.d = due_dates
self.n = len(processing_times)
def edd_schedule(self) -> Tuple[List[int], int]:
"""
EDD算法(最早截止日期优先)
返回:(调度顺序, 最大延迟)
"""
# 按截止时间排序
order = sorted(range(self.n), key=lambda i: self.d[i])
# 计算调度结果
completion = 0
max_lateness = 0
for i in order:
completion += self.p[i]
lateness = completion - self.d[i]
max_lateness = max(max_lateness, lateness)
return order, max_lateness
def schedule_details(self, order: List[int]) -> dict:
"""详细调度信息"""
completion = 0
details = []
for i in order:
start = completion
completion += self.p[i]
details.append({
'task': i,
'start': start,
'completion': completion,
'due': self.d[i],
'lateness': completion - self.d[i]
})
return details
def visualize(self, order: List[int], save_path: str = 'schedule.png'):
"""可视化调度"""
details = self.schedule_details(order)
fig, ax = plt.subplots(figsize=(12, 6))
y_pos = 0
for detail in details:
# 绘制任务条
ax.barh(y_pos, detail['completion'] - detail['start'],
left=detail['start'], height=0.5,
color='steelblue', edgecolor='black')
# 标注
mid = (detail['start'] + detail['completion']) / 2
ax.text(mid, y_pos, f'T{detail["task"]}',
ha='center', va='center', color='white', fontsize=10)
# 截止时间线
ax.axvline(x=detail['due'], ymin=y_pos/len(details),
ymax=(y_pos+0.5)/len(details),
color='red', linestyle='--', linewidth=1)
y_pos += 1
ax.set_xlabel('时间')
ax.set_ylabel('任务')
ax.set_yticks(range(len(order)))
ax.set_yticklabels([f'T{i}' for i in order])
ax.set_title('单机制调度 (EDD)')
ax.grid(True, axis='x', alpha=0.3)
plt.tight_layout()
plt.savefig(save_path, dpi=150)
print(f"调度图已保存到 {save_path}")
def generate_scheduling_instance(n: int, seed: int = 42) -> Tuple[List[int], List[int]]:
"""生成随机调度实例"""
random.seed(seed)
processing_times = [random.randint(1, 10) for _ in range(n)]
# 截止时间随机生成
due_dates = []
cumulative = 0
for p in processing_times:
cumulative += p
due_dates.append(cumulative + random.randint(-5, 10))
return processing_times, due_dates
if __name__ == "__main__":
print("=" * 60)
print("单机制调度问题 - EDD算法")
print("=" * 60)
# 测试
n = 8
p, d = generate_scheduling_instance(n)
print("\n任务数据:")
print(f"{'任务':<6}{'处理时间':<10}{'截止时间':<10}")
for i in range(n):
print(f"T{i:<5}{p[i]:<10}{d[i]:<10}")
scheduler = SingleMachineScheduling(p, d)
order, max_lateness = scheduler.edd_schedule()
print(f"\n最优调度顺序: {' -> '.join([f'T{i}' for i in order])}")
print(f"最大延迟: {max_lateness}")
# 详细结果
details = scheduler.schedule_details(order)
print("\n详细调度:")
print(f"{'任务':<6}{'开始':<8}{'完成':<8}{'截止':<8}{'延迟':<8}")
for detail in details:
print(f"T{detail['task']:<5}{detail['start']:<8}{detail['completion']:<8}"
f"{detail['due']:<8}{detail['lateness']:<8}")
# 可视化
scheduler.visualize(order)2.4 并行机调度:P||Cmax
问题:m台相同机器,n个任务,每个任务只能在一台机器上处理,最小化最大完工时间。
LPT规则(Longest Processing Time first):
- 按处理时间降序排列任务
- 依次分配到当前负载最小的机器
近似比:LPT给出(4/3 - 1/(3m))-近似。
class ParallelMachineScheduling:
"""并行机调度问题"""
def __init__(self, processing_times: List[int], num_machines: int):
self.p = processing_times
self.m = num_machines
self.n = len(processing_times)
def lpt_schedule(self) -> Tuple[int, List[List[int]]]:
"""
LPT算法(最长处理时间优先)
返回:(最大完工时间, 每台机器的任务分配)
"""
# 排序
sorted_tasks = sorted(range(self.n), key=lambda i: self.p[i], reverse=True)
# 机器负载
machine_loads = [0] * self.m
machine_schedules = [[] for _ in range(self.m)]
for task in sorted_tasks:
# 选择负载最小的机器
min_machine = min(range(self.m), key=lambda i: machine_loads[i])
machine_schedules[min_machine].append(task)
machine_loads[min_machine] += self.p[task]
makespan = max(machine_loads)
return makespan, machine_schedules
def visualize(self, schedule: List[List[int]], save_path: str = 'parallel_schedule.png'):
"""可视化并行调度"""
fig, ax = plt.subplots(figsize=(14, 6))
colors = plt.cm.Set3(range(len(schedule)))
for machine_idx, tasks in enumerate(schedule):
current_time = 0
for task in tasks:
ax.barh(machine_idx, self.p[task], left=current_time,
height=0.6, color=colors[task % len(colors)],
edgecolor='black')
mid = current_time + self.p[task] / 2
ax.text(mid, machine_idx, f'T{task}',
ha='center', va='center', fontsize=9)
current_time += self.p[task]
ax.set_xlabel('时间')
ax.set_ylabel('机器')
ax.set_yticks(range(self.m))
ax.set_yticklabels([f'M{i}' for i in range(self.m)])
ax.set_title(f'并行机调度 (m={self.m}, Cmax={sum(self.p)})')
ax.grid(True, axis='x', alpha=0.3)
plt.tight_layout()
plt.savefig(save_path, dpi=150)
if __name__ == "__main__":
# 并行机调度测试
n = 15
m = 3
tasks = [random.randint(1, 20) for _ in range(n)]
print(f"{n}个任务, {m}台机器")
print(f"总处理时间: {sum(tasks)}")
print(f"理论下界: {max(tasks)} (单机) 或 {sum(tasks)/m:.1f} (并行)")
scheduler = ParallelMachineScheduling(tasks, m)
makespan, schedule = scheduler.lpt_schedule()
print(f"LPT结果:")
print(f" 最大完工时间: {makespan}")
print(f" 各机器负载: {[sum(tasks[t] for t in m_tasks) for m_tasks in schedule]}")
scheduler.visualize(schedule)2.5 车间调度:Job Shop Problem
问题:有m台机器,每个作业需要按特定顺序在若干台机器上处理。最优化makespan。
这是经典的NP难问题,通常用启发式或元启发式求解。
class JobShopScheduling:
"""Job Shop调度问题(简化版:遗传算法)"""
def __init__(self, jobs: List[List[Tuple[int, int]]]):
"""
jobs: 每个作业的(机器, 处理时间)序列
例如: [[(0, 3), (1, 2)], [(1, 4), (0, 2)]]
"""
self.jobs = jobs
self.n = len(jobs)
self.m = max(max(m for m, _ in job) for job in jobs) + 1
def evaluate(self, schedule: List[int]) -> int:
"""
评估一个调度方案(作业顺序)
schedule: 每个时间步执行的作业索引
"""
# 记录每台机器和每个作业的完成时间
machine_available = [0] * self.m
job_next_op = [0] * self.n # 每个作业下一步操作的索引
job_available = [0] * self.n
current_time = 0
time_point = 0
for job in schedule:
if job_next_op[job] >= len(self.jobs[job]):
continue
machine, duration = self.jobs[job][job_next_op[job]]
start = max(current_time, machine_available[machine], job_available[job])
finish = start + duration
machine_available[machine] = finish
job_available[job] = finish
job_next_op[job] += 1
current_time = finish
return max(machine_available)
def random_schedule(self) -> List[int]:
"""生成随机调度"""
return random.sample(range(self.n), self.n)
def priority_schedule(self) -> List[int]:
"""用优先级规则生成调度"""
# 每个作业的剩余操作数
remaining = [len(job) for job in self.jobs]
schedule = []
available = set(range(self.n))
while len(schedule) < self.n:
# 选择剩余操作数最少的作业
if available:
job = min(available, key=lambda j: remaining[j])
schedule.append(job)
remaining[job] -= 1
if remaining[job] == 0:
available.remove(job)
else:
break
return schedule
if __name__ == "__main__":
# Job Shop实例
jobs = [
[(0, 3), (1, 2), (2, 4)], # 作业0: M0->3, M1->2, M2->4
[(1, 2), (0, 3), (2, 1)], # 作业1
[(2, 4), (0, 2), (1, 3)], # 作业2
]
print("Job Shop调度问题:")
for i, job in enumerate(jobs):
print(f" 作业{i}: " + " -> ".join([f"M{m}({t})" for m, t in job]))
scheduler = JobShopScheduling(jobs)
# 测试随机调度
random_sched = scheduler.random_schedule()
makespan = scheduler.evaluate(random_sched)
print(f"\n随机调度: {random_sched}, Makespan={makespan}")
# 优先级调度
priority_sched = scheduler.priority_schedule()
makespan = scheduler.evaluate(priority_sched)
print(f"优先级调度: {priority_sched}, Makespan={makespan}")3. 路径规划:A*算法及其变体
3.1 路径规划问题概述
路径规划是机器人和游戏AI中的核心问题:在图中找到从起点到终点的最优路径。
经典算法:
- Dijkstra:O((V+E)log V),无权图BFS
- A*:利用启发式加速
- D*:动态环境下的路径规划
- RRT:基于采样的运动规划
3.2 A*算法原理
A*算法结合了Dijkstra的完备性和贪心最佳优先搜索的速度。
评估函数:f(n) = g(n) + h(n)
- g(n):从起点到n的实际成本
- h(n):从n到目标的启发式估计
- f(n):经过n的总成本估计
关键性质:
- 如果h(n)是可采纳的(从不高估),A*能找到最优解
- 如果h(n)是一致的(满足三角不等式),A*效率更高
3.3 Python实现A*算法
import heapq
import math
from typing import List, Tuple, Optional, Dict, Callable
import random
class AStar:
"""A*路径规划算法"""
def __init__(self, grid: List[List[int]] = None,
width: int = 0, height: int = 0):
"""
网格世界中的A*算法
grid: 0=可通过, 1=障碍物
"""
if grid:
self.grid = grid
self.height = len(grid)
self.width = len(grid[0]) if self.height > 0 else 0
else:
self.width = width
self.height = height
self.grid = [[0] * width for _ in range(height)]
def heuristic(self, a: Tuple[int, int],
b: Tuple[int, int]) -> float:
"""欧几里得距离启发式"""
return math.sqrt((a[0] - b[0])**2 + (a[1] - b[1])**2)
def get_neighbors(self, pos: Tuple[int, int]) -> List[Tuple[int, int]]:
"""获取相邻格子(8方向)"""
x, y = pos
neighbors = []
# 8个方向
directions = [(-1, -1), (-1, 0), (-1, 1),
(0, -1), (0, 1),
(1, -1), (1, 0), (1, 1)]
for dx, dy in directions:
nx, ny = x + dx, y + dy
if 0 <= nx < self.height and 0 <= ny < self.width:
if self.grid[nx][ny] == 0: # 可通过
# 对角线移动代价更高
cost = 1.414 if dx != 0 and dy != 0 else 1
neighbors.append((nx, ny, cost))
return neighbors
def solve(self, start: Tuple[int, int],
goal: Tuple[int, int]) -> Tuple[Optional[List[Tuple[int, int]]], float]:
"""
A*算法求解
返回: (路径, 成本) 或 (None, inf)
"""
# 优先队列: (f, g, counter, position, path)
counter = 0
open_set = [(self.heuristic(start, goal), 0, counter, start, [start])]
heapq.heapify(open_set)
# 记录已访问节点的最佳g值
g_best: Dict[Tuple[int, int], float] = {start: 0}
# 记录探索节点数
nodes_explored = 0
while open_set:
f, g, _, current, path = heapq.heappop(open_set)
nodes_explored += 1
# 到达目标
if current == goal:
return path, g
# 跳过已处理的节点
if g > g_best.get(current, float('inf')):
continue
# 探索邻居
for nx, ny, cost in self.get_neighbors(current):
new_g = g + cost
# 如果找到更好的路径
if new_g < g_best.get((nx, ny), float('inf')):
g_best[(nx, ny)] = new_g
f = new_g + self.heuristic((nx, ny), goal)
counter += 1
heapq.heappush(open_set,
(f, new_g, counter, (nx, ny), path + [(nx, ny)]))
return None, float('inf')
def solve_with_callback(self, start: Tuple[int, int],
goal: Tuple[int, int],
callback: Callable = None) -> Tuple[Optional[List[Tuple[int, int]]], float]:
"""带回调的A*(用于可视化)"""
counter = 0
open_set = [(self.heuristic(start, goal), 0, counter, start, [start])]
heapq.heapify(open_set)
g_best: Dict[Tuple[int, int], float] = {start: 0}
closed_set = set()
explored_order = []
while open_set:
f, g, _, current, path = heapq.heappop(open_set)
if current in closed_set:
continue
closed_set.add(current)
explored_order.append(current)
if callback:
callback(current, g_best.copy(), closed_set.copy(), explored_order)
if current == goal:
return path, g
for nx, ny, cost in self.get_neighbors(current):
if (nx, ny) in closed_set:
continue
new_g = g + cost
if new_g < g_best.get((nx, ny), float('inf')):
g_best[(nx, ny)] = new_g
f = new_g + self.heuristic((nx, ny), goal)
counter += 1
heapq.heappush(open_set,
(f, new_g, counter, (nx, ny), path + [(nx, ny)]))
return None, float('inf')
class EightPuzzle:
"""八数码问题(8-Puzzle)"""
def __init__(self, state: List[int] = None):
self.size = 3
if state:
self.state = state
else:
# 默认已解决状态
self.state = list(range(1, 9)) + [0]
def find_zero(self) -> int:
return self.state.index(0)
def neighbors(self) -> List[List[int]]:
"""生成所有可能的移动"""
zero_pos = self.find_zero()
x, y = zero_pos // self.size, zero_pos % self.size
moves = []
directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
for dx, dy in directions:
nx, ny = x + dx, y + dy
if 0 <= nx < self.size and 0 <= ny < self.size:
new_state = self.state.copy()
new_zero_pos = nx * self.size + ny
new_state[zero_pos], new_state[new_zero_pos] = \
new_state[new_zero_pos], new_state[zero_pos]
moves.append(new_state)
return moves
def manhattan_distance(self, goal: List[int]) -> int:
"""曼哈顿距离启发式"""
distance = 0
for i, val in enumerate(self.state):
if val != 0:
goal_pos = goal.index(val)
x1, y1 = i // self.size, i % self.size
x2, y2 = goal_pos // self.size, goal_pos % self.size
distance += abs(x1 - x2) + abs(y1 - y2)
return distance
def solve_a_star(self, goal: List[int] = None) -> Tuple[Optional[List[List[int]]], int]:
"""A*求解八数码"""
if goal is None:
goal = list(range(1, 9)) + [0]
start_state = tuple(self.state)
goal_state = tuple(goal)
if start_state == goal_state:
return [list(start_state)], 0
# 优先队列: (f, g, counter, state, path)
counter = 0
h = self.manhattan_distance(goal)
open_set = [(h, 0, counter, start_state, [list(start_state)])]
heapq.heapify(open_set)
g_best = {start_state: 0}
while open_set:
f, g, _, current, path = heapq.heappop(open_set)
if current == goal_state:
return path, g
if g > g_best.get(current, float('inf')):
continue
# 扩展邻居
current_list = list(current)
zero_idx = current_list.index(0)
x, y = zero_idx // 3, zero_idx % 3
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nx, ny = x + dx, y + dy
if 0 <= nx < 3 and 0 <= ny < 3:
new_idx = nx * 3 + ny
new_state = list(current)
new_state[zero_idx], new_state[new_idx] = \
new_state[new_idx], new_state[zero_idx]
new_state_tuple = tuple(new_state)
new_g = g + 1
if new_g < g_best.get(new_state_tuple, float('inf')):
g_best[new_state_tuple] = new_g
# 计算新的启发式值
val = new_state[new_idx]
goal_pos = goal.index(val) if val in goal else 0
new_h = abs(nx - goal_pos // 3) + abs(ny - goal_pos % 3)
# 使用简化的曼哈顿距离
new_h = sum(
abs((i // 3) - (goal.index(new_state[i]) // 3)) +
abs((i % 3) - (goal.index(new_state[i]) % 3))
for i in range(9) if new_state[i] != 0
)
counter += 1
heapq.heappush(open_set,
(new_g + new_h, new_g, counter,
new_state_tuple, path + [new_state]))
return None, -1
def print_state(self, state: List[int] = None):
"""打印状态"""
if state is None:
state = self.state
for i in range(3):
row = []
for j in range(3):
val = state[i * 3 + j]
row.append(str(val) if val != 0 else '.')
print(' '.join(row))
print()
def demo_a_star():
"""A*算法演示"""
print("=" * 60)
print("A*算法路径规划演示")
print("=" * 60)
# 创建带障碍的网格
grid = [
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 1, 1, 1, 1, 1, 0, 1, 1, 0],
[0, 0, 0, 0, 0, 1, 0, 1, 0, 0],
[0, 1, 1, 1, 0, 1, 0, 1, 0, 0],
[0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
[0, 1, 0, 1, 1, 1, 1, 1, 1, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 1, 1, 1, 1, 1, 1, 0, 1, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
]
astar = AStar(grid)
start = (0, 0)
goal = (9, 9)
print(f"起点: {start}, 目标: {goal}")
path, cost = astar.solve(start, goal)
if path:
print(f"找到路径! 成本: {cost:.2f}, 步数: {len(path)-1}")
print(f"路径: {' -> '.join(map(str, path))}")
# 可视化
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(10, 10))
# 绘制网格
for i in range(len(grid)):
for j in range(len(grid[0])):
if grid[i][j] == 1:
ax.add_patch(plt.Rectangle((j, len(grid)-1-i), 1, 1,
color='black'))
else:
ax.add_patch(plt.Rectangle((j, len(grid)-1-i), 1, 1,
color='white', edgecolor='gray'))
# 绘制路径
if path:
path_x = [p[1] + 0.5 for p in path]
path_y = [len(grid) - 1 - p[0] + 0.5 for p in path]
ax.plot(path_x, path_y, 'b-', linewidth=2)
ax.scatter(path_x, path_y, c='blue', s=50, zorder=5)
# 标记起点和终点
ax.scatter(start[1] + 0.5, len(grid) - 1 - start[0] + 0.5,
c='green', s=200, marker='s', zorder=6, label='起点')
ax.scatter(goal[1] + 0.5, len(grid) - 1 - goal[0] + 0.5,
c='red', s=200, marker='*', zorder=6, label='终点')
ax.set_xlim(0, len(grid[0]))
ax.set_ylim(0, len(grid))
ax.set_aspect('equal')
ax.legend()
plt.title('A*路径规划')
plt.savefig('a_star_path.png', dpi=150)
print("路径图已保存到 a_star_path.png")
else:
print("未找到路径")
def demo_eight_puzzle():
"""八数码问题演示"""
print("\n" + "=" * 60)
print("八数码问题 A*求解")
print("=" * 60)
# 随机打乱状态
random.seed(42)
state = list(range(1, 9)) + [0]
random.shuffle(state)
# 确保有解(检查逆序数)
def count_inversions(s):
return sum(1 for i in range(len(s))
for j in range(i+1, len(s))
if s[i] != 0 and s[j] != 0 and s[i] > s[j])
# 如果逆序数为奇数,再交换一次
if count_inversions(state) % 2 == 1:
state[0], state[1] = state[1], state[0]
print("初始状态:")
puzzle = EightPuzzle(state)
puzzle.print_state()
goal = list(range(1, 9)) + [0]
print("目标状态:")
puzzle.print_state(goal)
path, moves = puzzle.solve_a_star(goal)
if path:
print(f"找到解! 移动次数: {moves}")
print("解路径:")
for i, step in enumerate(path[:6]):
print(f"步骤 {i}:")
for j in range(3):
row = []
for k in range(3):
val = step[j * 3 + k]
row.append(str(val) if val != 0 else '.')
print(' '.join(row))
print()
if len(path) > 6:
print(f"... 共{len(path)}步")
else:
print("无解")
if __name__ == "__main__":
demo_a_star()
demo_eight_puzzle()4. 图匹配:找到最佳配对
4.1 匹配问题基础
匹配(Matching):在图中选择边,使得每个顶点最多与一条选中的边关联。
完美匹配:每个顶点都被匹配。
二分图匹配:顶点可以分为两组,同组内无边相连。
4.2 匈牙利算法:最优指派
匈牙利算法(Hungarian Algorithm)用于求解二分图的最大权完美匹配。
时间复杂度:O(n³)
import numpy as np
class HungarianAlgorithm:
"""匈牙利算法求解最优指派问题"""
def __init__(self, cost_matrix: np.ndarray):
"""
cost_matrix: 成本矩阵 (n x m, n <= m)
"""
self.cost = cost_matrix.astype(float)
self.n, self.m = cost_matrix.shape
def solve(self) -> Tuple[float, List[int]]:
"""
求解最小成本指派
返回: (最小成本, 指派方案)
"""
n, m = self.n, self.m
# 扩展为方阵
size = max(n, m)
cost = np.full((size, size), float('inf'))
cost[:n, :m] = self.cost
# 初始化u, v (对偶变量)
u = np.zeros(size + 1)
v = np.zeros(size + 1)
p = np.zeros(size + 1, dtype=int) # 匹配
way = np.zeros(size + 1, dtype=int)
for i in range(1, size + 1):
p[0] = i
j0 = 0
minv = np.full(size + 1, float('inf'))
used = np.full(size + 1, False)
while p[j0] != 0:
used[j0] = True
i0 = p[j0]
delta = float('inf')
j1 = 0
for j in range(1, size + 1):
if not used[j]:
cur = cost[i0-1, j-1] - u[i0] - v[j]
if cur < minv[j]:
minv[j] = cur
way[j] = j0
if minv[j] < delta:
delta = minv[j]
j1 = j
for j in range(size + 1):
if used[j]:
u[p[j]] += delta
v[j] -= delta
else:
minv[j] -= delta
j0 = j1
# 回溯更新匹配
while j0 != 0:
j1 = way[j0]
p[j0] = p[j1]
j0 = j1
# 提取结果
assignment = [-1] * n
for j in range(1, size + 1):
if p[j] != 0 and p[j] <= n and j <= m:
assignment[p[j] - 1] = j - 1
total_cost = -v[0]
return total_cost, assignment
def demo_hungarian():
"""匈牙利算法演示"""
print("=" * 60)
print("匈牙利算法 - 最优指派问题")
print("=" * 60)
# 工人-任务成本矩阵
cost_matrix = np.array([
[9, 2, 7, 8],
[6, 4, 3, 7],
[5, 8, 1, 8],
[7, 6, 9, 4]
])
print("成本矩阵:")
print(cost_matrix)
hungarian = HungarianAlgorithm(cost_matrix)
min_cost, assignment = hungarian.solve()
print(f"\n最优指派:")
for worker, task in enumerate(assignment):
print(f" 工人{worker} -> 任务{task} (成本: {cost_matrix[worker, task]})")
print(f"总成本: {min_cost:.0f}")
if __name__ == "__main__":
demo_hungarian()5. 网络流:最大流与最小割
5.1 网络流问题
最大流问题:在网络中找到从源点到汇点的最大流量。
Ford-Fulkerson方法:不断寻找增广路径直到无法增广。
Edmonds-Karp算法:用BFS找增广路径,O(VE²)。
5.2 Python实现最大流
import networkx as nx
from typing import Dict, Tuple
class MaxFlowSolver:
"""最大流求解器"""
def __init__(self, graph: nx.DiGraph):
self.G = graph
def ford_fulkerson(self, source: int, sink: int) -> Tuple[float, Dict]:
"""Ford-Fulkerson算法"""
def bfs_find_path():
"""用BFS找增广路径"""
visited = {source}
parent = {source: None}
queue = [source]
while queue:
u = queue.pop(0)
if u == sink:
# 重建路径
path = []
node = sink
while node is not None:
path.append(node)
node = parent[node]
return path[::-1]
for v in self.G.successors(u):
if v not in visited and self.G[u][v]['capacity'] > 0:
visited.add(v)
parent[v] = u
queue.append(v)
return None
max_flow = 0
flow_dict = {u: {v: 0 for v in self.G.nodes()}
for u in self.G.nodes()}
while True:
path = bfs_find_path()
if path is None:
break
# 找路径上的最小残余容量
path_flow = float('inf')
for i in range(len(path) - 1):
u, v = path[i], path[i+1]
path_flow = min(path_flow, self.G[u][v]['capacity'])
# 更新残余网络
for i in range(len(path) - 1):
u, v = path[i], path[i+1]
self.G[u][v]['capacity'] -= path_flow
self.G[v][u]['capacity'] += path_flow
flow_dict[u][v] += path_flow
max_flow += path_flow
return max_flow, flow_dict
def min_cut(self, source: int, sink: int) -> Tuple[float, Tuple[set, set]]:
"""计算最小割"""
# 使用NetworkX的内置实现
flow_value, flow_dict = nx.maximum_flow(self.G, source, sink)
# BFS找源侧可达顶点
reachable = set([source])
queue = [source]
while queue:
u = queue.pop(0)
for v in self.G.successors(u):
if v not in reachable and self.G[u][v]['capacity'] > 0:
reachable.add(v)
queue.append(v)
# 不可达的是汇侧
unreachable = set(self.G.nodes()) - reachable
return flow_value, (reachable, unreachable)
def demo_maxflow():
"""最大流问题演示"""
print("=" * 60)
print("最大流问题求解")
print("=" * 60)
# 创建网络流图
G = nx.DiGraph()
# 添加边 (起点, 终点, 容量)
edges = [
('s', 'a', 10),
('s', 'b', 5),
('a', 'b', 15),
('a', 'c', 10),
('b', 'd', 15),
('c', 'd', 5),
('c', 't', 10),
('d', 't', 10),
]
for u, v, cap in edges:
G.add_edge(u, v, capacity=cap)
print("网络结构:")
print("源点(s) -> a(10) -> c(10) -> 汇点(t)")
print(" ↓(5)↘(15) ↘(5)")
print(" b -----> d ----->")
print(" (15) (10)")
print()
solver = MaxFlowSolver(G.copy())
max_flow, flow_dict = solver.ford_fulkerson('s', 't')
print(f"最大流值: {max_flow}")
print("\n各边的流量:")
for u, v, data in G.edges(data=True):
if flow_dict[u][v] > 0:
print(f" {u} -> {v}: {flow_dict[u][v]} / {data['capacity']}")
# 计算最小割
cut_value, (source_side, sink_side) = solver.min_cut('s', 't')
print(f"\n最小割值: {cut_value}")
print(f"源侧: {source_side}")
print(f"汇侧: {sink_side}")
# 使用NetworkX验证
print("\n--- 使用NetworkX验证 ---")
flow_value, flow_dict = nx.maximum_flow(G, 's', 't')
print(f"NetworkX最大流: {flow_value}")
def demo_mincost_flow():
"""最小费用流问题演示"""
print("\n" + "=" * 60)
print("最小费用流问题")
print("=" * 60)
# 创建带费用的网络
G = nx.DiGraph()
# 添加边 (起点, 终点, 容量, 费用)
edges = [
('s', 'a', 5, 2),
('s', 'b', 3, 1),
('a', 'b', 2, 1),
('a', 'c', 4, 3),
('b', 'c', 3, 2),
('b', 't', 4, 4),
('c', 't', 6, 1),
]
for u, v, cap, cost in edges:
G.add_edge(u, v, capacity=cap, cost=cost)
# 设置供给和需求
demand = {'s': -6, 't': 6, 'a': 0, 'b': 0, 'c': 0}
nx.set_node_attributes(G, demand, 'demand')
# 求解最小费用流
flow_cost, flow_dict = nx.min_cost_flow_cost(G)
print(f"最小总费用: {flow_cost}")
print("\n最优流量:")
for u, v in flow_dict:
if flow_dict[u][v] > 0:
print(f" {u} -> {v}: {flow_dict[u][v]} (费用: {flow_dict[u][v] * G[u][v]['cost']})")
if __name__ == "__main__":
demo_maxflow()
demo_mincost_flow()6. 实战:物流配送路径优化
class VehicleRoutingProblem:
"""车辆路径问题(VRP)简化版"""
def __init__(self, locations: List[Tuple[float, float]],
depot: int = 0,
vehicle_count: int = 3,
capacity: float = 100):
self.locations = locations
self.n = len(locations)
self.depot = depot
self.vehicle_count = vehicle_count
self.capacity = capacity
# 计算距离矩阵
self.dist = np.zeros((self.n, self.n))
for i in range(self.n):
for j in range(self.n):
self.dist[i][j] = np.sqrt(
(locations[i][0] - locations[j][0])**2 +
(locations[i][1] - locations[j][1])**2
)
def solve_nearest_neighbor(self, demands: List[float]) -> Dict:
"""最近邻贪心算法"""
remaining = set(range(self.n))
remaining.remove(self.depot)
demands_remaining = demands.copy()
routes = []
total_distance = 0
for _ in range(self.vehicle_count):
if not remaining:
break
route = [self.depot]
current = self.depot
current_load = 0
route_distance = 0
while remaining:
# 找最近的能满足需求的客户
best = None
best_dist = float('inf')
for customer in remaining:
if (demands_remaining[customer] + current_load <= self.capacity and
self.dist[current][customer] < best_dist):
best = customer
best_dist = self.dist[current][customer]
if best is None:
break
route.append(best)
current_load += demands_remaining[best]
route_distance += best_dist
remaining.remove(best)
current = best
route.append(self.depot)
route_distance += self.dist[current][self.depot]
routes.append(route)
total_distance += route_distance
return {
'routes': routes,
'total_distance': total_distance,
'routes_count': len(routes)
}
def solve_savings_heuristic(self, demands: List[float]) -> Dict:
"""节约算法( Clarke-Wright Savings)"""
# 计算节约值
savings = []
for i in range(self.n):
for j in range(self.n):
if i != j and i != self.depot and j != self.depot:
s = self.dist[i][self.depot] + self.dist[self.depot][j] - self.dist[i][j]
savings.append((s, i, j))
# 按节约值降序排序
savings.sort(reverse=True)
# 初始化:每个客户一条路线
routes = {i: [self.depot, i, self.depot] for i in range(self.n)
if i != self.depot}
route_endpoints = {i: (i, i) for i in routes}
# 合并路线
for s, i, j in savings:
if i not in route_endpoints or j not in route_endpoints:
continue
route_i_end = route_endpoints[i]
route_j_start = route_endpoints[j]
# 检查是否可以合并
# i在一条路线的末端,j在一条路线的始端
if route_i_end[1] != i or route_j_start[0] != j:
continue
# 检查容量约束
route_i = routes[i]
route_j = routes[j]
combined_demand = sum(demands[r] for r in route_i + route_j
if r != self.depot)
if combined_demand > self.capacity:
continue
# 合并路线
new_route = route_i[:-1] + route_j[1:]
del routes[i], routes[j]
routes[new_route[1]] = new_route
# 更新端点
del route_endpoints[i], route_endpoints[j]
route_endpoints[new_route[1]] = (new_route[0], new_route[-1])
# 计算总距离
total_distance = 0
result_routes = list(routes.values())
for route in result_routes:
dist = sum(self.dist[route[i]][route[i+1]] for i in range(len(route)-1))
total_distance += dist
return {
'routes': result_routes,
'total_distance': total_distance,
'routes_count': len(result_routes)
}
def demo_vrp():
"""VRP演示"""
print("=" * 60)
print("车辆路径问题(VRP)")
print("=" * 60)
# 生成随机位置
np.random.seed(42)
n_customers = 20
locations = [(0, 0)] + list(zip(
np.random.uniform(0, 100, n_customers),
np.random.uniform(0, 100, n_customers)
))
demands = [0] + list(np.random.uniform(5, 30, n_customers))
print(f"客户数: {n_customers}")
print(f"车辆数: 3")
print(f"容量: 100")
vrp = VehicleRoutingProblem(locations, depot=0,
vehicle_count=3, capacity=100)
# 最近邻算法
result_nn = vrp.solve_nearest_neighbor(demands)
print(f"\n最近邻算法:")
print(f" 总距离: {result_nn['total_distance']:.2f}")
print(f" 路线数: {result_nn['routes_count']}")
# 节约算法
result_savings = vrp.solve_savings_heuristic(demands)
print(f"\n节约算法:")
print(f" 总距离: {result_savings['total_distance']:.2f}")
print(f" 路线数: {result_savings['routes_count']}")
# 可视化
fig, ax = plt.subplots(figsize=(12, 10))
# 绘制客户点
ax.scatter(locations[0][0], locations[0][1],
c='red', s=200, marker='s', label='仓库', zorder=5)
for i, (x, y) in enumerate(locations[1:], 1):
ax.scatter(x, y, c='blue', s=50, zorder=5)
ax.annotate(str(i), (x, y), textcoords="offset points",
xytext=(5, 5), fontsize=8)
# 绘制路线
colors = ['green', 'orange', 'purple', 'brown', 'pink']
for i, route in enumerate(result_savings['routes'][:5]):
route_x = [locations[r][0] for r in route]
route_y = [locations[r][1] for r in route]
ax.plot(route_x, route_y, c=colors[i], linewidth=2,
marker='o', markersize=6, alpha=0.7)
ax.set_xlabel('X坐标')
ax.set_ylabel('Y坐标')
ax.set_title('VRP求解结果(节约算法)')
ax.legend()
ax.grid(True, alpha=0.3)
plt.savefig('vrp_solution.png', dpi=150)
print("\n可视化已保存到 vrp_solution.png")
if __name__ == "__main__":
demo_vrp()7. 实战:用PuLP建模资源分配
class ResourceAllocation:
"""资源分配问题的ILP建模"""
def __init__(self, n_projects: int, n_resources: int):
self.n_projects = n_projects
self.n_resources = n_resources
def solve(self, project_values: List[float],
project_costs: List[List[float]],
resource_capacities: List[float],
budget: float) -> Dict:
"""
求解资源分配问题
最大化: sum(value_i * x_i)
s.t. sum(cost_ij * x_i) <= capacity_j for all j
sum(sum(cost_ij * x_i)) <= budget
x_i in {0, 1}
"""
if not HAS_PULP:
print("需要安装PuLP: pip install pulp")
return {}
prob = LpProblem("ResourceAllocation", LpMaximize)
# 决策变量:是否选择项目i
x = [LpVariable(f"x{i}", cat='Binary') for i in range(self.n_projects)]
# 目标函数
prob += lpSum([project_values[i] * x[i] for i in range(self.n_projects)])
# 资源容量约束
for j in range(self.n_resources):
prob += lpSum([project_costs[i][j] * x[i]
for i in range(self.n_projects)]) <= resource_capacities[j]
# 预算约束
total_costs = [sum(project_costs[i][j] for j in range(self.n_resources))
for i in range(self.n_projects)]
prob += lpSum([total_costs[i] * x[i] for i in range(self.n_projects)]) <= budget
# 求解
prob.solve()
# 提取结果
selected = [i for i in range(self.n_projects) if value(x[i]) > 0.5]
total_value = sum(project_values[i] for i in selected)
total_cost = sum(total_costs[i] for i in selected)
return {
'selected_projects': selected,
'total_value': total_value,
'total_cost': total_cost,
'status': LpStatus[prob.status]
}
def demo_resource_allocation():
"""资源分配演示"""
print("=" * 60)
print("资源分配问题 - ILP求解")
print("=" * 60)
n_projects = 10
n_resources = 3
np.random.seed(42)
project_values = np.random.uniform(50, 200, n_projects).tolist()
project_costs = np.random.uniform(10, 50, (n_projects, n_resources)).tolist()
resource_capacities = [100, 80, 120]
budget = 200
print(f"项目数: {n_projects}")
print(f"资源类型: {n_resources}")
print(f"预算上限: {budget}")
print()
solver = ResourceAllocation(n_projects, n_resources)
result = solver.solve(project_values, project_costs,
resource_capacities, budget)
if result:
print(f"求解状态: {result['status']}")
print(f"选中项目: {result['selected_projects']}")
print(f"总价值: {result['total_value']:.2f}")
print(f"总成本: {result['total_cost']:.2f}")
print("\n项目详情:")
print(f"{'项目':<8}{'价值':<12}{'成本':<20}{'选中':<8}")
for i in range(n_projects):
cost_str = ', '.join([f"R{j}:{project_costs[i][j]:.1f}"
for j in range(n_resources)])
selected = '✓' if i in result['selected_projects'] else ''
print(f"{i:<8}{project_values[i]:<12.1f}{cost_str:<20}{selected:<8}")
if __name__ == "__main__":
demo_resource_allocation()8. 总结与延伸
8.1 组合优化工具链
| 工具 | 用途 | 特点 |
|---|---|---|
| PuLP | Python建模 | 轻量,易用 |
| Gurobi | 商业求解器 | 快速,强大 |
| CPLEX | IBM求解器 | 企业级 |
| OR-Tools | Google优化 | 多种问题 |
| NetworkX | 图算法 | 可视化,快速 |
| scipy.optimize | 科学计算 | 基础优化 |
8.2 实际问题选择指南
问题类型
├── 小规模 (< 30变量)
│ └── ILP求解器(分支定界)
│
├── 中规模调度
│ ├── 单机 → EDD/LPT规则
│ ├── 并行机 → LPT + 局部搜索
│ └── Job Shop → 元启发式
│
├── 路径规划
│ ├── 静态 → A*
│ ├── 动态 → D* Lite
│ └── 连续空间 → RRT/RRT*
│
├── 网络流
│ └── NetworkX + max_flow
│
└── 大规模组合优化
└── 元启发式(SA, GA, ACO)
8.3 实用建议
- 先建模再求解:把问题形式化是解决问题的第一步
- 利用问题结构:特殊结构可能让问题变简单
- 混合方法:精确算法找下界,启发式找可行解
- 可视化调试:画出解来发现问题
- 多次运行:随机算法需要多次试验
参考来源
- Bertsimas, D., & Tsitsiklis, J. (1997). Introduction to Linear Optimization.
- Schrijver, A. (2003). Combinatorial Optimization.
- Lavalle, S. M. (2006). Planning Algorithms.
- Papadimitriou, C. H., & Steiglitz, K. (1998). Combinatorial Optimization.
相关文档
- P_NP问题与NPC详解 - 组合优化的复杂度基础
- 近似算法实战指南 - NP难问题的近似求解
- 随机算法与蒙特卡洛方法实战 - 随机化优化算法
- 计算复杂性理论深度 - 理论计算机科学基础