装饰器模式详解
1. 装饰器基础
装饰器是 Python 中一种强大的设计模式,允许在不修改原函数代码的情况下,为函数添加额外的功能。本质上,装饰器是一个接受函数作为参数并返回新函数的高阶函数。
import time
from functools import wraps
def timer(func):
"""计时装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} 执行时间: {end_time - start_time:.4f}秒")
return result
return wrapper
@timer
def slow_function():
time.sleep(1)
return "完成"
# 等价于: slow_function = timer(slow_function)
注意:使用
@wraps 装饰器可以保留原函数的元数据(如函数名、文档字符串等)。
2. 带参数的装饰器
装饰器本身也可以接受参数,这需要额外的一层嵌套函数。
def repeat(times):
"""重复执行装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
results = []
for _ in range(times):
result = func(*args, **kwargs)
results.append(result)
return results
return wrapper
return decorator
@repeat(times=3)
def greet(name):
return f"Hello, {name}!"
print(greet("Alice")) # 执行3次,返回结果列表
3. 类装饰器
除了函数形式的装饰器,还可以使用类来实现装饰器,这在需要维护状态时特别有用。
class CountCalls:
"""统计函数调用次数的装饰器"""
def __init__(self, func):
self.func = func
self.call_count = 0
wraps(func)(self)
def __call__(self, *args, **kwargs):
self.call_count += 1
print(f"调用次数: {self.call_count}")
return self.func(*args, **kwargs)
@CountCalls
def process_data(data):
return data * 2
process_data(10)
process_data(20)
print(f"总调用次数: {process_data.call_count}")
4. 实用装饰器示例
import functools
import logging
# 缓存装饰器
def memoize(func):
"""缓存函数结果"""
cache = {}
@wraps(func)
def wrapper(*args):
if args in cache:
return cache[args]
result = func(*args)
cache[args] = result
return result
return wrapper
@memoize
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
# 日志装饰器
def log_execution(func):
"""记录函数执行日志"""
@wraps(func)
def wrapper(*args, **kwargs):
logging.info(f"执行 {func.__name__} with args={args}, kwargs={kwargs}")
try:
result = func(*args, **kwargs)
logging.info(f"{func.__name__} 成功返回: {result}")
return result
except Exception as e:
logging.error(f"{func.__name__} 抛出异常: {e}")
raise
return wrapper
# 权限检查装饰器
def require_permission(permission):
def decorator(func):
@wraps(func)
def wrapper(user, *args, **kwargs):
if not user.has_permission(permission):
raise PermissionError(f"用户缺少权限: {permission}")
return func(user, *args, **kwargs)
return wrapper
return decorator
5. 装饰器链
多个装饰器可以叠加使用,执行顺序从下到上(从内到外)。
@timer
@log_execution
@memoize
def complex_calculation(x, y):
return x ** y + y ** x
# 等价于:
# complex_calculation = timer(log_execution(memoize(complex_calculation)))
生成器与迭代器
1. 迭代器协议
迭代器是实现了 __iter__() 和 __next__() 方法的对象,可以在 for 循环中使用。
class Countdown:
"""倒计时迭代器"""
def __init__(self, start):
self.current = start
def __iter__(self):
return self
def __next__(self):
if self.current <= 0:
raise StopIteration
self.current -= 1
return self.current + 1
# 使用迭代器
for num in Countdown(5):
print(num) # 输出: 5, 4, 3, 2, 1
2. 生成器函数
生成器是一种特殊的迭代器,使用 yield 关键字定义,可以暂停和恢复执行状态。
def fibonacci_generator(n):
"""生成斐波那契数列"""
a, b = 0, 1
count = 0
while count < n:
yield a
a, b = b, a + b
count += 1
# 使用生成器
for num in fibonacci_generator(10):
print(num, end=' ')
# 生成器表达式
squares = (x**2 for x in range(10))
print(sum(squares))
3. 高级生成器特性
生成器支持双向通信,可以通过 send()、throw() 和 close() 方法与生成器交互。
def coroutine_example():
"""协程示例"""
print("协程启动")
try:
while True:
value = yield
if value is not None:
print(f"收到值: {value}")
except GeneratorExit:
print("协程关闭")
# 使用协程
coro = coroutine_example()
next(coro) # 启动协程
coro.send(10)
coro.send(20)
coro.close()
# 生成器委托
def delegate_generator():
yield from range(5)
yield from range(10, 15)
for value in delegate_generator():
print(value)
4. 实用生成器示例
import os
def read_large_file(file_path, chunk_size=1024):
"""逐块读取大文件"""
with open(file_path, 'r') as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk
def walk_directory(path):
"""递归遍历目录"""
for root, dirs, files in os.walk(path):
for file in files:
yield os.path.join(root, file)
def pipeline(*generators):
"""生成器管道"""
def inner(data):
for gen in generators:
data = gen(data)
return data
return inner
# 数据处理管道
def filter_even(numbers):
for num in numbers:
if num % 2 == 0:
yield num
def square(numbers):
for num in numbers:
yield num ** 2
# 组合使用
data = range(10)
process = pipeline(filter_even, square)
result = list(process(data))
print(result) # [0, 4, 16, 36, 64]
异步编程
1. asyncio 基础
Python 的 asyncio 库提供了编写并发代码的能力,使用 async/await 语法定义协程。
import asyncio
async def fetch_data(name, delay):
"""模拟异步获取数据"""
print(f"开始获取 {name}")
await asyncio.sleep(delay)
print(f"完成获取 {name}")
return f"数据: {name}"
async def main():
# 并发执行多个协程
tasks = [
fetch_data("A", 2),
fetch_data("B", 1),
fetch_data("C", 3)
]
results = await asyncio.gather(*tasks)
print(f"所有结果: {results}")
# 运行异步程序
asyncio.run(main())
2. 异步上下文管理器
使用 async with 管理异步资源。
class AsyncResource:
"""异步资源管理器"""
async def __aenter__(self):
print("获取资源")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await asyncio.sleep(1)
async def do_work(self):
print("执行工作")
await asyncio.sleep(1)
async def use_resource():
async with AsyncResource() as resource:
await resource.do_work()
3. 异步迭代器
class AsyncCounter:
"""异步迭代器"""
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.current += 1
return self.current - 1
async def iterate_async():
async for num in AsyncCounter(0, 5):
print(num)
4. 实用异步模式
import aiohttp
import asyncio
async def fetch_url(session, url):
"""异步获取URL内容"""
async with session.get(url) as response:
return await response.text()
async def fetch_multiple_urls(urls):
"""并发获取多个URL"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
# 超时控制
async def fetch_with_timeout(url, timeout=5):
try:
async with asyncio.timeout(timeout):
async with aiohttp.ClientSession() as session:
return await fetch_url(session, url)
except asyncio.TimeoutError:
print(f"请求 {url} 超时")
return None
# 信号量限制并发数
async def fetch_with_semaphore(urls, max_concurrent=5):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(url):
async with semaphore:
async with aiohttp.ClientSession() as session:
return await fetch_url(session, url)
return await asyncio.gather(*[fetch_one(url) for url in urls])
元类与描述符
1. 元类基础
元类是创建类的类,可以控制类的创建过程。
class SingletonMeta(type):
"""单例模式元类"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class Database(metaclass=SingletonMeta):
def __init__(self):
print("初始化数据库连接")
# 测试单例
db1 = Database()
db2 = Database()
print(db1 is db2) # True
# 自动注册类
class PluginRegistry(type):
"""插件注册元类"""
plugins = {}
def __new__(mcs, name, bases, attrs):
cls = super().__new__(mcs, name, bases, attrs)
if name != 'Plugin':
mcs.plugins[name] = cls
return cls
class Plugin(metaclass=PluginRegistry):
pass
class ImagePlugin(Plugin):
pass
class VideoPlugin(Plugin):
pass
print(PluginRegistry.plugins) # {'ImagePlugin': ..., 'VideoPlugin': ...}
2. 描述符协议
描述符是实现了 __get__()、__set__() 或 __delete__() 方法的对象。
class TypedProperty:
"""类型检查描述符"""
def __init__(self, name, expected_type):
self.name = name
self.expected_type = expected_type
def __get__(self, instance, owner):
if instance is None:
return self
return instance.__dict__.get(self.name)
def __set__(self, instance, value):
if not isinstance(value, self.expected_type):
raise TypeError(
f"{self.name} 必须是 {self.expected_type.__name__} 类型"
)
instance.__dict__[self.name] = value
class Person:
name = TypedProperty("name", str)
age = TypedProperty("age", int)
def __init__(self, name, age):
self.name = name
self.age = age
# 使用
person = Person("Alice", 30)
# person.age = "thirty" # 抛出 TypeError
3. 属性验证器
class Validator:
"""通用验证器描述符"""
def __init__(self, name, validators):
self.name = f"_{name}"
self.validators = validators
def __get__(self, instance, owner):
if instance is None:
return self
return getattr(instance, self.name)
def __set__(self, instance, value):
for validator in self.validators:
validator(value)
setattr(instance, self.name, value)
def positive(value):
if value <= 0:
raise ValueError("值必须为正数")
def max_value(maximum):
def validator(value):
if value > maximum:
raise ValueError(f"值不能大于 {maximum}")
return validator
class Product:
price = Validator("price", [positive, max_value(10000)])
def __init__(self, price):
self.price = price
product = Product(99)
# product.price = -10 # 抛出 ValueError
提示:元类和描述符是 Python 的高级特性,主要用于框架开发和复杂的抽象设计。日常编程中应谨慎使用。
性能优化技巧
1. 性能分析
使用内置工具进行性能分析,找出性能瓶颈。
import cProfile
import pstats
from io import StringIO
def profile_function(func):
"""性能分析装饰器"""
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
# 打印统计信息
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats('cumulative')
stats.print_stats(10)
print(stream.getvalue())
return result
return wrapper
@profile_function
def slow_operation():
total = 0
for i in range(1000000):
total += i
return total
2. 使用内置函数和库
Python 的内置函数和标准库通常用 C 语言实现,性能优于纯 Python 代码。
import operator
from functools import reduce
# 使用 map 代替列表推导式(大数据集时)
numbers = range(1000000)
squared = map(lambda x: x**2, numbers)
# 使用 filter
even_numbers = filter(lambda x: x % 2 == 0, numbers)
# 使用 reduce
product = reduce(operator.mul, range(1, 6)) # 120
# 使用 operator 模块
from operator import itemgetter
data = [('Alice', 30), ('Bob', 25), ('Charlie', 35)]
sorted_data = sorted(data, key=itemgetter(1))
3. 缓存和记忆化
from functools import lru_cache
@lru_cache(maxsize=128)
def expensive_function(n):
"""使用 LRU 缓存"""
if n < 2:
return n
return expensive_function(n-1) + expensive_function(n-2)
# 自定义缓存
class CachedProperty:
"""缓存属性"""
def __init__(self, func):
self.func = func
def __get__(self, instance, owner):
if instance is None:
return self
value = self.func(instance)
setattr(instance, self.func.__name__, value)
return value
class DataProcessor:
@CachedProperty
def expensive_computation(self):
print("执行昂贵的计算")
return sum(range(1000000))
4. 使用生成器节省内存
# 避免一次性加载大数据
def process_large_dataset(filename):
"""逐行处理大文件"""
with open(filename) as f:
for line in f:
yield process_line(line)
# 使用 itertools 处理大数据
from itertools import islice, chain
def chunked_reader(iterable, chunk_size):
"""分块读取迭代器"""
iterator = iter(iterable)
while True:
chunk = list(islice(iterator, chunk_size))
if not chunk:
break
yield chunk
# 使用
for chunk in chunked_reader(range(1000000), 1000):
process_chunk(chunk)
性能优化原则:
- 先保证代码正确性,再考虑优化
- 使用性能分析工具找出真正的瓶颈
- 优先使用算法优化,而非微优化
- 考虑使用 NumPy、Cython 等扩展库处理计算密集型任务