关键词
| 关键词 | 说明 |
|---|---|
| Plugin Architecture | 插件架构设计 |
| Python SDK | Python 开发工具包 |
| Event System | 事件系统核心 |
| Hook Points | 钩子注入点 |
| Plugin Manifest | 插件清单配置 |
| Sandboxed Execution | 沙箱安全执行 |
| Lifecycle Hooks | 生命周期钩子 |
| Dependency Injection | 依赖注入模式 |
| Semantic Versioning | 语义化版本控制 |
| PyPI Distribution | Python 包分发 |
概述
OpenClaw 的插件系统是实现其高度可扩展性的核心机制。通过标准化的插件接口,开发者可以为 OpenClaw 添加新的工具、能力、平台适配器、记忆处理器等组件,而无需修改核心代码。插件系统采用「发现-加载-注册」的三阶段架构,支持热加载、依赖管理和版本控制。
设计原则
OpenClaw 插件系统遵循「最小惊讶原则」:插件应该能以最少的配置工作,同时允许深度定制。核心 API 保持稳定,扩展点清晰文档化。
插件架构设计
核心抽象
# openclaw/plugins/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Type, Callable
from enum import Enum
import asyncio
class PluginType(Enum):
"""插件类型枚举"""
TOOL = "tool" # 工具类插件
PLATFORM = "platform" # 平台适配器
MEMORY = "memory" # 记忆处理器
MIDDLEWARE = "middleware" # 中间件
TRANSFORMER = "transformer" # 数据转换器
ANALYTICS = "analytics" # 分析统计
class PluginState(Enum):
"""插件状态"""
DISCOVERED = "discovered"
LOADING = "loading"
LOADED = "loaded"
ACTIVE = "active"
DISABLED = "disabled"
ERROR = "error"
@dataclass
class PluginMetadata:
"""插件元数据"""
name: str
version: str
author: str
description: str
homepage: str = ""
license: str = "MIT"
plugin_type: PluginType
dependencies: List[str] = field(default_factory=list)
compatible_versions: str = ">=1.0.0"
config_schema: Dict[str, Any] = field(default_factory=dict)
permissions: List[str] = field(default_factory=list)
def validate_version(self, current: str) -> bool:
"""验证版本兼容性"""
# 使用 semver 规范验证
from packaging import version
try:
constraints = self.compatible_versions
return version.parse(current) in version.SpecifierSet(constraints)
except:
return True
class PluginContext:
"""插件运行时上下文"""
def __init__(self, plugin: "Plugin", config: Dict[str, Any]):
self.plugin = plugin
self.config = config
self.state = PluginState.DISCOVERED
self.logger = self._setup_logger()
self._services: Dict[str, Any] = {}
def register_service(self, name: str, service: Any):
"""注册服务供其他插件使用"""
self._services[name] = service
def get_service(self, name: str) -> Optional[Any]:
"""获取已注册服务"""
return self._services.get(name)
def emit_event(self, event_name: str, **kwargs):
"""触发事件"""
return self.plugin.manager.emit_event(event_name, self, **kwargs)
def _setup_logger(self):
import logging
logger = logging.getLogger(f"plugin.{self.plugin.metadata.name}")
logger.setLevel(logging.DEBUG)
return logger
class Plugin(ABC):
"""插件基类"""
def __init__(self):
self.metadata: Optional[PluginMetadata] = None
self.context: Optional[PluginContext] = None
self.manager: Optional["PluginManager"] = None
@abstractmethod
async def on_load(self) -> bool:
"""插件加载时调用,返回是否加载成功"""
pass
@abstractmethod
async def on_unload(self):
"""插件卸载时调用,用于清理资源"""
pass
async def on_enable(self):
"""插件启用时调用"""
pass
async def on_disable(self):
"""插件禁用时调用"""
pass
def get_tools(self) -> List["Tool"]:
"""返回插件提供的工具列表"""
return []
def get_event_handlers(self) -> Dict[str, Callable]:
"""返回事件处理器映射"""
return {}插件管理器
# openclaw/plugins/manager.py
import importlib
import importlib.util
import sys
import json
from pathlib import Path
from typing import Dict, List, Optional, Type, Set
from concurrent.futures import ThreadPoolExecutor
class PluginManager:
"""插件管理器"""
def __init__(self, plugin_dir: Path):
self.plugin_dir = plugin_dir
self.plugins: Dict[str, Plugin] = {}
self.load_order: List[str] = []
self.event_handlers: Dict[str, List[tuple]] = {}
self.hooks: Dict[str, List[Callable]] = {}
self.dependencies: Dict[str, Set[str]] = {}
self.executor = ThreadPoolExecutor(max_workers=4)
async def discover_plugins(self) -> List[PluginMetadata]:
"""发现所有可用插件"""
discovered = []
for plugin_path in self.plugin_dir.iterdir():
if not plugin_path.is_dir():
continue
manifest_path = plugin_path / "plugin.json"
if manifest_path.exists():
with open(manifest_path) as f:
data = json.load(f)
metadata = PluginMetadata(**data)
discovered.append(metadata)
return discovered
async def load_plugin(
self,
metadata: PluginMetadata,
config: Dict[str, Any] = None
) -> Optional[Plugin]:
"""加载单个插件"""
plugin_path = self.plugin_dir / metadata.name
spec = importlib.util.spec_from_file_location(
metadata.name,
plugin_path / "__init__.py"
)
if not spec or not spec.loader:
return None
module = importlib.util.module_from_spec(spec)
sys.modules[metadata.name] = module
spec.loader.exec_module(module)
# 获取插件类
plugin_class = getattr(module, "Plugin", None)
if not plugin_class:
return None
# 实例化并初始化
plugin = plugin_class()
plugin.metadata = metadata
plugin.manager = self
# 解析依赖
self._resolve_dependencies(metadata)
# 加载依赖
for dep in metadata.dependencies:
if dep not in self.plugins:
dep_meta = self._find_metadata(dep)
if dep_meta:
await self.load_plugin(dep_meta)
# 创建上下文
plugin.context = PluginContext(
plugin,
config or {}
)
# 执行加载钩子
if await plugin.on_load():
self.plugins[metadata.name] = plugin
self.load_order.append(metadata.name)
# 注册事件处理器
self._register_event_handlers(plugin)
return plugin
return None
def _resolve_dependencies(self, metadata: PluginMetadata):
"""解析插件依赖"""
self.dependencies[metadata.name] = set(metadata.dependencies)
def _register_event_handlers(self, plugin: Plugin):
"""注册插件事件处理器"""
handlers = plugin.get_event_handlers()
for event_name, handler in handlers.items():
if event_name not in self.event_handlers:
self.event_handlers[event_name] = []
self.event_handlers[event_name].append(
(plugin.metadata.name, handler)
)
async def emit_event(self, event_name: str, context: PluginContext, **kwargs):
"""触发事件并调用所有处理器"""
if event_name not in self.event_handlers:
return
for plugin_name, handler in self.event_handlers[event_name]:
plugin = self.plugins.get(plugin_name)
if plugin and context.config.get(f"{plugin_name}_enabled", True):
try:
if asyncio.iscoroutinefunction(handler):
await handler(context, **kwargs)
else:
handler(context, **kwargs)
except Exception as e:
context.logger.error(f"Event handler error: {e}")
def register_hook(self, hook_name: str, callback: Callable):
"""注册钩子函数"""
if hook_name not in self.hooks:
self.hooks[hook_name] = []
self.hooks[hook_name].append(callback)
async def call_hooks(self, hook_name: str, *args, **kwargs) -> List[Any]:
"""调用所有钩子函数"""
results = []
for callback in self.hooks.get(hook_name, []):
try:
if asyncio.iscoroutinefunction(callback):
result = await callback(*args, **kwargs)
else:
result = callback(*args, **kwargs)
results.append(result)
except Exception as e:
logger.error(f"Hook error in {hook_name}: {e}")
return resultsPython 插件模板
插件目录结构
plugins/
└── my_custom_tool/
├── __init__.py # 插件入口
├── plugin.json # 插件清单
├── config.yaml # 默认配置
├── tools/ # 工具子目录
│ ├── __init__.py
│ └── custom_tool.py
├── handlers/ # 事件处理
│ └── event_handler.py
└── requirements.txt # 依赖
plugin.json 清单文件
{
"name": "my_custom_tool",
"version": "1.2.0",
"author": "Developer Name",
"description": "A custom tool plugin for OpenClaw",
"homepage": "https://github.com/user/openclaw-my-tool",
"license": "MIT",
"plugin_type": "tool",
"dependencies": [],
"compatible_versions": ">=1.5.0,<3.0.0",
"config_schema": {
"type": "object",
"properties": {
"api_key": {
"type": "string",
"description": "API key for the service",
"secret": true
},
"default_timeout": {
"type": "integer",
"default": 30
},
"enable_cache": {
"type": "boolean",
"default": true
}
},
"required": ["api_key"]
},
"permissions": [
"network:external",
"storage:read",
"storage:write"
],
"hooks": [
"before_agent_run",
"after_agent_run",
"on_message_received"
]
}插件主文件模板
# plugins/my_custom_tool/__init__.py
"""
My Custom Tool Plugin for OpenClaw
This plugin provides [description of what the plugin does].
"""
from openclaw.plugins.base import (
Plugin, PluginType, PluginMetadata
)
from typing import List, Dict, Any, Callable, Optional
__version__ = "1.2.0"
# 导出插件类和工具类
from .tools.custom_tool import CustomTool
class Plugin(Plugin):
"""My Custom Tool Plugin"""
METADATA = PluginMetadata(
name="my_custom_tool",
version=__version__,
author="Developer Name",
description="Provides custom tool capabilities",
homepage="https://github.com/user/openclaw-my-tool",
plugin_type=PluginType.TOOL,
dependencies=[], # 依赖其他插件时填写
config_schema={
"type": "object",
"properties": {
"api_key": {"type": "string", "secret": True},
"default_timeout": {"type": "integer", "default": 30}
},
"required": ["api_key"]
}
)
def __init__(self):
super().__init__()
self._tools: List[CustomTool] = []
self._initialized = False
async def on_load(self) -> bool:
"""插件加载时的初始化逻辑"""
# 验证配置
api_key = self.context.config.get("api_key")
if not api_key:
self.context.logger.error("Missing required config: api_key")
return False
# 初始化工具实例
self._tools = [
CustomTool(
name="custom_action",
api_key=api_key,
timeout=self.context.config.get("default_timeout", 30)
)
]
# 注册事件处理器
self.context.register_service(
"custom_client",
self._create_client()
)
self._initialized = True
self.context.logger.info(
f"Plugin loaded with {len(self._tools)} tools"
)
return True
async def on_unload(self):
"""插件卸载时的清理逻辑"""
# 清理资源
if self._tools:
for tool in self._tools:
await tool.cleanup()
self._initialized = False
self.context.logger.info("Plugin unloaded")
async def on_enable(self):
"""插件启用时的逻辑"""
self.context.logger.info("Plugin enabled")
async def on_disable(self):
"""插件禁用时的逻辑"""
self.context.logger.info("Plugin disabled")
def get_tools(self) -> List:
"""返回插件提供的所有工具"""
return self._tools
def get_event_handlers(self) -> Dict[str, Callable]:
"""返回事件处理器映射"""
return {
"on_message_received": self._handle_message,
"before_agent_run": self._pre_agent_run,
"after_agent_run": self._post_agent_run
}
# 事件处理方法
async def _handle_message(self, context, message):
"""处理收到的消息"""
pass
async def _pre_agent_run(self, context, agent_input):
"""Agent 运行前钩子"""
pass
async def _post_agent_run(self, context, agent_input, agent_output):
"""Agent 运行后钩子"""
pass
def _create_client(self):
"""创建客户端实例"""
# 实现客户端创建逻辑
pass工具定义与实现
# plugins/my_custom_tool/tools/custom_tool.py
from openclaw.tools.base import Tool, ToolResult
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
import httpx
class CustomToolInput(BaseModel):
"""工具输入参数模型"""
query: str = Field(description="查询内容")
max_results: int = Field(default=10, description="最大返回结果数")
filters: Optional[Dict[str, Any]] = Field(default=None, description="过滤条件")
class CustomTool(Tool):
"""自定义工具实现"""
name = "custom_action"
description = "执行自定义操作的工具,支持查询和处理"
input_model = CustomToolInput
output_model = ToolResult
def __init__(
self,
name: str,
api_key: str,
timeout: int = 30,
cache_enabled: bool = True
):
super().__init__(name)
self.api_key = api_key
self.timeout = timeout
self.cache_enabled = cache_enabled
self._client: Optional[httpx.AsyncClient] = None
self._cache: Dict[str, Any] = {}
async def _ensure_client(self):
"""确保 HTTP 客户端已初始化"""
if not self._client:
self._client = httpx.AsyncClient(
base_url="https://api.example.com",
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=self.timeout
)
async def execute(self, input_data: CustomToolInput) -> ToolResult:
"""执行工具逻辑"""
await self._ensure_client()
# 检查缓存
cache_key = f"{input_data.query}:{input_data.max_results}"
if self.cache_enabled and cache_key in self._cache:
return ToolResult(
success=True,
data=self._cache[cache_key],
from_cache=True
)
try:
# 调用外部 API
response = await self._client.post(
"/search",
json={
"query": input_data.query,
"limit": input_data.max_results,
"filters": input_data.filters
}
)
response.raise_for_status()
data = response.json()
# 更新缓存
if self.cache_enabled:
self._cache[cache_key] = data
return ToolResult(
success=True,
data=data,
metadata={
"api_version": "v1",
"result_count": len(data.get("results", []))
}
)
except httpx.HTTPStatusError as e:
return ToolResult(
success=False,
error=f"HTTP Error: {e.response.status_code}",
error_code="HTTP_ERROR"
)
except Exception as e:
return ToolResult(
success=False,
error=str(e),
error_code="UNKNOWN_ERROR"
)
async def cleanup(self):
"""清理资源"""
if self._client:
await self._client.aclose()
self._client = None
self._cache.clear()事件处理系统
内置事件类型
| 事件名 | 触发时机 | 传递参数 |
|---|---|---|
on_startup | 系统启动完成 | - |
on_shutdown | 系统关闭前 | - |
on_message_received | 收到新消息 | message |
on_message_sent | 消息发送后 | message |
before_agent_run | Agent 执行前 | input |
after_agent_run | Agent 执行后 | input, output |
on_error | 发生错误时 | error, context |
on_tool_call | 工具调用时 | tool, input |
on_user_authenticated | 用户认证后 | user |
事件处理器示例
# plugins/my_custom_tool/handlers/event_handler.py
class MyEventHandler:
"""自定义事件处理器"""
def __init__(self, plugin_context):
self.context = plugin_context
async def handle_message_received(self, message):
"""处理收到的消息,可用于预处理、过滤等"""
# 消息预处理
if self._should_block(message):
return {"blocked": True, "reason": "spam"}
# 消息增强
if self._should_enhance(message):
enhanced = await self._enhance_message(message)
return {"enhanced": enhanced}
return None # 继续正常流程
async def handle_before_agent_run(self, agent_input):
"""Agent 运行前的预处理"""
# 添加上下文
agent_input.context["custom_field"] = self._get_context_value()
# 验证输入
if not self._validate_input(agent_input):
raise ValueError("Invalid input")
return agent_input
async def handle_after_agent_run(self, agent_input, agent_output):
"""Agent 运行后的后处理"""
# 记录分析数据
await self._log_metrics(agent_input, agent_output)
# 修改输出
if self._should_modify_output(agent_output):
agent_output = self._modify_output(agent_output)
return agent_output
async def handle_error(self, error, context):
"""全局错误处理"""
self.context.logger.error(f"Error occurred: {error}")
# 发送告警通知
await self._send_alert(error, context)
# 返回友好错误消息
return {
"user_message": "发生了一些问题,请稍后再试。",
"error_id": context.get("error_id")
}
def _should_block(self, message) -> bool:
"""判断是否应阻止消息"""
return False
def _should_enhance(self, message) -> bool:
"""判断是否应增强消息"""
return False
async def _enhance_message(self, message):
"""增强消息内容"""
pass
def _validate_input(self, agent_input) -> bool:
"""验证输入"""
return True
async def _log_metrics(self, input_data, output_data):
"""记录指标"""
pass
def _should_modify_output(self, output) -> bool:
"""判断是否应修改输出"""
return False
def _modify_output(self, output):
"""修改输出"""
return output
async def _send_alert(self, error, context):
"""发送告警"""
pass
def _get_context_value(self):
"""获取上下文值"""
return None沙箱安全执行
# openclaw/plugins/sandbox.py
import sys
import resource
import tempfile
import shutil
from pathlib import Path
from typing import Dict, Any
class PluginSandbox:
"""插件沙箱环境"""
def __init__(self, plugin_name: str):
self.plugin_name = plugin_name
self.work_dir = Path(tempfile.mkdtemp(prefix=f"openclaw_{plugin_name}_"))
self._setup_limits()
def _setup_limits(self):
"""设置资源限制"""
# 内存限制:500MB
memory_limit = 500 * 1024 * 1024
resource.setrlimit(resource.RLIMIT_AS, (memory_limit, memory_limit))
# CPU 时间限制:60秒
resource.setrlimit(resource.RLIMIT_CPU, (60, 60))
# 文件大小限制:100MB
resource.setrlimit(
resource.RLIMIT_FSIZE,
(100 * 1024 * 1024, 100 * 1024 * 1024)
)
# 进程数限制:10
resource.setrlimit(resource.RLIMIT_NPROC, (10, 10))
def __enter__(self):
"""进入沙箱上下文"""
self.old_cwd = Path.cwd()
sys.path.insert(0, str(self.work_dir))
self.old_environ = dict(__import__("os").environ)
# 隔离环境变量
import os
os.environ["SANDBOX_MODE"] = "true"
os.environ["PLUGIN_NAME"] = self.plugin_name
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出沙箱上下文"""
sys.path.remove(str(self.work_dir))
self.old_cwd.chdir()
# 恢复环境变量
import os
os.environ.clear()
os.environ.update(self.old_environ)
# 清理临时目录
shutil.rmtree(self.work_dir, ignore_errors=True)
return False # 不抑制异常插件打包与发布
pyproject.toml 配置
# plugins/my_custom_tool/pyproject.toml
[build-system]
requires = ["hatchling", "hatch-requirements-txt"]
build-backend = "hatchling.build"
[project]
name = "openclaw-my-custom-tool"
version = "1.2.0"
description = "Custom tool plugin for OpenClaw"
readme = "README.md"
license = "MIT"
requires-python = ">=3.10"
authors = [
{ name = "Developer Name", email = "dev@example.com" }
]
keywords = ["openclaw", "plugin", "ai", "tool"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
]
dependencies = [
"httpx>=0.25.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4.0",
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.1.0",
"ruff>=0.1.0",
]
[project.urls]
Homepage = "https://github.com/user/openclaw-my-tool"
Repository = "https://github.com/user/openclaw-my-tool"
Issues = "https://github.com/user/openclaw-my-tool/issues"
[tool.hatch.build.targets.wheel]
packages = ["."]发布到 PyPI
# 安装发布工具
pip install build twine
# 构建 wheel
python -m build
# 发布到 Test PyPI (测试)
python -m twine upload --repository testpypi dist/*
# 发布到正式 PyPI
python -m twine upload dist/*OpenClaw 插件市场
# ~/.openclaw/plugins.yaml
plugin_marketplace:
enabled: true
sources:
- name: "official"
url: "https://plugins.openclaw.dev/registry.json"
- name: "community"
url: "https://community.openclaw.dev/plugins.json"
installed:
- name: "my_custom_tool"
version: "1.2.0"
source: "community"# 安装插件命令
openclaw plugin install my_custom_tool
openclaw plugin install my_custom_tool==1.2.0
openclaw plugin install --source github user/repo
# 更新插件
openclaw plugin update my_custom_tool
openclaw plugin update --all
# 卸载插件
openclaw plugin uninstall my_custom_tool
# 列出已安装插件
openclaw plugin list开发调试技巧
使用
openclaw plugin dev my_custom_tool启动开发模式,支持热重载和实时日志。
插件测试框架
# plugins/my_custom_tool/tests/test_tool.py
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
@pytest.fixture
def mock_context():
"""模拟插件上下文"""
context = MagicMock()
context.config = {"api_key": "test_key", "default_timeout": 30}
context.logger = MagicMock()
context.register_service = MagicMock()
return context
@pytest.fixture
async def tool(mock_context):
"""创建测试工具实例"""
with patch("httpx.AsyncClient"):
from my_custom_tool.tools.custom_tool import CustomTool
tool = CustomTool(
name="test_action",
api_key="test_key",
timeout=30
)
tool.context = mock_context
return tool
@pytest.mark.asyncio
async def test_tool_execute_success(tool):
"""测试工具成功执行"""
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {
"results": [{"id": 1, "name": "test"}]
}
with patch.object(tool, "_client") as mock_client:
mock_client.post = AsyncMock(return_value=mock_response)
from my_custom_tool.tools.custom_tool import CustomToolInput
input_data = CustomToolInput(
query="test query",
max_results=5
)
result = await tool.execute(input_data)
assert result.success is True
assert result.data["results"][0]["id"] == 1
@pytest.mark.asyncio
async def test_tool_cache_hit(tool):
"""测试缓存命中"""
tool._cache["test:5"] = {"cached": True}
tool.cache_enabled = True
from my_custom_tool.tools.custom_tool import CustomToolInput
input_data = CustomToolInput(query="test", max_results=5)
result = await tool.execute(input_data)
assert result.from_cache is True关键要点
- 标准化接口:所有插件继承
Plugin基类,实现统一生命周期- 清单驱动:通过
plugin.json声明元数据和依赖- 事件系统:支持丰富的钩子点,实现深度集成
- 沙箱隔离:资源限制保护系统安全
- 语义版本:遵循 SemVer 规范管理版本兼容性
相关文档
- OpenClaw多平台集成 - 创建平台适配器插件
- OpenClaw记忆系统 - 记忆处理器插件开发
- OpenClaw高级用法 - 工作流与自定义工具
- OpenClaw概览 - 系统整体架构