关键词

关键词说明
Plugin Architecture插件架构设计
Python SDKPython 开发工具包
Event System事件系统核心
Hook Points钩子注入点
Plugin Manifest插件清单配置
Sandboxed Execution沙箱安全执行
Lifecycle Hooks生命周期钩子
Dependency Injection依赖注入模式
Semantic Versioning语义化版本控制
PyPI DistributionPython 包分发

概述

OpenClaw 的插件系统是实现其高度可扩展性的核心机制。通过标准化的插件接口,开发者可以为 OpenClaw 添加新的工具、能力、平台适配器、记忆处理器等组件,而无需修改核心代码。插件系统采用「发现-加载-注册」的三阶段架构,支持热加载、依赖管理和版本控制。

设计原则

OpenClaw 插件系统遵循「最小惊讶原则」:插件应该能以最少的配置工作,同时允许深度定制。核心 API 保持稳定,扩展点清晰文档化。


插件架构设计

核心抽象

# openclaw/plugins/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Type, Callable
from enum import Enum
import asyncio
 
class PluginType(Enum):
    """插件类型枚举"""
    TOOL = "tool"                    # 工具类插件
    PLATFORM = "platform"            # 平台适配器
    MEMORY = "memory"                # 记忆处理器
    MIDDLEWARE = "middleware"       # 中间件
    TRANSFORMER = "transformer"     # 数据转换器
    ANALYTICS = "analytics"          # 分析统计
 
class PluginState(Enum):
    """插件状态"""
    DISCOVERED = "discovered"
    LOADING = "loading"
    LOADED = "loaded"
    ACTIVE = "active"
    DISABLED = "disabled"
    ERROR = "error"
 
@dataclass
class PluginMetadata:
    """插件元数据"""
    name: str
    version: str
    author: str
    description: str
    homepage: str = ""
    license: str = "MIT"
    plugin_type: PluginType
    dependencies: List[str] = field(default_factory=list)
    compatible_versions: str = ">=1.0.0"
    config_schema: Dict[str, Any] = field(default_factory=dict)
    permissions: List[str] = field(default_factory=list)
    
    def validate_version(self, current: str) -> bool:
        """验证版本兼容性"""
        # 使用 semver 规范验证
        from packaging import version
        try:
            constraints = self.compatible_versions
            return version.parse(current) in version.SpecifierSet(constraints)
        except:
            return True
 
class PluginContext:
    """插件运行时上下文"""
    
    def __init__(self, plugin: "Plugin", config: Dict[str, Any]):
        self.plugin = plugin
        self.config = config
        self.state = PluginState.DISCOVERED
        self.logger = self._setup_logger()
        self._services: Dict[str, Any] = {}
        
    def register_service(self, name: str, service: Any):
        """注册服务供其他插件使用"""
        self._services[name] = service
        
    def get_service(self, name: str) -> Optional[Any]:
        """获取已注册服务"""
        return self._services.get(name)
    
    def emit_event(self, event_name: str, **kwargs):
        """触发事件"""
        return self.plugin.manager.emit_event(event_name, self, **kwargs)
    
    def _setup_logger(self):
        import logging
        logger = logging.getLogger(f"plugin.{self.plugin.metadata.name}")
        logger.setLevel(logging.DEBUG)
        return logger
 
class Plugin(ABC):
    """插件基类"""
    
    def __init__(self):
        self.metadata: Optional[PluginMetadata] = None
        self.context: Optional[PluginContext] = None
        self.manager: Optional["PluginManager"] = None
    
    @abstractmethod
    async def on_load(self) -> bool:
        """插件加载时调用,返回是否加载成功"""
        pass
    
    @abstractmethod
    async def on_unload(self):
        """插件卸载时调用,用于清理资源"""
        pass
    
    async def on_enable(self):
        """插件启用时调用"""
        pass
    
    async def on_disable(self):
        """插件禁用时调用"""
        pass
    
    def get_tools(self) -> List["Tool"]:
        """返回插件提供的工具列表"""
        return []
    
    def get_event_handlers(self) -> Dict[str, Callable]:
        """返回事件处理器映射"""
        return {}

插件管理器

