关键词

遗传编程语法树Koza函数集终止符集
子树交叉子树变异符号回归自动函数发现程序演进
表达树树深度限制饱和度初始化方法适应度评估
闭合性充分性GP变体行为遗传多目标GP

摘要

遗传编程(Genetic Programming, GP)是由John Koza于1989年在其标志性著作《Genetic Programming: On the Programming of Computers by Means of Natural Selection》中系统提出的一种自动程序生成技术。与传统遗传算法固定长度编码不同,GP采用可变大小和形状的树形结构表示计算机程序,实现了程序空间的自动搜索与优化。这一突破性范式使计算机能够”自动编程”,已在符号回归、电路设计、游戏策略、自动化控制等领域展现出卓越的问题求解能力。


一、遗传编程的理论基础

1.1 自动编程的愿景

传统编程要求人类程序员明确指定解决问题的每一个步骤。Koza提出的遗传编程旨在颠覆这一范式:给定问题的输入-输出示例或性能评估函数,让计算机通过进化过程自动生成解决问题的程序。

这一愿景的理论支撑来自两个核心原则:

闭合性原则(Closure):所有函数和终止符必须能够相互组合而不出错。例如,若函数集中包含返回布尔值的函数,则其他函数必须能够接受布尔输入。

充分性原则(Sufficiency):函数集和终止符集的组合必须能够表示任意复杂度的解空间。

1.2 与遗传算法的区别

特征遗传算法(GA)遗传编程(GP)
个体表示固定长度串可变大小树
基因含义编码决定自身即是程序
搜索空间离散、线性层次化、组合爆炸
应用领域参数优化程序合成

二、树形表示与语法结构

2.1 表达树的基本概念

GP中的个体是一棵有根有序树。每个内部节点代表一个函数(运算符),每个叶子节点代表一个终止符(变量或常量)。

数学表达:程序可形式化为从输入空间到输出空间的映射:

其中为输入变量空间,为输出空间。

2.2 表达树的构建示例

以数学表达式为例,其对应的表达树结构如下:

           ×
          / \
         +   -
        / \ / \
       x  3 y  2

中序遍历(标准数学表达式):

前序遍历(波兰表达式):

后序遍历(逆波兰表达式):

2.3 树深度与复杂度控制

GP树的深度直接影响生成的程序复杂度:

  • 最大深度限制:防止”不可控生长”(bloat)现象
  • 初始深度控制:使用特定初始化方法控制初始种群

深度限制策略

class TreeNode:
    def __init__(self, value, depth=0):
        self.value = value
        self.depth = depth
        self.children = []
    
    def is_valid(self, max_depth):
        if self.depth > max_depth:
            return False
        return all(child.is_valid(max_depth) for child in self.children)
    
    def count_nodes(self):
        return 1 + sum(child.count_nodes() for child in self.children)
    
    def get_leaves(self):
        if not self.children:
            return [self.value]
        return [leaf for child in self.children for leaf in child.get_leaves()]

三、函数集与终止符集

3.1 函数集的设计

函数集应包含解决问题所需的所有运算符。典型的函数类别包括:

3.1.1 算术函数

function_set = {
    '+': lambda x, y: x + y,
    '-': lambda x, y: x - y,
    '*': lambda x, y: x * y,
    '/': lambda x, y: x / y if abs(y) > 1e-8 else 1,  # 保护性除法
    '%': lambda x, y: x % y if y != 0 else 0,
    '**': lambda x, y: x ** y if x > 0 else 0,
}

3.1.2 布尔函数

对于布尔问题,函数集通常包括:

boolean_functions = {
    'AND': lambda x, y: x and y,
    'OR': lambda x, y: x or y,
    'NOT': lambda x: not x,
    'IF': lambda cond, x, y: x if cond else y,
}

3.1.3 条件与循环函数

control_functions = {
    'IF-LESS': lambda x, y, z, w: z if x < y else w,
    'IF-EQUAL': lambda x, y, z, w: z if x == y else w,
    'PROGN2': lambda x, y: y,  # 顺序执行
    'PROGN3': lambda x, y, z: z,
}

3.2 终止符集的设计

终止符集提供程序的输入和常量:

terminal_set = {
    'variables': ['x', 'y', 'z', 't'],
    'constants': list(range(-10, 11)),
    'ephemeral_random': lambda: random.uniform(-1, 1),  # 临时随机常量
    'funcalls': ['sin', 'cos', 'exp', 'log', 'sqrt'],
}

Note

临时随机常量(Ephemeral Random Constants):在初始化时生成随机常量值,交叉和变异操作可能改变这些常量但不会产生新值。这是GP中处理数值常量的经典技术。

3.3 闭合性检查

def check_closure(function_set, terminal_set):
    """
    验证函数集的闭合性
    每个函数的所有参数位置必须能够接受任何函数的返回值
    """
    for name, func in function_set.items():
        arity = func.__code__.co_argcount
        for i in range(arity):
            # 检查第i个参数位置
            compatible = any(
                is_compatible(output_type, param_types[i])
                for func2 in function_set.values()
                for output_type in get_output_types(func2)
            )
            # 也必须接受终止符
            compatible |= any(
                is_compatible(t, param_types[i])
                for t in terminal_set
            )
            if not compatible:
                print(f"Closure violation: {name} at position {i}")
    return True

四、遗传操作算子

4.1 初始化方法

4.1.1 完整方法(Full Method)

所有叶子节点达到指定深度,确保树的完全性:

def full_initialization(depth, function_set, terminal_set):
    if depth == 0:
        return random.choice(terminal_set)
    return TreeNode(
        random.choice(function_set),
        children=[full_initialization(depth-1, function_set, terminal_set) 
                  for _ in range(arity(random.choice(function_set)))]
    )

4.1.2 生长方法(Grow Method)

允许叶子节点在任何深度出现,产生更自然的形状:

def grow_initialization(max_depth, function_set, terminal_set):
    if max_depth == 0 or random.random() < ratio_terminals:
        return random.choice(terminal_set)
    func = random.choice(function_set)
    return TreeNode(
        func,
        children=[grow_initialization(max_depth-1, function_set, terminal_set)
                  for _ in range(arity(func))]
    )

4.1.3 混合方法(Ramped Half-and-Half)

在初始化种群时混合使用完整法和生长法:

def ramped_initialization(pop_size, min_depth=2, max_depth=6, 
                          function_set, terminal_set):
    population = []
    depth_range = range(min_depth, max_depth + 1)
    
    for i in range(pop_size):
        depth = list(depth_range)[i % len(depth_range)]
        if i % 2 == 0:
            tree = full_initialization(depth, function_set, terminal_set)
        else:
            tree = grow_initialization(depth, function_set, terminal_set)
        population.append(tree)
    
    return population

Tip

Ramped Half-and-Half能产生多样化的树结构,是Koza推荐的标准初始化方法。

4.2 子树交叉(Crossover)

4.2.1 标准子树交叉

随机选择两个父代树中的交叉点,交换对应的子树:

import copy
 
def subtree_crossover(parent1, parent2, max_depth=None):
    # 随机选择交叉点
    crossover_point1 = random.choice(get_all_nodes(parent1))
    crossover_point2 = random.choice(get_all_nodes(parent2))
    
    # 深度检查
    if max_depth:
        subtree_depth = get_subtree_depth(crossover_point2)
        parent_depth = get_node_depth(parent1, crossover_point1)
        if parent_depth + subtree_depth > max_depth:
            return copy.deepcopy(parent1), copy.deepcopy(parent2)
    
    # 执行交叉
    child1 = copy.deepcopy(parent1)
    child2 = copy.deepcopy(parent2)
    
    replace_subtree(child1, crossover_point1, copy.deepcopy(crossover_point2))
    replace_subtree(child2, crossover_point2, copy.deepcopy(crossover_point1))
    
    return child1, child2

可视化示例

Parent1:                        Parent2:
      +                               /
     / \                             / \
    ×   3                         sin   +
   / \                             |   / \
  x   2                           y   z   w

Crossover point1: ×              Crossover point2: sin

Child1:                          Child2:
      +                               /
     / \                             / \
   sin   3                          ×   +
   |                               / \   \
  y                              x   2  z   w

4.2.2 点交叉(Point Crossover)

保留大部分树结构,仅交换一个节点:

def point_crossover(parent1, parent2):
    node1 = random.choice(get_all_nodes(parent1))
    node2 = random.choice(get_all_nodes(parent2))
    
    child1 = copy.deepcopy(parent1)
    child2 = copy.deepcopy(parent2)
    
    replace_node(child1, node1, copy.deepcopy(node2))
    replace_node(child2, node2, copy.deepcopy(node1))
    
    return child1, child2

4.3 子树变异(Mutation)

4.3.1 替换变异

随机选择一棵子树,用新生成的随机树替换:

