🚀 Python 进阶高级编程完整教程
从基础到精通,深入理解 Python 高级特性与工程实践
阅读指南:本节内容结构
每一节将围绕以下四方面展开,便于理解与落地:
| 模块 | 说明 |
|---|---|
| 概念 | 该技术/语法是什么,核心定义与前置知识 |
| 作用 | 能解决什么问题,在工程中的价值 |
| 应用场景 | 适合在什么业务、架构、阶段下使用 |
| 案例 | 可运行的示例与小型实战,便于模仿与改造 |
按「概念 → 作用 → 应用场景 → 案例」顺序阅读,可先建立整体认识,再在具体场景中对照代码加深理解。
目录
- 十八、元编程与反射机制
- 十九、内存管理与性能优化
- 二十、并发编程进阶
- 二十一、设计模式
- 二十二、数据结构与算法进阶
- 二十三、函数式编程
- 二十四、描述符协议
- 二十五、上下文管理器深入
- 二十六、测试驱动开发
- 二十七、代码质量与规范
- 二十八、C 扩展与性能提升
- 二十九、网络编程进阶
- 三十、数据库高级操作
- 三十一、分布式系统
- 三十二、Web 框架源码剖析
- 三十三、安全编程
- 三十四、项目架构与工程化
十八、元编程与反射机制
18.1 元类 (metaclass) 的原理与应用
概念介绍
元类是创建类的"类"。在 Python 中,一切皆对象,类本身也是对象,而元类就是用来创建这些类对象的模板。
核心概念:
- 类是实例的模板
- 元类是类的模板
type是 Python 中默认的元类
作用
- 在类创建阶段介入:可在类“成型”前统一修改属性、方法、继承关系,避免在每个子类中重复逻辑。
- 实现横切约束:如统一注册、自动校验、单例、ORM 映射等,与业务代码解耦。
- 提升可维护性:把“类级别”的规则集中到元类,使业务类更简洁、行为更一致。
应用场景
- ORM / 数据模型:根据类属性自动生成表结构、字段校验、与数据库的映射。
- 单例/多例控制:保证某类在进程内只有一份实例,或按键生成有限个实例。
- 插件/注册机制:类定义时自动注册到中央仓库,供按名称或类型查找。
- API 与配置:根据类或装饰器自动生成路由、权限、配置项,减少样板代码。
基础语法
# 方式1:使用 type 动态创建类
def init_method(self, name):
self.name = name
def say_hello(self):
return f"Hello, I'm {self.name}"
# type(类名, 父类元组, 属性字典)
Person = type('Person', (object,), {
'__init__': init_method,
'say_hello': say_hello
})
# 使用动态创建的类
p = Person("Alice")
print(p.say_hello()) # 输出: Hello, I'm Alice
自定义元类
class SingletonMeta(type):
"""单例模式元类"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
# 使用元类
class Database(metaclass=SingletonMeta):
def __init__(self):
print("初始化数据库连接")
self.connection = "MySQL Connection"
# 测试单例
db1 = Database() # 输出: 初始化数据库连接
db2 = Database() # 不会再次初始化
print(db1 is db2) # 输出: True
案例:ORM 字段验证
下面用元类在类创建时收集字段并做校验,演示「概念 → 作用 → 应用场景」的落地。
class Field:
"""字段基类"""
def __init__(self, field_type, required=False):
self.field_type = field_type
self.required = required
def validate(self, value):
if self.required and value is None:
raise ValueError(f"字段不能为空")
if value is not None and not isinstance(value, self.field_type):
raise TypeError(f"字段类型必须是 {self.field_type.__name__}")
return value
class ModelMeta(type):
"""ORM 模型元类"""
def __new__(mcs, name, bases, attrs):
# 收集所有字段
fields = {}
for key, value in attrs.items():
if isinstance(value, Field):
fields[key] = value
# 将字段信息存储到类属性中
attrs['_fields'] = fields
return super().__new__(mcs, name, bases, attrs)
class Model(metaclass=ModelMeta):
"""ORM 基类"""
def __init__(self, **kwargs):
for field_name, field in self._fields.items():
value = kwargs.get(field_name)
validated_value = field.validate(value)
setattr(self, field_name, validated_value)
def to_dict(self):
return {k: getattr(self, k, None) for k in self._fields}
# 使用 ORM
class User(Model):
name = Field(str, required=True)
age = Field(int)
email = Field(str, required=True)
# 创建用户
try:
user = User(name="张三", age=25, email="zhangsan@example.com")
print(user.to_dict()) # {'name': '张三', 'age': 25, 'email': 'zhangsan@example.com'}
# 测试验证
invalid_user = User(name="李四") # 缺少必填字段 email
except ValueError as e:
print(f"验证错误: {e}")
18.2 __new__ vs __init__
概念
| 特性 | __new__ |
__init__ |
|---|---|---|
| 调用时机 | 创建实例之前 | 实例创建之后 |
| 返回值 | 必须返回实例对象 | 不返回值(返回 None) |
| 第一参数 | cls (类) | self (实例) |
| 用途 | 控制实例创建过程 | 初始化实例属性 |
作用
__new__:决定“是否创建新对象、创建哪种对象、能否复用已有对象”,适合实现单例、缓存、不可变类型、对象池。__init__:在对象已创建后做属性赋值与初始状态设置,不负责“造”对象。
应用场景
- 单例 / 有限实例:在
__new__中判断是否已存在实例,有则直接返回。 - 不可变类型:在
__new__中先“造好”再交给__init__,或仅在__new__里完成所有逻辑。 - 对象池:在
__new__中从池中取/还对象,减少频繁分配与回收。 - 子类化内置类型:某些内置类型要求实例在
__new__中完成构造,再在__init__中做业务初始化。
基础示例
class Point:
def __new__(cls, x, y):
print(f"1. __new__ 被调用: cls={cls}")
instance = super().__new__(cls)
print(f"2. 实例已创建: {instance}")
return instance
def __init__(self, x, y):
print(f"3. __init__ 被调用: self={self}")
self.x = x
self.y = y
print(f"4. 初始化完成")
# 创建实例
p = Point(10, 20)
# 输出:
# 1. __new__ 被调用: cls=<class '__main__.Point'>
# 2. 实例已创建: <__main__.Point object at 0x...>
# 3. __init__ 被调用: self=<__main__.Point object at 0x...>
# 4. 初始化完成
案例:不可变对象
用 __new__ 在构造阶段完成“唯一创建”,再通过属性只读化实现不可变。
class ImmutablePoint:
"""不可变的点类"""
def __new__(cls, x, y):
instance = super().__new__(cls)
# 在 __new__ 中设置属性,因为对象还未完全创建
instance._x = x
instance._y = y
return instance
@property
def x(self):
return self._x
@property
def y(self):
return self._y
def __repr__(self):
return f"ImmutablePoint({self.x}, {self.y})"
# 测试
p = ImmutablePoint(5, 10)
print(p) # ImmutablePoint(5, 10)
# p.x = 20 # AttributeError: can't set attribute
案例:对象缓存池
在 __new__ 中做“同值复用”,减少重复对象,适合大量重复取用的不可变值。
class CachedString:
"""字符串缓存池"""
_cache = {}
def __new__(cls, value):
# 如果缓存中存在,直接返回
if value in cls._cache:
print(f"从缓存返回: {value}")
return cls._cache[value]
# 创建新实例并缓存
instance = super().__new__(cls)
cls._cache[value] = instance
print(f"创建新实例: {value}")
return instance
def __init__(self, value):
self.value = value
# 测试缓存
s1 = CachedString("hello") # 创建新实例: hello
s2 = CachedString("world") # 创建新实例: world
s3 = CachedString("hello") # 从缓存返回: hello
print(s1 is s3) # True - 同一个对象
print(s1 is s2) # False - 不同对象
18.3 动态创建类与属性
概念
在运行时用 type(类名, 父类, 属性字典) 或自定义元类生成新类,而不是在源码里写死 class。属性也可通过 setattr、__dict__、描述符等在运行时增删改。
作用
- 按配置/数据生成类:根据配置文件、接口约定、数据库 schema 等生成对应的类,减少手写重复代码。
- 实现工厂与路由:根据请求类型、URL、事件名等动态生成处理类或方法映射。
- 插件与扩展:在运行时加载模块后,用其提供的名称、基类、方法动态拼出“插件类”。
应用场景
- API 路由/Handler 工厂:按 path 或 method 动态生成 Handler 类,便于统一注册与调度。
- 表单/校验器生成:根据 JSON Schema 或接口文档自动生成带校验逻辑的类。
- 数据模型生成:从数据库表结构或 OpenAPI 生成 Model 类。
使用 type 动态创建类
# 动态创建方法
def get_area(self):
return self.width * self.height
def get_perimeter(self):
return 2 * (self.width + self.height)
# 动态创建类
Rectangle = type(
'Rectangle', # 类名
(object,), # 父类
{
'__init__': lambda self, w, h: setattr(self, 'width', w) or setattr(self, 'height', h),
'get_area': get_area,
'get_perimeter': get_perimeter
}
)
# 使用动态类
rect = Rectangle(10, 5)
print(f"面积: {rect.get_area()}") # 50
print(f"周长: {rect.get_perimeter()}") # 30
案例:API 路由注册器
根据 endpoint 和 methods 动态生成 Handler 类并注册,演示“动态类 + 路由”的典型用法。
class APIRouter:
"""API 路由动态生成器"""
def __init__(self):
self.routes = {}
def create_handler_class(self, endpoint, methods):
"""动态创建处理器类"""
def handle_get(self):
return f"GET {endpoint}"
def handle_post(self):
return f"POST {endpoint}"
def handle_put(self):
return f"PUT {endpoint}"
def handle_delete(self):
return f"DELETE {endpoint}"
# 方法映射
method_map = {
'GET': handle_get,
'POST': handle_post,
'PUT': handle_put,
'DELETE': handle_delete
}
# 动态创建类
class_attrs = {
method: method_map[method]
for method in methods if method in method_map
}
handler_class = type(
f'{endpoint.replace("/", "_")}Handler',
(object,),
class_attrs
)
return handler_class
def register(self, endpoint, methods):
"""注册路由"""
handler_class = self.create_handler_class(endpoint, methods)
self.routes[endpoint] = handler_class()
return handler_class
# 使用路由器
router = APIRouter()
# 注册路由
router.register('/users', ['GET', 'POST'])
router.register('/products', ['GET', 'PUT', 'DELETE'])
# 调用路由
print(router.routes['/users'].GET()) # GET /users
print(router.routes['/users'].POST()) # POST /users
print(router.routes['/products'].DELETE()) # DELETE /products
18.4 反射函数
概念
通过字符串名称在运行时访问与修改对象属性、方法,即 getattr / setattr / hasattr / delattr,以及 getattr(obj, name)(*args) 式的动态调用。
作用
- 解耦名称与实现:调用方只需知道“名字”,不必在编码期绑定到具体属性或方法。
- 通用配置/插件逻辑:根据配置里的字符串键调用对应方法,或遍历对象属性做批量处理。
- 可测试性与可扩展性:便于用配置驱动行为、写通用测试或探查接口。
应用场景
- 配置驱动:根据
config["handler"]从模块中getattr(module, name)取得可调用对象并执行。 - 插件/扩展:按插件名、方法名动态调用,实现“按名称执行”的插件系统。
- 数据映射与序列化:按字段名循环
getattr(obj, field)或setattr(obj, field, value),做 ORM、导入导出等。
四大反射函数
class Student:
def __init__(self, name, age):
self.name = name
self.age = age
self._score = 0
def study(self, subject):
return f"{self.name} 正在学习 {subject}"
def get_info(self):
return f"姓名: {self.name}, 年龄: {self.age}"
student = Student("小明", 18)
# 1. hasattr - 检查属性是否存在
print(hasattr(student, 'name')) # True
print(hasattr(student, 'gender')) # False
# 2. getattr - 获取属性值
name = getattr(student, 'name')
print(name) # 小明
# 获取不存在的属性,返回默认值
gender = getattr(student, 'gender', '未知')
print(gender) # 未知
# 3. setattr - 设置属性值
setattr(student, 'age', 19)
setattr(student, 'gender', '男') # 动态添加新属性
print(student.age) # 19
print(student.gender) # 男
# 4. delattr - 删除属性
delattr(student, 'gender')
print(hasattr(student, 'gender')) # False
案例:配置文件加载器
用 hasattr / getattr / setattr 将字典配置同步到对象,并做类型检查与默认值。
class Config:
"""配置类"""
def __init__(self):
self.debug = False
self.database = "sqlite"
self.port = 8000
class ConfigLoader:
"""动态配置加载器"""
def __init__(self, config_obj):
self.config = config_obj
def load_from_dict(self, config_dict):
"""从字典加载配置"""
for key, value in config_dict.items():
if hasattr(self.config, key):
# 验证类型
current_value = getattr(self.config, key)
if type(current_value) == type(value):
setattr(self.config, key, value)
print(f"✓ 更新配置: {key} = {value}")
else:
print(f"✗ 类型错误: {key} 应为 {type(current_value).__name__}")
else:
# 动态添加新配置
setattr(self.config, key, value)
print(f"✓ 新增配置: {key} = {value}")
def get_config(self, key, default=None):
"""安全获取配置"""
return getattr(self.config, key, default)
def remove_config(self, key):
"""删除配置项"""
if hasattr(self.config, key):
delattr(self.config, key)
print(f"✓ 删除配置: {key}")
return True
return False
# 使用配置加载器
config = Config()
loader = ConfigLoader(config)
# 从字典加载
new_config = {
'debug': True,
'port': 9000,
'host': '0.0.0.0', # 新配置
'database': 123 # 类型错误
}
loader.load_from_dict(new_config)
# 输出:
# ✓ 更新配置: debug = True
# ✓ 更新配置: port = 9000
# ✓ 新增配置: host = 0.0.0.0
# ✗ 类型错误: database 应为 str
print(f"\n当前配置:")
print(f"Debug: {config.debug}")
print(f"Port: {config.port}")
print(f"Host: {loader.get_config('host', 'localhost')}")
案例:插件系统
通过 getattr(plugin, method_name) 按名称调用插件方法,实现可配置、可扩展的插件执行。
class PluginManager:
"""插件管理器"""
def __init__(self):
self.plugins = {}
def register_plugin(self, name, plugin_class):
"""注册插件"""
self.plugins[name] = plugin_class()
def execute_plugin(self, name, method_name, *args, **kwargs):
"""动态执行插件方法"""
if name not in self.plugins:
return f"插件 {name} 不存在"
plugin = self.plugins[name]
# 检查方法是否存在
if not hasattr(plugin, method_name):
return f"插件 {name} 没有方法 {method_name}"
# 动态调用方法
method = getattr(plugin, method_name)
try:
result = method(*args, **kwargs)
return result
except Exception as e:
return f"执行错误: {e}"
def list_plugin_methods(self, name):
"""列出插件的所有方法"""
if name not in self.plugins:
return []
plugin = self.plugins[name]
methods = [
attr for attr in dir(plugin)
if callable(getattr(plugin, attr)) and not attr.startswith('_')
]
return methods
# 定义插件
class ImagePlugin:
def resize(self, width, height):
return f"调整图片大小到 {width}x{height}"
def rotate(self, angle):
return f"旋转图片 {angle} 度"
class TextPlugin:
def uppercase(self, text):
return text.upper()
def reverse(self, text):
return text[::-1]
# 使用插件管理器
manager = PluginManager()
manager.register_plugin('image', ImagePlugin)
manager.register_plugin('text', TextPlugin)
# 动态执行插件方法
print(manager.execute_plugin('image', 'resize', 800, 600))
print(manager.execute_plugin('text', 'uppercase', 'hello world'))
print(manager.execute_plugin('text', 'reverse', 'Python'))
# 列出插件方法
print("\n图片插件方法:", manager.list_plugin_methods('image'))
print("文本插件方法:", manager.list_plugin_methods('text'))
十九、内存管理与性能优化
19.1 垃圾回收机制 (GC)
概念
Python 的自动内存管理依赖两套机制协同工作:
- 引用计数:每个对象记录被引用次数,为 0 时立即回收,是主要、实时性最高的机制。
- 循环垃圾回收器 (cycle collector):专门处理互相引用形成的环,在引用计数无法回收时介入。
作用
- 自动回收无引用对象:开发者无需手动
free,降低内存泄漏与野指针风险。 - 解决循环引用:对父子、链表、图等结构中的环进行检测与回收,避免“逻辑已废弃但计数不为 0”的对象常驻内存。
- 可控的回收策略:通过
gc模块调整阈值、手动触发、排查不可回收对象,便于定位内存问题。
应用场景
- 长生命周期的服务:Web/API 服务、后台任务中,通过理解 GC 行为与阈值,避免峰值内存持续升高。
- 大量临时对象:解析、序列化、缓存构建等场景下,结合
gc.collect()或调整阈值控制回收节奏。 - 循环引用排查:对象“理论上应被回收却未回收”时,用
gc.garbage、gc.get_referrers等分析引用链。
查看引用计数
import sys
# 创建对象
a = [1, 2, 3]
print(f"引用计数: {sys.getrefcount(a)}") # 2 (a 本身 + getrefcount 参数)
# 增加引用
b = a
print(f"引用计数: {sys.getrefcount(a)}") # 3
# 删除引用
del b
print(f"引用计数: {sys.getrefcount(a)}") # 2
垃圾回收控制
import gc
# 查看垃圾回收器状态
print("GC 状态:", gc.isenabled()) # True
# 获取垃圾回收阈值
print("GC 阈值:", gc.get_threshold()) # (700, 10, 10)
# 手动触发垃圾回收
collected = gc.collect()
print(f"回收了 {collected} 个对象")
# 查看当前追踪的对象数量
print(f"追踪对象数: {len(gc.get_objects())}")
# 禁用自动垃圾回收(谨慎使用)
gc.disable()
print("GC 已禁用")
# 重新启用
gc.enable()
print("GC 已启用")
19.2 引用计数与循环引用
概念
- 引用计数:每个对象有一个计数,被引用则 +1,引用删除则 -1,为 0 时由 CPython 立刻回收。
- 循环引用:A 引用 B、B 又引用 A(或更长环),导致双方计数均 ≥1,仅靠引用计数无法回收,需依赖 GC 的环检测。
作用
- 理解“为什么有的对象没被
del却仍存在”:多半被其他对象或全局/闭包引用。 - 识别“逻辑上已不用但内存不降”的情况:多为循环引用,需配合
weakref或调整结构打破环。
应用场景
- 缓存、观察者、树/图结构:父子互指、监听器与被监听对象互指时,用
weakref将一方改为弱引用,避免循环。 - 排查内存不释放:用
gc.get_referrers(obj)、gc.get_referents(obj)画引用关系,找到意外持有引用的一方。
案例:循环引用问题
import gc
class Node:
def __init__(self, value):
self.value = value
self.next = None
def __del__(self):
print(f"Node {self.value} 被删除")
# 创建循环引用
def create_cycle():
node1 = Node(1)
node2 = Node(2)
# 创建循环引用
node1.next = node2
node2.next = node1
print("创建了循环引用")
# 函数结束,但对象不会立即被删除
create_cycle()
print("函数执行完毕")
# 手动触发垃圾回收
print("\n触发垃圾回收:")
collected = gc.collect()
print(f"回收了 {collected} 个对象")
检测循环引用
import gc
class TreeNode:
def __init__(self, value):
self.value = value
self.parent = None
self.children = []
# 创建树结构(有循环引用)
root = TreeNode(1)
child1 = TreeNode(2)
child2 = TreeNode(3)
root.children = [child1, child2]
child1.parent = root
child2.parent = root
# 查找循环引用
gc.collect() # 先清理
garbage = gc.garbage
print(f"垃圾对象: {len(garbage)}")
# 查看对象的引用关系
print("\n引用关系:")
for obj in gc.get_referents(root):
print(f" {type(obj)}: {obj if not isinstance(obj, list) else '[...]'}")
使用弱引用避免循环
import weakref
class TreeNode:
def __init__(self, value):
self.value = value
self._parent = None # 使用弱引用
self.children = []
@property
def parent(self):
return self._parent() if self._parent else None
@parent.setter
def parent(self, node):
self._parent = weakref.ref(node) if node else None
def __del__(self):
print(f"TreeNode {self.value} 被删除")
# 测试弱引用
root = TreeNode(1)
child = TreeNode(2)
root.children.append(child)
child.parent = root # 使用弱引用
print("删除 root:")
del root
print("root 已删除\n")
print("删除 child:")
del child
print("child 已删除")
19.3 __slots__ 优化内存
概念
在类中定义 __slots__ = ['attr1', 'attr2', ...] 后,实例不再使用 __dict__ 字典存属性,而是为列出的属性在 C 层分配固定槽位,从而减少内存与属性访问开销。
作用
- 显著降低实例内存:大量小对象时,少一个
__dict__能明显减少总内存与缓存占用。 - 略提速属性访问:按槽位偏移访问,比字典查找更稳定、可预测。
- 约束实例属性:只能拥有
__slots__中的属性,避免拼写或误用带来的隐式新属性。
应用场景
- 高密度数据结构:图节点、事件、日志条目、解析得到的 Token 等,实例数量大、属性固定时最适合。
- 与 C 扩展/序列化对接:固定字段更容易与 struct、二进制协议、数据库行一一对应。
- 不适用:需要运行时动态增删属性、或依赖
__dict__做序列化/反射的类。
案例:基础使用
import sys
# 普通类
class RegularPoint:
def __init__(self, x, y):
self.x = x
self.y = y
# 使用 __slots__
class SlottedPoint:
__slots__ = ['x', 'y']
def __init__(self, x, y):
self.x = x
self.y = y
# 内存对比
regular = RegularPoint(10, 20)
slotted = SlottedPoint(10, 20)
print(f"普通类实例大小: {sys.getsizeof(regular)} + {sys.getsizeof(regular.__dict__)} bytes")
print(f"Slots类实例大小: {sys.getsizeof(slotted)} bytes")
# 测试动态属性
try:
regular.z = 30 # 成功
print("普通类可以添加属性")
except AttributeError:
pass
try:
slotted.z = 30 # 失败
except AttributeError as e:
print(f"Slots类不能添加属性: {e}")
实战案例:大量对象优化
import sys
import time
class RegularStudent:
def __init__(self, name, age, grade):
self.name = name
self.age = age
self.grade = grade
class SlottedStudent:
__slots__ = ['name', 'age', 'grade']
def __init__(self, name, age, grade):
self.name = name
self.age = age
self.grade = grade
# 性能测试
def benchmark(class_type, count=100000):
start_time = time.time()
students = [class_type(f"Student_{i}", 18, 'A') for i in range(count)]
creation_time = time.time() - start_time
# 内存使用
memory = sys.getsizeof(students[0])
if hasattr(students[0], '__dict__'):
memory += sys.getsizeof(students[0].__dict__)
return creation_time, memory
# 运行测试
print("创建 100,000 个对象:")
print("-" * 50)
regular_time, regular_mem = benchmark(RegularStudent)
print(f"普通类:")
print(f" 创建时间: {regular_time:.3f} 秒")
print(f" 单个对象内存: {regular_mem} bytes")
slotted_time, slotted_mem = benchmark(SlottedStudent)
print(f"\nSlots类:")
print(f" 创建时间: {slotted_time:.3f} 秒")
print(f" 单个对象内存: {slotted_mem} bytes")
print(f"\n内存节省: {((regular_mem - slotted_mem) / regular_mem * 100):.1f}%")
print(f"速度提升: {((regular_time - slotted_time) / regular_time * 100):.1f}%")
19.4 性能分析工具
cProfile - 性能分析
import cProfile
import pstats
from io import StringIO
def fibonacci(n):
"""递归计算斐波那契数"""
if n <= 1:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
def fibonacci_memo(n, memo={}):
"""带缓存的斐波那契"""
if n in memo:
return memo[n]
if n <= 1:
return n
memo[n] = fibonacci_memo(n - 1, memo) + fibonacci_memo(n - 2, memo)
return memo[n]
# 性能分析
def profile_function(func, *args):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args)
profiler.disable()
# 输出统计
s = StringIO()
stats = pstats.Stats(profiler, stream=s)
stats.strip_dirs()
stats.sort_stats('cumulative')
stats.print_stats(10) # 显示前10行
print(s.getvalue())
return result
print("普通递归:")
profile_function(fibonacci, 20)
print("\n带缓存递归:")
profile_function(fibonacci_memo, 20)
timeit - 精确计时
import timeit
# 测试列表推导式 vs for循环
def test_list_comp():
return [x**2 for x in range(1000)]
def test_for_loop():
result = []
for x in range(1000):
result.append(x**2)
return result
# 使用 timeit
time_comp = timeit.timeit(test_list_comp, number=10000)
time_loop = timeit.timeit(test_for_loop, number=10000)
print(f"列表推导式: {time_comp:.4f} 秒")
print(f"For 循环: {time_loop:.4f} 秒")
print(f"性能提升: {((time_loop - time_comp) / time_loop * 100):.1f}%")
实战案例:性能监控装饰器
import time
import functools
from collections import defaultdict
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.stats = defaultdict(lambda: {'calls': 0, 'total_time': 0, 'min_time': float('inf'), 'max_time': 0})
def monitor(self, func):
"""监控装饰器"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
# 更新统计
stats = self.stats[func.__name__]
stats['calls'] += 1
stats['total_time'] += elapsed
stats['min_time'] = min(stats['min_time'], elapsed)
stats['max_time'] = max(stats['max_time'], elapsed)
return result
return wrapper
def report(self):
"""生成报告"""
print("\n" + "="*70)
print(f"{'函数':<20} {'调用次数':<10} {'总时间(s)':<12} {'平均(s)':<12} {'最小(s)':<10} {'最大(s)':<10}")
print("="*70)
for func_name, stats in sorted(self.stats.items()):
avg_time = stats['total_time'] / stats['calls']
print(f"{func_name:<20} {stats['calls']:<10} {stats['total_time']:<12.6f} "
f"{avg_time:<12.6f} {stats['min_time']:<10.6f} {stats['max_time']:<10.6f}")
print("="*70)
# 使用监控器
monitor = PerformanceMonitor()
@monitor.monitor
def process_data(size):
return sum(i**2 for i in range(size))
@monitor.monitor
def process_list(size):
result = []
for i in range(size):
result.append(i**2)
return sum(result)
# 运行测试
for _ in range(100):
process_data(10000)
process_list(10000)
# 显示报告
monitor.report()
19.5 内存泄漏检测
使用 tracemalloc
import tracemalloc
def memory_leak_example():
"""模拟内存泄漏"""
leaked_list = []
for i in range(100000):
leaked_list.append([i] * 100)
# leaked_list 没有被清理
return "完成"
# 启动内存追踪
tracemalloc.start()
# 记录初始快照
snapshot1 = tracemalloc.take_snapshot()
# 执行可能泄漏的代码
result = memory_leak_example()
# 记录执行后快照
snapshot2 = tracemalloc.take_snapshot()
# 对比快照
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("内存增长 Top 10:")
for stat in top_stats[:10]:
print(stat)
# 显示当前内存使用
current, peak = tracemalloc.get_traced_memory()
print(f"\n当前内存: {current / 1024 / 1024:.2f} MB")
print(f"峰值内存: {peak / 1024 / 1024:.2f} MB")
tracemalloc.stop()
案例:内存泄漏检测器
通过多次快照对比,定位哪些代码路径导致内存持续增长,便于在长跑服务中排查泄漏。
import tracemalloc
import gc
class MemoryLeakDetector:
"""内存泄漏检测器"""
def __init__(self):
self.snapshots = []
tracemalloc.start()
def take_snapshot(self, label=""):
"""拍摄快照"""
gc.collect() # 先回收垃圾
snapshot = tracemalloc.take_snapshot()
self.snapshots.append((label, snapshot))
print(f"✓ 快照已保存: {label}")
def compare_snapshots(self, index1=0, index2=-1, top=10):
"""比较两个快照"""
if len(self.snapshots) < 2:
print("至少需要两个快照进行比较")
return
label1, snapshot1 = self.snapshots[index1]
label2, snapshot2 = self.snapshots[index2]
print(f"\n{'='*70}")
print(f"对比: '{label1}' vs '{label2}'")
print(f"{'='*70}")
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print(f"\n内存增长 Top {top}:")
for i, stat in enumerate(top_stats[:top], 1):
print(f"{i}. {stat}")
def get_current_memory(self):
"""获取当前内存使用"""
current, peak = tracemalloc.get_traced_memory()
return {
'current_mb': current / 1024 / 1024,
'peak_mb': peak / 1024 / 1024
}
def stop(self):
"""停止追踪"""
tracemalloc.stop()
print("✓ 内存追踪已停止")
# 使用检测器
detector = MemoryLeakDetector()
# 初始快照
detector.take_snapshot("初始状态")
# 模拟操作1
data1 = [list(range(1000)) for _ in range(1000)]
detector.take_snapshot("创建 data1")
# 模拟操作2
data2 = {i: list(range(100)) for i in range(10000)}
detector.take_snapshot("创建 data2")
# 清理 data1
del data1
detector.take_snapshot("删除 data1")
# 比较快照
detector.compare_snapshots(0, 1) # 初始 vs data1
detector.compare_snapshots(1, 2) # data1 vs data2
detector.compare_snapshots(0, -1) # 初始 vs 最终
# 显示内存使用
memory = detector.get_current_memory()
print(f"\n当前内存使用: {memory['current_mb']:.2f} MB")
print(f"峰值内存使用: {memory['peak_mb']:.2f} MB")
detector.stop()
二十、并发编程进阶
20.1 多线程与 GIL
概念
GIL (Global Interpreter Lock) 是 CPython 的全局解释器锁:同一进程内,任意时刻仅有一个线程在执行 Python 字节码。因此多线程在“纯 Python 计算”上无法真正并行利用多核,但在等待 I/O、sleep、或调用会释放 GIL 的 C 扩展时,其他线程可以执行。
作用
- 保证解释器内部一致性:避免引用计数、内存分配等在多线程下竞争,简化 CPython 实现。
- 明确多线程适用边界:CPU 密集用多进程/其他语言;I/O 密集、混合负载中“等待型”部分仍适合多线程。
- 为选型提供依据:理解 GIL 后可在“多线程 / 多进程 / 协程 / 混用”之间做合理选择。
应用场景
- I/O 密集:大量请求、文件、数据库、HTTP 等阻塞等待时,多线程可明显提升吞吐。
- 混合型任务:主线程负责 UI 或调度,工作线程做 I/O、调用 C 库(如 numpy)等,仍能受益。
- 不适合:纯 Python 的算密型循环、大规模数值计算,应优先考虑多进程或 C/NumPy 等会释放 GIL 的扩展。
案例:基础线程使用
import threading
import time
def worker(name, delay):
"""工作线程"""
print(f"线程 {name} 开始")
time.sleep(delay)
print(f"线程 {name} 完成")
# 创建线程
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(f"Worker-{i}", 1))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("所有线程完成")
线程同步 - Lock
import threading
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.lock = threading.Lock()
def deposit(self, amount):
"""存款"""
with self.lock:
current = self.balance
time.sleep(0.001) # 模拟处理延迟
self.balance = current + amount
def withdraw(self, amount):
"""取款"""
with self.lock:
if self.balance >= amount:
current = self.balance
time.sleep(0.001)
self.balance = current - amount
return True
return False
# 测试线程安全
account = BankAccount(1000)
def do_transactions():
for _ in range(100):
account.deposit(10)
account.withdraw(10)
# 创建多个线程
threads = [threading.Thread(target=do_transactions) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终余额: {account.balance}") # 应该还是 1000
案例:线程池任务调度
使用 ThreadPoolExecutor 限制并发数、复用线程,适合批量 I/O(如爬虫、批量请求)。
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def fetch_url(url):
"""模拟网络请求"""
delay = random.uniform(0.5, 2)
time.sleep(delay)
return {
'url': url,
'status': 200,
'time': delay
}
def download_with_threadpool(urls, max_workers=5):
"""使用线程池下载"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交任务
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
# 获取结果
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
print(f"✓ 完成: {url} ({result['time']:.2f}s)")
except Exception as e:
print(f"✗ 失败: {url} - {e}")
return results
# 测试
urls = [f"https://example.com/page{i}" for i in range(20)]
start_time = time.time()
results = download_with_threadpool(urls, max_workers=5)
elapsed = time.time() - start_time
print(f"\n总耗时: {elapsed:.2f} 秒")
print(f"完成数: {len(results)}/{len(urls)}")
20.2 多进程编程
概念
通过 multiprocessing 启动独立进程,每个进程有独立解释器与内存空间,因此可绕过 GIL,真正利用多核执行 Python 字节码。进程间通过 Queue、Pipe、共享内存等通信。
作用
- 突破 GIL:CPU 密集型计算在多进程下可线性利用多核。
- 隔离失败:单进程崩溃不拖垮整体,适合批量任务、worker 池。
- 与多线程互补:算密用多进程,I/O 密集用多线程或协程。
应用场景
- 数值/科学计算:大矩阵、蒙特卡洛、图像处理等纯 Python 或 NumPy 算密循环。
- 批量离线任务:ETL、报表、批量重算,按分片或文件拆到多进程。
- 多 worker 服务:每 worker 一进程,由 supervisord、systemd 或自己 fork 管理。
案例:基础使用
import multiprocessing
import os
import time
def worker(name):
"""工作进程"""
print(f"进程 {name} (PID: {os.getpid()}) 开始")
time.sleep(2)
print(f"进程 {name} 完成")
return f"{name} 的结果"
if __name__ == '__main__':
# 创建进程池
with multiprocessing.Pool(processes=4) as pool:
# 并行执行
results = pool.map(worker, ['A', 'B', 'C', 'D', 'E'])
print(f"\n所有结果: {results}")
进程间通信 - Queue
from multiprocessing import Process, Queue
import time
def producer(queue, name):
"""生产者"""
for i in range(5):
item = f"{name}-{i}"
queue.put(item)
print(f"生产: {item}")
time.sleep(0.5)
queue.put(None) # 结束信号
def consumer(queue, name):
"""消费者"""
while True:
item = queue.get()
if item is None:
break
print(f" 消费 [{name}]: {item}")
time.sleep(1)
if __name__ == '__main__':
queue = Queue()
# 创建进程
p1 = Process(target=producer, args=(queue, "Producer"))
c1 = Process(target=consumer, args=(queue, "Consumer"))
p1.start()
c1.start()
p1.join()
c1.join()
实战案例:CPU 密集型任务优化
import multiprocessing
import time
import math
def is_prime(n):
"""判断质数"""
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def find_primes_single(start, end):
"""单线程查找质数"""
return [n for n in range(start, end) if is_prime(n)]
def find_primes_multi(start, end, num_processes=4):
"""多进程查找质数"""
chunk_size = (end - start) // num_processes
ranges = [(start + i * chunk_size, start + (i + 1) * chunk_size)
for i in range(num_processes)]
ranges[-1] = (ranges[-1][0], end) # 调整最后一个范围
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.starmap(find_primes_single, ranges)
# 合并结果
return [prime for sublist in results for prime in sublist]
if __name__ == '__main__':
start_num, end_num = 1, 100000
# 单进程
print("单进程:")
start_time = time.time()
primes_single = find_primes_single(start_num, end_num)
single_time = time.time() - start_time
print(f" 找到 {len(primes_single)} 个质数")
print(f" 耗时: {single_time:.2f} 秒")
# 多进程
print("\n多进程:")
start_time = time.time()
primes_multi = find_primes_multi(start_num, end_num, num_processes=4)
multi_time = time.time() - start_time
print(f" 找到 {len(primes_multi)} 个质数")
print(f" 耗时: {multi_time:.2f} 秒")
print(f"\n加速比: {single_time / multi_time:.2f}x")
20.3 协程与 asyncio
基础概念
协程(Coroutine)是一种轻量级的并发方案,特别适合 I/O 密集型任务。
基础使用
import asyncio
async def say_hello(name, delay):
"""异步问候"""
print(f"Hello {name}!")
await asyncio.sleep(delay) # 异步等待
print(f"Goodbye {name}!")
return f"{name} 完成"
async def main():
"""主协程"""
# 并发执行多个协程
results = await asyncio.gather(
say_hello("Alice", 1),
say_hello("Bob", 2),
say_hello("Charlie", 1.5)
)
print(f"结果: {results}")
# 运行
asyncio.run(main())
异步 HTTP 请求
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""异步获取 URL"""
try:
async with session.get(url) as response:
data = await response.text()
return {'url': url, 'status': response.status, 'length': len(data)}
except Exception as e:
return {'url': url, 'error': str(e)}
async def fetch_all(urls):
"""批量获取"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 测试
urls = [
'https://www.python.org',
'https://www.github.com',
'https://www.stackoverflow.com',
] * 3
start_time = time.time()
results = asyncio.run(fetch_all(urls))
elapsed = time.time() - start_time
print(f"完成 {len(results)} 个请求,耗时 {elapsed:.2f} 秒")
for result in results:
if 'error' in result:
print(f"✗ {result['url']}: {result['error']}")
else:
print(f"✓ {result['url']}: {result['status']} ({result['length']} bytes)")
实战案例:异步任务队列
import asyncio
import random
from datetime import datetime
class AsyncTaskQueue:
"""异步任务队列"""
def __init__(self, max_workers=3):
self.queue = asyncio.Queue()
self.max_workers = max_workers
self.results = []
async def worker(self, worker_id):
"""工作协程"""
while True:
task = await self.queue.get()
if task is None: # 结束信号
self.queue.task_done()
break
task_id, delay = task
print(f"[Worker-{worker_id}] 开始任务 {task_id}")
try:
await asyncio.sleep(delay) # 模拟工作
result = f"任务 {task_id} 完成 (耗时 {delay}s)"
self.results.append(result)
print(f"[Worker-{worker_id}] ✓ {result}")
except Exception as e:
print(f"[Worker-{worker_id}] ✗ 任务 {task_id} 失败: {e}")
finally:
self.queue.task_done()
async def add_task(self, task_id, delay):
"""添加任务"""
await self.queue.put((task_id, delay))
async def run(self):
"""运行队列"""
# 创建工作协程
workers = [
asyncio.create_task(self.worker(i))
for i in range(self.max_workers)
]
# 等待所有任务完成
await self.queue.join()
# 发送结束信号
for _ in range(self.max_workers):
await self.queue.put(None)
# 等待工作协程结束
await asyncio.gather(*workers)
return self.results
async def main():
queue = AsyncTaskQueue(max_workers=3)
# 添加任务
for i in range(10):
delay = random.uniform(0.5, 2)
await queue.add_task(i, delay)
# 运行
start_time = datetime.now()
results = await queue.run()
elapsed = (datetime.now() - start_time).total_seconds()
print(f"\n{'='*50}")
print(f"完成 {len(results)} 个任务,总耗时 {elapsed:.2f} 秒")
# 运行
asyncio.run(main())
20.4 并发模型对比
概念
同一类任务(如“算 n 次平方和”“等 n 秒”)可分别用顺序、多线程、多进程、协程实现,在不同负载下耗时与资源占用不同。对比测试用于量化差异,为选型提供依据。
作用与应用场景
- 选型依据:CPU 密集看多进程加速比,I/O 密集看线程/协程的吞吐与资源占用。
- 容量与成本:在目标 QPS、延时的前提下,对比线程数/进程数/连接数,便于做资源配置。
案例:性能对比测试
import time
import threading
import multiprocessing
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_task(n):
"""CPU 密集型任务"""
return sum(i * i for i in range(n))
def io_task(delay):
"""I/O 密集型任务"""
time.sleep(delay)
return f"完成 {delay}s"
async def async_io_task(delay):
"""异步 I/O 任务"""
await asyncio.sleep(delay)
return f"完成 {delay}s"
def benchmark_sequential(task, args_list):
"""顺序执行"""
start = time.time()
results = [task(arg) for arg in args_list]
return time.time() - start, results
def benchmark_threading(task, args_list, max_workers=4):
"""多线程"""
start = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(task, args_list))
return time.time() - start, results
def benchmark_multiprocessing(task, args_list, max_workers=4):
"""多进程"""
start = time.time()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(task, args_list))
return time.time() - start, results
async def benchmark_asyncio(task, args_list):
"""协程"""
start = time.time()
results = await asyncio.gather(*[task(arg) for arg in args_list])
return time.time() - start, results
# CPU 密集型测试
print("="*60)
print("CPU 密集型任务测试 (计算平方和)")
print("="*60)
cpu_args = [1000000] * 8
seq_time, _ = benchmark_sequential(cpu_task, cpu_args)
print(f"顺序执行: {seq_time:.2f} 秒")
thread_time, _ = benchmark_threading(cpu_task, cpu_args)
print(f"多线程: {thread_time:.2f} 秒 (加速 {seq_time/thread_time:.2f}x)")
if __name__ == '__main__':
mp_time, _ = benchmark_multiprocessing(cpu_task, cpu_args)
print(f"多进程: {mp_time:.2f} 秒 (加速 {seq_time/mp_time:.2f}x)")
# I/O 密集型测试
print("\n" + "="*60)
print("I/O 密集型任务测试 (sleep)")
print("="*60)
io_args = [0.5] * 8
seq_time, _ = benchmark_sequential(io_task, io_args)
print(f"顺序执行: {seq_time:.2f} 秒")
thread_time, _ = benchmark_threading(io_task, io_args)
print(f"多线程: {thread_time:.2f} 秒 (加速 {seq_time/thread_time:.2f}x)")
async_time, _ = asyncio.run(benchmark_asyncio(async_io_task, io_args))
print(f"协程: {async_time:.2f} 秒 (加速 {seq_time/async_time:.2f}x)")
二十一、设计模式
概念
设计模式是对「在特定场景下如何组织类与对象、分工与协作」的抽象总结,分为创建型、结构型、行为型。在 Python 中常结合鸭子类型、上下文管理器、元类等语法实现,形式比传统 OOP 更灵活。
作用
- 统一语言:团队用“单例”“工厂”“策略”等名称沟通,减少重复解释。
- 复用方案:在扩展点、可替换逻辑、对象创建方式上直接套用成熟套路,少走弯路。
- 平衡灵活与约束:在“可扩展”与“不过度设计”之间提供折中模板。
应用场景
- 创建型:全局唯一配置/连接(单例)、根据类型或配置生成不同实现(工厂、建造者)。
- 结构型:为已有类加能力而不改接口(装饰、适配)、统一访问集合与树(组合、迭代)。
- 行为型:可互换的算法或策略、事件/请求的传递链(责任链、观察者)、模板步骤中插入扩展点(模板方法)。
案例说明
下面各小节按创建型、结构型、行为型给出 Python 示例,每个模式都包含「目的 + 代码 + 可替换用法」,便于在项目中直接套用或简化。
21.1 创建型模式
单例模式 (Singleton)
概念与目的:确保一个类在进程内只有一个实例,常用于配置、连接池、日志等全局唯一资源。
案例: 使用 __new__ 控制实例创建,保证多次调用得到同一对象。
class Singleton:
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not Singleton._initialized:
self.data = []
Singleton._initialized = True
# 测试
s1 = Singleton()
s2 = Singleton()
print(s1 is s2) # True
s1.data.append("测试")
print(s2.data) # ['测试']
使用元类实现单例
class SingletonMeta(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class Logger(metaclass=SingletonMeta):
def __init__(self):
self.logs = []
def log(self, message):
self.logs.append(message)
print(f"[LOG] {message}")
# 测试
logger1 = Logger()
logger2 = Logger()
logger1.log("第一条日志")
logger2.log("第二条日志")
print(logger1 is logger2) # True
print(logger1.logs) # ['第一条日志', '第二条日志']
工厂模式 (Factory)
目的: 创建对象时不暴露创建逻辑
from abc import ABC, abstractmethod
class Animal(ABC):
@abstractmethod
def speak(self):
pass
class Dog(Animal):
def speak(self):
return "汪汪汪!"
class Cat(Animal):
def speak(self):
return "喵喵喵!"
class Duck(Animal):
def speak(self):
return "嘎嘎嘎!"
class AnimalFactory:
"""动物工厂"""
_animals = {
'dog': Dog,
'cat': Cat,
'duck': Duck
}
@classmethod
def create_animal(cls, animal_type):
animal_class = cls._animals.get(animal_type.lower())
if animal_class:
return animal_class()
raise ValueError(f"未知的动物类型: {animal_type}")
# 使用工厂
factory = AnimalFactory()
dog = factory.create_animal('dog')
cat = factory.create_animal('cat')
print(dog.speak()) # 汪汪汪!
print(cat.speak()) # 喵喵喵!
建造者模式 (Builder)
目的: 分步骤构建复杂对象
class Computer:
def __init__(self):
self.cpu = None
self.memory = None
self.storage = None
self.gpu = None
def __str__(self):
return f"Computer(CPU={self.cpu}, Memory={self.memory}, Storage={self.storage}, GPU={self.gpu})"
class ComputerBuilder:
"""电脑建造者"""
def __init__(self):
self.computer = Computer()
def set_cpu(self, cpu):
self.computer.cpu = cpu
return self # 链式调用
def set_memory(self, memory):
self.computer.memory = memory
return self
def set_storage(self, storage):
self.computer.storage = storage
return self
def set_gpu(self, gpu):
self.computer.gpu = gpu
return self
def build(self):
return self.computer
# 使用建造者
builder = ComputerBuilder()
gaming_pc = (builder
.set_cpu("Intel i9")
.set_memory("32GB")
.set_storage("1TB SSD")
.set_gpu("RTX 4090")
.build())
print(gaming_pc)
# 创建另一台电脑
office_pc = (ComputerBuilder()
.set_cpu("Intel i5")
.set_memory("16GB")
.set_storage("512GB SSD")
.build())
print(office_pc)
21.2 结构型模式
适配器模式 (Adapter)
目的: 将一个类的接口转换成客户期望的另一个接口
class EuropeanSocket:
"""欧洲插座"""
def voltage(self):
return 230
def live(self):
return 1
def neutral(self):
return -1
class AmericanSocket:
"""美国插座"""
def voltage(self):
return 120
class AmericanSocketAdapter:
"""适配器"""
def __init__(self, european_socket):
self.socket = european_socket
def voltage(self):
# 转换电压
return self.socket.voltage() * 120 / 230
class ElectricDevice:
"""电器设备"""
def __init__(self, socket):
self.socket = socket
def charge(self):
voltage = self.socket.voltage()
print(f"充电中,电压: {voltage:.1f}V")
# 使用适配器
european = EuropeanSocket()
adapter = AmericanSocketAdapter(european)
device = ElectricDevice(adapter)
device.charge() # 充电中,电压: 120.0V
装饰器模式 (Decorator)
目的: 动态地给对象添加新功能
from abc import ABC, abstractmethod
class Coffee(ABC):
@abstractmethod
def cost(self):
pass
@abstractmethod
def description(self):
pass
class SimpleCoffee(Coffee):
def cost(self):
return 5
def description(self):
return "简单咖啡"
class CoffeeDecorator(Coffee):
def __init__(self, coffee):
self._coffee = coffee
def cost(self):
return self._coffee.cost()
def description(self):
return self._coffee.description()
class Milk(CoffeeDecorator):
def cost(self):
return self._coffee.cost() + 2
def description(self):
return self._coffee.description() + ", 加牛奶"
class Sugar(CoffeeDecorator):
def cost(self):
return self._coffee.cost() + 1
def description(self):
return self._coffee.description() + ", 加糖"
class WhippedCream(CoffeeDecorator):
def cost(self):
return self._coffee.cost() + 3
def description(self):
return self._coffee.description() + ", 加奶油"
# 使用装饰器
coffee = SimpleCoffee()
print(f"{coffee.description()}: ¥{coffee.cost()}")
coffee = Milk(coffee)
print(f"{coffee.description()}: ¥{coffee.cost()}")
coffee = Sugar(coffee)
print(f"{coffee.description()}: ¥{coffee.cost()}")
coffee = WhippedCream(coffee)
print(f"{coffee.description()}: ¥{coffee.cost()}")
# 输出:
# 简单咖啡: ¥5
# 简单咖啡, 加牛奶: ¥7
# 简单咖啡, 加牛奶, 加糖: ¥8
# 简单咖啡, 加牛奶, 加糖, 加奶油: ¥11
代理模式 (Proxy)
目的: 为对象提供代理以控制访问
from abc import ABC, abstractmethod
class Image(ABC):
@abstractmethod
def display(self):
pass
class RealImage(Image):
"""真实图片"""
def __init__(self, filename):
self.filename = filename
self.load_from_disk()
def load_from_disk(self):
print(f"加载图片: {self.filename}")
def display(self):
print(f"显示图片: {self.filename}")
class ImageProxy(Image):
"""图片代理 - 延迟加载"""
def __init__(self, filename):
self.filename = filename
self.real_image = None
def display(self):
if self.real_image is None:
self.real_image = RealImage(self.filename)
self.real_image.display()
# 使用代理
print("创建代理对象:")
image1 = ImageProxy("photo1.jpg")
image2 = ImageProxy("photo2.jpg")
print("\n第一次显示:")
image1.display() # 此时才加载
print("\n第二次显示:")
image1.display() # 直接显示,不再加载
21.3 行为型模式
策略模式 (Strategy)
目的: 定义一系列算法,让它们可以互相替换
from abc import ABC, abstractmethod
class PaymentStrategy(ABC):
@abstractmethod
def pay(self, amount):
pass
class CreditCardPayment(PaymentStrategy):
def __init__(self, card_number):
self.card_number = card_number
def pay(self, amount):
print(f"使用信用卡 {self.card_number} 支付 ¥{amount}")
class AlipayPayment(PaymentStrategy):
def __init__(self, account):
self.account = account
def pay(self, amount):
print(f"使用支付宝账户 {self.account} 支付 ¥{amount}")
class WeChatPayment(PaymentStrategy):
def __init__(self, account):
self.account = account
def pay(self, amount):
print(f"使用微信账户 {self.account} 支付 ¥{amount}")
class ShoppingCart:
def __init__(self):
self.items = []
self.payment_strategy = None
def add_item(self, item, price):
self.items.append((item, price))
def set_payment_strategy(self, strategy):
self.payment_strategy = strategy
def checkout(self):
total = sum(price for _, price in self.items)
if self.payment_strategy:
self.payment_strategy.pay(total)
else:
print("请选择支付方式")
# 使用策略模式
cart = ShoppingCart()
cart.add_item("Python书籍", 89)
cart.add_item("键盘", 299)
# 选择支付方式
cart.set_payment_strategy(AlipayPayment("user@example.com"))
cart.checkout()
# 更换支付方式
cart.set_payment_strategy(WeChatPayment("wx_12345"))
cart.checkout()
观察者模式 (Observer)
目的: 当对象状态改变时,通知所有依赖它的对象
from abc import ABC, abstractmethod
class Observer(ABC):
@abstractmethod
def update(self, subject):
pass
class Subject:
def __init__(self):
self._observers = []
self._state = None
def attach(self, observer):
self._observers.append(observer)
def detach(self, observer):
self._observers.remove(observer)
def notify(self):
for observer in self._observers:
observer.update(self)
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
self.notify()
class EmailObserver(Observer):
def update(self, subject):
print(f"[Email] 状态变更通知: {subject.state}")
class SMSObserver(Observer):
def update(self, subject):
print(f"[SMS] 状态变更通知: {subject.state}")
class LogObserver(Observer):
def update(self, subject):
print(f"[Log] 记录状态: {subject.state}")
# 使用观察者模式
subject = Subject()
# 注册观察者
email_observer = EmailObserver()
sms_observer = SMSObserver()
log_observer = LogObserver()
subject.attach(email_observer)
subject.attach(sms_observer)
subject.attach(log_observer)
# 改变状态
print("订单状态: 已下单")
subject.state = "已下单"
print("\n订单状态: 已发货")
subject.state = "已发货"
# 移除观察者
subject.detach(sms_observer)
print("\n订单状态: 已送达")
subject.state = "已送达"
命令模式 (Command)
目的: 将请求封装为对象,从而可以参数化、排队或记录请求
from abc import ABC, abstractmethod
class Command(ABC):
@abstractmethod
def execute(self):
pass
@abstractmethod
def undo(self):
pass
class Light:
def __init__(self):
self.is_on = False
def turn_on(self):
self.is_on = True
print("灯已打开")
def turn_off(self):
self.is_on = False
print("灯已关闭")
class LightOnCommand(Command):
def __init__(self, light):
self.light = light
def execute(self):
self.light.turn_on()
def undo(self):
self.light.turn_off()
class LightOffCommand(Command):
def __init__(self, light):
self.light = light
def execute(self):
self.light.turn_off()
def undo(self):
self.light.turn_on()
class RemoteControl:
def __init__(self):
self.history = []
def execute_command(self, command):
command.execute()
self.history.append(command)
def undo_last(self):
if self.history:
command = self.history.pop()
command.undo()
# 使用命令模式
light = Light()
remote = RemoteControl()
# 创建命令
on_command = LightOnCommand(light)
off_command = LightOffCommand(light)
# 执行命令
remote.execute_command(on_command)
remote.execute_command(off_command)
remote.execute_command(on_command)
# 撤销
print("\n撤销操作:")
remote.undo_last()
remote.undo_last()
二十二、数据结构与算法进阶
22.1 时间复杂度与空间复杂度
常见时间复杂度
| 复杂度 | 名称 | 示例 |
|---|---|---|
| O(1) | 常数时间 | 数组访问 |
| O(log n) | 对数时间 | 二分查找 |
| O(n) | 线性时间 | 遍历数组 |
| O(n log n) | 线性对数 | 快速排序 |
| O(n²) | 平方时间 | 冒泡排序 |
| O(2ⁿ) | 指数时间 | 递归斐波那契 |
复杂度分析示例
import time
def measure_time(func):
"""测量执行时间"""
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
print(f"{func.__name__}: {elapsed:.6f} 秒")
return result
return wrapper
# O(1) - 常数时间
@measure_time
def constant_time(arr, index):
return arr[index]
# O(n) - 线性时间
@measure_time
def linear_time(arr, target):
for item in arr:
if item == target:
return True
return False
# O(n²) - 平方时间
@measure_time
def quadratic_time(arr):
result = []
for i in arr:
for j in arr:
result.append((i, j))
return result
# O(log n) - 对数时间
@measure_time
def binary_search(arr, target):
left, right = 0, len(arr) - 1
while left <= right:
mid = (left + right) // 2
if arr[mid] == target:
return mid
elif arr[mid] < target:
left = mid + 1
else:
right = mid - 1
return -1
# 测试
arr = list(range(10000))
constant_time(arr, 5000)
linear_time(arr, 9999)
binary_search(arr, 9999)
small_arr = list(range(100))
quadratic_time(small_arr)
22.2 常用算法实现
排序算法
def quick_sort(arr):
"""快速排序 - O(n log n)"""
if len(arr) <= 1:
return arr
pivot = arr[len(arr) // 2]
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
return quick_sort(left) + middle + quick_sort(right)
def merge_sort(arr):
"""归并排序 - O(n log n)"""
if len(arr) <= 1:
return arr
mid = len(arr) // 2
left = merge_sort(arr[:mid])
right = merge_sort(arr[mid:])
# 合并
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
# 测试
arr = [64, 34, 25, 12, 22, 11, 90]
print(f"原数组: {arr}")
print(f"快速排序: {quick_sort(arr)}")
print(f"归并排序: {merge_sort(arr)}")
动态规划
def fibonacci_dp(n):
"""斐波那契数列 - 动态规划"""
if n <= 1:
return n
dp = [0] * (n + 1)
dp[1] = 1
for i in range(2, n + 1):
dp[i] = dp[i-1] + dp[i-2]
return dp[n]
def longest_common_subsequence(text1, text2):
"""最长公共子序列"""
m, n = len(text1), len(text2)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(1, m + 1):
for j in range(1, n + 1):
if text1[i-1] == text2[j-1]:
dp[i][j] = dp[i-1][j-1] + 1
else:
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
return dp[m][n]
def knapsack(weights, values, capacity):
"""0-1 背包问题"""
n = len(weights)
dp = [[0] * (capacity + 1) for _ in range(n + 1)]
for i in range(1, n + 1):
for w in range(1, capacity + 1):
if weights[i-1] <= w:
dp[i][w] = max(
dp[i-1][w],
dp[i-1][w - weights[i-1]] + values[i-1]
)
else:
dp[i][w] = dp[i-1][w]
return dp[n][capacity]
# 测试
print(f"Fibonacci(10): {fibonacci_dp(10)}")
text1 = "ABCDGH"
text2 = "AEDFHR"
print(f"LCS('{text1}', '{text2}'): {longest_common_subsequence(text1, text2)}")
weights = [2, 3, 4, 5]
values = [3, 4, 5, 6]
capacity = 8
print(f"背包问题: {knapsack(weights, values, capacity)}")
22.3 collections 高级容器
deque - 双端队列
from collections import deque
# 创建队列
queue = deque([1, 2, 3])
# 两端操作
queue.append(4) # 右端添加
queue.appendleft(0) # 左端添加
print(queue) # deque([0, 1, 2, 3, 4])
queue.pop() # 右端删除
queue.popleft() # 左端删除
print(queue) # deque([1, 2, 3])
# 旋转
queue.rotate(1) # 向右旋转
print(queue) # deque([3, 1, 2])
queue.rotate(-1) # 向左旋转
print(queue) # deque([1, 2, 3])
Counter - 计数器
from collections import Counter
# 统计元素
text = "hello world"
counter = Counter(text)
print(counter) # Counter({'l': 3, 'o': 2, ...})
# 最常见的元素
print(counter.most_common(3)) # [('l', 3), ('o', 2), ('h', 1)]
# 计数器运算
c1 = Counter(['a', 'b', 'c', 'a'])
c2 = Counter(['a', 'b', 'd'])
print(c1 + c2) # Counter({'a': 3, 'b': 2, 'c': 1, 'd': 1})
print(c1 - c2) # Counter({'a': 1, 'c': 1})
defaultdict - 默认字典
from collections import defaultdict
# 自动创建默认值
dd = defaultdict(list)
dd['fruits'].append('apple')
dd['fruits'].append('banana')
dd['vegetables'].append('carrot')
print(dict(dd))
# {'fruits': ['apple', 'banana'], 'vegetables': ['carrot']}
# 用于分组
students = [
('Alice', 'A'),
('Bob', 'B'),
('Charlie', 'A'),
('David', 'B')
]
groups = defaultdict(list)
for name, grade in students:
groups[grade].append(name)
print(dict(groups))
# {'A': ['Alice', 'Charlie'], 'B': ['Bob', 'David']}
namedtuple - 命名元组
from collections import namedtuple
# 创建命名元组类
Point = namedtuple('Point', ['x', 'y'])
Person = namedtuple('Person', ['name', 'age', 'city'])
# 创建实例
p = Point(10, 20)
print(f"坐标: ({p.x}, {p.y})")
person = Person('Alice', 25, 'Beijing')
print(f"姓名: {person.name}, 年龄: {person.age}")
# 转换为字典
print(person._asdict())
# {'name': 'Alice', 'age': 25, 'city': 'Beijing'}
22.4 heapq - 堆操作
基础使用
import heapq
# 创建最小堆
heap = []
heapq.heappush(heap, 5)
heapq.heappush(heap, 3)
heapq.heappush(heap, 7)
heapq.heappush(heap, 1)
print(heap) # [1, 3, 7, 5]
# 弹出最小值
print(heapq.heappop(heap)) # 1
print(heap) # [3, 5, 7]
# 查看最小值(不弹出)
print(heap[0]) # 3
# 一次性创建堆
numbers = [5, 3, 7, 1, 9, 2]
heapq.heapify(numbers)
print(numbers) # [1, 3, 2, 5, 9, 7]
实战案例:Top K 问题
import heapq
def find_k_largest(arr, k):
"""找出最大的 K 个元素"""
return heapq.nlargest(k, arr)
def find_k_smallest(arr, k):
"""找出最小的 K 个元素"""
return heapq.nsmallest(k, arr)
# 测试
numbers = [23, 1, 45, 67, 12, 89, 34, 56]
print(f"最大的 3 个: {find_k_largest(numbers, 3)}")
print(f"最小的 3 个: {find_k_smallest(numbers, 3)}")
# 自定义比较
students = [
{'name': 'Alice', 'score': 85},
{'name': 'Bob', 'score': 92},
{'name': 'Charlie', 'score': 78},
{'name': 'David', 'score': 95}
]
top_students = heapq.nlargest(2, students, key=lambda x: x['score'])
print(f"成绩最高的 2 名: {top_students}")
实战案例:合并有序列表
import heapq
def merge_sorted_lists(*lists):
"""合并多个有序列表"""
return list(heapq.merge(*lists))
# 测试
list1 = [1, 4, 7]
list2 = [2, 5, 8]
list3 = [3, 6, 9]
result = merge_sorted_lists(list1, list2, list3)
print(f"合并结果: {result}")
22.5 bisect - 二分查找
import bisect
# 有序列表
arr = [1, 3, 5, 7, 9]
# 查找插入位置
pos = bisect.bisect_left(arr, 6)
print(f"6 应插入的位置: {pos}") # 3
# 插入元素并保持有序
bisect.insort(arr, 6)
print(f"插入后: {arr}") # [1, 3, 5, 6, 7, 9]
# 查找元素
def binary_search_exists(arr, target):
i = bisect.bisect_left(arr, target)
return i < len(arr) and arr[i] == target
print(binary_search_exists(arr, 6)) # True
print(binary_search_exists(arr, 8)) # False
二十三、函数式编程
23.1 高阶函数与闭包
高阶函数
高阶函数: 接受函数作为参数或返回函数的函数
# map, filter, reduce
from functools import reduce
numbers = [1, 2, 3, 4, 5]
# map - 映射
squared = list(map(lambda x: x**2, numbers))
print(f"平方: {squared}")
# filter - 过滤
even = list(filter(lambda x: x % 2 == 0, numbers))
print(f"偶数: {even}")
# reduce - 归约
sum_all = reduce(lambda x, y: x + y, numbers)
print(f"求和: {sum_all}")
闭包
def make_multiplier(n):
"""创建乘法函数"""
def multiplier(x):
return x * n
return multiplier
# 创建不同的乘法器
double = make_multiplier(2)
triple = make_multiplier(3)
print(double(5)) # 10
print(triple(5)) # 15
# 闭包保存状态
def make_counter():
count = 0
def counter():
nonlocal count
count += 1
return count
return counter
c1 = make_counter()
c2 = make_counter()
print(c1()) # 1
print(c1()) # 2
print(c2()) # 1
23.2 函数组合与柯里化
函数组合
def compose(*functions):
"""组合多个函数"""
def inner(arg):
result = arg
for func in reversed(functions):
result = func(result)
return result
return inner
# 示例函数
def add_one(x):
return x + 1
def double(x):
return x * 2
def square(x):
return x ** 2
# 组合函数
combined = compose(square, double, add_one)
print(combined(3)) # ((3 + 1) * 2) ** 2 = 64
柯里化
def curry(func):
"""柯里化装饰器"""
def curried(*args, **kwargs):
if len(args) + len(kwargs) >= func.__code__.co_argcount:
return func(*args, **kwargs)
return lambda *more_args, **more_kwargs: curried(*(args + more_args), **{**kwargs, **more_kwargs})
return curried
@curry
def add_three_numbers(a, b, c):
return a + b + c
# 使用柯里化
print(add_three_numbers(1)(2)(3)) # 6
print(add_three_numbers(1, 2)(3)) # 6
print(add_three_numbers(1)(2, 3)) # 6
23.3 惰性求值与无限序列
生成器实现惰性求值
def fibonacci():
"""无限斐波那契数列"""
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# 获取前10个
fib = fibonacci()
first_10 = [next(fib) for _ in range(10)]
print(f"前10个斐波那契数: {first_10}")
# 素数生成器
def primes():
"""无限素数序列"""
def is_prime(n):
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True
n = 2
while True:
if is_prime(n):
yield n
n += 1
# 获取前10个素数
prime_gen = primes()
first_10_primes = [next(prime_gen) for _ in range(10)]
print(f"前10个素数: {first_10_primes}")
itertools 高级迭代
from itertools import islice, takewhile, dropwhile, cycle, repeat
# islice - 切片迭代器
def count_from(n):
while True:
yield n
n += 1
print(list(islice(count_from(10), 5))) # [10, 11, 12, 13, 14]
# takewhile - 条件获取
numbers = [1, 4, 6, 8, 10, 3, 5]
print(list(takewhile(lambda x: x < 10, numbers))) # [1, 4, 6, 8]
# dropwhile - 条件跳过
print(list(dropwhile(lambda x: x < 10, numbers))) # [10, 3, 5]
# cycle - 循环迭代
colors = cycle(['red', 'green', 'blue'])
print([next(colors) for _ in range(7)])
# ['red', 'green', 'blue', 'red', 'green', 'blue', 'red']
# repeat - 重复
print(list(repeat('A', 5))) # ['A', 'A', 'A', 'A', 'A']
23.4 functools 工具集
partial - 偏函数
from functools import partial
def power(base, exponent):
return base ** exponent
# 创建平方函数
square = partial(power, exponent=2)
print(square(5)) # 25
# 创建立方函数
cube = partial(power, exponent=3)
print(cube(5)) # 125
lru_cache - 缓存
from functools import lru_cache
@lru_cache(maxsize=128)
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
# 测试性能
import time
start = time.time()
result = fibonacci(35)
elapsed = time.time() - start
print(f"Fibonacci(35) = {result}")
print(f"耗时: {elapsed:.6f} 秒")
# 查看缓存信息
print(fibonacci.cache_info())
reduce - 归约
from functools import reduce
# 累积求和
numbers = [1, 2, 3, 4, 5]
total = reduce(lambda x, y: x + y, numbers)
print(f"求和: {total}")
# 求最大值
maximum = reduce(lambda x, y: x if x > y else y, numbers)
print(f"最大值: {maximum}")
# 连接字符串
words = ['Hello', 'World', 'Python']
sentence = reduce(lambda x, y: x + ' ' + y, words)
print(f"句子: {sentence}")
实战案例:函数式数据处理管道
from functools import reduce, partial
class Pipeline:
"""函数式数据处理管道"""
def __init__(self, data):
self.data = data
def map(self, func):
"""映射"""
self.data = list(map(func, self.data))
return self
def filter(self, func):
"""过滤"""
self.data = list(filter(func, self.data))
return self
def reduce(self, func, initial=None):
"""归约"""
if initial is None:
return reduce(func, self.data)
return reduce(func, self.data, initial)
def take(self, n):
"""获取前n个"""
self.data = self.data[:n]
return self
def get(self):
"""获取结果"""
return self.data
# 使用管道
result = (Pipeline([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.filter(lambda x: x % 2 == 0) # 过滤偶数
.map(lambda x: x ** 2) # 平方
.take(3) # 取前3个
.get())
print(f"结果: {result}") # [4, 16, 36]
# 求和示例
total = (Pipeline([1, 2, 3, 4, 5])
.map(lambda x: x * 2)
.reduce(lambda x, y: x + y))
print(f"总和: {total}") # 30
二十四、描述符协议
24.1 描述符协议基础
概念
描述符是实现了 __get__、__set__ 或 __delete__ 中至少一个方法的类,用于在属性访问时插入自定义逻辑。property、classmethod、staticmethod 等均基于描述符实现。
协议方法:
__get__(self, obj, type=None) -> value:读取属性时调用__set__(self, obj, value):写入属性时调用(仅数据描述符可写)__delete__(self, obj):del 属性时调用
作用
- 集中校验与转换:在属性的读写路径上统一做类型检查、范围校验、格式化。
- 惰性计算与缓存:在
__get__中实现按需计算并缓存,避免重复开销。 - 透明封装:对外仍是「属性访问」,对内可接入日志、权限、审计等横切逻辑。
应用场景
- ORM 字段、配置项、环境变量映射等「带约束的属性」。
- 缓存属性、惰性属性、只读/只写属性。
- 框架中的绑定方法、类级配置(如 Django 的 Field)。
数据描述符与非数据描述符
# 非数据描述符:仅实现 __get__
class CachedAttribute:
"""只读缓存描述符"""
def __init__(self, func):
self.func = func
self.attrname = None
def __set_name__(self, owner, name):
self.attrname = name
def __get__(self, obj, type=None):
if obj is None:
return self
value = self.func(obj)
object.__setattr__(obj, self.attrname, value)
return value
# 数据描述符:实现 __set__ 或 __delete__,优先级高于实例字典
class Typed:
def __init__(self, name, expected_type):
self.name = name
self.expected_type = expected_type
def __get__(self, obj, type=None):
if obj is None:
return self
return obj.__dict__.get(self.name)
def __set__(self, obj, value):
if not isinstance(value, self.expected_type):
raise TypeError(f"{self.name} 期望 {self.expected_type}")
obj.__dict__[self.name] = value
class Person:
name = Typed("name", str)
age = Typed("age", int)
def __init__(self, name, age):
self.name = name
self.age = age
p = Person("Alice", 30)
# p.age = "30" # TypeError
24.2 property 与自定义描述符
用描述符实现 property 行为
class Property:
"""简易 property 描述符"""
def __init__(self, fget=None, fset=None, fdel=None):
self.fget = fget
self.fset = fset
self.fdel = fdel
def __get__(self, obj, type=None):
if obj is None:
return self
if self.fget is None:
raise AttributeError("只写属性")
return self.fget(obj)
def __set__(self, obj, value):
if self.fset is None:
raise AttributeError("只读属性")
self.fset(obj, value)
def __delete__(self, obj):
if self.fdel is None:
raise AttributeError("不可删除")
self.fdel(obj)
def setter(self, fset):
return type(self)(self.fget, fset, self.fdel)
def deleter(self, fdel):
return type(self)(self.fget, self.fset, fdel)
案例:带校验与缓存的属性
class Range:
"""数值范围校验描述符"""
def __init__(self, min_val, max_val, name=None):
self.min_val = min_val
self.max_val = max_val
self.name = name
def __set_name__(self, owner, name):
self.name = name
def __get__(self, obj, type=None):
if obj is None:
return self
return obj.__dict__.get(self.name)
def __set__(self, obj, value):
if not (self.min_val <= value <= self.max_val):
raise ValueError(f"{self.name} 须在 [{self.min_val}, {self.max_val}] 内")
obj.__dict__[self.name] = value
class Score:
math = Range(0, 100, "math")
english = Range(0, 100, "english")
def __init__(self, math, english):
self.math = math
self.english = english
s = Score(85, 90)
# s.math = 150 # ValueError
二十五、上下文管理器深入
25.1 协议与 with 语句
概念
上下文管理器通过 __enter__ 和 __exit__ 与 with 配合,在进入/退出代码块时自动完成资源的获取与释放,并统一处理异常。
作用
- 资源安全:确保文件、锁、连接等在使用后一定被关闭或释放。
- 减少样板:避免到处写 try/finally。
- 可组合:多个
with或contextlib.ExitStack可叠加使用。
应用场景
文件、socket、数据库连接、线程锁、临时目录、事务、计时/日志区块等。
基础用法
class FileGuard:
def __init__(self, path, mode="r"):
self.path = path
self.mode = mode
self.f = None
def __enter__(self):
self.f = open(self.path, self.mode)
return self.f
def __exit__(self, exc_type, exc_val, exc_tb):
if self.f:
self.f.close()
return False # 不吞掉异常
with FileGuard("/tmp/hello.txt", "w") as f:
f.write("hello")
25.2 contextlib 与常见模式
contextmanager 与 closing
from contextlib import contextmanager, closing
@contextmanager
def timer(name):
import time
start = time.perf_counter()
try:
yield
finally:
print(f"{name}: {time.perf_counter() - start:.4f}s")
with timer("计算"):
total = sum(range(10**6))
# closing:对含 close() 的对象做上下文管理
from urllib.request import urlopen
with closing(urlopen("https://www.python.org")) as resp:
print(len(resp.read()))
ExitStack 与嵌套资源
from contextlib import ExitStack
def open_many(paths, mode="r"):
stack = ExitStack()
files = [stack.enter_context(open(p, mode)) for p in paths]
stack.push(lambda *a: None) # 防止 with 结束后立刻 close
return stack, files
with ExitStack() as stack:
stack, (f1, f2) = open_many(["/tmp/a.txt", "/tmp/b.txt"])
stack.pop_all() # 把清理延到外层 with 结束
# 使用 f1, f2...
25.3 案例:可重入锁与事务上下文
import threading
from contextlib import contextmanager
@contextmanager
def lock_section(lock):
lock.acquire()
try:
yield
finally:
lock.release()
lock = threading.Lock()
with lock_section(lock):
# 临界区
pass
二十六、测试驱动开发
26.1 TDD 流程与原则
概念
TDD(Test-Driven Development)指先写失败测试,再写最少实现使测试通过,然后重构,循环进行。即「红 — 绿 — 重构」。
作用
- 需求即用例:测试充当可执行规格,降低理解与回归成本。
- 设计更简洁:为「可测」会倒逼接口清晰、依赖注入。
- 信心:改动后快速验证行为是否保持。
应用场景
业务逻辑、工具函数、API、边界与异常分支都适合先写测试再实现。
26.2 unittest 与 pytest 使用
unittest 示例
import unittest
def add(a, b):
return a + b
class TestAdd(unittest.TestCase):
def test_positive(self):
self.assertEqual(add(2, 3), 5)
def test_negative(self):
self.assertEqual(add(-1, -2), -3)
def test_type_error(self):
with self.assertRaises(TypeError):
add("1", 2)
if __name__ == "__main__":
unittest.main()
pytest 示例与 fixture
import pytest
def add(a, b):
return a + b
@pytest.fixture
def sample_data():
return [1, 2, 3]
def test_add_basic():
assert add(2, 3) == 5
def test_add_with_fixture(sample_data):
assert add(sample_data[0], sample_data[1]) == 3
@pytest.mark.parametrize("a,b,expected", [(1,2,3), (0,0,0), (-1,1,0)])
def test_add_param(a, b, expected):
assert add(a, b) == expected
26.3 Mock 与隔离
from unittest.mock import Mock, patch
def fetch_user(user_id):
# 假设这里会调外部 API
import requests
r = requests.get(f"https://api.example.com/users/{user_id}")
return r.json()
def test_fetch_user_mocked():
with patch("__main__.requests.get") as mock_get:
mock_get.return_value.json.return_value = {"id": 1, "name": "Alice"}
user = fetch_user(1)
assert user["name"] == "Alice"
mock_get.assert_called_once()
二十七、代码质量与规范
27.1 PEP 8 与风格约定
概念
PEP 8 是 Python 官方推荐编码风格,涵盖命名、缩进、行宽、空行、导入顺序等,便于团队协作与长期维护。
作用
- 可读性:统一风格减少认知负担。
- 工具链:许多检查与格式化工具(如 flake8、black)以 PEP 8 为基础。
常见要点
| 项目 | 建议 |
|---|---|
| 缩进 | 4 空格 |
| 行宽 | 一般 88 或 100 字符 |
| 命名 | 模块/函数/变量小写+下划线;类 CapWords |
| 导入 | 标准库 → 第三方 → 本地,分组并用空行隔开 |
27.2 类型注解与静态检查
类型注解
def greet(name: str, times: int = 1) -> str:
return (name + " ") * times
from typing import List, Optional, Union
def process(items: List[int], flag: Optional[bool] = None) -> Union[str, int]:
return sum(items) if flag else "skipped"
静态检查(mypy)
在项目根目录配置 mypy.ini 或 pyproject.toml 后运行 mypy .,可在不改动运行时的前提下发现类型不一致。
27.3 格式化与规范工具
- black:自动化格式化,减少风格争论。
- isort:统一 import 排序与分组。
- flake8 / ruff:风格与简单逻辑检查。
- pylint:更深度的代码质量与复杂度分析。
# 常见用法
black src/
isort src/
ruff check src/
二十八、C 扩展与性能提升
28.1 为何用 C 扩展
概念
在对计算密集或底层 I/O 有极高要求时,可用 C/C++ 实现扩展模块,由 Python 通过 ctypes、Cython 或 C API 调用,以突破解释器与 GIL 带来的性能天花板。
作用
- 算力:数值计算、编解码、压缩、图像处理等可获得数量级加速。
- 复用:直接调用已有 C/C++ 库,避免用 Python 重写。
- 底层能力:接触系统调用、硬件、专有 SDK 等。
应用场景
科学计算、游戏引擎、高频交易、协议解析、加解密、音视频处理等。
28.2 ctypes 调用 C 动态库
# 假设有 libcalc.so / calc.dll,提供 int add(int, int)
from ctypes import CDLL, c_int
lib = CDLL("./libcalc.so") # 或 "calc.dll" 等
lib.add.argtypes = [c_int, c_int]
lib.add.restype = c_int
print(lib.add(3, 5)) # 8
28.3 Cython 简述
Cython 将「类 Python」的代码编译为 C 再编译为扩展模块,适合在保留 Python 语法习惯的前提下做热点优化。
# 保存为 demo.pyx,用 cython 编译或通过 setuptools 构建
def fib(int n):
cdef int i, a = 0, b = 1
for i in range(n):
a, b = b, a + b
return a
构建常用 setuptools 的 Extension("demo", ["demo.pyx"]) 并指定 cythonize,得到 .so/.pyd 后即可在 Python 中 import demo 使用。
二十九、网络编程进阶
29.1 socket 与协议设计
概念
socket 是进程间或跨主机通信的通用句柄。TCP 面向连接、可靠;UDP 无连接、可丢包但延迟更低。协议设计需约定字节序、消息边界、头部与-body 等。
作用
- 自定义协议:在 TCP/UDP 上实现 RPC、游戏协议、物联网指令等。
- 调试与对接:直接用 socket 模拟客户端/服务端,验证协议与流控。
简单 TCP 服务端与客户端
# 服务端
import socket
import threading
def handle(client):
data = client.recv(1024)
client.sendall(b"echo: " + data)
client.close()
def serve():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("0.0.0.0", 9999))
sock.listen(5)
while True:
client, addr = sock.accept()
threading.Thread(target=handle, args=(client,)).start()
# 客户端
def client():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", 9999))
s.sendall(b"hello")
print(s.recv(1024).decode())
s.close()
29.2 异步 I/O 与 asyncio
import asyncio
async def tcp_echo_client():
reader, writer = await asyncio.open_connection("127.0.0.1", 9999)
writer.write(b"hello")
await writer.drain()
data = await reader.read(1024)
print(data.decode())
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client())
高并发时用 asyncio + 协议/传输层抽象,可在一线程内处理大量连接,适用于 I/O 密集型服务。
三十、数据库高级操作
30.1 ORM 进阶与连接池
概念
ORM 把表映射成类、行映射成对象,通过会话(Session)做增删改查。连接池在进程内复用数据库连接,降低建连/断连开销,并限制并发连接数。
作用
- 抽象与可移植:用 Python 对象和关系操作表达业务,减少手写 SQL 与方言差异。
- 性能与资源:连接池避免频繁建连,配合合理的提交与刷新策略,提高吞吐。
应用场景
Web 应用、后台任务、报表与数据分析中的持久化层;高 QPS 服务必须配合连接池与慢查优化。
SQLAlchemy 连接池与会话
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(
"sqlite:///app.db",
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
)
Session = sessionmaker(bind=engine, autocommit=False, autoflush=False)
def get_db():
session = Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
30.2 事务与 SQL 优化
- 事务:在 Session 内多次写操作后一次性
commit(),异常时rollback(),保证原子性。 - 批量写:使用
bulk_insert_mappings、add_all或批量execute,减少往返次数。 - 查优:用
joinedload/selectinload控制 N+1;对统计、报表用纯 SQL 或仅查所需列;必要时建索引、分页。
from sqlalchemy.orm import joinedload
# 避免 N+1
rows = session.query(Order).options(joinedload(Order.items)).filter(Order.user_id == uid).all()
三十一、分布式系统
31.1 分布式中的基本概念
概念
分布式系统把存储与计算分散到多台节点,通过网络协作完成业务。会涉及一致性、可用性、分区容忍性(CAP)、共识、幂等、超时与重试等。
作用
- 扩展与高可用:水平扩展、多活、故障隔离。
- 职责拆分:数据库、缓存、消息队列、任务队列等各司其职。
应用场景
多实例 Web/API、跨机房部署、订单与库存等强一致或最终一致场景、异步任务与事件驱动架构。
31.2 消息队列与缓存
- 消息队列(如 RabbitMQ、Kafka):削峰、解耦、异步、顺序与重试;Python 常用
pika、kafka-python、celery等。 - 缓存(如 Redis):热点数据、会话、计数、分布式锁;Python 常用
redis库。
import redis
r = redis.Redis(host="localhost", port=6379, db=0)
r.set("key", "value", ex=3600)
print(r.get("key"))
31.3 简单架构示例
典型分层:接入层(负载均衡)→ 应用层(无状态多实例)→ 消息队列 + 任务 Worker → 数据库 + 缓存。通过配置中心、健康检查与优雅下线,实现可扩展与可运维。
三十二、Web 框架源码剖析
32.1 请求生命周期与 WSGI
概念
WSGI(Web Server Gateway Interface)是 Python Web 与服务器之间的标准接口:服务器调用 application(environ, start_response),应用在 environ 中读请求、写响应并调用 start_response。
作用
- 解耦:同一套应用可跑在 gunicorn、uwsgi、asgi 等不同服务器上。
- 理解框架:大部分 Python Web 框架都在 WSGI 之上封装路由、请求对象、响应、中间件。
最小 WSGI 应用
def app(environ, start_response):
status = "200 OK"
headers = [("Content-Type", "text/plain; charset=utf-8")]
start_response(status, headers)
return [b"Hello World\n"]
32.2 路由、请求与中间件
- 路由:根据 method + path 找到处理函数,常用正则或前缀树(如 werkzeug 的 Map)。
- 请求/响应:把 environ 封装成 Request,把返回值封装成 Response,统一 headers、body、状态码。
- 中间件:在应用外围包装若干层,每层可修改请求/响应、做鉴权、日志、异常捕获等。实现方式多为「包装 app:`middleware(env, start_response) → 调用下一层或最终 app」」。
Flask、Django、Bottle 等都在此模型上做了各自的路由、模板、ORM 集成;读其 WSGI 入口和一次请求的调用链,即可把握「从 socket 到视图」的路径。
三十三、安全编程
33.1 注入与 XSS
概念
- 注入:用户输入被拼进命令、SQL 或其它解释器,导致执行任意代码或越权访问。防范措施:参数化查询、白名单校验、最小权限、避免
eval/os.system拼接输入。 - XSS:在 HTML 中注入脚本,在他人浏览器执行。防范:输出编码、Content-Security-Policy、HttpOnly Cookie。
作用
在设计与实现阶段就减少可被利用面,保护数据与身份。
应用场景
所有接受用户输入或展示用户生成内容的系统:表单、搜索、评论、管理后台、API。
33.2 认证、授权与敏感数据
- 认证:确认「是谁」,常用密码+盐哈希、Token(JWT)、OAuth2 等,会话需安全存储与过期。
- 授权:确认「能做什么」,用 RBAC/ABAC、权限与资源绑定,避免越权访问。
- 敏感数据:传输用 HTTPS;存储尽量加密或脱敏;日志与错误信息中不出现密码、Token、完整卡号等。
33.3 依赖与配置安全
- 使用依赖前确认来源与版本,定期更新并关注安全公告(如
pip-audit、safety)。 - 密钥、数据库地址等放环境变量或加密配置中心,不进入代码库;生产环境关闭调试、禁用默认账号与多余端口。
三十四、项目架构与工程化
34.1 项目结构与配置
概念
通过清晰的分层与配置管理,把「跑得起来」的脚本变成可维护、可扩展、可交付的工程。
作用
- 可维护性:新人能快速找到入口、配置与业务边界。
- 可测可配:通过配置切换环境与开关,通过结构支持单测与集成测试。
推荐分层示例
my_project/
config/ # 配置与环境变量
src/
core/ # 领域/内核
api/ # HTTP/ RPC 接口
jobs/ # 定时/队列任务
tests/
scripts/ # 部署、迁移、运维脚本
pyproject.toml
34.2 日志、监控与部署
- 日志:按级别输出到文件或集中收集,附带 trace_id、用户/请求标识,便于排障与审计。
- 监控:指标(QPS、延迟、错误率)、健康检查、告警;必要时链路追踪与性能分析。
- 部署:使用虚拟环境或容器,通过 CI 跑测试与风格检查;发布采用蓝绿/金丝雀等策略,配合回滚与数据库迁移流程。
34.3 CI/CD 与可维护性
- CI:提交触发构建、单元测试、静态检查、安全扫描,未通过则不合并。
- CD:通过流水线将已通过 CI 的版本自动部署到测试/预发/生产,并保留人工审批或开关。
- 可维护性:接口稳定、文档与变更日志、依赖与 Python 版本固定、关键路径有测试与监控,便于长期迭代与故障定位。
34.4 综合项目案例
以下四个小项目将本教程中的配置、上下文管理、并发、测试、类型注解、数据处理、FastAPI 与 Agent/工具调用等知识点串起来,便于仿写与扩展。
项目案例一:待办事项 REST API(Flask + 配置 + 测试)
目标:一个最小可用的待办 API,支持列出、新增、完成/删除,带配置与单测。
涉及技术:项目结构、环境配置、上下文管理器(DB 会话)、REST 设计、pytest + Mock。
目录结构示意:
todo_api/
config.py # 从环境变量加载配置
app.py # Flask 应用与路由
models.py # 使用 SQLite + 简单封装(或 SQLAlchemy)
tests/
test_api.py # 接口测试
run.py # 启动入口
核心代码示例:
# config.py
import os
from dataclasses import dataclass
@dataclass
class Config:
env: str = "dev"
db_path: str = "todo.db"
secret_key: str = "change-me"
@classmethod
def from_env(cls) -> "Config":
return cls(
env=os.getenv("ENV", "dev"),
db_path=os.getenv("DB_PATH", "todo.db"),
secret_key=os.getenv("SECRET_KEY", "dev-secret"),
)
# app.py(精简)
from flask import Flask, request, jsonify
import sqlite3
from contextlib import contextmanager
from config import Config
config = Config.from_env()
app = Flask(__name__)
@contextmanager
def get_db():
conn = sqlite3.connect(config.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
@app.route("/todos", methods=["GET"])
def list_todos():
with get_db() as conn:
rows = conn.execute("SELECT id, title, done FROM todos").fetchall()
return jsonify([dict(r) for r in rows])
@app.route("/todos", methods=["POST"])
def create_todo():
data = request.get_json() or {}
title = data.get("title", "").strip()
if not title:
return jsonify({"error": "title required"}), 400
with get_db() as conn:
cur = conn.execute("INSERT INTO todos (title, done) VALUES (?, 0)", (title,))
tid = cur.lastrowid
return jsonify({"id": tid, "title": title, "done": False}), 201
# 初始化表(可在 run.py 里调用)
def init_db():
with get_db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS todos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
done INTEGER DEFAULT 0
)
""")
测试示例(tests/test_api.py):
import pytest
from unittest.mock import patch, MagicMock
@pytest.fixture
def client():
with patch("app.get_db") as m:
conn = MagicMock()
m.return_value.__enter__ = lambda s: conn
m.return_value.__exit__ = lambda s, *a: None
from app import app
app.config["TESTING"] = True
with app.test_client() as c:
yield c
def test_list_todos_empty(client):
r = client.get("/todos")
assert r.status_code == 200
assert r.get_json() == []
项目案例二:本地异步任务队列(asyncio + 内存队列)
目标:实现一个「生产者—消费者」的异步任务队列,内存存储、可扩展为 Redis 队列。
涉及技术:asyncio、上下文管理器(生命周期)、类型注解、简单协议设计。
# async_queue.py
import asyncio
from typing import Any, Callable, Awaitable
from dataclasses import dataclass, field
@dataclass
class Task:
id: str
name: str
payload: dict[str, Any] = field(default_factory=dict)
class AsyncTaskQueue:
def __init__(self, workers: int = 2):
self._q: asyncio.Queue[Task] = asyncio.Queue()
self._workers = workers
self._handlers: list[Callable[[Task], Awaitable[None]]] = []
def register(self, handler: Callable[[Task], Awaitable[None]]) -> None:
self._handlers.append(handler)
async def put(self, task: Task) -> None:
await self._q.put(task)
async def _run_worker(self) -> None:
while True:
task = await self._q.get()
try:
for h in self._handlers:
await h(task)
finally:
self._q.task_done()
async def __aenter__(self) -> "AsyncTaskQueue":
self._workers_task = asyncio.gather(
*[self._run_worker() for _ in range(self._workers)]
)
return self
async def __aexit__(self, *args: Any) -> None:
await self._q.join()
self._workers_task.cancel()
try:
await self._workers_task
except asyncio.CancelledError:
pass
# 使用示例
async def main():
async def handler(task: Task) -> None:
print(f"处理任务: {task.name} -> {task.payload}")
async with AsyncTaskQueue(workers=2) as queue:
queue.register(handler)
await queue.put(Task(id="1", name="say_hi", payload={"msg": "hello"}))
await asyncio.sleep(0.5)
asyncio.run(main())
项目案例三:日志统计命令行工具(管道 + 类型注解 + 测试)
目标:从标准输入或文件读取类似 Nginx 的访问日志行,输出「按路径/状态码聚合」的简单统计。
涉及技术:函数式管道(迭代器/生成器)、类型注解、argparse、可测的函数设计。
# log_stats.py
import re
import sys
from collections import Counter
from typing import Iterator, TextIO
# 假定格式: 127.0.0.1 - - [日期] "GET /api/x HTTP/1.1" 200 1234
LOG_PATTERN = re.compile(
r'^.*?"(?:GET|POST|PUT|DELETE)\s+(\S+).*?"\s+(\d{3})'
)
def parse_line(line: str) -> tuple[str, str] | None:
m = LOG_PATTERN.match(line.strip())
if m:
return m.group(1), m.group(2)
return None
def read_lines(f: TextIO) -> Iterator[str]:
for line in f:
yield line
def pipeline(lines: Iterator[str]) -> tuple[Counter[str], Counter[str]]:
paths: Counter[str] = Counter()
codes: Counter[str] = Counter()
for line in lines:
parsed = parse_line(line)
if parsed:
path, code = parsed
paths[path] += 1
codes[code] += 1
return paths, codes
def main() -> None:
import argparse
p = argparse.ArgumentParser(description="简单访问日志统计")
p.add_argument("file", nargs="?", type=argparse.FileType("r"), default=sys.stdin)
args = p.parse_args()
paths, codes = pipeline(read_lines(args.file))
print("按路径:")
for path, n in paths.most_common(10):
print(f" {path} -> {n}")
print("按状态码:")
for code, n in codes.most_common():
print(f" {code} -> {n}")
if __name__ == "__main__":
main()
单测示例:
# tests/test_log_stats.py
from io import StringIO
from log_stats import parse_line, pipeline, read_lines
def test_parse_line():
assert parse_line('1.2.3.4 - - [...] "GET /api/users HTTP/1.1" 200 0') == ("/api/users", "200")
assert parse_line("invalid") is None
def test_pipeline():
lines = [
'... "GET /a HTTP/1.1" 200 0',
'... "GET /a HTTP/1.1" 200 0',
'... "GET /b HTTP/1.1" 404 0',
]
paths, codes = pipeline(read_lines(StringIO("\n".join(lines))))
assert paths["/a"] == 2 and paths["/b"] == 1
assert codes["200"] == 2 and codes["404"] == 1
项目案例四:功能完整的 Agent 服务(FastAPI + 工具调用)
目标:一个可直接跑通的「对话 Agent」服务,支持多轮对话、多种工具(计算、天气、时间、单位换算、知识检索、会话内待办)、工具链式调用与 ReAct 式多步推理、会话记忆、流式输出(SSE)、健康检查与接口文档,且不依赖真实 LLM 即可用规则引擎完整演示。
涉及技术:FastAPI 路由与依赖注入、Pydantic 请求/响应、异步与 SSE、工具注册与 Schema 描述、多轮会话与内存记忆、中间件与 CORS、可替换的 Agent「大脑」。
目录结构:
agent_api/
main.py # FastAPI 应用、路由、中间件、SSE
config.py # 环境配置、日志
schemas.py # 请求/响应/工具 Schema
agent/
__init__.py
brain.py # 规则引擎 + 多步 ReAct 循环(可换 LLM)
tools.py # 工具实现与注册表、get_tool_schemas()
middleware/
log_and_cors.py # 请求日志、CORS(可选)
tests/
test_tools.py
test_brain.py
test_chat_api.py
requirements.txt
1. 配置与依赖
# config.py
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class Config:
env: str = "dev"
log_level: str = "INFO"
max_react_steps: int = 5
session_ttl_seconds: int = 3600
api_key: Optional[str] = None # 若设置,则请求头需带 X-API-Key
@classmethod
def from_env(cls) -> "Config":
return cls(
env=os.getenv("ENV", "dev"),
log_level=os.getenv("LOG_LEVEL", "INFO"),
max_react_steps=int(os.getenv("MAX_REACT_STEPS", "5")),
session_ttl_seconds=int(os.getenv("SESSION_TTL", "3600")),
api_key=os.getenv("AGENT_API_KEY"),
)
# requirements.txt
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
pydantic>=2.0.0
2. 数据模型(schemas.py)
from pydantic import BaseModel, Field
from typing import Optional, Any, Literal
class ToolParameter(BaseModel):
name: str
type: str = "string"
description: Optional[str] = None
required: bool = True
class ToolSchema(BaseModel):
name: str
description: str
parameters: list[ToolParameter] = Field(default_factory=list)
class ToolCall(BaseModel):
name: str
arguments: dict[str, Any] = Field(default_factory=dict)
class ChatMessage(BaseModel):
role: Literal["user", "assistant", "tool", "system"]
content: str
tool_calls: Optional[list[ToolCall]] = None
tool_call_id: Optional[str] = None
class ChatRequest(BaseModel):
messages: list[ChatMessage]
stream: bool = False
session_id: Optional[str] = None # 多轮会话标识
class ToolUse(BaseModel):
name: str
arguments: dict[str, Any]
result: str
class ChatResponse(BaseModel):
message: ChatMessage
tool_calls_used: list[ToolUse] = Field(default_factory=list)
session_id: Optional[str] = None
3. 工具集(agent/tools.py)
from typing import Any, Callable, get_type_hints
import math
import re
from datetime import datetime, timezone
from schemas import ToolSchema, ToolParameter
TOOLS: dict[str, Callable[..., str]] = {}
_TOOL_SCHEMAS: dict[str, ToolSchema] = {}
def register_tool(name: str, description: str, param_schema: list[dict] | None = None):
def decorator(f: Callable[..., str]):
TOOLS[name] = f
params = param_schema or [
{"name": k, "type": "string", "description": "", "required": True}
for k in f.__code__.co_varnames[: f.__code__.co_argcount]
if k != "self"
]
_TOOL_SCHEMAS[name] = ToolSchema(
name=name, description=description,
parameters=[ToolParameter(**p) for p in params]
)
return f
return decorator
@register_tool("calculator", "对数学表达式求值,支持 + - * / ** 及 sin/cos/sqrt",
[{"name": "expression", "type": "string", "description": "数学表达式", "required": True}])
def calculator(expression: str) -> str:
expr = re.sub(r"\s+", "", expression)
if not re.match(r"^[\d+\-*/().\s\sin\cos\sqrt]+$", expr):
return "表达式包含不允许的字符"
try:
allowed = {"sin": math.sin, "cos": math.cos, "sqrt": math.sqrt}
val = eval(expr, {"__builtins__": {}}, allowed)
return str(round(val, 8) if isinstance(val, float) else val)
except Exception as e:
return f"计算错误: {e}"
@register_tool("get_weather", "查询城市天气(示例为模拟数据)",
[{"name": "city", "type": "string", "description": "城市名", "required": True}])
def get_weather(city: str) -> str:
# 模拟数据,实际可接天气 API
mock = {"北京": "晴 28°C", "上海": "多云 26°C", "深圳": "雷阵雨 30°C"}
return mock.get(city, f"{city} 晴 25°C")
@register_tool("get_current_time", "获取当前时间或指定时区时间",
[{"name": "timezone_name", "type": "string", "description": "如 UTC、Asia/Shanghai,默认本地", "required": False}])
def get_current_time(timezone_name: str = "Asia/Shanghai") -> str:
try:
from zoneinfo import ZoneInfo
tz = ZoneInfo(timezone_name)
except Exception:
tz = timezone.utc
return datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S %Z")
# 简单单位换算:米↔英尺,千克↔磅
_UNIT_MAP = {"米": ("m", 1), "英尺": ("ft", 0.3048), "千克": ("kg", 1), "磅": ("lb", 0.453592)}
@register_tool("unit_convert", "单位换算,支持 米/英尺、千克/磅",
[{"name": "value", "type": "string", "description": "数值", "required": True},
{"name": "from_unit", "type": "string", "description": "原单位", "required": True},
{"name": "to_unit", "type": "string", "description": "目标单位", "required": True}])
def unit_convert(value: str, from_unit: str, to_unit: str) -> str:
try:
v = float(value)
except ValueError:
return "数值格式错误"
from_u = _UNIT_MAP.get(from_unit.strip()) or _UNIT_MAP.get(from_unit.strip() + ".")
to_u = _UNIT_MAP.get(to_unit.strip()) or _UNIT_MAP.get(to_unit.strip() + ".")
if not from_u or not to_u or from_u[0][0] != to_u[0][0]:
return "不支持该单位或单位类型不一致"
base = v * from_u[1]
result = base / to_u[1]
return f"{v} {from_unit} = {round(result, 6)} {to_unit}"
# 内存知识库(示例)
_KB: dict[str, str] = {
"python": "Python 是一种解释型、面向对象的高级编程语言。",
"fastapi": "FastAPI 是基于 Starlette 和 Pydantic 的现代 Python Web 框架,支持异步与自动 OpenAPI 文档。",
"agent": "Agent 指能根据输入自主选择调用工具并多步推理完成目标的程序,常与 LLM 结合。",
}
@register_tool("search_kb", "从知识库检索简短说明",
[{"name": "keyword", "type": "string", "description": "关键词", "required": True}])
def search_kb(keyword: str) -> str:
k = keyword.strip().lower()
for key, text in _KB.items():
if k in key or key in k:
return text
return f"未找到与「{keyword}」相关条目。已知关键词:{list(_KB.keys())}"
# 会话内待办(按 session 存,实际可接 Redis/DB)
_session_todos: dict[str, list[str]] = {}
@register_tool("todo_add", "向当前会话待办列表添加一项",
[{"name": "item", "type": "string", "description": "待办内容", "required": True}])
def todo_add(item: str, _session_id: str = "") -> str:
sid = _session_id or "default"
if sid not in _session_todos:
_session_todos[sid] = []
_session_todos[sid].append(item.strip())
return f"已添加:{item.strip()},当前共 {len(_session_todos[sid])} 项。"
@register_tool("todo_list", "列出当前会话的待办",
[{"name": "_session_id", "type": "string", "description": "会话ID", "required": False}])
def todo_list(_session_id: str = "") -> str:
sid = _session_id or "default"
items = _session_todos.get(sid, [])
if not items:
return "当前没有待办。"
return "待办列表:\n" + "\n".join(f"{i+1}. {x}" for i, x in enumerate(items, 1))
def run_tool(name: str, arguments: dict[str, Any], session_id: str = "") -> str:
if name not in TOOLS:
return f"未知工具: {name}"
fn = TOOLS[name]
varnames = set(fn.__code__.co_varnames[: fn.__code__.co_argcount])
kwargs = dict(arguments)
if "_session_id" in varnames:
kwargs.setdefault("_session_id", session_id)
if "session_id" in varnames:
kwargs.setdefault("session_id", session_id)
try:
return fn(**{k: v for k, v in kwargs.items() if k in varnames})
except TypeError as e:
return f"参数错误: {e}"
def get_tool_schemas() -> list[ToolSchema]:
return list(_TOOL_SCHEMAS.values())
4. Agent 大脑:多步 ReAct 与多轮会话(agent/brain.py)
import re
from schemas import ChatMessage, ToolCall, ToolUse
from agent.tools import run_tool, get_tool_schemas, TOOLS
# 会话历史:session_id -> [ChatMessage],仅保留最近 N 条
_sessions: dict[str, list[ChatMessage]] = {}
_MAX_HISTORY = 20
def _get_history(session_id: str | None) -> list[ChatMessage]:
if not session_id:
return []
return _sessions.get(session_id, [])
def _append_history(session_id: str | None, msg: ChatMessage) -> None:
if not session_id:
return
if session_id not in _sessions:
_sessions[session_id] = []
_sessions[session_id].append(msg)
_sessions[session_id] = _sessions[session_id][-_MAX_HISTORY:]
def _intent_tools(text: str) -> list[tuple[str, dict]]:
"""规则:返回 [(tool_name, arguments), ...],可多步。"""
text = text.strip().lower()
out: list[tuple[str, dict]] = []
if "算" in text or "计算" in text or ("多少" in text and ("+" in text or "-" in text or "*" in text)):
expr = re.sub(r"^.*?(算|计算|等于)\s*[::]?\s*", "", text, flags=re.I).strip()
expr = re.sub(r".*?([\d+\-*/().\s]+).*", r"\1", expr) or "0"
out.append(("calculator", {"expression": expr}))
if "天气" in text:
city = "北京"
for m in re.finditer(r"(\w+)\s*的?\s*天气", text):
city = m.group(1)
break
out.append(("get_weather", {"city": city}))
if "几点" in text or "时间" in text or "现在" in text:
tz = "Asia/Shanghai"
for m in re.finditer(r"(utc|asia/\w+)", text):
tz = m.group(1)
break
out.append(("get_current_time", {"timezone_name": tz}))
if "换算" in text or "等于多少" in text:
for m in re.finditer(r"(\d+(?:\.\d+)?)\s*(\S+)\s*[=等于]\s*(\S+)", text):
out.append(("unit_convert", {"value": m.group(1), "from_unit": m.group(2), "to_unit": m.group(3)}))
break
if "查" in text or "知识" in text or "什么是" in text or "介绍" in text:
for k in ["python", "fastapi", "agent"]:
if k in text:
out.append(("search_kb", {"keyword": k}))
break
if "待办" in text and ("添加" in text or "记" in text or "加入" in text):
for m in re.finditer(r"[添加记录下]+[::]\s*(.+)", text):
out.append(("todo_add", {"item": m.group(1).strip()}))
break
if "待办" in text and ("列" in text or "看" in text or "有哪些" in text):
out.append(("todo_list", {}))
return out
def decide_and_respond(
messages: list[ChatMessage],
session_id: str | None = None,
max_steps: int = 5,
) -> tuple[ChatMessage, list[ToolUse], list[ChatMessage]]:
"""多轮上下文 + 多步 ReAct:先合并会话历史与本次 messages,再对最后一条用户消息做意图识别并链式调用工具,直到无新工具或达到 max_steps。"""
all_msgs = _get_history(session_id) + messages
if not all_msgs or all_msgs[-1].role != "user":
return ChatMessage(role="assistant", content="请发送一条用户消息。"), [], all_msgs
last_content = all_msgs[-1].content.strip()
tool_uses: list[ToolUse] = []
steps = 0
current_input = last_content
while steps < max_steps:
steps += 1
planned = _intent_tools(current_input)
if not planned:
break
for name, args in planned:
result = run_tool(name, args, session_id or "")
tool_uses.append(ToolUse(name=name, arguments=args, result=result))
current_input = result # 下一步可基于结果再决策(此处规则引擎未做二次推理,仅链式执行)
# 生成最终回复文案
if not tool_uses:
content = "你好,我是 Agent。你可以问我:计算、某地天气、当前时间、单位换算、查知识(如 python/fastapi/agent)、添加/列出待办。"
else:
parts = [u.result for u in tool_uses]
content = "\n".join(parts) if len(parts) > 1 else parts[0]
msg = ChatMessage(role="assistant", content=content)
if session_id:
_append_history(session_id, all_msgs[-1])
_append_history(session_id, msg)
return msg, tool_uses, all_msgs
5. FastAPI 应用与路由(main.py)
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request, Depends, Header
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import json
import logging
import uuid
from config import Config
from schemas import ChatRequest, ChatResponse, ChatMessage, ToolUse
from agent.brain import decide_and_respond
from agent.tools import TOOLS, get_tool_schemas
cfg = Config.from_env()
logging.basicConfig(level=getattr(logging, cfg.log_level))
@asynccontextmanager
async def lifespan(app: FastAPI):
logging.info("Agent API starting, tools=%s", list(TOOLS.keys()))
yield
logging.info("Agent API shutdown")
app = FastAPI(title="Agent API", version="0.2.0", lifespan=lifespan)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
def check_api_key(x_api_key: str | None = Header(None)):
if cfg.api_key and x_api_key != cfg.api_key:
raise HTTPException(401, "Invalid or missing X-API-Key")
@app.get("/health")
async def health():
return {"status": "ok", "tools": list(TOOLS.keys())}
@app.get("/tools")
async def list_tools():
return {"tools": [s.model_dump() for s in get_tool_schemas()]}
@app.post("/chat", response_model=ChatResponse)
async def chat(req: ChatRequest, _: None = Depends(check_api_key)):
if not req.messages:
raise HTTPException(400, "messages 不能为空")
msg, used, _ = decide_and_respond(
req.messages,
session_id=req.session_id or None,
max_steps=cfg.max_react_steps,
)
return ChatResponse(
message=msg,
tool_calls_used=used,
session_id=req.session_id,
)
@app.post("/chat/stream")
async def chat_stream(req: ChatRequest, _: None = Depends(check_api_key)):
"""SSE 流式返回:先发若干 tool_use 事件,再发 content 与 done。"""
if not req.messages:
raise HTTPException(400, "messages 不能为空")
msg, used, _ = decide_and_respond(
req.messages,
session_id=req.session_id,
max_steps=cfg.max_react_steps,
)
def event_gen():
for u in used:
yield f"data: {json.dumps({'type':'tool_use','name':u.name,'result':u.result})}\n\n"
yield f"data: {json.dumps({'type':'content','content':msg.content})}\n\n"
yield "data: {\"type\":\"done\"}\n\n"
return StreamingResponse(
event_gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
6. 运行与调用示例
# 安装并启动
pip install -r requirements.txt
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# 健康检查与工具列表
curl -s http://127.0.0.1:8000/health
curl -s http://127.0.0.1:8000/tools
# 计算
curl -s -X POST http://127.0.0.1:8000/chat \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"user","content":"计算 2*3+1"}]}'
# 天气 + 多轮会话(同一 session_id 可继续问待办)
curl -s -X POST http://127.0.0.1:8000/chat \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"user","content":"上海天气"}],"session_id":"s1"}'
# 时间、单位换算、知识检索
curl -s -X POST http://127.0.0.1:8000/chat \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"user","content":"现在北京时间几点?"}]}'
curl -s -X POST http://127.0.0.1:8000/chat \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"user","content":"10 米等于多少英尺?"}]}'
curl -s -X POST http://127.0.0.1:8000/chat \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"user","content":"介绍一下 fastapi"}]}'
# 待办(需带 session_id 才会写入/读出同一列表)
curl -s -X POST http://127.0.0.1:8000/chat \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"user","content":"待办添加:买牛奶"}],"session_id":"u1"}'
curl -s -X POST http://127.0.0.1:8000/chat \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"user","content":"列出我的待办"}],"session_id":"u1"}'
# 流式接口
curl -s -N -X POST http://127.0.0.1:8000/chat/stream \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"user","content":"计算 sqrt(2)"}]}'
7. 单测示例(tests/test_tools.py / test_brain.py)
# tests/test_tools.py
from agent.tools import calculator, get_weather, search_kb, run_tool, todo_add, todo_list
def test_calculator():
assert calculator("1+2") == "3"
assert "错误" in calculator("1/0") or "ZeroDivision" in calculator("1/0")
def test_get_weather():
assert "北京" in get_weather("北京")
def test_search_kb():
assert "Python" in search_kb("python") or "python" in search_kb("python")
def test_todo():
todo_add("a", _session_id="test_sid")
todo_add("b", _session_id="test_sid")
r = todo_list(_session_id="test_sid")
assert "a" in r and "b" in r
# tests/test_brain.py
from schemas import ChatMessage
from agent.brain import decide_and_respond
def test_calc_intent():
msg, used, _ = decide_and_respond([ChatMessage(role="user", content="计算 2+3")])
assert any(u.name == "calculator" and "5" in u.result for u in used)
def test_weather_intent():
msg, used, _ = decide_and_respond([ChatMessage(role="user", content="北京天气")])
assert any(u.name == "get_weather" for u in used)
8. 扩展:接入真实 LLM
将 agent/brain.py 中的 _intent_tools 改为调用 OpenAI/本地模型的 Function Calling 输出,解析出 tool_calls 后仍用 run_tool(name, arguments, session_id) 执行,把 result 填入 messages 再继续下一轮模型调用,即可实现完整 ReAct/Agent 循环;工具注册与 FastAPI 路由无需改动。
以上四个项目可分别作为「Web 小服务」「异步任务组件」「CLI 与数据处理」「Agent/LLM 服务」的起点,再按需接入真实数据库、Redis、日志库与 LLM API,逐步贴近生产结构。