# openclaw/plugins/manager.py
import importlib
import importlib.util
import sys
import json
from pathlib import Path
from typing import Dict, List, Optional, Type, Set
from concurrent.futures import ThreadPoolExecutor
 
class PluginManager:
    """插件管理器"""
    
    def __init__(self, plugin_dir: Path):
        self.plugin_dir = plugin_dir
        self.plugins: Dict[str, Plugin] = {}
        self.load_order: List[str] = []
        self.event_handlers: Dict[str, List[tuple]] = {}
        self.hooks: Dict[str, List[Callable]] = {}
        self.dependencies: Dict[str, Set[str]] = {}
        self.executor = ThreadPoolExecutor(max_workers=4)
        
    async def discover_plugins(self) -> List[PluginMetadata]:
        """发现所有可用插件"""
        discovered = []
        
        for plugin_path in self.plugin_dir.iterdir():
            if not plugin_path.is_dir():
                continue
                
            manifest_path = plugin_path / "plugin.json"
            if manifest_path.exists():
                with open(manifest_path) as f:
                    data = json.load(f)
                    metadata = PluginMetadata(**data)
                    discovered.append(metadata)
                    
        return discovered
    
    async def load_plugin(
        self, 
        metadata: PluginMetadata,
        config: Dict[str, Any] = None
    ) -> Optional[Plugin]:
        """加载单个插件"""
        
        plugin_path = self.plugin_dir / metadata.name
        spec = importlib.util.spec_from_file_location(
            metadata.name,
            plugin_path / "__init__.py"
        )
        
        if not spec or not spec.loader:
            return None
            
        module = importlib.util.module_from_spec(spec)
        sys.modules[metadata.name] = module
        spec.loader.exec_module(module)
        
        # 获取插件类
        plugin_class = getattr(module, "Plugin", None)
        if not plugin_class:
            return None
            
        # 实例化并初始化
        plugin = plugin_class()
        plugin.metadata = metadata
        plugin.manager = self
        
        # 解析依赖
        self._resolve_dependencies(metadata)
        
        # 加载依赖
        for dep in metadata.dependencies:
            if dep not in self.plugins:
                dep_meta = self._find_metadata(dep)
                if dep_meta:
                    await self.load_plugin(dep_meta)
        
        # 创建上下文
        plugin.context = PluginContext(
            plugin, 
            config or {}
        )
        
        # 执行加载钩子
        if await plugin.on_load():
            self.plugins[metadata.name] = plugin
            self.load_order.append(metadata.name)
            
            # 注册事件处理器
            self._register_event_handlers(plugin)
            
            return plugin
        
        return None
    
    def _resolve_dependencies(self, metadata: PluginMetadata):
        """解析插件依赖"""
        self.dependencies[metadata.name] = set(metadata.dependencies)
    
    def _register_event_handlers(self, plugin: Plugin):
        """注册插件事件处理器"""
        handlers = plugin.get_event_handlers()
        for event_name, handler in handlers.items():
            if event_name not in self.event_handlers:
                self.event_handlers[event_name] = []
            self.event_handlers[event_name].append(
                (plugin.metadata.name, handler)
            )
    
    async def emit_event(self, event_name: str, context: PluginContext, **kwargs):
        """触发事件并调用所有处理器"""
        
        if event_name not in self.event_handlers:
            return
            
        for plugin_name, handler in self.event_handlers[event_name]:
            plugin = self.plugins.get(plugin_name)
            if plugin and context.config.get(f"{plugin_name}_enabled", True):
                try:
                    if asyncio.iscoroutinefunction(handler):
                        await handler(context, **kwargs)
                    else:
                        handler(context, **kwargs)
                except Exception as e:
                    context.logger.error(f"Event handler error: {e}")
    
    def register_hook(self, hook_name: str, callback: Callable):
        """注册钩子函数"""
        if hook_name not in self.hooks:
            self.hooks[hook_name] = []
        self.hooks[hook_name].append(callback)
    
    async def call_hooks(self, hook_name: str, *args, **kwargs) -> List[Any]:
        """调用所有钩子函数"""
        results = []
        for callback in self.hooks.get(hook_name, []):
            try:
                if asyncio.iscoroutinefunction(callback):
                    result = await callback(*args, **kwargs)
                else:
                    result = callback(*args, **kwargs)
                results.append(result)
            except Exception as e:
                logger.error(f"Hook error in {hook_name}: {e}")
        return results