def subtree_mutation(tree, function_set, terminal_set, max_depth=6):
    child = copy.deepcopy(tree)
    mutation_point = random.choice(get_all_nodes(child))
    
    # 生成新的随机子树
    depth = random.randint(0, max_depth)
    new_subtree = grow_initialization(depth, function_set, terminal_set)
    
    replace_subtree(child, mutation_point, new_subtree)
    return child

4.3.2 收缩变异(Shrink Mutation)

将一棵子树替换为随机终止符:

def shrink_mutation(tree, terminal_set):
    child = copy.deepcopy(tree)
    mutation_point = random.choice(get_non_leaf_nodes(child))
    
    replace_node(child, mutation_point, random.choice(terminal_set))
    return child

4.3.3 点变异(Point Mutation)

仅改变单个节点的值:

def point_mutation(tree, function_set, terminal_set, rate=0.01):
    child = copy.deepcopy(tree)
    
    for node in get_all_nodes(child):
        if random.random() < rate:
            if node.is_function():
                node.value = random.choice(function_set)
            else:
                node.value = random.choice(terminal_set)
    
    return child

五、适应度评估

5.1 适应度案例(Fitness Cases)

GP的适应度通过在多个测试案例上的表现来衡量:

class FitnessEvaluator:
    def __init__(self, fitness_cases):
        """
        fitness_cases: [(input1, input2, ..., expected_output), ...]
        """
        self.fitness_cases = fitness_cases
    
    def evaluate(self, tree):
        errors = []
        for inputs, expected in self.fitness_cases:
            try:
                result = tree.execute(*inputs)
                errors.append(abs(result - expected))
            except:
                errors.append(float('inf'))
        
        return {
            'raw_fitness': sum(errors),
            'mean_absolute_error': sum(errors) / len(errors),
            'hits': sum(1 for e in errors if e < 0.01),
        }

5.2 标准化适应度

对于符号回归问题,标准化均方误差(Normalized Mean Squared Error):

标准化版本

Warning

过拟合是GP面临的重要问题。需要使用独立的验证集来评估泛化能力。


六、自动函数发现

6.1 ADF(Automatically Defined Functions)

Koza提出的ADF机制允许进化过程中自动定义和调用子函数:

class ADFTree:
    def __init__(self, main_tree, adf_definitions):
        self.main_tree = main_tree  # 主程序
        self.adf_definitions = adf_definitions  # {name: Tree}
    
    def execute(self, inputs):
        # 准备ADF参数
        adf_args = {f'ARG{i}': inputs[i] for i in range(len(inputs))}
        
        # 计算ADF
        for name, tree in self.adf_definitions.items():
            adf_args[name] = tree.execute(inputs)
        
        # 执行主程序
        return self.main_tree.execute(adf_args)

ADF树结构示例

ADF1(x, y):                       Main Program:
      +                              IF-ZERO
     / \                            /   |   \
    ×   z                          >   ADF1  ADF2
   / \                              |   |     |
  x   y                           x   y     z

6.2 模块化GP架构

class ModularGP:
    def __init__(self, n_adfs=3):
        self.n_adfs = n_adfs
        self.adf_functions = {}
    
    def evolve_with_adfs(self, fitness_cases):
        # 进化主程序和ADF定义
        pass
    
    def call_adf(self, adf_name, args):
        return self.adf_functions[adf_name].execute(args)

七、经典应用案例

7.1 符号回归(Symbolic Regression)

符号回归旨在发现生成数据的数学表达式:

def symbolic_regression_example():
    """
    目标函数: f(x) = sin(x) + x^2
    """
    # 生成训练数据
    x_train = np.linspace(-10, 10, 100)
    y_train = np.sin(x_train) + x_train**2
    
    # 定义函数集和终止符集
    function_set = ['+', '-', '*', '/', 'sin', 'cos', 'exp', 'log']
    terminal_set = ['x', *np.random.uniform(-1, 1, 10)]
    
    # 运行GP
    gp = GeneticProgramming(function_set, terminal_set, fitness_cases)
    best_solution = gp.evolve()
    
    print(f"Discovered: {best_solution.to_expression()}")
    # 输出类似: sin(x) + (x * x)
 
# 示例运行结果
symreg = SymbolicRegressionFitness(
    X=np.linspace(-10, 10, 100),
    y=np.sin(X) + X**2
)

7.2 电路自动设计

Koza在《Genetic Programming III》中展示了GP自动设计电路的能力,包括:

  • 电路拓扑发现
  • 元件参数优化
  • 信号处理滤波器设计
class CircuitDesignGP:
    def __init__(self, circuit_constraints):
        self.constraints = circuit_constraints
        self.available_components = ['R', 'L', 'C', 'Diode', 'Transistor']
    
    def evaluate_circuit(self, circuit_tree):
        """
        评估电路性能指标
        - 频率响应
        - 功耗
        - 噪声特性
        """
        simulation_result = self.simulate(circuit_tree)
        return {
            'fitness': fitness_score(simulation_result),
            'validity': self.check_constraints(circuit_tree),
        }

7.3 游戏策略演化

class GameStrategyGP:
    def __init__(self, game_state_space):
        self.game_states = game_state_space
    
    def create_game_tree(self):
        function_set = ['IF-STATE', 'AND', 'OR', 'NOT']
        terminal_set = [f'S{i}' for i in range(len(self.game_states))]
        return grow_initialization(5, function_set, terminal_set)
    
    def evaluate_strategy(self, strategy_tree):
        wins = 0
        for game_state in self.game_states:
            action = strategy_tree.execute(game_state)
            if self.play_and_win(action, game_state):
                wins += 1
        return wins / len(self.game_states)

八、防止”不可控生长”(Bloat)

8.1 Bloat现象

GP进化过程中,程序树可能无限增长而适应度不再提升,这被称为”代码膨胀”或”bloat”。

8.2 深度限制策略

class DepthLimitedGP:
    def __init__(self, max_depth=17):
        self.max_depth = max_depth
    
    def is_valid_tree(self, tree):
        return get_tree_depth(tree) <= self.max_depth
    
    def crossover_with_limit(self, parent1, parent2):
        child1, child2 = subtree_crossover(parent1, parent2)
        if not self.is_valid_tree(child1):
            return copy.deepcopy(parent1)
        if not self.is_valid_tree(child2):
            return copy.deepcopy(parent2)
        return child1, child2

8.3 惩罚机制

def size_penalized_fitness(raw_fitness, tree, penalty_coef=0.001):
    complexity = tree.count_nodes()
    return raw_fitness + penalty_coef * complexity

8.4 多目标优化

from deap import tools
 
def evolve_multi_objective(population, fitness_cases):
    creator.create("FitnessMulti", base.Fitness, 
                    weights=(1.0, -1.0))  # 最大化适应度,最小化大小
    creator.create("Individual", list, fitness=creator.FitnessMulti)
    
    toolbox = base.Toolbox()
    # ... setup operators ...
    
    NSGA2 algorithm for multi-objective optimization
    population = algorithms.eaMuPlusLambda(
        population, toolbox, mu=500, lambda_=500, 
        cxpb=0.9, mutpb=0.1, ngen=100
    )
    
    return population

九、实用框架与工具

9.1 DEAP框架

from deap import base, creator, tools, algorithms
import random
 
def deap_gp_example():
    creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
    creator.create("Individual", gp.PrimitiveTree, 
                   fitness=creator.FitnessMin)
    
    pset = gp.PrimitiveSet("MAIN", arity=1)
    pset.addPrimitive(operator.add, args=2, name="add")
    pset.addPrimitive(operator.sub, args=2, name="sub")
    pset.addPrimitive(operator.mul, args=2, name="mul")
    pset.addPrimitive(math.sin, args=1)
    pset.addPrimitive(math.cos, args=1)
    
    pset.addEphemeralConstant("rand", lambda: random.uniform(-1, 1))
    pset.addTerminal(1)
    pset.addTerminal(2)
    
    toolbox = base.Toolbox()
    toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=2, max_=6)
    toolbox.register("individual", tools.initIterate, 
                     creator.Individual, toolbox.expr)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("compile", gp.compile, pset=pset)
    
    def evalSymbReg(individual):
        func = toolbox.compile(expr=individual)
        return sum(abs(func(x) - target(x)) for x in points),
    
    toolbox.register("evaluate", evalSymbReg)
    toolbox.register("select", tools.selTournament, tournsize=3)
    toolbox.register("mate", gp.cxOnePoint)
    toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
    toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)
    
    algorithms.eaSimple(toolbox.population(), toolbox, 
                        cxpb=0.9, mutpb=0.1, ngen=100)

十、进阶主题与研究前沿

10.1 几何语义遗传编程(Geometric Semantic GP)

直接对函数的语义空间进行操作,而非语法空间:

def geometric_semantic_crossover(parent1, parent2, training_data):
    """
    几何语义交叉保证子代在训练点上的性能
    不劣于任一父代
    """
    new_tree = create_mixing_tree(parent1, parent2)
    return new_tree
 
