关键词
| 遗传编程 | 语法树 | Koza | 函数集 | 终止符集 |
|---|---|---|---|---|
| 子树交叉 | 子树变异 | 符号回归 | 自动函数发现 | 程序演进 |
| 表达树 | 树深度限制 | 饱和度 | 初始化方法 | 适应度评估 |
| 闭合性 | 充分性 | GP变体 | 行为遗传 | 多目标GP |
摘要
遗传编程(Genetic Programming, GP)是由John Koza于1989年在其标志性著作《Genetic Programming: On the Programming of Computers by Means of Natural Selection》中系统提出的一种自动程序生成技术。与传统遗传算法固定长度编码不同,GP采用可变大小和形状的树形结构表示计算机程序,实现了程序空间的自动搜索与优化。这一突破性范式使计算机能够”自动编程”,已在符号回归、电路设计、游戏策略、自动化控制等领域展现出卓越的问题求解能力。
一、遗传编程的理论基础
1.1 自动编程的愿景
传统编程要求人类程序员明确指定解决问题的每一个步骤。Koza提出的遗传编程旨在颠覆这一范式:给定问题的输入-输出示例或性能评估函数,让计算机通过进化过程自动生成解决问题的程序。
这一愿景的理论支撑来自两个核心原则:
闭合性原则(Closure):所有函数和终止符必须能够相互组合而不出错。例如,若函数集中包含返回布尔值的函数,则其他函数必须能够接受布尔输入。
充分性原则(Sufficiency):函数集和终止符集的组合必须能够表示任意复杂度的解空间。
1.2 与遗传算法的区别
| 特征 | 遗传算法(GA) | 遗传编程(GP) |
|---|---|---|
| 个体表示 | 固定长度串 | 可变大小树 |
| 基因含义 | 编码决定 | 自身即是程序 |
| 搜索空间 | 离散、线性 | 层次化、组合爆炸 |
| 应用领域 | 参数优化 | 程序合成 |
二、树形表示与语法结构
2.1 表达树的基本概念
GP中的个体是一棵有根有序树。每个内部节点代表一个函数(运算符),每个叶子节点代表一个终止符(变量或常量)。
数学表达:程序可形式化为从输入空间到输出空间的映射:
其中为输入变量空间,为输出空间。
2.2 表达树的构建示例
以数学表达式为例,其对应的表达树结构如下:
×
/ \
+ -
/ \ / \
x 3 y 2
中序遍历(标准数学表达式):
前序遍历(波兰表达式):
后序遍历(逆波兰表达式):
2.3 树深度与复杂度控制
GP树的深度直接影响生成的程序复杂度:
- 最大深度限制:防止”不可控生长”(bloat)现象
- 初始深度控制:使用特定初始化方法控制初始种群
深度限制策略:
class TreeNode:
def __init__(self, value, depth=0):
self.value = value
self.depth = depth
self.children = []
def is_valid(self, max_depth):
if self.depth > max_depth:
return False
return all(child.is_valid(max_depth) for child in self.children)
def count_nodes(self):
return 1 + sum(child.count_nodes() for child in self.children)
def get_leaves(self):
if not self.children:
return [self.value]
return [leaf for child in self.children for leaf in child.get_leaves()]三、函数集与终止符集
3.1 函数集的设计
函数集应包含解决问题所需的所有运算符。典型的函数类别包括:
3.1.1 算术函数
function_set = {
'+': lambda x, y: x + y,
'-': lambda x, y: x - y,
'*': lambda x, y: x * y,
'/': lambda x, y: x / y if abs(y) > 1e-8 else 1, # 保护性除法
'%': lambda x, y: x % y if y != 0 else 0,
'**': lambda x, y: x ** y if x > 0 else 0,
}3.1.2 布尔函数
对于布尔问题,函数集通常包括:
boolean_functions = {
'AND': lambda x, y: x and y,
'OR': lambda x, y: x or y,
'NOT': lambda x: not x,
'IF': lambda cond, x, y: x if cond else y,
}3.1.3 条件与循环函数
control_functions = {
'IF-LESS': lambda x, y, z, w: z if x < y else w,
'IF-EQUAL': lambda x, y, z, w: z if x == y else w,
'PROGN2': lambda x, y: y, # 顺序执行
'PROGN3': lambda x, y, z: z,
}3.2 终止符集的设计
终止符集提供程序的输入和常量:
terminal_set = {
'variables': ['x', 'y', 'z', 't'],
'constants': list(range(-10, 11)),
'ephemeral_random': lambda: random.uniform(-1, 1), # 临时随机常量
'funcalls': ['sin', 'cos', 'exp', 'log', 'sqrt'],
}Note
临时随机常量(Ephemeral Random Constants):在初始化时生成随机常量值,交叉和变异操作可能改变这些常量但不会产生新值。这是GP中处理数值常量的经典技术。
3.3 闭合性检查
def check_closure(function_set, terminal_set):
"""
验证函数集的闭合性
每个函数的所有参数位置必须能够接受任何函数的返回值
"""
for name, func in function_set.items():
arity = func.__code__.co_argcount
for i in range(arity):
# 检查第i个参数位置
compatible = any(
is_compatible(output_type, param_types[i])
for func2 in function_set.values()
for output_type in get_output_types(func2)
)
# 也必须接受终止符
compatible |= any(
is_compatible(t, param_types[i])
for t in terminal_set
)
if not compatible:
print(f"Closure violation: {name} at position {i}")
return True四、遗传操作算子
4.1 初始化方法
4.1.1 完整方法(Full Method)
所有叶子节点达到指定深度,确保树的完全性:
def full_initialization(depth, function_set, terminal_set):
if depth == 0:
return random.choice(terminal_set)
return TreeNode(
random.choice(function_set),
children=[full_initialization(depth-1, function_set, terminal_set)
for _ in range(arity(random.choice(function_set)))]
)4.1.2 生长方法(Grow Method)
允许叶子节点在任何深度出现,产生更自然的形状:
def grow_initialization(max_depth, function_set, terminal_set):
if max_depth == 0 or random.random() < ratio_terminals:
return random.choice(terminal_set)
func = random.choice(function_set)
return TreeNode(
func,
children=[grow_initialization(max_depth-1, function_set, terminal_set)
for _ in range(arity(func))]
)4.1.3 混合方法(Ramped Half-and-Half)
在初始化种群时混合使用完整法和生长法:
def ramped_initialization(pop_size, min_depth=2, max_depth=6,
function_set, terminal_set):
population = []
depth_range = range(min_depth, max_depth + 1)
for i in range(pop_size):
depth = list(depth_range)[i % len(depth_range)]
if i % 2 == 0:
tree = full_initialization(depth, function_set, terminal_set)
else:
tree = grow_initialization(depth, function_set, terminal_set)
population.append(tree)
return populationTip
Ramped Half-and-Half能产生多样化的树结构,是Koza推荐的标准初始化方法。
4.2 子树交叉(Crossover)
4.2.1 标准子树交叉
随机选择两个父代树中的交叉点,交换对应的子树:
import copy
def subtree_crossover(parent1, parent2, max_depth=None):
# 随机选择交叉点
crossover_point1 = random.choice(get_all_nodes(parent1))
crossover_point2 = random.choice(get_all_nodes(parent2))
# 深度检查
if max_depth:
subtree_depth = get_subtree_depth(crossover_point2)
parent_depth = get_node_depth(parent1, crossover_point1)
if parent_depth + subtree_depth > max_depth:
return copy.deepcopy(parent1), copy.deepcopy(parent2)
# 执行交叉
child1 = copy.deepcopy(parent1)
child2 = copy.deepcopy(parent2)
replace_subtree(child1, crossover_point1, copy.deepcopy(crossover_point2))
replace_subtree(child2, crossover_point2, copy.deepcopy(crossover_point1))
return child1, child2可视化示例:
Parent1: Parent2:
+ /
/ \ / \
× 3 sin +
/ \ | / \
x 2 y z w
Crossover point1: × Crossover point2: sin
Child1: Child2:
+ /
/ \ / \
sin 3 × +
| / \ \
y x 2 z w
4.2.2 点交叉(Point Crossover)
保留大部分树结构,仅交换一个节点:
def point_crossover(parent1, parent2):
node1 = random.choice(get_all_nodes(parent1))
node2 = random.choice(get_all_nodes(parent2))
child1 = copy.deepcopy(parent1)
child2 = copy.deepcopy(parent2)
replace_node(child1, node1, copy.deepcopy(node2))
replace_node(child2, node2, copy.deepcopy(node1))
return child1, child24.3 子树变异(Mutation)
4.3.1 替换变异
随机选择一棵子树,用新生成的随机树替换:
def subtree_mutation(tree, function_set, terminal_set, max_depth=6):
child = copy.deepcopy(tree)
mutation_point = random.choice(get_all_nodes(child))
# 生成新的随机子树
depth = random.randint(0, max_depth)
new_subtree = grow_initialization(depth, function_set, terminal_set)
replace_subtree(child, mutation_point, new_subtree)
return child4.3.2 收缩变异(Shrink Mutation)
将一棵子树替换为随机终止符:
def shrink_mutation(tree, terminal_set):
child = copy.deepcopy(tree)
mutation_point = random.choice(get_non_leaf_nodes(child))
replace_node(child, mutation_point, random.choice(terminal_set))
return child4.3.3 点变异(Point Mutation)
仅改变单个节点的值:
def point_mutation(tree, function_set, terminal_set, rate=0.01):
child = copy.deepcopy(tree)
for node in get_all_nodes(child):
if random.random() < rate:
if node.is_function():
node.value = random.choice(function_set)
else:
node.value = random.choice(terminal_set)
return child五、适应度评估
5.1 适应度案例(Fitness Cases)
GP的适应度通过在多个测试案例上的表现来衡量:
class FitnessEvaluator:
def __init__(self, fitness_cases):
"""
fitness_cases: [(input1, input2, ..., expected_output), ...]
"""
self.fitness_cases = fitness_cases
def evaluate(self, tree):
errors = []
for inputs, expected in self.fitness_cases:
try:
result = tree.execute(*inputs)
errors.append(abs(result - expected))
except:
errors.append(float('inf'))
return {
'raw_fitness': sum(errors),
'mean_absolute_error': sum(errors) / len(errors),
'hits': sum(1 for e in errors if e < 0.01),
}5.2 标准化适应度
对于符号回归问题,标准化均方误差(Normalized Mean Squared Error):
标准化版本:
Warning
过拟合是GP面临的重要问题。需要使用独立的验证集来评估泛化能力。
六、自动函数发现
6.1 ADF(Automatically Defined Functions)
Koza提出的ADF机制允许进化过程中自动定义和调用子函数:
class ADFTree:
def __init__(self, main_tree, adf_definitions):
self.main_tree = main_tree # 主程序
self.adf_definitions = adf_definitions # {name: Tree}
def execute(self, inputs):
# 准备ADF参数
adf_args = {f'ARG{i}': inputs[i] for i in range(len(inputs))}
# 计算ADF
for name, tree in self.adf_definitions.items():
adf_args[name] = tree.execute(inputs)
# 执行主程序
return self.main_tree.execute(adf_args)ADF树结构示例:
ADF1(x, y): Main Program:
+ IF-ZERO
/ \ / | \
× z > ADF1 ADF2
/ \ | | |
x y x y z
6.2 模块化GP架构
class ModularGP:
def __init__(self, n_adfs=3):
self.n_adfs = n_adfs
self.adf_functions = {}
def evolve_with_adfs(self, fitness_cases):
# 进化主程序和ADF定义
pass
def call_adf(self, adf_name, args):
return self.adf_functions[adf_name].execute(args)七、经典应用案例
7.1 符号回归(Symbolic Regression)
符号回归旨在发现生成数据的数学表达式:
def symbolic_regression_example():
"""
目标函数: f(x) = sin(x) + x^2
"""
# 生成训练数据
x_train = np.linspace(-10, 10, 100)
y_train = np.sin(x_train) + x_train**2
# 定义函数集和终止符集
function_set = ['+', '-', '*', '/', 'sin', 'cos', 'exp', 'log']
terminal_set = ['x', *np.random.uniform(-1, 1, 10)]
# 运行GP
gp = GeneticProgramming(function_set, terminal_set, fitness_cases)
best_solution = gp.evolve()
print(f"Discovered: {best_solution.to_expression()}")
# 输出类似: sin(x) + (x * x)
# 示例运行结果
symreg = SymbolicRegressionFitness(
X=np.linspace(-10, 10, 100),
y=np.sin(X) + X**2
)7.2 电路自动设计
Koza在《Genetic Programming III》中展示了GP自动设计电路的能力,包括:
- 电路拓扑发现
- 元件参数优化
- 信号处理滤波器设计
class CircuitDesignGP:
def __init__(self, circuit_constraints):
self.constraints = circuit_constraints
self.available_components = ['R', 'L', 'C', 'Diode', 'Transistor']
def evaluate_circuit(self, circuit_tree):
"""
评估电路性能指标
- 频率响应
- 功耗
- 噪声特性
"""
simulation_result = self.simulate(circuit_tree)
return {
'fitness': fitness_score(simulation_result),
'validity': self.check_constraints(circuit_tree),
}7.3 游戏策略演化
class GameStrategyGP:
def __init__(self, game_state_space):
self.game_states = game_state_space
def create_game_tree(self):
function_set = ['IF-STATE', 'AND', 'OR', 'NOT']
terminal_set = [f'S{i}' for i in range(len(self.game_states))]
return grow_initialization(5, function_set, terminal_set)
def evaluate_strategy(self, strategy_tree):
wins = 0
for game_state in self.game_states:
action = strategy_tree.execute(game_state)
if self.play_and_win(action, game_state):
wins += 1
return wins / len(self.game_states)八、防止”不可控生长”(Bloat)
8.1 Bloat现象
GP进化过程中,程序树可能无限增长而适应度不再提升,这被称为”代码膨胀”或”bloat”。
8.2 深度限制策略
class DepthLimitedGP:
def __init__(self, max_depth=17):
self.max_depth = max_depth
def is_valid_tree(self, tree):
return get_tree_depth(tree) <= self.max_depth
def crossover_with_limit(self, parent1, parent2):
child1, child2 = subtree_crossover(parent1, parent2)
if not self.is_valid_tree(child1):
return copy.deepcopy(parent1)
if not self.is_valid_tree(child2):
return copy.deepcopy(parent2)
return child1, child28.3 惩罚机制
def size_penalized_fitness(raw_fitness, tree, penalty_coef=0.001):
complexity = tree.count_nodes()
return raw_fitness + penalty_coef * complexity8.4 多目标优化
from deap import tools
def evolve_multi_objective(population, fitness_cases):
creator.create("FitnessMulti", base.Fitness,
weights=(1.0, -1.0)) # 最大化适应度,最小化大小
creator.create("Individual", list, fitness=creator.FitnessMulti)
toolbox = base.Toolbox()
# ... setup operators ...
NSGA2 algorithm for multi-objective optimization
population = algorithms.eaMuPlusLambda(
population, toolbox, mu=500, lambda_=500,
cxpb=0.9, mutpb=0.1, ngen=100
)
return population九、实用框架与工具
9.1 DEAP框架
from deap import base, creator, tools, algorithms
import random
def deap_gp_example():
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", gp.PrimitiveTree,
fitness=creator.FitnessMin)
pset = gp.PrimitiveSet("MAIN", arity=1)
pset.addPrimitive(operator.add, args=2, name="add")
pset.addPrimitive(operator.sub, args=2, name="sub")
pset.addPrimitive(operator.mul, args=2, name="mul")
pset.addPrimitive(math.sin, args=1)
pset.addPrimitive(math.cos, args=1)
pset.addEphemeralConstant("rand", lambda: random.uniform(-1, 1))
pset.addTerminal(1)
pset.addTerminal(2)
toolbox = base.Toolbox()
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=2, max_=6)
toolbox.register("individual", tools.initIterate,
creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)
def evalSymbReg(individual):
func = toolbox.compile(expr=individual)
return sum(abs(func(x) - target(x)) for x in points),
toolbox.register("evaluate", evalSymbReg)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)
algorithms.eaSimple(toolbox.population(), toolbox,
cxpb=0.9, mutpb=0.1, ngen=100)十、进阶主题与研究前沿
10.1 几何语义遗传编程(Geometric Semantic GP)
直接对函数的语义空间进行操作,而非语法空间:
def geometric_semantic_crossover(parent1, parent2, training_data):
"""
几何语义交叉保证子代在训练点上的性能
不劣于任一父代
"""
new_tree = create_mixing_tree(parent1, parent2)
return new_tree
def geometric_semantic_mutation(tree, mutation_step=0.5):
"""
语义变异
"""
target = random.choice(training_data)
new_tree = add_random_tree()
return weighted_sum(tree, new_tree, mutation_step)10.2 概率增量式遗传编程(Probabilistic Incremental Program Evolution)
class PIPE:
def __init__(self):
self.probability_matrix = {} # 存储概率分布
def update_probabilities(self, population, fitnesses):
for tree in population:
for node in tree:
self.probability_matrix[node] += fitness
def sample_tree(self):
root = self.sample_node()
self.expand_node(root)
return root十一、GP的深度理论分析
11.1 进化动力学的数学框架
11.1.1 模式定理(Schema Theorem)
Koza将模式定理应用于遗传编程,定义了包含通配符的树形模式(Schema):
模式定义:一个Schema是包含通配符”*“的树结构,表示一类相似的程序。例如,*(x, *)表示所有以任意函数为根、第二个子节点为任意值的树。
模式定理:短阶、低阶、在种群中具有高于平均适应度的模式,在进化过程中将以指数级增长。
class SchemaAnalyzer:
"""模式分析器"""
def __init__(self, primitive_set):
self.primitive_set = primitive_set
def extract_schemas(self, population):
"""从种群中提取模式"""
schemas = []
for tree in population:
schemas.extend(self._generate_schemas(tree, min_holes=1))
return schemas
def _generate_schemas(self, tree, min_holes=1):
"""生成包含指定数量空洞的模式"""
schemas = []
def traverse(node, schema_template, hole_positions):
if len(hole_positions) >= min_holes:
schemas.append({
'template': schema_template.copy(),
'holes': hole_positions.copy()
})
for i, child in enumerate(node.children):
new_template = schema_template + [(node.value, i)]
new_holes = hole_positions + [(node.value, i)]
traverse(child, new_template, new_holes)
traverse(tree, [], [])
return schemas11.1.2 收敛性分析框架
class GPConvergenceAnalyzer:
"""GP收敛性分析"""
def __init__(self, gp_system):
self.gp = gp_system
def markov_chain_analysis(self):
"""
使用马尔可夫链分析GP的收敛性
状态空间:种群中个体的适应度等级
"""
n_states = len(self.gp.population) + 1
P = np.zeros((n_states, n_states))
for i in range(n_states):
for j in range(n_states):
P[i, j] = self._transition_prob(i, j)
return P
def absorption_probability(self):
"""计算被最优解吸收的概率"""
pass
def expected_generations_to_convergence(self):
"""估计收敛的期望代数"""
pass11.2 适应度景观分析
class FitnessLandscapeAnalyzer:
"""适应度景观分析"""
def __init__(self, gp_system):
self.gp = gp_system
def sample_landscape(self, n_samples=1000):
"""采样适应度景观"""
samples = []
for _ in range(n_samples):
tree = self.gp.generate_random_tree(max_depth=6)
fitness = self.gp.evaluate_fitness(tree)
samples.append({
'tree': tree,
'fitness': fitness,
'size': tree.count_nodes(),
'depth': tree.depth()
})
return samples
def compute_neutrality(self, tree, neighborhood_size=100):
"""计算中性度"""
fitness = self.gp.evaluate_fitness(tree)
neighbors = self.gp.generate_neighbors(tree, neighborhood_size)
neutral_count = 0
epsilon = 1e-6
for neighbor in neighbors:
neighbor_fitness = self.gp.evaluate_fitness(neighbor)
if abs(neighbor_fitness - fitness) < epsilon:
neutral_count += 1
return neutral_count / len(neighbors) if neighbors else 0
def compute_ruggedness(self, samples):
"""计算景观的粗糙度"""
distances = []
fitnesses = []
best_fitness = min(s['fitness'] for s in samples)
for sample in samples:
dist = self._tree_distance(sample['tree'],
self._find_best_tree(samples))
distances.append(dist)
fitnesses.append(sample['fitness'])
correlation = np.corrcoef(distances, fitnesses)[0, 1]
return 1 - abs(correlation)十二、高级应用与技术
12.1 时间序列预测
class GPPredictor:
"""基于GP的时间序列预测"""
def __init__(self, lookback=5, forecast_horizon=1):
self.lookback = lookback
self.forecast_horizon = forecast_horizon
def create_terminal_set(self, time_series):
"""创建终端集"""
terminals = {}
for lag in range(1, self.lookback + 1):
terminals[f'x{-lag}'] = list(time_series[:-lag])
if len(time_series) > 2:
diff1 = np.diff(time_series)
terminals['diff1'] = list(diff1[:-1])
pct_change = np.diff(time_series) / time_series[:-1]
terminals['pct'] = list(pct_change)
return terminals
def create_training_cases(self, time_series):
"""创建训练案例"""
cases = []
X = self.create_terminal_set(time_series)
n = len(time_series)
for i in range(self.lookback, n - self.forecast_horizon):
inputs = {}
for key, values in X.items():
lag = int(key[1:]) if key.startswith('x') else 0
idx = i + lag if lag > 0 else i
if 0 <= idx < len(values):
inputs[key] = values[idx]
target = time_series[i + self.forecast_horizon]
cases.append((inputs, target))
return cases12.2 图像处理应用
class ImageFilterGP:
"""使用GP进化图像滤波器"""
def __init__(self, image_size=(64, 64)):
self.image_size = image_size
def create_image_terminals(self):
"""创建图像处理终端集"""
return {
'R': 'pixel_r',
'G': 'pixel_g',
'B': 'pixel_b',
'mean_3x3': 'mean_3x3',
'sobel_x': 'sobel_x',
'sobel_y': 'sobel_y'
}
def fitness_function(self, tree, test_images, ground_truth):
"""计算滤波器质量"""
total_mse = 0
for img, truth in zip(test_images, ground_truth):
output = self._apply_tree_to_image(tree, img)
mse = np.mean((output - truth) ** 2)
total_mse += mse
return total_mse / len(test_images)12.3 控制器设计
class GPController:
"""使用GP进化控制器"""
def __init__(self, system_model, time_horizon=10):
self.system = system_model
self.time_horizon = time_horizon
def create_control_terminals(self):
"""创建控制器终端集"""
return {
'e': 'error',
'de': 'error_derivative',
'ie': 'error_integral',
'e_k1': 'error_k_minus_1',
'u_k1': 'control_k_minus_1',
}
def fitness_function(self, tree):
"""评估控制器性能"""
total_cost = 0
for _ in range(10):
state = self.system.reset()
error_integral = 0
for t in range(self.time_horizon):
setpoint = self.system.get_setpoint(t)
error = setpoint - state
error_derivative = error - getattr(self, 'prev_error', 0)
self.prev_error = error
error_integral += error
context = {
'e': error,
'de': error_derivative,
'ie': error_integral,
}
try:
control = tree.evaluate(context)
control = np.clip(control, -10, 10)
except:
control = 0
state = self.system.step(control)
total_cost += error ** 2
return total_cost12.4 自动机器学习
class AutoFeatureEngineeringGP:
"""自动特征工程"""
def __init__(self, df, target_column):
self.df = df
self.target = target_column
self.numeric_columns = df.select_dtypes(
include=[np.number]
).columns.tolist()
self.numeric_columns.remove(target_column)
def create_function_set(self):
"""创建特征工程函数集"""
return {
'+': lambda x, y: x + y,
'-': lambda x, y: x - y,
'*': lambda x, y: x * y,
'/': lambda x, y: x / (y + 1e-10),
'sqrt': lambda x: np.sqrt(np.abs(x)),
'log': lambda x: np.log(np.abs(x) + 1),
'exp': lambda x: np.exp(np.clip(x, -10, 10)),
'abs': lambda x: np.abs(x),
'sin': lambda x: np.sin(x),
'cos': lambda x: np.cos(x),
}
def evolve_features(self, n_features=20, population_size=100):
"""进化特征集"""
best_features = []
for _ in range(n_features):
gp = GeneticProgramming(
function_set=list(self.create_function_set().keys()),
terminal_set=list(self.create_terminal_set().keys())
)
fitness_func = lambda tree: self.evaluate_feature(
tree, self.df, self.df[self.target].values
)
best_tree = gp.evolve(fitness_func)
best_features.append(best_tree)
return best_features十三、性能优化与实践
13.1 计算加速技术
class VectorizedGP:
"""向量化GP"""
def __init__(self, function_set, terminal_set):
self.function_set = function_set
self.terminal_set = terminal_set
def vectorized_evaluate(self, tree, X):
"""向量化评估"""
def eval_node(node):
if node.is_leaf():
value = node.value
if isinstance(value, str) and value in X.columns:
return X[value].values
else:
return np.full(len(X), float(value))
else:
children_values = [eval_node(child) for child in node.children]
func = self.function_set[node.value]
return func(*children_values)
return eval_node(tree)
def batch_fitness(self, population, X, y):
"""批量计算种群适应度"""
predictions = []
for tree in population:
pred = self.vectorized_evaluate(tree, X)
mse = np.mean((pred - y) ** 2)
predictions.append(mse)
return np.array(predictions)13.2 分布式GP
from multiprocessing import Pool, cpu_count
class ParallelGP:
"""并行GP"""
def __init__(self, n_workers=None):
self.n_workers = n_workers or max(1, cpu_count() - 1)
def parallel_evaluate(self, population, fitness_func, chunksize=None):
"""并行评估适应度"""
chunksize = chunksize or max(1, len(population) // self.n_workers)
with Pool(self.n_workers) as pool:
fitnesses = pool.map(fitness_func, population, chunksize=chunksize)
return np.array(fitnesses)十四、与其他技术的融合
14.1 GP与神经网络
class GPForNAS:
"""GP用于神经网络架构搜索"""
def __init__(self, search_space):
self.search_space = search_space
def create_nas_terminal_set(self):
"""NAS终端集"""
return {
'conv3x3': 'Conv(kernel=3)',
'conv5x5': 'Conv(kernel=5)',
'maxpool': 'MaxPool()',
'avgpool': 'AvgPool()',
'fc': 'Dense()',
'relu': 'ReLU()',
'dropout': 'Dropout()',
}
def decode_architecture(self, tree, input_shape):
"""将GP树解码为神经网络架构"""
from tensorflow import keras
from tensorflow.keras import layers
model = keras.Sequential()
model.add(layers.Input(shape=input_shape))
def build_from_tree(node):
if node.is_leaf():
layer_type = node.value
if layer_type.startswith('conv'):
kernel = int(layer_type[4])
model.add(layers.Conv2D(32, kernel))
elif layer_type == 'maxpool':
model.add(layers.MaxPooling2D())
else:
for child in node.children:
build_from_tree(child)
build_from_tree(tree)
return model14.2 GP与强化学习
class GPforRL:
"""GP与强化学习结合"""
def __init__(self, env):
self.env = env
def evaluate_policy(self, tree, n_episodes=10):
"""评估策略"""
total_reward = 0
for _ in range(n_episodes):
state = self.env.reset()
episode_reward = 0
done = False
while not done:
action = tree.evaluate(state)
action = np.clip(action, -1, 1)
next_state, reward, done, info = self.env.step(action)
episode_reward += reward
state = next_state
total_reward += episode_reward
return total_reward / n_episodes十五、案例研究:金融预测
class StockPredictor:
"""股价预测GP系统"""
def __init__(self, stock_data, lookback=20):
self.data = stock_data
self.lookback = lookback
self.train_size = int(len(stock_data) * 0.7)
self.train_data = stock_data[:self.train_size]
self.test_data = stock_data[self.train_size:]
def create_feature_engineering(self):
"""创建特征工程函数"""
return {
'+': lambda x, y: x + y,
'-': lambda x, y: x - y,
'*': lambda x, y: x * y,
'/': lambda x, y: x / (y + 1e-10),
'sma': lambda x, n: self._sma(x, n),
'ema': lambda x, n: self._ema(x, n),
'rsi': lambda x, n: self._rsi(x, n),
}
def _sma(self, x, n):
"""简单移动平均"""
result = np.zeros_like(x)
for i in range(n, len(x)):
result[i] = np.mean(x[i-n:i])
return result
def _ema(self, x, n):
"""指数移动平均"""
alpha = 2 / (n + 1)
result = np.zeros_like(x)
result[0] = x[0]
for i in range(1, len(x)):
result[i] = alpha * x[i] + (1 - alpha) * result[i-1]
return result
def _rsi(self, x, n=14):
"""相对强弱指数"""
deltas = np.diff(x, prepend=x[0])
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = self._sma(gains, n)
avg_loss = self._sma(losses, n)
rs = avg_gain / (avg_loss + 1e-10)
return 100 - (100 / (1 + rs))十六、研究前沿与未来方向
16.1 可解释AI与GP
遗传编程在可解释AI领域展现出独特优势:
class ExplainableGP:
"""可解释的遗传编程"""
def generate_explanation(self, tree):
"""生成人类可读的规则解释"""
rules = []
def traverse(node):
if node.value == 'IF_LESS':
condition = f"{node.children[0].value} < {node.children[1].value}"
rules.append(f"IF {condition} THEN {self._tree_to_text(node.children[2])}")
else:
for child in node.children:
traverse(child)
traverse(tree)
return rules
def feature_importance(self, tree, training_data):
"""计算特征重要性"""
importance = {}
for terminal in tree.get_terminals():
original_fitness = self._evaluate(tree)
modified_tree = tree.remove_subtree(terminal)
modified_fitness = self._evaluate(modified_tree)
importance[terminal] = original_fitness - modified_fitness
return sorted(importance.items(), key=lambda x: -x[1])16.2 迁移学习与GP
class TransferGP:
"""迁移学习GP"""
def extract_knowledge(self, source_tree):
"""从源任务提取知识"""
important_subtrees = self._identify_important_subtrees(source_tree)
patterns = [self._subtree_to_pattern(st) for st in important_subtrees]
return patterns
def inject_knowledge(self, target_tree, patterns):
"""向目标树注入知识"""
for pattern in patterns:
insertion_points = self._find_compatible_positions(target_tree, pattern)
if insertion_points:
point = random.choice(insertion_points)
target_tree.insert_subtree(point, pattern)
return target_tree16.3 深度学习与传统GP的融合
16.3.1 深度语义GP
class DeepSemanticGP:
"""
深度语义GP
结合深度学习表示学习和GP的进化能力
"""
def __init__(self, embedding_dim=128):
self.embedding_dim = embedding_dim
self.semantic_encoder = None # 用于编码树到语义空间
self.decoder = None # 用于从语义空间解码
def encode_tree(self, tree):
"""将树编码为语义向量"""
if self.semantic_encoder is None:
return self._tree_to_embedding(tree)
return self.semantic_encoder(tree)
def decode_semantic(self, embedding):
"""从语义向量解码为树"""
if self.decoder is None:
return self._embedding_to_tree(embedding)
return self.decoder(embedding)
def semantic_crossover(self, parent1, parent2):
"""
语义交叉:在语义空间中进行交叉操作
"""
# 编码到语义空间
sem1 = self.encode_tree(parent1)
sem2 = self.encode_tree(parent2)
# 在语义空间插值
alpha = random.random()
child_sem = alpha * sem1 + (1 - alpha) * sem2
# 解码回树结构
return self.decode_semantic(child_sem)
def semantic_mutation(self, tree, mutation_strength=0.1):
"""
语义变异:在语义空间添加噪声
"""
sem = self.encode_tree(tree)
# 添加高斯噪声
noise = np.random.randn(self.embedding_dim) * mutation_strength
mutated_sem = sem + noise
return self.decode_semantic(mutated_sem)16.3.2 GP强化神经架构搜索
class GPNeuralArchitectureSearch:
"""
基于GP的神经架构搜索
进化神经网络架构
"""
def __init__(self, input_shape, num_classes):
self.input_shape = input_shape
self.num_classes = num_classes
def create_architecture_space(self):
"""定义架构搜索空间"""
self.operations = {
'conv3x3': lambda x, filters: self._conv_block(x, filters, 3),
'conv5x5': lambda x, filters: self._conv_block(x, filters, 5),
'conv7x7': lambda x, filters: self._conv_block(x, filters, 7),
'depthwise_conv3x3': lambda x, filters: self._depthwise_conv(x, 3),
'maxpool3x3': lambda x: self._maxpool(x, 3),
'avgpool3x3': lambda x: self._avgpool(x, 3),
'identity': lambda x: x,
'se_block': lambda x: self._se_block(x),
'dropout': lambda x, rate: tf.keras.layers.Dropout(rate)(x),
}
self.filters_options = [16, 32, 64, 128, 256]
self.depth_options = [1, 2, 3, 4]
def _conv_block(self, x, filters, kernel_size):
"""卷积块"""
from tensorflow import keras
y = keras.layers.Conv2D(filters, kernel_size, padding='same')(x)
y = keras.layers.BatchNormalization()(y)
y = keras.layers.ReLU()(y)
return y
def _depthwise_conv(self, x, kernel_size):
"""深度可分离卷积"""
from tensorflow import keras
y = keras.layers.DepthwiseConv2D(kernel_size, padding='same')(x)
y = keras.layers.BatchNormalization()(y)
y = keras.layers.ReLU()(y)
return y
def _maxpool(self, x, pool_size):
"""最大池化"""
from tensorflow import keras
return keras.layers.MaxPooling2D(pool_size)(x)
def _avgpool(self, x, pool_size):
"""平均池化"""
from tensorflow import keras
return keras.layers.AveragePooling2D(pool_size)(x)
def _se_block(self, x, reduction=16):
"""Squeeze-and-Excitation块"""
from tensorflow import keras
channels = x.shape[-1]
y = keras.layers.GlobalAveragePooling2D()(x)
y = keras.layers.Dense(channels // reduction, activation='relu')(y)
y = keras.layers.Dense(channels, activation='sigmoid')(y)
y = keras.layers.Reshape([1, 1, channels])(y)
return keras.layers.Multiply()([x, y])
def decode_to_model(self, architecture_tree):
"""
将架构树解码为Keras模型
"""
from tensorflow import keras
inputs = keras.Input(shape=self.input_shape)
x = inputs
def build_from_tree(node, x):
if node.is_leaf():
return x
op = node.value
if op in self.operations:
if op == 'dropout':
x = self.operations[op](x, node.params.get('rate', 0.2))
else:
filters = node.params.get('filters', 64)
x = self.operations[op](x, filters)
for child in node.children:
x = build_from_tree(child, x)
return x
x = build_from_tree(architecture_tree, x)
# 添加分类头
x = keras.layers.GlobalAveragePooling2D()(x)
outputs = keras.layers.Dense(self.num_classes, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
def fitness_evaluation(self, architecture_tree, train_data, val_data, epochs=10):
"""
评估架构性能
"""
model = self.decode_to_model(architecture_tree)
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
history = model.fit(
train_data[0], train_data[1],
validation_data=val_data,
epochs=epochs,
verbose=0
)
val_acc = history.history['val_accuracy'][-1]
num_params = model.count_params()
# 权衡准确率和参数量
return val_acc - 1e-7 * num_params十七、GP与其他进化算法的混合
17.1 GP与遗传算法的混合
class HybridGPGA:
"""
GP与GA混合算法
GA用于优化数值参数,GP用于演化结构
"""
def __init__(self):
self.gp_population = []
self.ga_population = []
def initialize(self, pop_size, tree_depth, param_dim):
"""初始化混合种群"""
# GP种群:树结构
for _ in range(pop_size):
tree = self._random_tree(tree_depth)
self.gp_population.append(tree)
# GA种群:数值参数
for _ in range(pop_size):
params = np.random.randn(param_dim)
self.ga_population.append(params)
def evaluate_hybrid(self, tree, params, fitness_cases):
"""
评估混合解
"""
total_error = 0
for inputs, expected in fitness_cases:
# 使用树计算结构
result = tree.evaluate(inputs)
# 应用GA优化的参数进行后处理
processed = self._apply_params(result, params)
total_error += abs(processed - expected) ** 2
return 1.0 / (1 + total_error)
def _apply_params(self, result, params):
"""应用GA参数"""
return result * (1 + params[0]) + params[1]
def evolve_generation(self, fitness_cases, crossover_rate=0.8, mutation_rate=0.1):
"""进化一代"""
# 评估
fitnesses = []
for tree, params in zip(self.gp_population, self.ga_population):
fitness = self.evaluate_hybrid(tree, params, fitness_cases)
fitnesses.append(fitness)
# 选择
selected_indices = self._tournament_select(fitnesses, k=3, n=len(fitnesses)//2)
# GP交叉/变异
new_gp_pop = []
for _ in range(len(self.gp_population)):
if random.random() < crossover_rate:
parent1 = random.choice(selected_indices)
parent2 = random.choice(selected_indices)
child = self._gp_crossover(
self.gp_population[parent1],
self.gp_population[parent2]
)
else:
child = random.choice(selected_indices)
if random.random() < mutation_rate:
child = self._gp_mutate(child)
new_gp_pop.append(child)
# GA交叉/变异
new_ga_pop = []
for _ in range(len(self.ga_population)):
if random.random() < crossover_rate:
parent1 = random.choice(selected_indices)
parent2 = random.choice(selected_indices)
child = self._ga_crossover(
self.ga_population[parent1],
self.ga_population[parent2]
)
else:
child = random.choice(selected_indices)
if random.random() < mutation_rate:
child = self._ga_mutate(child)
new_ga_pop.append(child)
self.gp_population = new_gp_pop
self.ga_population = new_ga_pop
def _gp_crossover(self, parent1, parent2):
"""GP树交叉"""
# 使用子树交叉
return subtree_crossover(parent1, parent2)
def _gp_mutate(self, tree):
"""GP树变异"""
# 使用子树变异
return subtree_mutation(tree)
def _ga_crossover(self, parent1, parent2):
"""GA交叉"""
alpha = random.random()
return alpha * parent1 + (1 - alpha) * parent2
def _ga_mutate(self, individual):
"""GA变异"""
noise = np.random.randn(len(individual)) * 0.1
return individual + noise
def _tournament_select(self, fitnesses, k=3, n=None):
"""锦标赛选择"""
n = n or len(fitnesses)
selected = []
for _ in range(n):
tournament = random.sample(range(len(fitnesses)), k)
winner = max(tournament, key=lambda i: fitnesses[i])
selected.append(winner)
return selected17.2 GP与粒子群优化的混合
class HybridGPPSO:
"""
GP与PSO混合算法
PSO用于优化树中数值常量的位置
"""
def __init__(self, function_set, terminal_set):
self.function_set = function_set
self.terminal_set = terminal_set
def extract_constants(self, tree):
"""提取树中的数值常量"""
constants = []
positions = []
def traverse(node, path):
if node.is_leaf():
if isinstance(node.value, (int, float)):
constants.append(node.value)
positions.append(path)
else:
for i, child in enumerate(node.children):
traverse(child, path + [i])
traverse(tree, [])
return constants, positions
def insert_constants(self, tree, positions, new_constants):
"""将新常量插入树"""
# 重建树
return self._rebuild_tree(tree, positions, new_constants)
def optimize_constants(self, tree, fitness_cases, n_particles=20, max_iter=50):
"""
使用PSO优化树中的数值常量
"""
constants, positions = self.extract_constants(tree)
if not constants:
return tree
dim = len(constants)
# PSO优化常量
pso = ParticleSwarmOptimizer(
objective=lambda c: self._evaluate_constants(tree, c, fitness_cases, positions),
dim=dim,
bounds=[(min(constants), max(constants))] * dim,
n_particles=n_particles
)
best_constants, _ = pso.run()
# 更新树中的常量
return self.insert_constants(tree, positions, best_constants)
def _evaluate_constants(self, tree, constants, fitness_cases, positions):
"""评估常量配置"""
modified_tree = self.insert_constants(tree, positions, constants)
total_error = 0
for inputs, expected in fitness_cases:
result = modified_tree.evaluate(inputs)
total_error += abs(result - expected) ** 2
return total_error17.3 GP与差分进化的混合
class HybridGPDE:
"""
GP与DE混合算法
DE用于演化树结构的关键参数
"""
def __init__(self, function_set, terminal_set):
self.function_set = function_set
self.terminal_set = terminal_set
def identify_key_parameters(self, tree):
"""
识别树中的关键可调参数
例如:循环次数、条件阈值等
"""
key_params = []
def traverse(node, context):
if node.value in ['LOOP', 'REPEAT', 'IF']:
# 这些节点有可调参数
key_params.append({
'node': node,
'path': context.copy(),
'type': node.value
})
for i, child in enumerate(node.children):
traverse(child, context + [i])
traverse(tree, [])
return key_params
def encode_parameters(self, tree):
"""将关键参数编码为向量"""
key_params = self.identify_key_parameters(tree)
vector = []
for param in key_params:
if param['type'] == 'LOOP':
vector.append(param['node'].children[0].value if len(param['node'].children) > 0 else 5)
elif param['type'] == 'IF':
threshold = 0.0 # 默认阈值
vector.append(threshold)
return np.array(vector)
def decode_parameters(self, tree, param_vector):
"""将参数向量解码回树"""
key_params = self.identify_key_parameters(tree)
for i, param in enumerate(key_params):
if i < len(param_vector):
param['node'].param_value = param_vector[i]
return tree
def hybrid_evolve(self, fitness_cases, n_generations=100):
"""混合进化主循环"""
# 初始化种群
trees = [self._random_tree(max_depth=6) for _ in range(50)]
for gen in range(n_generations):
for i, tree in enumerate(trees):
# 提取参数向量
params = self.encode_parameters(tree)
# 使用DE优化参数
de = DifferentialEvolution(
objective=lambda p: self._evaluate_with_params(tree, p, fitness_cases),
bounds=[(0, 20)] * len(params),
NP=20,
max_iter=20
)
de.initialize()
for _ in range(20):
de.step()
# 更新树的参数
trees[i] = self.decode_parameters(tree, de.best_solution)
# 对树进行标准GP操作
trees = self._gp_selection(trees, fitness_cases)
trees = self._gp_crossover(trees)
trees = self._gp_mutation(trees)
return max(trees, key=lambda t: self._evaluate_tree(t, fitness_cases))
def _evaluate_with_params(self, tree, params, fitness_cases):
"""评估带有参数的树"""
modified_tree = self.decode_parameters(tree, params)
total_error = 0
for inputs, expected in fitness_cases:
result = modified_tree.evaluate(inputs)
total_error += abs(result - expected) ** 2
return total_error十八、高级应用案例
18.1 自动定理证明
class GPTheoremProver:
"""
使用GP进行自动定理证明
进化逻辑表达式
"""
def __init__(self):
self.logic_functions = {
'AND': lambda x, y: x and y,
'OR': lambda x, y: x or y,
'NOT': lambda x: not x,
'IMPLIES': lambda x, y: (not x) or y,
'IFF': lambda x, y: x == y,
}
def create_logical_terminals(self, propositions):
"""创建逻辑终端"""
terminals = {}
for prop in propositions:
terminals[prop] = prop
return terminals
def evaluate_formula(self, tree, truth_assignment):
"""评估公式在给定真值赋值下的结果"""
def eval_node(node):
if node.is_leaf():
return truth_assignment[node.value]
else:
func = self.logic_functions[node.value]
args = [eval_node(child) for child in node.children]
return func(*args)
return eval_node(tree)
def check_tautology(self, tree, all_assignments):
"""检查公式是否为重言式"""
results = []
for assignment in all_assignments:
result = self.evaluate_formula(tree, assignment)
results.append(result)
return all(results)
def check_contradiction(self, tree, all_assignments):
"""检查公式是否为矛盾式"""
results = []
for assignment in all_assignments:
result = self.evaluate_formula(tree, assignment)
results.append(result)
return not any(results)
def find_equivalent_formulas(self, target_tree, n_propositions, search_space):
"""找到与目标公式等价的公式"""
equivalents = []
for candidate in search_space:
if self.check_logical_equivalence(target_tree, candidate, n_propositions):
equivalents.append(candidate)
return equivalents
def check_logical_equivalence(self, tree1, tree2, n_propositions):
"""检查两个公式是否逻辑等价"""
all_assignments = self._generate_all_assignments(n_propositions)
for assignment in all_assignments:
result1 = self.evaluate_formula(tree1, assignment)
result2 = self.evaluate_formula(tree2, assignment)
if result1 != result2:
return False
return True
def _generate_all_assignments(self, n):
"""生成所有可能的真值赋值"""
assignments = []
for i in range(2 ** n):
assignment = {}
for j in range(n):
assignment[f'p{j}'] = bool((i >> j) & 1)
assignments.append(assignment)
return assignments18.2 游戏AI设计
class GPGameAI:
"""
使用GP设计游戏AI
进化游戏策略树
"""
def __init__(self, game_env):
self.game = game_env
self.condition_functions = {
'HP_LESS': lambda state, threshold: state['hp'] < threshold,
'HP_GREATER': lambda state, threshold: state['hp'] > threshold,
'ENEMY_NEAR': lambda state: state['enemy_distance'] < 5,
'ENEMY_FAR': lambda state: state['enemy_distance'] > 10,
'HAS_ITEM': lambda state, item: item in state['inventory'],
'CAN_ATTACK': lambda state: state['can_attack'],
}
self.action_functions = {
'ATTACK': lambda state: ('action', 'attack'),
'DEFEND': lambda state: ('action', 'defend'),
'HEAL': lambda state: ('action', 'heal'),
'MOVE_TO': lambda state, x, y: ('action', 'move', x, y),
'USE_ITEM': lambda state, item: ('action', 'use_item', item),
'WAIT': lambda state: ('action', 'wait'),
}
def create_game_terminals(self):
"""创建游戏终端集"""
terminals = {
'hp': 'state_hp',
'mp': 'state_mp',
'enemy_hp': 'enemy_hp',
'enemy_distance': 'enemy_distance',
'x': 'position_x',
'y': 'position_y',
}
# 添加数值常量
for val in [0, 1, 5, 10, 20, 50, 100]:
terminals[str(val)] = val
return terminals
def evaluate_strategy(self, strategy_tree, n_episodes=20):
"""评估策略的性能"""
total_score = 0
total_wins = 0
for _ in range(n_episodes):
score, won = self._play_episode(strategy_tree)
total_score += score
total_wins += won
return {
'avg_score': total_score / n_episodes,
'win_rate': total_wins / n_episodes,
'fitness': (total_score / n_episodes) + 100 * (total_wins / n_episodes)
}
def _play_episode(self, strategy_tree):
"""播放一局游戏"""
state = self.game.reset()
total_score = 0
max_steps = 1000
for step in range(max_steps):
# 从策略树获取动作
action = self._get_action(strategy_tree, state)
# 执行动作
next_state, reward, done, info = self.game.step(action)
total_score += reward
state = next_state
if done:
break
won = info.get('won', False)
return total_score, won
def _get_action(self, strategy_tree, state):
"""从策略树获取动作"""
def eval_node(node):
if node.is_leaf():
if node.value in state:
return state[node.value]
elif node.value in self.condition_functions:
# 返回条件函数
return node.value
else:
return node.value
else:
value = node.value
if value in self.condition_functions:
# 评估条件
args = [eval_node(child) for child in node.children]
func = self.condition_functions[value]
if len(args) == 1:
return func(state, args[0])
else:
return func(state)
elif value in self.action_functions:
# 评估动作
args = [eval_node(child) for child in node.children]
func = self.action_functions[value]
if len(args) == 0:
return func(state)
else:
return func(state, *args)
else:
# 其他函数
args = [eval_node(child) for child in node.children]
return self._apply_function(value, *args)
result = eval_node(strategy_tree)
# 解析动作
if isinstance(result, tuple) and result[0] == 'action':
return result[1:]
return ('wait',)18.3 音乐生成
class GPMusicGenerator:
"""
使用GP生成音乐
进化音乐模式
"""
def __init__(self, key='C', scale='major'):
self.key = key
self.scale = scale
self.scale_intervals = self._get_scale_intervals(scale)
self.key_offset = self._note_to_offset(key)
def _get_scale_intervals(self, scale):
"""获取音阶的音程"""
scales = {
'major': [0, 2, 4, 5, 7, 9, 11],
'minor': [0, 2, 3, 5, 7, 8, 10],
'pentatonic': [0, 2, 4, 7, 9],
'blues': [0, 3, 5, 6, 7, 10],
}
return scales.get(scale, scales['major'])
def _note_to_offset(self, note):
"""将音符转换为偏移量"""
notes = {'C': 0, 'D': 2, 'E': 4, 'F': 5, 'G': 7, 'A': 9, 'B': 11}
return notes.get(note[0].upper(), 0)
def create_music_functions(self):
"""创建音乐生成函数"""
return {
'note': lambda pitch, octave: self._create_note(pitch, octave),
'chord': lambda *notes: self._create_chord(notes),
'sequence': lambda *elements: self._create_sequence(elements),
'arpeggio': lambda notes, direction: self._create_arpeggio(notes, direction),
'rest': lambda duration: self._create_rest(duration),
'scale_up': lambda start, length: self._scale_movement(start, length, 1),
'scale_down': lambda start, length: self._scale_movement(start, length, -1),
'sequence_repeat': lambda seq, times: self._repeat_sequence(seq, times),
}
def _create_note(self, pitch, octave):
"""创建音符"""
pitch_offset = {'C': 0, 'D': 1, 'E': 2, 'F': 3, 'G': 4, 'A': 5, 'B': 6}
midi = 12 * (octave + 1) + pitch_offset.get(pitch, 0)
return {'type': 'note', 'midi': midi, 'duration': 1}
def _create_chord(self, notes):
"""创建和弦"""
return {'type': 'chord', 'notes': notes}
def _create_sequence(self, elements):
"""创建序列"""
return {'type': 'sequence', 'elements': elements}
def _create_arpeggio(self, notes, direction):
"""创建琶音"""
return {'type': 'arpeggio', 'notes': notes, 'direction': direction}
def _create_rest(self, duration):
"""创建休止"""
return {'type': 'rest', 'duration': duration}
def _scale_movement(self, start, length, direction):
"""创建音阶进行"""
sequence = []
for i in range(length):
midi = start + i * direction
sequence.append({'type': 'note', 'midi': midi, 'duration': 1})
return {'type': 'sequence', 'elements': sequence}
def _repeat_sequence(self, seq, times):
"""重复序列"""
return {'type': 'sequence', 'elements': seq['elements'] * times}
def fitness_function(self, tree, reference_styles=None):
"""
评估生成音乐的质量
"""
music = self._evaluate_tree(tree)
# 评估多个维度
melody_score = self._evaluate_melody(music)
harmony_score = self._evaluate_harmony(music)
rhythm_score = self._evaluate_rhythm(music)
if reference_styles:
style_score = self._evaluate_style(music, reference_styles)
else:
style_score = 0
return melody_score + harmony_score + rhythm_score + style_score
def _evaluate_melody(self, music):
"""评估旋律质量"""
notes = self._extract_notes(music)
if len(notes) < 2:
return 0
# 评估音域
pitches = [n['midi'] for n in notes]
range_score = min(1.0, (max(pitches) - min(pitches)) / 12)
# 评估级进与跳进的平衡
intervals = [abs(pitches[i+1] - pitches[i]) for i in range(len(pitches)-1)]
stepwise = sum(1 for i in intervals if i <= 2)
leap = sum(1 for i in intervals if i > 4)
interval_score = stepwise / (stepwise + leap + 1)
return 0.5 * range_score + 0.5 * interval_score
def _evaluate_harmony(self, music):
"""评估和声质量"""
# 检查和弦进行
chords = self._extract_chords(music)
if not chords:
return 0
# 简单的和声评估:检查和弦是否协调
harmony_score = 0
for chord in chords:
if self._is_consonant(chord):
harmony_score += 1
return harmony_score / max(1, len(chords))
def _is_consonant(self, chord):
"""检查和弦是否协调"""
# 简化版本:检查音程是否为协和音程
consonant_intervals = {0, 3, 4, 5, 7, 8, 9, 12}
return True # 简化实现
def _evaluate_rhythm(self, music):
"""评估节奏质量"""
notes = self._extract_notes(music)
# 评估节奏的规律性
durations = [n.get('duration', 1) for n in notes]
# 检查是否有重复的节奏型
patterns = []
for i in range(len(durations) - 3):
pattern = tuple(durations[i:i+4])
patterns.append(pattern)
unique_patterns = len(set(patterns))
pattern_score = unique_patterns / max(1, len(patterns))
return pattern_score十九、实用工具与框架
19.1 DEAP框架详解
"""
DEAP框架完整示例
DEAP是最流行的Python遗传编程框架
"""
from deap import base, creator, tools, algorithms, gp
import operator
import random
import numpy as np
def deap_gp_complete_example():
"""
DEAP GP完整示例:符号回归
"""
# 定义原始集
pset = gp.PrimitiveSetTyped("MAIN", [float], float)
# 重命名参数
pset.renameArguments(ARG0='x')
# 添加算术运算
def protectedDiv(left, right):
try:
return left / right
except ZeroDivisionError:
return 1
pset.addPrimitive(operator.add, [float, float], float, name="add")
pset.addPrimitive(operator.sub, [float, float], float, name="sub")
pset.addPrimitive(operator.mul, [float, float], float, name="mul")
pset.addPrimitive(protectedDiv, [float, float], float, name="div")
pset.addPrimitive(operator.neg, [float], float, name="neg")
pset.addPrimitive(math.cos, [float], float, name="cos")
pset.addPrimitive(math.sin, [float], float, name="sin")
# 添加常量和Ephemeral随机常量
pset.addEphemeralConstant("rand100", lambda: random.uniform(-100, 100), float)
pset.addTerminal(1.0, float)
pset.addTerminal(2.0, float)
# 创建适应度类(最小化)
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)
# 创建工具箱
toolbox = base.Toolbox()
# 表达式生成
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=2, max_=6)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)
# 遗传操作
toolbox.register("evaluate", evalSymbReg, pset=pset)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)
# 交叉(确保语法有效)
toolbox.register("mate", gp.cxOnePoint)
# 统计
stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
stats_size = tools.Statistics(len)
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
mstats.register("avg", np.mean)
mstats.register("std", np.std)
mstats.register("min", np.min)
mstats.register("max", np.max)
# 算法参数
pop = toolbox.population(n=300)
hof = tools.HallOfFame(1)
# 运行算法
final_pop, log = algorithms.eaSimple(
pop, toolbox,
cxpb=0.9, mutpb=0.1,
ngen=40,
stats=mstats,
halloffame=hof,
verbose=True
)
return final_pop, log, hof
def evalSymbReg(individual, pset):
"""
评估符号回归个体
目标函数: x^4 + x^3 + x^2 + x
"""
# 编译表达式为函数
func = gp.compile(individual, pset)
# 测试数据
x = [random.uniform(-1, 1) for _ in range(100)]
# 计算均方误差
sqerrors = []
for xi in x:
try:
y = (xi ** 4 + xi ** 3 + xi ** 2 + xi) # 目标函数
sqerrors.append((func(xi) - y) ** 2)
except Exception:
sqerrors.append(1000000) # 惩罚无效表达式
return (math.fsum(sqerrors) / len(x),)
def deap_classification_example():
"""
DEAP GP分类示例
"""
# 创建PrimitiveSet
pset = gp.PrimitiveSetTyped("MAIN", [bool, bool], bool)
pset.renameArguments(ARG0='x', ARG1='y')
# 布尔函数
pset.addPrimitive(operator.and_, [bool, bool], bool)
pset.addPrimitive(operator.or_, [bool, bool], bool)
pset.addPrimitive(operator.not_, [bool], bool)
# Ephemeral常量
pset.addEphemeralConstant("randBool", lambda: random.choice([True, False]))
# 适应度
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)
toolbox = base.Toolbox()
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=2, max_=4)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("compile", gp.compile, pset=pset)
toolbox.register("evaluate", evalParity, pset=pset)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("mutate", gp.mutUniform, expr=gp.genFull(pset=pset))
# 多目标版本
creator.create("FitnessMulti", base.Fitness, weights=(1.0, -0.5))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMulti)
def evalParity(individual, pset):
"""
奇偶校验问题
5输入奇偶函数
"""
func = gp.compile(individual, pset)
# 生成所有可能的输入
n_samples = 32
correct = 0
for i in range(n_samples):
inputs = [(i >> j) & 1 == 1 for j in range(5)]
# 计算奇偶
parity = sum(inputs) % 2 == 1
try:
if func(*inputs) == parity:
correct += 1
except Exception:
pass
# 适应度 = 正确率
accuracy = correct / n_samples
# 复杂度惩罚
complexity = len(individual) / 100.0
return (accuracy, -complexity)19.2 PyTorch-GP集成
"""
PyTorch与GP集成示例
使用GP生成神经网络模块
"""
import torch
import torch.nn as nn
from deap import gp
class PyTorchGPBridge:
"""
PyTorch与GP的桥梁
"""
def __init__(self):
self.layer_templates = {
'linear': lambda in_feat, out_feat: nn.Linear(in_feat, out_feat),
'conv2d': lambda in_ch, out_ch, kernel: nn.Conv2d(in_ch, out_ch, kernel),
'relu': lambda: nn.ReLU(),
'sigmoid': lambda: nn.Sigmoid(),
'tanh': lambda: nn.Tanh(),
'dropout': lambda p: nn.Dropout(p),
'batchnorm1d': lambda: nn.BatchNorm1d,
'batchnorm2d': lambda: nn.BatchNorm2d,
}
def gp_tree_to_module(self, tree, input_dim):
"""
将GP树转换为PyTorch模块
"""
class DynamicModule(nn.Module):
def __init__(self):
super().__init__()
self.layers = []
self._build_from_tree(tree)
def _build_from_tree(self, node, input_channels=None):
if node.is_leaf():
return
value = node.value
if value in self.layer_templates:
layer = self._create_layer(value, node, input_channels)
if layer is not None:
self.layers.append(layer)
for child in node.children:
self._build_from_tree(child, input_channels)
def _create_layer(self, layer_type, node, input_channels):
if layer_type == 'linear':
out_feat = node.params.get('out_features', 64)
return nn.Linear(input_channels or 784, out_feat)
elif layer_type == 'relu':
return nn.ReLU()
elif layer_type == 'dropout':
p = node.params.get('p', 0.5)
return nn.Dropout(p)
return None
def forward(self, x):
for layer in self.layers:
x = layer(x)
return x
return DynamicModule()
def evaluate_genome(self, tree, train_loader, test_loader, device='cuda'):
"""
评估GP生成的模型
"""
model = self.gp_tree_to_module(tree, input_dim=784)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 训练
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
if batch_idx >= 50: # 快速评估
break
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data.view(data.size(0), -1))
loss = criterion(output, target)
loss.backward()
optimizer.step()
# 评估
model.eval()
correct = 0
total = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data.view(data.size(0), -1))
_, predicted = torch.max(output.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
accuracy = correct / total
return accuracy二十、性能基准与最佳实践
20.1 基准测试函数集
class GPBenchmarkFunctions:
"""
GP标准基准测试函数
"""
@staticmethod
def pagie_polynomial(x, y):
"""
Pagie多项式
f(x,y) = 1/(1+x^-4) + 1/(1+y^-4)
"""
return 1 / (1 + x**(-4)) + 1 / (1 + y**(-4))
@staticmethod
def poly_metric(x, y):
"""
多项式度量
f(x,y) = x^4 + y^4
"""
return x**4 + y**4
@staticmethod
def rational_metric(x, y):
"""
有理函数
f(x,y) = (x^2 + 2xy + y^2) / (x^2 - 2xy + y^2)
"""
denominator = x**2 - 2*x*y + y**2
if abs(denominator) < 1e-10:
return 1e10
return (x**2 + 2*x*y + y**2) / denominator
@staticmethod
def transcendental(x, y):
"""
超越函数
f(x,y) = sin(x) + cos(y)
"""
return np.sin(x) + np.cos(y)
@staticmethod
def vladislavleva(x1, x2, x3, x4):
"""
Vladislavleva-4函数
5维函数,强烈多峰
"""
return (x1 - 3) * (4 - 3*x1**2) + \
(x2 - x3**2) * (1 - x2**2) + \
(x4 - 1) * (1 - x4**2)20.2 参数推荐配置
| 问题类型 | 种群大小 | 深度限制 | 交叉率 | 变异率 | 锦标赛规模 |
|---|---|---|---|---|---|
| 符号回归 | 500-1000 | 6-8 | 0.8-0.9 | 0.1-0.2 | 3-7 |
| 分类 | 500-1000 | 5-7 | 0.8-0.9 | 0.1-0.2 | 3-5 |
| 自动编程 | 1000-5000 | 8-12 | 0.7-0.8 | 0.2-0.3 | 5-10 |
| 控制器设计 | 500-1000 | 6-10 | 0.8-0.9 | 0.1-0.2 | 3-5 |
20.3 常见问题解决方案
| 问题 | 症状 | 解决方案 |
|---|---|---|
| 过拟合 | 训练误差低,测试误差高 | 增加验证集,使用更小的树,使用复杂度惩罚 |
| 欠拟合 | 训练和测试误差都高 | 增加种群多样性,增加树深度,改进函数集 |
| 代码膨胀 | 树大小失控增长 | 使用深度限制,使用大小惩罚,使用收缩变异 |
| 早熟收敛 | 适应度停滞 | 增加变异率,使用多种群,增加多样性选择 |
| 语法无效 | 交叉后产生无效树 | 使用保证语法的交叉算子,修复机制 |
二十一、参考文献
- Koza, J. R. (1992). Genetic Programming: On the Programming of Computers by Means of Natural Selection. MIT Press.
- Koza, J. R. (1994). Genetic Programming II: Automatic Discovery of Reusable Programs. MIT Press.
- Koza, J. R., et al. (1999). Genetic Programming III: Darwinian Invention and Problem Solving. Morgan Kaufmann.
- Poli, R., Langdon, W. B., & McPhee, N. F. (2008). A Field Guide to Genetic Programming. Lulu.com.
- Banzhaf, W., et al. (1998). Genetic Programming: An Introduction. Morgan Kaufmann.
- Langdon, W. B., & Poli, R. (2002). Foundations of Genetic Programming. Springer.
- Vanneschi, L., & Castelli, M. (2019). “A Survey of Semantic Genetic Programming.” Frontiers in Robotics and AI, 6, 37.
- Moraglio, A., et al. (2012). “Geometric Semantic Genetic Programming.” PPSN XII, 21-31.
- Forrest, S., & Mitchell, M. (1993). “What Makes a Problem Hard for Genetic Programming?” Proceedings of ICGA, 1993.
- Luke, S., & Spector, L. (1998). “A Revised Comparison of Crossover and Mutation in Genetic Programming.” Proceedings of GP, 1998.
- Cramer, N. L. (1985). “A Representation for the Adaptive Generation of Simple Sequential Programs.” Proceedings of ICGA, 1985.
- Montana, D. J. (1995). “Strongly Typed Genetic Programming.” Evolutionary Computation, 3(2), 199-230.
- Ryan, C., & O’Neill, M. (1998). “Grammatical Evolution: Evolving Programs in an Arbitrary Language.” Proceedings of GP, 1998.
- Spector, L., & Robinson, A. (2002). “Genetic Programming and Autoconstructive Evolution with the Push Programming Language.” Genetic Programming and Evolvable Machines, 3(1), 7-40.
- Koza, J. R. (2010). “Human-Competitive Results Produced by Genetic Programming.” Genetic Programming and Evolvable Machines, 11(3-4), 251-284.