Python 插件模板

插件目录结构

plugins/
└── my_custom_tool/
    ├── __init__.py          # 插件入口
    ├── plugin.json          # 插件清单
    ├── config.yaml          # 默认配置
    ├── tools/               # 工具子目录
    │   ├── __init__.py
    │   └── custom_tool.py
    ├── handlers/            # 事件处理
    │   └── event_handler.py
    └── requirements.txt     # 依赖

plugin.json 清单文件

{
  "name": "my_custom_tool",
  "version": "1.2.0",
  "author": "Developer Name",
  "description": "A custom tool plugin for OpenClaw",
  "homepage": "https://github.com/user/openclaw-my-tool",
  "license": "MIT",
  "plugin_type": "tool",
  "dependencies": [],
  "compatible_versions": ">=1.5.0,<3.0.0",
  "config_schema": {
    "type": "object",
    "properties": {
      "api_key": {
        "type": "string",
        "description": "API key for the service",
        "secret": true
      },
      "default_timeout": {
        "type": "integer",
        "default": 30
      },
      "enable_cache": {
        "type": "boolean",
        "default": true
      }
    },
    "required": ["api_key"]
  },
  "permissions": [
    "network:external",
    "storage:read",
    "storage:write"
  ],
  "hooks": [
    "before_agent_run",
    "after_agent_run",
    "on_message_received"
  ]
}

插件主文件模板

# plugins/my_custom_tool/__init__.py
"""
My Custom Tool Plugin for OpenClaw
 
This plugin provides [description of what the plugin does].
"""
 
from openclaw.plugins.base import (
    Plugin, PluginType, PluginMetadata
)
from typing import List, Dict, Any, Callable, Optional
 
__version__ = "1.2.0"
 
# 导出插件类和工具类
from .tools.custom_tool import CustomTool
 
class Plugin(Plugin):
    """My Custom Tool Plugin"""
    
    METADATA = PluginMetadata(
        name="my_custom_tool",
        version=__version__,
        author="Developer Name",
        description="Provides custom tool capabilities",
        homepage="https://github.com/user/openclaw-my-tool",
        plugin_type=PluginType.TOOL,
        dependencies=[],  # 依赖其他插件时填写
        config_schema={
            "type": "object",
            "properties": {
                "api_key": {"type": "string", "secret": True},
                "default_timeout": {"type": "integer", "default": 30}
            },
            "required": ["api_key"]
        }
    )
    
    def __init__(self):
        super().__init__()
        self._tools: List[CustomTool] = []
        self._initialized = False
    
    async def on_load(self) -> bool:
        """插件加载时的初始化逻辑"""
        
        # 验证配置
        api_key = self.context.config.get("api_key")
        if not api_key:
            self.context.logger.error("Missing required config: api_key")
            return False
        
        # 初始化工具实例
        self._tools = [
            CustomTool(
                name="custom_action",
                api_key=api_key,
                timeout=self.context.config.get("default_timeout", 30)
            )
        ]
        
        # 注册事件处理器
        self.context.register_service(
            "custom_client",
            self._create_client()
        )
        
        self._initialized = True
        self.context.logger.info(
            f"Plugin loaded with {len(self._tools)} tools"
        )
        return True
    
    async def on_unload(self):
        """插件卸载时的清理逻辑"""
        
        # 清理资源
        if self._tools:
            for tool in self._tools:
                await tool.cleanup()
        
        self._initialized = False
        self.context.logger.info("Plugin unloaded")
    
    async def on_enable(self):
        """插件启用时的逻辑"""
        self.context.logger.info("Plugin enabled")
    
    async def on_disable(self):
        """插件禁用时的逻辑"""
        self.context.logger.info("Plugin disabled")
    
    def get_tools(self) -> List:
        """返回插件提供的所有工具"""
        return self._tools
    
    def get_event_handlers(self) -> Dict[str, Callable]:
        """返回事件处理器映射"""
        return {
            "on_message_received": self._handle_message,
            "before_agent_run": self._pre_agent_run,
            "after_agent_run": self._post_agent_run
        }
    
    # 事件处理方法
    async def _handle_message(self, context, message):
        """处理收到的消息"""
        pass
    
    async def _pre_agent_run(self, context, agent_input):
        """Agent 运行前钩子"""
        pass
    
    async def _post_agent_run(self, context, agent_input, agent_output):
        """Agent 运行后钩子"""
        pass
    
    def _create_client(self):
        """创建客户端实例"""
        # 实现客户端创建逻辑
        pass