def geometric_semantic_mutation(tree, mutation_step=0.5):
    """
    语义变异
    """
    target = random.choice(training_data)
    new_tree = add_random_tree()
    return weighted_sum(tree, new_tree, mutation_step)

10.2 概率增量式遗传编程(Probabilistic Incremental Program Evolution)

class PIPE:
    def __init__(self):
        self.probability_matrix = {}  # 存储概率分布
    
    def update_probabilities(self, population, fitnesses):
        for tree in population:
            for node in tree:
                self.probability_matrix[node] += fitness
    
    def sample_tree(self):
        root = self.sample_node()
        self.expand_node(root)
        return root

十一、GP的深度理论分析

11.1 进化动力学的数学框架

11.1.1 模式定理(Schema Theorem)

Koza将模式定理应用于遗传编程,定义了包含通配符的树形模式(Schema):

模式定义:一个Schema是包含通配符”*“的树结构,表示一类相似的程序。例如,*(x, *)表示所有以任意函数为根、第二个子节点为任意值的树。

模式定理:短阶、低阶、在种群中具有高于平均适应度的模式,在进化过程中将以指数级增长。

class SchemaAnalyzer:
    """模式分析器"""
    
    def __init__(self, primitive_set):
        self.primitive_set = primitive_set
    
    def extract_schemas(self, population):
        """从种群中提取模式"""
        schemas = []
        
        for tree in population:
            schemas.extend(self._generate_schemas(tree, min_holes=1))
        
        return schemas
    
    def _generate_schemas(self, tree, min_holes=1):
        """生成包含指定数量空洞的模式"""
        schemas = []
        
        def traverse(node, schema_template, hole_positions):
            if len(hole_positions) >= min_holes:
                schemas.append({
                    'template': schema_template.copy(),
                    'holes': hole_positions.copy()
                })
            
            for i, child in enumerate(node.children):
                new_template = schema_template + [(node.value, i)]
                new_holes = hole_positions + [(node.value, i)]
                traverse(child, new_template, new_holes)
        
        traverse(tree, [], [])
        return schemas

11.1.2 收敛性分析框架

class GPConvergenceAnalyzer:
    """GP收敛性分析"""
    
    def __init__(self, gp_system):
        self.gp = gp_system
    
    def markov_chain_analysis(self):
        """
        使用马尔可夫链分析GP的收敛性
        状态空间:种群中个体的适应度等级
        """
        n_states = len(self.gp.population) + 1
        P = np.zeros((n_states, n_states))
        
        for i in range(n_states):
            for j in range(n_states):
                P[i, j] = self._transition_prob(i, j)
        
        return P
    
    def absorption_probability(self):
        """计算被最优解吸收的概率"""
        pass
    
    def expected_generations_to_convergence(self):
        """估计收敛的期望代数"""
        pass

11.2 适应度景观分析

class FitnessLandscapeAnalyzer:
    """适应度景观分析"""
    
    def __init__(self, gp_system):
        self.gp = gp_system
    
    def sample_landscape(self, n_samples=1000):
        """采样适应度景观"""
        samples = []
        
        for _ in range(n_samples):
            tree = self.gp.generate_random_tree(max_depth=6)
            fitness = self.gp.evaluate_fitness(tree)
            
            samples.append({
                'tree': tree,
                'fitness': fitness,
                'size': tree.count_nodes(),
                'depth': tree.depth()
            })
        
        return samples
    
    def compute_neutrality(self, tree, neighborhood_size=100):
        """计算中性度"""
        fitness = self.gp.evaluate_fitness(tree)
        neighbors = self.gp.generate_neighbors(tree, neighborhood_size)
        
        neutral_count = 0
        epsilon = 1e-6
        
        for neighbor in neighbors:
            neighbor_fitness = self.gp.evaluate_fitness(neighbor)
            if abs(neighbor_fitness - fitness) < epsilon:
                neutral_count += 1
        
        return neutral_count / len(neighbors) if neighbors else 0
    
    def compute_ruggedness(self, samples):
        """计算景观的粗糙度"""
        distances = []
        fitnesses = []
        
        best_fitness = min(s['fitness'] for s in samples)
        
        for sample in samples:
            dist = self._tree_distance(sample['tree'], 
                                      self._find_best_tree(samples))
            distances.append(dist)
            fitnesses.append(sample['fitness'])
        
        correlation = np.corrcoef(distances, fitnesses)[0, 1]
        return 1 - abs(correlation)

十二、高级应用与技术

12.1 时间序列预测

class GPPredictor:
    """基于GP的时间序列预测"""
    
    def __init__(self, lookback=5, forecast_horizon=1):
        self.lookback = lookback
        self.forecast_horizon = forecast_horizon
    
    def create_terminal_set(self, time_series):
        """创建终端集"""
        terminals = {}
        
        for lag in range(1, self.lookback + 1):
            terminals[f'x{-lag}'] = list(time_series[:-lag])
        
        if len(time_series) > 2:
            diff1 = np.diff(time_series)
            terminals['diff1'] = list(diff1[:-1])
            
            pct_change = np.diff(time_series) / time_series[:-1]
            terminals['pct'] = list(pct_change)
        
        return terminals
    
    def create_training_cases(self, time_series):
        """创建训练案例"""
        cases = []
        X = self.create_terminal_set(time_series)
        n = len(time_series)
        
        for i in range(self.lookback, n - self.forecast_horizon):
            inputs = {}
            for key, values in X.items():
                lag = int(key[1:]) if key.startswith('x') else 0
                idx = i + lag if lag > 0 else i
                if 0 <= idx < len(values):
                    inputs[key] = values[idx]
            
            target = time_series[i + self.forecast_horizon]
            cases.append((inputs, target))
        
        return cases

12.2 图像处理应用

class ImageFilterGP:
    """使用GP进化图像滤波器"""
    
    def __init__(self, image_size=(64, 64)):
        self.image_size = image_size
    
    def create_image_terminals(self):
        """创建图像处理终端集"""
        return {
            'R': 'pixel_r',
            'G': 'pixel_g',
            'B': 'pixel_b',
            'mean_3x3': 'mean_3x3',
            'sobel_x': 'sobel_x',
            'sobel_y': 'sobel_y'
        }
    
    def fitness_function(self, tree, test_images, ground_truth):
        """计算滤波器质量"""
        total_mse = 0
        
        for img, truth in zip(test_images, ground_truth):
            output = self._apply_tree_to_image(tree, img)
            mse = np.mean((output - truth) ** 2)
            total_mse += mse
        
        return total_mse / len(test_images)

12.3 控制器设计

class GPController:
    """使用GP进化控制器"""
    
    def __init__(self, system_model, time_horizon=10):
        self.system = system_model
        self.time_horizon = time_horizon
    
    def create_control_terminals(self):
        """创建控制器终端集"""
        return {
            'e': 'error',
            'de': 'error_derivative',
            'ie': 'error_integral',
            'e_k1': 'error_k_minus_1',
            'u_k1': 'control_k_minus_1',
        }
    
    def fitness_function(self, tree):
        """评估控制器性能"""
        total_cost = 0
        
        for _ in range(10):
            state = self.system.reset()
            error_integral = 0
            
            for t in range(self.time_horizon):
                setpoint = self.system.get_setpoint(t)
                error = setpoint - state
                error_derivative = error - getattr(self, 'prev_error', 0)
                self.prev_error = error
                error_integral += error
                
                context = {
                    'e': error,
                    'de': error_derivative,
                    'ie': error_integral,
                }
                
                try:
                    control = tree.evaluate(context)
                    control = np.clip(control, -10, 10)
                except:
                    control = 0
                
                state = self.system.step(control)
                total_cost += error ** 2
        
        return total_cost

12.4 自动机器学习

class AutoFeatureEngineeringGP:
    """自动特征工程"""
    
    def __init__(self, df, target_column):
        self.df = df
        self.target = target_column
        self.numeric_columns = df.select_dtypes(
            include=[np.number]
        ).columns.tolist()
        self.numeric_columns.remove(target_column)
    
    def create_function_set(self):
        """创建特征工程函数集"""
        return {
            '+': lambda x, y: x + y,
            '-': lambda x, y: x - y,
            '*': lambda x, y: x * y,
            '/': lambda x, y: x / (y + 1e-10),
            'sqrt': lambda x: np.sqrt(np.abs(x)),
            'log': lambda x: np.log(np.abs(x) + 1),
            'exp': lambda x: np.exp(np.clip(x, -10, 10)),
            'abs': lambda x: np.abs(x),
            'sin': lambda x: np.sin(x),
            'cos': lambda x: np.cos(x),
        }
    
    def evolve_features(self, n_features=20, population_size=100):
        """进化特征集"""
        best_features = []
        
        for _ in range(n_features):
            gp = GeneticProgramming(
                function_set=list(self.create_function_set().keys()),
                terminal_set=list(self.create_terminal_set().keys())
            )
            
            fitness_func = lambda tree: self.evaluate_feature(
                tree, self.df, self.df[self.target].values
            )
            
            best_tree = gp.evolve(fitness_func)
            best_features.append(best_tree)
        
        return best_features