工具定义与实现

# plugins/my_custom_tool/tools/custom_tool.py
from openclaw.tools.base import Tool, ToolResult
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
import httpx
 
class CustomToolInput(BaseModel):
    """工具输入参数模型"""
    query: str = Field(description="查询内容")
    max_results: int = Field(default=10, description="最大返回结果数")
    filters: Optional[Dict[str, Any]] = Field(default=None, description="过滤条件")
 
class CustomTool(Tool):
    """自定义工具实现"""
    
    name = "custom_action"
    description = "执行自定义操作的工具,支持查询和处理"
    
    input_model = CustomToolInput
    output_model = ToolResult
    
    def __init__(
        self,
        name: str,
        api_key: str,
        timeout: int = 30,
        cache_enabled: bool = True
    ):
        super().__init__(name)
        self.api_key = api_key
        self.timeout = timeout
        self.cache_enabled = cache_enabled
        self._client: Optional[httpx.AsyncClient] = None
        self._cache: Dict[str, Any] = {}
    
    async def _ensure_client(self):
        """确保 HTTP 客户端已初始化"""
        if not self._client:
            self._client = httpx.AsyncClient(
                base_url="https://api.example.com",
                headers={"Authorization": f"Bearer {self.api_key}"},
                timeout=self.timeout
            )
    
    async def execute(self, input_data: CustomToolInput) -> ToolResult:
        """执行工具逻辑"""
        
        await self._ensure_client()
        
        # 检查缓存
        cache_key = f"{input_data.query}:{input_data.max_results}"
        if self.cache_enabled and cache_key in self._cache:
            return ToolResult(
                success=True,
                data=self._cache[cache_key],
                from_cache=True
            )
        
        try:
            # 调用外部 API
            response = await self._client.post(
                "/search",
                json={
                    "query": input_data.query,
                    "limit": input_data.max_results,
                    "filters": input_data.filters
                }
            )
            response.raise_for_status()
            data = response.json()
            
            # 更新缓存
            if self.cache_enabled:
                self._cache[cache_key] = data
            
            return ToolResult(
                success=True,
                data=data,
                metadata={
                    "api_version": "v1",
                    "result_count": len(data.get("results", []))
                }
            )
            
        except httpx.HTTPStatusError as e:
            return ToolResult(
                success=False,
                error=f"HTTP Error: {e.response.status_code}",
                error_code="HTTP_ERROR"
            )
        except Exception as e:
            return ToolResult(
                success=False,
                error=str(e),
                error_code="UNKNOWN_ERROR"
            )
    
    async def cleanup(self):
        """清理资源"""
        if self._client:
            await self._client.aclose()
            self._client = None
        self._cache.clear()

事件处理系统

内置事件类型

事件名触发时机传递参数
on_startup系统启动完成-
on_shutdown系统关闭前-
on_message_received收到新消息message
on_message_sent消息发送后message
before_agent_runAgent 执行前input
after_agent_runAgent 执行后input, output
on_error发生错误时error, context
on_tool_call工具调用时tool, input
on_user_authenticated用户认证后user

事件处理器示例

# plugins/my_custom_tool/handlers/event_handler.py
 
class MyEventHandler:
    """自定义事件处理器"""
    
    def __init__(self, plugin_context):
        self.context = plugin_context
    
    async def handle_message_received(self, message):
        """处理收到的消息,可用于预处理、过滤等"""
        
        # 消息预处理
        if self._should_block(message):
            return {"blocked": True, "reason": "spam"}
        
        # 消息增强
        if self._should_enhance(message):
            enhanced = await self._enhance_message(message)
            return {"enhanced": enhanced}
        
        return None  # 继续正常流程
    
    async def handle_before_agent_run(self, agent_input):
        """Agent 运行前的预处理"""
        
        # 添加上下文
        agent_input.context["custom_field"] = self._get_context_value()
        
        # 验证输入
        if not self._validate_input(agent_input):
            raise ValueError("Invalid input")
        
        return agent_input
    
    async def handle_after_agent_run(self, agent_input, agent_output):
        """Agent 运行后的后处理"""
        
        # 记录分析数据
        await self._log_metrics(agent_input, agent_output)
        
        # 修改输出
        if self._should_modify_output(agent_output):
            agent_output = self._modify_output(agent_output)
        
        return agent_output
    
    async def handle_error(self, error, context):
        """全局错误处理"""
        
        self.context.logger.error(f"Error occurred: {error}")
        
        # 发送告警通知
        await self._send_alert(error, context)
        
        # 返回友好错误消息
        return {
            "user_message": "发生了一些问题,请稍后再试。",
            "error_id": context.get("error_id")
        }
    
    def _should_block(self, message) -> bool:
        """判断是否应阻止消息"""
        return False
    
    def _should_enhance(self, message) -> bool:
        """判断是否应增强消息"""
        return False
    
    async def _enhance_message(self, message):
        """增强消息内容"""
        pass
    
    def _validate_input(self, agent_input) -> bool:
        """验证输入"""
        return True
    
    async def _log_metrics(self, input_data, output_data):
        """记录指标"""
        pass
    
    def _should_modify_output(self, output) -> bool:
        """判断是否应修改输出"""
        return False
    
    def _modify_output(self, output):
        """修改输出"""
        return output
    
    async def _send_alert(self, error, context):
        """发送告警"""
        pass
    
    def _get_context_value(self):
        """获取上下文值"""
        return None

沙箱安全执行

# openclaw/plugins/sandbox.py
import sys
import resource
import tempfile
import shutil
from pathlib import Path
from typing import Dict, Any
 
class PluginSandbox:
    """插件沙箱环境"""
    
    def __init__(self, plugin_name: str):
        self.plugin_name = plugin_name
        self.work_dir = Path(tempfile.mkdtemp(prefix=f"openclaw_{plugin_name}_"))
        self._setup_limits()
    
    def _setup_limits(self):
        """设置资源限制"""
        
        # 内存限制:500MB
        memory_limit = 500 * 1024 * 1024
        resource.setrlimit(resource.RLIMIT_AS, (memory_limit, memory_limit))
        
        # CPU 时间限制:60秒
        resource.setrlimit(resource.RLIMIT_CPU, (60, 60))
        
        # 文件大小限制:100MB
        resource.setrlimit(
            resource.RLIMIT_FSIZE, 
            (100 * 1024 * 1024, 100 * 1024 * 1024)
        )
        
        # 进程数限制:10
        resource.setrlimit(resource.RLIMIT_NPROC, (10, 10))
    
    def __enter__(self):
        """进入沙箱上下文"""
        self.old_cwd = Path.cwd()
        sys.path.insert(0, str(self.work_dir))
        self.old_environ = dict(__import__("os").environ)
        
        # 隔离环境变量
        import os
        os.environ["SANDBOX_MODE"] = "true"
        os.environ["PLUGIN_NAME"] = self.plugin_name
        
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """退出沙箱上下文"""
        sys.path.remove(str(self.work_dir))
        self.old_cwd.chdir()
        
        # 恢复环境变量
        import os
        os.environ.clear()
        os.environ.update(self.old_environ)
        
        # 清理临时目录
        shutil.rmtree(self.work_dir, ignore_errors=True)
        
        return False  # 不抑制异常

插件打包与发布

pyproject.toml 配置

# plugins/my_custom_tool/pyproject.toml
[build-system]
requires = ["hatchling", "hatch-requirements-txt"]
build-backend = "hatchling.build"
 