十三、性能优化与实践

13.1 计算加速技术

class VectorizedGP:
    """向量化GP"""
    
    def __init__(self, function_set, terminal_set):
        self.function_set = function_set
        self.terminal_set = terminal_set
    
    def vectorized_evaluate(self, tree, X):
        """向量化评估"""
        def eval_node(node):
            if node.is_leaf():
                value = node.value
                if isinstance(value, str) and value in X.columns:
                    return X[value].values
                else:
                    return np.full(len(X), float(value))
            else:
                children_values = [eval_node(child) for child in node.children]
                func = self.function_set[node.value]
                return func(*children_values)
        
        return eval_node(tree)
    
    def batch_fitness(self, population, X, y):
        """批量计算种群适应度"""
        predictions = []
        
        for tree in population:
            pred = self.vectorized_evaluate(tree, X)
            mse = np.mean((pred - y) ** 2)
            predictions.append(mse)
        
        return np.array(predictions)

13.2 分布式GP

from multiprocessing import Pool, cpu_count
 
class ParallelGP:
    """并行GP"""
    
    def __init__(self, n_workers=None):
        self.n_workers = n_workers or max(1, cpu_count() - 1)
    
    def parallel_evaluate(self, population, fitness_func, chunksize=None):
        """并行评估适应度"""
        chunksize = chunksize or max(1, len(population) // self.n_workers)
        
        with Pool(self.n_workers) as pool:
            fitnesses = pool.map(fitness_func, population, chunksize=chunksize)
        
        return np.array(fitnesses)

十四、与其他技术的融合

14.1 GP与神经网络

class GPForNAS:
    """GP用于神经网络架构搜索"""
    
    def __init__(self, search_space):
        self.search_space = search_space
    
    def create_nas_terminal_set(self):
        """NAS终端集"""
        return {
            'conv3x3': 'Conv(kernel=3)',
            'conv5x5': 'Conv(kernel=5)',
            'maxpool': 'MaxPool()',
            'avgpool': 'AvgPool()',
            'fc': 'Dense()',
            'relu': 'ReLU()',
            'dropout': 'Dropout()',
        }
    
    def decode_architecture(self, tree, input_shape):
        """将GP树解码为神经网络架构"""
        from tensorflow import keras
        from tensorflow.keras import layers
        
        model = keras.Sequential()
        model.add(layers.Input(shape=input_shape))
        
        def build_from_tree(node):
            if node.is_leaf():
                layer_type = node.value
                if layer_type.startswith('conv'):
                    kernel = int(layer_type[4])
                    model.add(layers.Conv2D(32, kernel))
                elif layer_type == 'maxpool':
                    model.add(layers.MaxPooling2D())
            else:
                for child in node.children:
                    build_from_tree(child)
        
        build_from_tree(tree)
        return model

14.2 GP与强化学习

class GPforRL:
    """GP与强化学习结合"""
    
    def __init__(self, env):
        self.env = env
    
    def evaluate_policy(self, tree, n_episodes=10):
        """评估策略"""
        total_reward = 0
        
        for _ in range(n_episodes):
            state = self.env.reset()
            episode_reward = 0
            done = False
            
            while not done:
                action = tree.evaluate(state)
                action = np.clip(action, -1, 1)
                
                next_state, reward, done, info = self.env.step(action)
                episode_reward += reward
                state = next_state
            
            total_reward += episode_reward
        
        return total_reward / n_episodes

十五、案例研究:金融预测

class StockPredictor:
    """股价预测GP系统"""
    
    def __init__(self, stock_data, lookback=20):
        self.data = stock_data
        self.lookback = lookback
        self.train_size = int(len(stock_data) * 0.7)
        self.train_data = stock_data[:self.train_size]
        self.test_data = stock_data[self.train_size:]
    
    def create_feature_engineering(self):
        """创建特征工程函数"""
        return {
            '+': lambda x, y: x + y,
            '-': lambda x, y: x - y,
            '*': lambda x, y: x * y,
            '/': lambda x, y: x / (y + 1e-10),
            'sma': lambda x, n: self._sma(x, n),
            'ema': lambda x, n: self._ema(x, n),
            'rsi': lambda x, n: self._rsi(x, n),
        }
    
    def _sma(self, x, n):
        """简单移动平均"""
        result = np.zeros_like(x)
        for i in range(n, len(x)):
            result[i] = np.mean(x[i-n:i])
        return result
    
    def _ema(self, x, n):
        """指数移动平均"""
        alpha = 2 / (n + 1)
        result = np.zeros_like(x)
        result[0] = x[0]
        for i in range(1, len(x)):
            result[i] = alpha * x[i] + (1 - alpha) * result[i-1]
        return result
    
    def _rsi(self, x, n=14):
        """相对强弱指数"""
        deltas = np.diff(x, prepend=x[0])
        gains = np.where(deltas > 0, deltas, 0)
        losses = np.where(deltas < 0, -deltas, 0)
        
        avg_gain = self._sma(gains, n)
        avg_loss = self._sma(losses, n)
        
        rs = avg_gain / (avg_loss + 1e-10)
        return 100 - (100 / (1 + rs))

十六、研究前沿与未来方向

16.1 可解释AI与GP

遗传编程在可解释AI领域展现出独特优势:

class ExplainableGP:
    """可解释的遗传编程"""
 
    def generate_explanation(self, tree):
        """生成人类可读的规则解释"""
        rules = []
 
        def traverse(node):
            if node.value == 'IF_LESS':
                condition = f"{node.children[0].value} < {node.children[1].value}"
                rules.append(f"IF {condition} THEN {self._tree_to_text(node.children[2])}")
            else:
                for child in node.children:
                    traverse(child)
 
        traverse(tree)
        return rules
 
    def feature_importance(self, tree, training_data):
        """计算特征重要性"""
        importance = {}
 
        for terminal in tree.get_terminals():
            original_fitness = self._evaluate(tree)
            modified_tree = tree.remove_subtree(terminal)
            modified_fitness = self._evaluate(modified_tree)
            importance[terminal] = original_fitness - modified_fitness
 
        return sorted(importance.items(), key=lambda x: -x[1])

16.2 迁移学习与GP

class TransferGP:
    """迁移学习GP"""
 
    def extract_knowledge(self, source_tree):
        """从源任务提取知识"""
        important_subtrees = self._identify_important_subtrees(source_tree)
        patterns = [self._subtree_to_pattern(st) for st in important_subtrees]
        return patterns
 
    def inject_knowledge(self, target_tree, patterns):
        """向目标树注入知识"""
        for pattern in patterns:
            insertion_points = self._find_compatible_positions(target_tree, pattern)
            if insertion_points:
                point = random.choice(insertion_points)
                target_tree.insert_subtree(point, pattern)
        return target_tree

16.3 深度学习与传统GP的融合

16.3.1 深度语义GP

class DeepSemanticGP:
    """
    深度语义GP
    结合深度学习表示学习和GP的进化能力
    """
 
    def __init__(self, embedding_dim=128):
        self.embedding_dim = embedding_dim
        self.semantic_encoder = None  # 用于编码树到语义空间
        self.decoder = None  # 用于从语义空间解码
 
    def encode_tree(self, tree):
        """将树编码为语义向量"""
        if self.semantic_encoder is None:
            return self._tree_to_embedding(tree)
 
        return self.semantic_encoder(tree)
 
    def decode_semantic(self, embedding):
        """从语义向量解码为树"""
        if self.decoder is None:
            return self._embedding_to_tree(embedding)
 
        return self.decoder(embedding)
 
    def semantic_crossover(self, parent1, parent2):
        """
        语义交叉:在语义空间中进行交叉操作
        """
        # 编码到语义空间
        sem1 = self.encode_tree(parent1)
        sem2 = self.encode_tree(parent2)
 
        # 在语义空间插值
        alpha = random.random()
        child_sem = alpha * sem1 + (1 - alpha) * sem2
 
        # 解码回树结构
        return self.decode_semantic(child_sem)
 
    def semantic_mutation(self, tree, mutation_strength=0.1):
        """
        语义变异:在语义空间添加噪声
        """
        sem = self.encode_tree(tree)
 
        # 添加高斯噪声
        noise = np.random.randn(self.embedding_dim) * mutation_strength
        mutated_sem = sem + noise
 
        return self.decode_semantic(mutated_sem)

16.3.2 GP强化神经架构搜索

class GPNeuralArchitectureSearch:
    """
    基于GP的神经架构搜索
    进化神经网络架构
    """
 
    def __init__(self, input_shape, num_classes):
        self.input_shape = input_shape
        self.num_classes = num_classes
 
    def create_architecture_space(self):
        """定义架构搜索空间"""
        self.operations = {
            'conv3x3': lambda x, filters: self._conv_block(x, filters, 3),
            'conv5x5': lambda x, filters: self._conv_block(x, filters, 5),
            'conv7x7': lambda x, filters: self._conv_block(x, filters, 7),
            'depthwise_conv3x3': lambda x, filters: self._depthwise_conv(x, 3),
            'maxpool3x3': lambda x: self._maxpool(x, 3),
            'avgpool3x3': lambda x: self._avgpool(x, 3),
            'identity': lambda x: x,
            'se_block': lambda x: self._se_block(x),
            'dropout': lambda x, rate: tf.keras.layers.Dropout(rate)(x),
        }
 
        self.filters_options = [16, 32, 64, 128, 256]
        self.depth_options = [1, 2, 3, 4]
 
    def _conv_block(self, x, filters, kernel_size):
        """卷积块"""
        from tensorflow import keras
        y = keras.layers.Conv2D(filters, kernel_size, padding='same')(x)
        y = keras.layers.BatchNormalization()(y)
        y = keras.layers.ReLU()(y)
        return y
 
    def _depthwise_conv(self, x, kernel_size):
        """深度可分离卷积"""
        from tensorflow import keras
        y = keras.layers.DepthwiseConv2D(kernel_size, padding='same')(x)
        y = keras.layers.BatchNormalization()(y)
        y = keras.layers.ReLU()(y)
        return y
 
    def _maxpool(self, x, pool_size):
        """最大池化"""
        from tensorflow import keras
        return keras.layers.MaxPooling2D(pool_size)(x)
 
    def _avgpool(self, x, pool_size):
        """平均池化"""
        from tensorflow import keras
        return keras.layers.AveragePooling2D(pool_size)(x)
 
    def _se_block(self, x, reduction=16):
        """Squeeze-and-Excitation块"""
        from tensorflow import keras
        channels = x.shape[-1]
        y = keras.layers.GlobalAveragePooling2D()(x)
        y = keras.layers.Dense(channels // reduction, activation='relu')(y)
        y = keras.layers.Dense(channels, activation='sigmoid')(y)
        y = keras.layers.Reshape([1, 1, channels])(y)
        return keras.layers.Multiply()([x, y])
 
    def decode_to_model(self, architecture_tree):
        """
        将架构树解码为Keras模型
        """
        from tensorflow import keras
 
        inputs = keras.Input(shape=self.input_shape)
        x = inputs
 
        def build_from_tree(node, x):
            if node.is_leaf():
                return x
 
            op = node.value
            if op in self.operations:
                if op == 'dropout':
                    x = self.operations[op](x, node.params.get('rate', 0.2))
                else:
                    filters = node.params.get('filters', 64)
                    x = self.operations[op](x, filters)
 
            for child in node.children:
                x = build_from_tree(child, x)
 
            return x
 
        x = build_from_tree(architecture_tree, x)
 
        # 添加分类头
        x = keras.layers.GlobalAveragePooling2D()(x)
        outputs = keras.layers.Dense(self.num_classes, activation='softmax')(x)
 
        model = keras.Model(inputs=inputs, outputs=outputs)
        return model
 
    def fitness_evaluation(self, architecture_tree, train_data, val_data, epochs=10):
        """
        评估架构性能
        """
        model = self.decode_to_model(architecture_tree)
        model.compile(
            optimizer='adam',
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
 
        history = model.fit(
            train_data[0], train_data[1],
            validation_data=val_data,
            epochs=epochs,
            verbose=0
        )
 
        val_acc = history.history['val_accuracy'][-1]
        num_params = model.count_params()
 
        # 权衡准确率和参数量
        return val_acc - 1e-7 * num_params

十七、GP与其他进化算法的混合

17.1 GP与遗传算法的混合

class HybridGPGA:
    """
    GP与GA混合算法
    GA用于优化数值参数,GP用于演化结构
    """
 
    def __init__(self):
        self.gp_population = []
        self.ga_population = []
 
    def initialize(self, pop_size, tree_depth, param_dim):
        """初始化混合种群"""
        # GP种群:树结构
        for _ in range(pop_size):
            tree = self._random_tree(tree_depth)
            self.gp_population.append(tree)
 
        # GA种群:数值参数
        for _ in range(pop_size):
            params = np.random.randn(param_dim)
            self.ga_population.append(params)
 
    def evaluate_hybrid(self, tree, params, fitness_cases):
        """
        评估混合解
        """
        total_error = 0
 
        for inputs, expected in fitness_cases:
            # 使用树计算结构
            result = tree.evaluate(inputs)
 
            # 应用GA优化的参数进行后处理
            processed = self._apply_params(result, params)
 
            total_error += abs(processed - expected) ** 2
 
        return 1.0 / (1 + total_error)
 
    def _apply_params(self, result, params):
        """应用GA参数"""
        return result * (1 + params[0]) + params[1]
 
    def evolve_generation(self, fitness_cases, crossover_rate=0.8, mutation_rate=0.1):
        """进化一代"""
        # 评估
        fitnesses = []
        for tree, params in zip(self.gp_population, self.ga_population):
            fitness = self.evaluate_hybrid(tree, params, fitness_cases)
            fitnesses.append(fitness)
 
        # 选择
        selected_indices = self._tournament_select(fitnesses, k=3, n=len(fitnesses)//2)
 
        # GP交叉/变异
        new_gp_pop = []
        for _ in range(len(self.gp_population)):
            if random.random() < crossover_rate:
                parent1 = random.choice(selected_indices)
                parent2 = random.choice(selected_indices)
                child = self._gp_crossover(
                    self.gp_population[parent1],
                    self.gp_population[parent2]
                )
            else:
                child = random.choice(selected_indices)
 
            if random.random() < mutation_rate:
                child = self._gp_mutate(child)
 
            new_gp_pop.append(child)
 
        # GA交叉/变异
        new_ga_pop = []
        for _ in range(len(self.ga_population)):
            if random.random() < crossover_rate:
                parent1 = random.choice(selected_indices)
                parent2 = random.choice(selected_indices)
                child = self._ga_crossover(
                    self.ga_population[parent1],
                    self.ga_population[parent2]
                )
            else:
                child = random.choice(selected_indices)
 
            if random.random() < mutation_rate:
                child = self._ga_mutate(child)
 
            new_ga_pop.append(child)
 
        self.gp_population = new_gp_pop
        self.ga_population = new_ga_pop
 
    def _gp_crossover(self, parent1, parent2):
        """GP树交叉"""
        # 使用子树交叉
        return subtree_crossover(parent1, parent2)
 
    def _gp_mutate(self, tree):
        """GP树变异"""
        # 使用子树变异
        return subtree_mutation(tree)
 
    def _ga_crossover(self, parent1, parent2):
        """GA交叉"""
        alpha = random.random()
        return alpha * parent1 + (1 - alpha) * parent2
 
    def _ga_mutate(self, individual):
        """GA变异"""
        noise = np.random.randn(len(individual)) * 0.1
        return individual + noise
 
    def _tournament_select(self, fitnesses, k=3, n=None):
        """锦标赛选择"""
        n = n or len(fitnesses)
        selected = []
 
        for _ in range(n):
            tournament = random.sample(range(len(fitnesses)), k)
            winner = max(tournament, key=lambda i: fitnesses[i])
            selected.append(winner)
 
        return selected

17.2 GP与粒子群优化的混合

class HybridGPPSO:
    """
    GP与PSO混合算法
    PSO用于优化树中数值常量的位置
    """
 
    def __init__(self, function_set, terminal_set):
        self.function_set = function_set
        self.terminal_set = terminal_set
 
    def extract_constants(self, tree):
        """提取树中的数值常量"""
        constants = []
        positions = []
 
        def traverse(node, path):
            if node.is_leaf():
                if isinstance(node.value, (int, float)):
                    constants.append(node.value)
                    positions.append(path)
            else:
                for i, child in enumerate(node.children):
                    traverse(child, path + [i])
 
        traverse(tree, [])
        return constants, positions
 
    def insert_constants(self, tree, positions, new_constants):
        """将新常量插入树"""
        # 重建树
        return self._rebuild_tree(tree, positions, new_constants)
 
    def optimize_constants(self, tree, fitness_cases, n_particles=20, max_iter=50):
        """
        使用PSO优化树中的数值常量
        """
        constants, positions = self.extract_constants(tree)
 
        if not constants:
            return tree
 
        dim = len(constants)
 
        # PSO优化常量
        pso = ParticleSwarmOptimizer(
            objective=lambda c: self._evaluate_constants(tree, c, fitness_cases, positions),
            dim=dim,
            bounds=[(min(constants), max(constants))] * dim,
            n_particles=n_particles
        )
 
        best_constants, _ = pso.run()
 
        # 更新树中的常量
        return self.insert_constants(tree, positions, best_constants)
 
    def _evaluate_constants(self, tree, constants, fitness_cases, positions):
        """评估常量配置"""
        modified_tree = self.insert_constants(tree, positions, constants)
 
        total_error = 0
        for inputs, expected in fitness_cases:
            result = modified_tree.evaluate(inputs)
            total_error += abs(result - expected) ** 2
 
        return total_error

17.3 GP与差分进化的混合

class HybridGPDE:
    """
    GP与DE混合算法
    DE用于演化树结构的关键参数
    """
 
    def __init__(self, function_set, terminal_set):
        self.function_set = function_set
        self.terminal_set = terminal_set
 
    def identify_key_parameters(self, tree):
        """
        识别树中的关键可调参数
        例如:循环次数、条件阈值等
        """
        key_params = []
 
        def traverse(node, context):
            if node.value in ['LOOP', 'REPEAT', 'IF']:
                # 这些节点有可调参数
                key_params.append({
                    'node': node,
                    'path': context.copy(),
                    'type': node.value
                })
 
            for i, child in enumerate(node.children):
                traverse(child, context + [i])
 
        traverse(tree, [])
        return key_params
 
    def encode_parameters(self, tree):
        """将关键参数编码为向量"""
        key_params = self.identify_key_parameters(tree)
        vector = []
 
        for param in key_params:
            if param['type'] == 'LOOP':
                vector.append(param['node'].children[0].value if len(param['node'].children) > 0 else 5)
            elif param['type'] == 'IF':
                threshold = 0.0  # 默认阈值
                vector.append(threshold)
 
        return np.array(vector)
 
    def decode_parameters(self, tree, param_vector):
        """将参数向量解码回树"""
        key_params = self.identify_key_parameters(tree)
 
        for i, param in enumerate(key_params):
            if i < len(param_vector):
                param['node'].param_value = param_vector[i]
 
        return tree
 
    def hybrid_evolve(self, fitness_cases, n_generations=100):
        """混合进化主循环"""
        # 初始化种群
        trees = [self._random_tree(max_depth=6) for _ in range(50)]
 
        for gen in range(n_generations):
            for i, tree in enumerate(trees):
                # 提取参数向量
                params = self.encode_parameters(tree)
 
                # 使用DE优化参数
                de = DifferentialEvolution(
                    objective=lambda p: self._evaluate_with_params(tree, p, fitness_cases),
                    bounds=[(0, 20)] * len(params),
                    NP=20,
                    max_iter=20
                )
                de.initialize()
 
                for _ in range(20):
                    de.step()
 
                # 更新树的参数
                trees[i] = self.decode_parameters(tree, de.best_solution)
 
            # 对树进行标准GP操作
            trees = self._gp_selection(trees, fitness_cases)
            trees = self._gp_crossover(trees)
            trees = self._gp_mutation(trees)
 
        return max(trees, key=lambda t: self._evaluate_tree(t, fitness_cases))
 
    def _evaluate_with_params(self, tree, params, fitness_cases):
        """评估带有参数的树"""
        modified_tree = self.decode_parameters(tree, params)
 
        total_error = 0
        for inputs, expected in fitness_cases:
            result = modified_tree.evaluate(inputs)
            total_error += abs(result - expected) ** 2
 
        return total_error

十八、高级应用案例

18.1 自动定理证明

class GPTheoremProver:
    """
    使用GP进行自动定理证明
    进化逻辑表达式
    """
 
    def __init__(self):
        self.logic_functions = {
            'AND': lambda x, y: x and y,
            'OR': lambda x, y: x or y,
            'NOT': lambda x: not x,
            'IMPLIES': lambda x, y: (not x) or y,
            'IFF': lambda x, y: x == y,
        }
 
    def create_logical_terminals(self, propositions):
        """创建逻辑终端"""
        terminals = {}
        for prop in propositions:
            terminals[prop] = prop
        return terminals
 
    def evaluate_formula(self, tree, truth_assignment):
        """评估公式在给定真值赋值下的结果"""
        def eval_node(node):
            if node.is_leaf():
                return truth_assignment[node.value]
            else:
                func = self.logic_functions[node.value]
                args = [eval_node(child) for child in node.children]
                return func(*args)
 
        return eval_node(tree)
 
    def check_tautology(self, tree, all_assignments):
        """检查公式是否为重言式"""
        results = []
        for assignment in all_assignments:
            result = self.evaluate_formula(tree, assignment)
            results.append(result)
 
        return all(results)
 
    def check_contradiction(self, tree, all_assignments):
        """检查公式是否为矛盾式"""
        results = []
        for assignment in all_assignments:
            result = self.evaluate_formula(tree, assignment)
            results.append(result)
 
        return not any(results)
 
    def find_equivalent_formulas(self, target_tree, n_propositions, search_space):
        """找到与目标公式等价的公式"""
        equivalents = []
 
        for candidate in search_space:
            if self.check_logical_equivalence(target_tree, candidate, n_propositions):
                equivalents.append(candidate)
 
        return equivalents
 
    def check_logical_equivalence(self, tree1, tree2, n_propositions):
        """检查两个公式是否逻辑等价"""
        all_assignments = self._generate_all_assignments(n_propositions)
 
        for assignment in all_assignments:
            result1 = self.evaluate_formula(tree1, assignment)
            result2 = self.evaluate_formula(tree2, assignment)
 
            if result1 != result2:
                return False
 
        return True
 
    def _generate_all_assignments(self, n):
        """生成所有可能的真值赋值"""
        assignments = []
        for i in range(2 ** n):
            assignment = {}
            for j in range(n):
                assignment[f'p{j}'] = bool((i >> j) & 1)
            assignments.append(assignment)
        return assignments

18.2 游戏AI设计

class GPGameAI:
    """
    使用GP设计游戏AI
    进化游戏策略树
    """
 
    def __init__(self, game_env):
        self.game = game_env
 
        self.condition_functions = {
            'HP_LESS': lambda state, threshold: state['hp'] < threshold,
            'HP_GREATER': lambda state, threshold: state['hp'] > threshold,
            'ENEMY_NEAR': lambda state: state['enemy_distance'] < 5,
            'ENEMY_FAR': lambda state: state['enemy_distance'] > 10,
            'HAS_ITEM': lambda state, item: item in state['inventory'],
            'CAN_ATTACK': lambda state: state['can_attack'],
        }
 
        self.action_functions = {
            'ATTACK': lambda state: ('action', 'attack'),
            'DEFEND': lambda state: ('action', 'defend'),
            'HEAL': lambda state: ('action', 'heal'),
            'MOVE_TO': lambda state, x, y: ('action', 'move', x, y),
            'USE_ITEM': lambda state, item: ('action', 'use_item', item),
            'WAIT': lambda state: ('action', 'wait'),
        }
 
    def create_game_terminals(self):
        """创建游戏终端集"""
        terminals = {
            'hp': 'state_hp',
            'mp': 'state_mp',
            'enemy_hp': 'enemy_hp',
            'enemy_distance': 'enemy_distance',
            'x': 'position_x',
            'y': 'position_y',
        }
 
        # 添加数值常量
        for val in [0, 1, 5, 10, 20, 50, 100]:
            terminals[str(val)] = val
 
        return terminals
 
    def evaluate_strategy(self, strategy_tree, n_episodes=20):
        """评估策略的性能"""
        total_score = 0
        total_wins = 0
 
        for _ in range(n_episodes):
            score, won = self._play_episode(strategy_tree)
            total_score += score
            total_wins += won
 
        return {
            'avg_score': total_score / n_episodes,
            'win_rate': total_wins / n_episodes,
            'fitness': (total_score / n_episodes) + 100 * (total_wins / n_episodes)
        }
 
    def _play_episode(self, strategy_tree):
        """播放一局游戏"""
        state = self.game.reset()
        total_score = 0
        max_steps = 1000
 
        for step in range(max_steps):
            # 从策略树获取动作
            action = self._get_action(strategy_tree, state)
 
            # 执行动作
            next_state, reward, done, info = self.game.step(action)
            total_score += reward
            state = next_state
 
            if done:
                break
 
        won = info.get('won', False)
        return total_score, won
 
    def _get_action(self, strategy_tree, state):
        """从策略树获取动作"""
        def eval_node(node):
            if node.is_leaf():
                if node.value in state:
                    return state[node.value]
                elif node.value in self.condition_functions:
                    # 返回条件函数
                    return node.value
                else:
                    return node.value
            else:
                value = node.value
 
                if value in self.condition_functions:
                    # 评估条件
                    args = [eval_node(child) for child in node.children]
                    func = self.condition_functions[value]
 
                    if len(args) == 1:
                        return func(state, args[0])
                    else:
                        return func(state)
 
                elif value in self.action_functions:
                    # 评估动作
                    args = [eval_node(child) for child in node.children]
                    func = self.action_functions[value]
 
                    if len(args) == 0:
                        return func(state)
                    else:
                        return func(state, *args)
 
                else:
                    # 其他函数
                    args = [eval_node(child) for child in node.children]
                    return self._apply_function(value, *args)
 
        result = eval_node(strategy_tree)
 
        # 解析动作
        if isinstance(result, tuple) and result[0] == 'action':
            return result[1:]
 
        return ('wait',)

18.3 音乐生成

class GPMusicGenerator:
    """
    使用GP生成音乐
    进化音乐模式
    """
 
    def __init__(self, key='C', scale='major'):
        self.key = key
        self.scale = scale
        self.scale_intervals = self._get_scale_intervals(scale)
        self.key_offset = self._note_to_offset(key)
 
    def _get_scale_intervals(self, scale):
        """获取音阶的音程"""
        scales = {
            'major': [0, 2, 4, 5, 7, 9, 11],
            'minor': [0, 2, 3, 5, 7, 8, 10],
            'pentatonic': [0, 2, 4, 7, 9],
            'blues': [0, 3, 5, 6, 7, 10],
        }
        return scales.get(scale, scales['major'])
 
    def _note_to_offset(self, note):
        """将音符转换为偏移量"""
        notes = {'C': 0, 'D': 2, 'E': 4, 'F': 5, 'G': 7, 'A': 9, 'B': 11}
        return notes.get(note[0].upper(), 0)
 
    def create_music_functions(self):
        """创建音乐生成函数"""
        return {
            'note': lambda pitch, octave: self._create_note(pitch, octave),
            'chord': lambda *notes: self._create_chord(notes),
            'sequence': lambda *elements: self._create_sequence(elements),
            'arpeggio': lambda notes, direction: self._create_arpeggio(notes, direction),
            'rest': lambda duration: self._create_rest(duration),
            'scale_up': lambda start, length: self._scale_movement(start, length, 1),
            'scale_down': lambda start, length: self._scale_movement(start, length, -1),
            'sequence_repeat': lambda seq, times: self._repeat_sequence(seq, times),
        }
 
    def _create_note(self, pitch, octave):
        """创建音符"""
        pitch_offset = {'C': 0, 'D': 1, 'E': 2, 'F': 3, 'G': 4, 'A': 5, 'B': 6}
        midi = 12 * (octave + 1) + pitch_offset.get(pitch, 0)
        return {'type': 'note', 'midi': midi, 'duration': 1}
 
    def _create_chord(self, notes):
        """创建和弦"""
        return {'type': 'chord', 'notes': notes}
 
    def _create_sequence(self, elements):
        """创建序列"""
        return {'type': 'sequence', 'elements': elements}
 
    def _create_arpeggio(self, notes, direction):
        """创建琶音"""
        return {'type': 'arpeggio', 'notes': notes, 'direction': direction}
 
    def _create_rest(self, duration):
        """创建休止"""
        return {'type': 'rest', 'duration': duration}
 
    def _scale_movement(self, start, length, direction):
        """创建音阶进行"""
        sequence = []
        for i in range(length):
            midi = start + i * direction
            sequence.append({'type': 'note', 'midi': midi, 'duration': 1})
        return {'type': 'sequence', 'elements': sequence}
 
    def _repeat_sequence(self, seq, times):
        """重复序列"""
        return {'type': 'sequence', 'elements': seq['elements'] * times}
 
    def fitness_function(self, tree, reference_styles=None):
        """
        评估生成音乐的质量
        """
        music = self._evaluate_tree(tree)
 
        # 评估多个维度
        melody_score = self._evaluate_melody(music)
        harmony_score = self._evaluate_harmony(music)
        rhythm_score = self._evaluate_rhythm(music)
 
        if reference_styles:
            style_score = self._evaluate_style(music, reference_styles)
        else:
            style_score = 0
 
        return melody_score + harmony_score + rhythm_score + style_score
 
    def _evaluate_melody(self, music):
        """评估旋律质量"""
        notes = self._extract_notes(music)
 
        if len(notes) < 2:
            return 0
 
        # 评估音域
        pitches = [n['midi'] for n in notes]
        range_score = min(1.0, (max(pitches) - min(pitches)) / 12)
 
        # 评估级进与跳进的平衡
        intervals = [abs(pitches[i+1] - pitches[i]) for i in range(len(pitches)-1)]
        stepwise = sum(1 for i in intervals if i <= 2)
        leap = sum(1 for i in intervals if i > 4)
        interval_score = stepwise / (stepwise + leap + 1)
 
        return 0.5 * range_score + 0.5 * interval_score
 
    def _evaluate_harmony(self, music):
        """评估和声质量"""
        # 检查和弦进行
        chords = self._extract_chords(music)
 
        if not chords:
            return 0
 
        # 简单的和声评估:检查和弦是否协调
        harmony_score = 0
        for chord in chords:
            if self._is_consonant(chord):
                harmony_score += 1
 
        return harmony_score / max(1, len(chords))
 
    def _is_consonant(self, chord):
        """检查和弦是否协调"""
        # 简化版本:检查音程是否为协和音程
        consonant_intervals = {0, 3, 4, 5, 7, 8, 9, 12}
        return True  # 简化实现
 
    def _evaluate_rhythm(self, music):
        """评估节奏质量"""
        notes = self._extract_notes(music)
 
        # 评估节奏的规律性
        durations = [n.get('duration', 1) for n in notes]
 
        # 检查是否有重复的节奏型
        patterns = []
        for i in range(len(durations) - 3):
            pattern = tuple(durations[i:i+4])
            patterns.append(pattern)
 
        unique_patterns = len(set(patterns))
        pattern_score = unique_patterns / max(1, len(patterns))
 
        return pattern_score

十九、实用工具与框架

19.1 DEAP框架详解

"""
DEAP框架完整示例
DEAP是最流行的Python遗传编程框架
"""
 
from deap import base, creator, tools, algorithms, gp
import operator
import random
import numpy as np
 
def deap_gp_complete_example():
    """
    DEAP GP完整示例:符号回归
    """
 
    # 定义原始集
    pset = gp.PrimitiveSetTyped("MAIN", [float], float)
 
    # 重命名参数
    pset.renameArguments(ARG0='x')
 
    # 添加算术运算
    def protectedDiv(left, right):
        try:
            return left / right
        except ZeroDivisionError:
            return 1
 
    pset.addPrimitive(operator.add, [float, float], float, name="add")
    pset.addPrimitive(operator.sub, [float, float], float, name="sub")
    pset.addPrimitive(operator.mul, [float, float], float, name="mul")
    pset.addPrimitive(protectedDiv, [float, float], float, name="div")
    pset.addPrimitive(operator.neg, [float], float, name="neg")
    pset.addPrimitive(math.cos, [float], float, name="cos")
    pset.addPrimitive(math.sin, [float], float, name="sin")
 
    # 添加常量和Ephemeral随机常量
    pset.addEphemeralConstant("rand100", lambda: random.uniform(-100, 100), float)
    pset.addTerminal(1.0, float)
    pset.addTerminal(2.0, float)
 
    # 创建适应度类(最小化)
    creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
    creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)
 
    # 创建工具箱
    toolbox = base.Toolbox()
 
    # 表达式生成
    toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=2, max_=6)
    toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("compile", gp.compile, pset=pset)
 
    # 遗传操作
    toolbox.register("evaluate", evalSymbReg, pset=pset)
    toolbox.register("select", tools.selTournament, tournsize=3)
    toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
    toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)
 
    # 交叉(确保语法有效)
    toolbox.register("mate", gp.cxOnePoint)
 
    # 统计
    stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
    stats_size = tools.Statistics(len)
    mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
    mstats.register("avg", np.mean)
    mstats.register("std", np.std)
    mstats.register("min", np.min)
    mstats.register("max", np.max)
 
    # 算法参数
    pop = toolbox.population(n=300)
    hof = tools.HallOfFame(1)
 
    # 运行算法
    final_pop, log = algorithms.eaSimple(
        pop, toolbox,
        cxpb=0.9, mutpb=0.1,
        ngen=40,
        stats=mstats,
        halloffame=hof,
        verbose=True
    )
 
    return final_pop, log, hof
 
 
def evalSymbReg(individual, pset):
    """
    评估符号回归个体
    目标函数: x^4 + x^3 + x^2 + x
    """
    # 编译表达式为函数
    func = gp.compile(individual, pset)
 
    # 测试数据
    x = [random.uniform(-1, 1) for _ in range(100)]
 
    # 计算均方误差
    sqerrors = []
    for xi in x:
        try:
            y = (xi ** 4 + xi ** 3 + xi ** 2 + xi)  # 目标函数
            sqerrors.append((func(xi) - y) ** 2)
        except Exception:
            sqerrors.append(1000000)  # 惩罚无效表达式
 
    return (math.fsum(sqerrors) / len(x),)
 
 
def deap_classification_example():
    """
    DEAP GP分类示例
    """
 
    # 创建PrimitiveSet
    pset = gp.PrimitiveSetTyped("MAIN", [bool, bool], bool)
    pset.renameArguments(ARG0='x', ARG1='y')
 
    # 布尔函数
    pset.addPrimitive(operator.and_, [bool, bool], bool)
    pset.addPrimitive(operator.or_, [bool, bool], bool)
    pset.addPrimitive(operator.not_, [bool], bool)
 
    # Ephemeral常量
    pset.addEphemeralConstant("randBool", lambda: random.choice([True, False]))
 
    # 适应度
    creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
    creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)
 
    toolbox = base.Toolbox()
    toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=2, max_=4)
    toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
    toolbox.register("compile", gp.compile, pset=pset)
    toolbox.register("evaluate", evalParity, pset=pset)
    toolbox.register("select", tools.selTournament, tournsize=3)
    toolbox.register("mate", gp.cxOnePoint)
    toolbox.register("mutate", gp.mutUniform, expr=gp.genFull(pset=pset))
 
    # 多目标版本
    creator.create("FitnessMulti", base.Fitness, weights=(1.0, -0.5))
    creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMulti)
 
 