[project]
name = "openclaw-my-custom-tool"
version = "1.2.0"
description = "Custom tool plugin for OpenClaw"
readme = "README.md"
license = "MIT"
requires-python = ">=3.10"
authors = [
    { name = "Developer Name", email = "dev@example.com" }
]
keywords = ["openclaw", "plugin", "ai", "tool"]
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
]
 
dependencies = [
    "httpx>=0.25.0",
]
 
[project.optional-dependencies]
dev = [
    "pytest>=7.4.0",
    "pytest-asyncio>=0.21.0",
    "pytest-cov>=4.1.0",
    "ruff>=0.1.0",
]
 
[project.urls]
Homepage = "https://github.com/user/openclaw-my-tool"
Repository = "https://github.com/user/openclaw-my-tool"
Issues = "https://github.com/user/openclaw-my-tool/issues"
 
[tool.hatch.build.targets.wheel]
packages = ["."]

发布到 PyPI

# 安装发布工具
pip install build twine
 
# 构建 wheel
python -m build
 
# 发布到 Test PyPI (测试)
python -m twine upload --repository testpypi dist/*
 
# 发布到正式 PyPI
python -m twine upload dist/*

OpenClaw 插件市场

# ~/.openclaw/plugins.yaml
plugin_marketplace:
  enabled: true
  sources:
    - name: "official"
      url: "https://plugins.openclaw.dev/registry.json"
    - name: "community"
      url: "https://community.openclaw.dev/plugins.json"
  
  installed:
    - name: "my_custom_tool"
      version: "1.2.0"
      source: "community"
# 安装插件命令
openclaw plugin install my_custom_tool
openclaw plugin install my_custom_tool==1.2.0
openclaw plugin install --source github user/repo
 
# 更新插件
openclaw plugin update my_custom_tool
openclaw plugin update --all
 
# 卸载插件
openclaw plugin uninstall my_custom_tool
 
# 列出已安装插件
openclaw plugin list

开发调试技巧

使用 openclaw plugin dev my_custom_tool 启动开发模式,支持热重载和实时日志。


插件测试框架

# plugins/my_custom_tool/tests/test_tool.py
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
 
@pytest.fixture
def mock_context():
    """模拟插件上下文"""
    context = MagicMock()
    context.config = {"api_key": "test_key", "default_timeout": 30}
    context.logger = MagicMock()
    context.register_service = MagicMock()
    return context
 
@pytest.fixture
async def tool(mock_context):
    """创建测试工具实例"""
    with patch("httpx.AsyncClient"):
        from my_custom_tool.tools.custom_tool import CustomTool
        
        tool = CustomTool(
            name="test_action",
            api_key="test_key",
            timeout=30
        )
        tool.context = mock_context
        return tool
 
@pytest.mark.asyncio
async def test_tool_execute_success(tool):
    """测试工具成功执行"""
    
    mock_response = MagicMock()
    mock_response.raise_for_status = MagicMock()
    mock_response.json.return_value = {
        "results": [{"id": 1, "name": "test"}]
    }
    
    with patch.object(tool, "_client") as mock_client:
        mock_client.post = AsyncMock(return_value=mock_response)
        
        from my_custom_tool.tools.custom_tool import CustomToolInput
        input_data = CustomToolInput(
            query="test query",
            max_results=5
        )
        
        result = await tool.execute(input_data)
        
        assert result.success is True
        assert result.data["results"][0]["id"] == 1
 
@pytest.mark.asyncio
async def test_tool_cache_hit(tool):
    """测试缓存命中"""
    
    tool._cache["test:5"] = {"cached": True}
    tool.cache_enabled = True
    
    from my_custom_tool.tools.custom_tool import CustomToolInput
    input_data = CustomToolInput(query="test", max_results=5)
    
    result = await tool.execute(input_data)
    
    assert result.from_cache is True

关键要点

  1. 标准化接口:所有插件继承 Plugin 基类,实现统一生命周期
  2. 清单驱动:通过 plugin.json 声明元数据和依赖
  3. 事件系统:支持丰富的钩子点,实现深度集成
  4. 沙箱隔离:资源限制保护系统安全
  5. 语义版本:遵循 SemVer 规范管理版本兼容性

相关文档