def evalParity(individual, pset):
    """
    奇偶校验问题
    5输入奇偶函数
    """
    func = gp.compile(individual, pset)
 
    # 生成所有可能的输入
    n_samples = 32
    correct = 0
 
    for i in range(n_samples):
        inputs = [(i >> j) & 1 == 1 for j in range(5)]
 
        # 计算奇偶
        parity = sum(inputs) % 2 == 1
 
        try:
            if func(*inputs) == parity:
                correct += 1
        except Exception:
            pass
 
    # 适应度 = 正确率
    accuracy = correct / n_samples
 
    # 复杂度惩罚
    complexity = len(individual) / 100.0
 
    return (accuracy, -complexity)

19.2 PyTorch-GP集成

"""
PyTorch与GP集成示例
使用GP生成神经网络模块
"""
 
import torch
import torch.nn as nn
from deap import gp
 
class PyTorchGPBridge:
    """
    PyTorch与GP的桥梁
    """
 
    def __init__(self):
        self.layer_templates = {
            'linear': lambda in_feat, out_feat: nn.Linear(in_feat, out_feat),
            'conv2d': lambda in_ch, out_ch, kernel: nn.Conv2d(in_ch, out_ch, kernel),
            'relu': lambda: nn.ReLU(),
            'sigmoid': lambda: nn.Sigmoid(),
            'tanh': lambda: nn.Tanh(),
            'dropout': lambda p: nn.Dropout(p),
            'batchnorm1d': lambda: nn.BatchNorm1d,
            'batchnorm2d': lambda: nn.BatchNorm2d,
        }
 
    def gp_tree_to_module(self, tree, input_dim):
        """
        将GP树转换为PyTorch模块
        """
        class DynamicModule(nn.Module):
            def __init__(self):
                super().__init__()
                self.layers = []
                self._build_from_tree(tree)
 
            def _build_from_tree(self, node, input_channels=None):
                if node.is_leaf():
                    return
 
                value = node.value
 
                if value in self.layer_templates:
                    layer = self._create_layer(value, node, input_channels)
                    if layer is not None:
                        self.layers.append(layer)
 
                for child in node.children:
                    self._build_from_tree(child, input_channels)
 
            def _create_layer(self, layer_type, node, input_channels):
                if layer_type == 'linear':
                    out_feat = node.params.get('out_features', 64)
                    return nn.Linear(input_channels or 784, out_feat)
                elif layer_type == 'relu':
                    return nn.ReLU()
                elif layer_type == 'dropout':
                    p = node.params.get('p', 0.5)
                    return nn.Dropout(p)
                return None
 
            def forward(self, x):
                for layer in self.layers:
                    x = layer(x)
                return x
 
        return DynamicModule()
 
    def evaluate_genome(self, tree, train_loader, test_loader, device='cuda'):
        """
        评估GP生成的模型
        """
        model = self.gp_tree_to_module(tree, input_dim=784)
        model = model.to(device)
 
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
 
        # 训练
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            if batch_idx >= 50:  # 快速评估
                break
 
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data.view(data.size(0), -1))
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
 
        # 评估
        model.eval()
        correct = 0
        total = 0
 
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = model(data.view(data.size(0), -1))
                _, predicted = torch.max(output.data, 1)
                total += target.size(0)
                correct += (predicted == target).sum().item()
 
        accuracy = correct / total
        return accuracy

二十、性能基准与最佳实践

20.1 基准测试函数集

class GPBenchmarkFunctions:
    """
    GP标准基准测试函数
    """
 
    @staticmethod
    def pagie_polynomial(x, y):
        """
        Pagie多项式
        f(x,y) = 1/(1+x^-4) + 1/(1+y^-4)
        """
        return 1 / (1 + x**(-4)) + 1 / (1 + y**(-4))
 
    @staticmethod
    def poly_metric(x, y):
        """
        多项式度量
        f(x,y) = x^4 + y^4
        """
        return x**4 + y**4
 
    @staticmethod
    def rational_metric(x, y):
        """
        有理函数
        f(x,y) = (x^2 + 2xy + y^2) / (x^2 - 2xy + y^2)
        """
        denominator = x**2 - 2*x*y + y**2
        if abs(denominator) < 1e-10:
            return 1e10
        return (x**2 + 2*x*y + y**2) / denominator
 
    @staticmethod
    def transcendental(x, y):
        """
        超越函数
        f(x,y) = sin(x) + cos(y)
        """
        return np.sin(x) + np.cos(y)
 
    @staticmethod
    def vladislavleva(x1, x2, x3, x4):
        """
        Vladislavleva-4函数
        5维函数,强烈多峰
        """
        return (x1 - 3) * (4 - 3*x1**2) + \
               (x2 - x3**2) * (1 - x2**2) + \
               (x4 - 1) * (1 - x4**2)

20.2 参数推荐配置

问题类型种群大小深度限制交叉率变异率锦标赛规模
符号回归500-10006-80.8-0.90.1-0.23-7
分类500-10005-70.8-0.90.1-0.23-5
自动编程1000-50008-120.7-0.80.2-0.35-10
控制器设计500-10006-100.8-0.90.1-0.23-5

20.3 常见问题解决方案

问题症状解决方案
过拟合训练误差低,测试误差高增加验证集,使用更小的树,使用复杂度惩罚
欠拟合训练和测试误差都高增加种群多样性,增加树深度,改进函数集
代码膨胀树大小失控增长使用深度限制,使用大小惩罚,使用收缩变异
早熟收敛适应度停滞增加变异率,使用多种群,增加多样性选择
语法无效交叉后产生无效树使用保证语法的交叉算子,修复机制

二十一、参考文献

  1. Koza, J. R. (1992). Genetic Programming: On the Programming of Computers by Means of Natural Selection. MIT Press.
  2. Koza, J. R. (1994). Genetic Programming II: Automatic Discovery of Reusable Programs. MIT Press.
  3. Koza, J. R., et al. (1999). Genetic Programming III: Darwinian Invention and Problem Solving. Morgan Kaufmann.
  4. Poli, R., Langdon, W. B., & McPhee, N. F. (2008). A Field Guide to Genetic Programming. Lulu.com.
  5. Banzhaf, W., et al. (1998). Genetic Programming: An Introduction. Morgan Kaufmann.
  6. Langdon, W. B., & Poli, R. (2002). Foundations of Genetic Programming. Springer.
  7. Vanneschi, L., & Castelli, M. (2019). “A Survey of Semantic Genetic Programming.” Frontiers in Robotics and AI, 6, 37.
  8. Moraglio, A., et al. (2012). “Geometric Semantic Genetic Programming.” PPSN XII, 21-31.
  9. Forrest, S., & Mitchell, M. (1993). “What Makes a Problem Hard for Genetic Programming?” Proceedings of ICGA, 1993.
  10. Luke, S., & Spector, L. (1998). “A Revised Comparison of Crossover and Mutation in Genetic Programming.” Proceedings of GP, 1998.
  11. Cramer, N. L. (1985). “A Representation for the Adaptive Generation of Simple Sequential Programs.” Proceedings of ICGA, 1985.
  12. Montana, D. J. (1995). “Strongly Typed Genetic Programming.” Evolutionary Computation, 3(2), 199-230.
  13. Ryan, C., & O’Neill, M. (1998). “Grammatical Evolution: Evolving Programs in an Arbitrary Language.” Proceedings of GP, 1998.
  14. Spector, L., & Robinson, A. (2002). “Genetic Programming and Autoconstructive Evolution with the Push Programming Language.” Genetic Programming and Evolvable Machines, 3(1), 7-40.
  15. Koza, J. R. (2010). “Human-Competitive Results Produced by Genetic Programming.” Genetic Programming and Evolvable Machines, 11(3-4), 251-284.

相关文档