Home Python 高级教程:异步与并发编程详解.md
Post
Cancel

Python 高级教程:异步与并发编程详解.md

在现代软件开发中,高性能和响应速度是应用程序的关键要求。Python 作为一门流行的编程语言,提供了强大的异步和并发编程工具来满足这些需求。本教程将深入探讨 Python 中的异步编程和并发编程,通过实际示例帮助你掌握这些高级概念。

目录

  • 异步编程
    • asyncio 基础
    • 事件循环
    • async/await 语法
    • 任务和 Future
    • 异步上下文管理器
  • 并发编程
    • 多线程编程
    • 多进程编程
    • 线程池和进程池
    • 锁和同步原语
    • 并发模式设计

1. 异步编程

异步编程是一种编程范式,它允许程序在等待某些操作完成时继续执行其他任务,而不是被阻塞。在 Python 中,asyncio 是实现异步编程的核心库。

1.1 asyncio

协程基础

协程(Coroutine)是异步编程的核心概念。它们是可以暂停执行的特殊函数,允许其他代码在等待期间运行。在使用协程时,需要注意以下关键点:

  • 使用 async def 定义协程函数
  • 使用 await 等待协程执行完成
  • 理解 awaitasyncio.create_task() 的区别:
    • await 直接等待协程完成
    • create_task() 创建任务并允许并发执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
import time

async def make_coffee():
    print("开始煮咖啡...")
    # 模拟煮咖啡的过程
    await asyncio.sleep(3)
    print("咖啡准备好了!")
    return "一杯香浓的咖啡"

async def make_toast():
    print("开始烤面包...")
    # 模拟烤面包的过程
    await asyncio.sleep(2)
    print("面包烤好了!")
    return "一片金黄的吐司"

async def prepare_breakfast():
    # 创建任务实现真正的并发
    coffee_task = asyncio.create_task(make_coffee())
    toast_task = asyncio.create_task(make_toast())
    
    try:
        # 等待所有任务完成
        coffee, toast = await asyncio.gather(coffee_task, toast_task)
        print(f"早餐准备完成:{coffee}{toast}")
    except Exception as e:
        print(f"准备早餐时发生错误:{e}")
        # 取消未完成的任务
        for task in [coffee_task, toast_task]:
            if not task.done():
                task.cancel()

if __name__ == "__main__":
    asyncio.run(prepare_breakfast())
事件循环

事件循环是 asyncio 的核心,它负责协调所有异步操作的执行。以下示例展示了如何正确使用事件循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
import time

async def check_weather(city):
    print(f"正在查询{city}的天气...")
    await asyncio.sleep(1)  # 模拟 API 调用
    return f"{city}的天气晴朗"

async def main():
    try:
        # 获取事件循环
        loop = asyncio.get_running_loop()
        
        # 创建多个任务
        cities = ["北京", "上海", "广州", "深圳"]
        tasks = [loop.create_task(check_weather(city)) for city in cities]
        
        # 使用超时机制
        try:
            # 设置 5 秒超时
            results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=5.0)
            for result in results:
                print(result)
        except asyncio.TimeoutError:
            print("查询超时!")
            # 取消所有未完成的任务
            for task in tasks:
                if not task.done():
                    task.cancel()
    except Exception as e:
        print(f"发生错误:{e}")

if __name__ == "__main__":
    asyncio.run(main())
异步上下文管理器

下面是一个改进的异步数据库连接示例,展示了正确的资源管理和错误处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import asyncio
import contextlib

class AsyncDatabase:
    def __init__(self):
        self.connected = False
    
    async def __aenter__(self):
        try:
            print("连接数据库...")
            await asyncio.sleep(1)  # 模拟连接过程
            self.connected = True
            return self
        except Exception as e:
            print(f"连接数据库失败:{e}")
            raise
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.connected:
            print("关闭数据库连接...")
            try:
                await asyncio.sleep(0.5)  # 模拟关闭过程
                self.connected = False
            except Exception as e:
                print(f"关闭数据库连接时发生错误:{e}")
                raise
    
    async def query(self, sql):
        if not self.connected:
            raise RuntimeError("数据库未连接")
        await asyncio.sleep(1)  # 模拟查询
        return f"查询结果:{sql}"

async def main():
    try:
        async with AsyncDatabase() as db:
            result = await db.query("SELECT * FROM users")
            print(result)
    except Exception as e:
        print(f"数据库操作失败:{e}")

if __name__ == "__main__":
    asyncio.run(main())

1.2 并发编程

在开始并发编程部分之前,需要理解 Python 中的 GIL(全局解释器锁):

  • GIL 确保同一时刻只有一个线程执行 Python 字节码
  • 这意味着 Python 的多线程在 CPU 密集型任务上并不能提供真正的并行性
  • 对于 I/O 密集型任务,多线程仍然是有效的
  • 对于 CPU 密集型任务,应该使用多进程
多线程编程

以下是一个改进的多线程示例,包含了正确的线程管理和退出机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import threading
import time
from queue import Queue
import signal
import sys

class CoffeeShop:
    def __init__(self):
        self.orders_queue = Queue()
        self.ready_orders = Queue()
        self.should_stop = threading.Event()
        
        # 设置信号处理
        signal.signal(signal.SIGINT, self.handle_shutdown)
        signal.signal(signal.SIGTERM, self.handle_shutdown)
    
    def handle_shutdown(self, signum, frame):
        print("\n正在关闭咖啡店...")
        self.should_stop.set()
    
    def take_order(self, order_id):
        if not self.should_stop.is_set():
            print(f"收到订单 #{order_id}")
            self.orders_queue.put(order_id)
    
    def make_coffee(self):
        while not self.should_stop.is_set():
            try:
                order_id = self.orders_queue.get(timeout=1)
                print(f"正在制作订单 #{order_id} 的咖啡...")
                time.sleep(2)  # 模拟制作咖啡的时间
                self.ready_orders.put(order_id)
                print(f"订单 #{order_id} 的咖啡制作完成!")
                self.orders_queue.task_done()
            except Queue.Empty:
                continue
            except Exception as e:
                print(f"制作咖啡时发生错误:{e}")
    
    def serve_coffee(self):
        while not self.should_stop.is_set():
            try:
                order_id = self.ready_orders.get(timeout=1)
                print(f"服务员正在派送订单 #{order_id}")
                time.sleep(1)  # 模拟派送时间
                print(f"订单 #{order_id} 已送达!")
                self.ready_orders.task_done()
            except Queue.Empty:
                continue
            except Exception as e:
                print(f"派送咖啡时发生错误:{e}")

def run_coffee_shop():
    shop = CoffeeShop()
    
    # 创建工作线程
    threads = [
        threading.Thread(target=shop.make_coffee, name="Barista"),
        threading.Thread(target=shop.serve_coffee, name="Server")
    ]
    
    # 将线程设置为守护线程
    for thread in threads:
        thread.daemon = True
        thread.start()
    
    try:
        # 模拟接收订单
        for i in range(5):
            shop.take_order(i)
            time.sleep(0.5)
        
        # 等待所有订单处理完成
        shop.orders_queue.join()
        shop.ready_orders.join()
    except KeyboardInterrupt:
        print("\n收到中断信号,正在关闭...")
    finally:
        # 设置停止标志
        shop.should_stop.set()
        
        # 等待所有线程完成
        for thread in threads:
            thread.join(timeout=2)
        
        print("咖啡店已关闭")

if __name__ == "__main__":
    run_coffee_shop()
多进程编程

以下是一个改进的多进程示例,包含了更好的资源管理和错误处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import multiprocessing as mp
import time
import os
import signal
from functools import partial

def init_worker():
    # 忽略子进程的 SIGINT 信号
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def process_image(image_name, processing_time=2):
    try:
        print(f"进程 {os.getpid()} 开始处理图片:{image_name}")
        time.sleep(processing_time)  # 模拟图片处理
        return f"{image_name} 处理完成"
    except Exception as e:
        print(f"处理图片 {image_name} 时发生错误:{e}")
        return f"{image_name} 处理失败"

def batch_process_images(images, max_workers=None):
    if max_workers is None:
        # 设置进程池大小上限
        max_workers = min(len(images), os.cpu_count() or 1, 4)
    
    print(f"使用 {max_workers} 个进程进行处理")
    
    try:
        # 创建进程池,设置初始化函数
        with mp.Pool(processes=max_workers, initializer=init_worker) as pool:
            # 使用进程池处理图片
            results = pool.map_async(process_image, images)
            
            try:
                # 等待所有任务完成,设置超时
                processed_results = results.get(timeout=30)
                return processed_results
            except mp.TimeoutError:
                print("处理超时!")
                pool.terminate()
                return []
            
    except KeyboardInterrupt:
        print("\n检测到中断信号,正在优雅关闭...")
        pool.terminate()
        pool.join()
        return []
    except Exception as e:
        print(f"批处理过程中发生错误:{e}")
        return []

if __name__ == "__main__":
    # 准备测试数据
    test_images = [f"image_{i}.jpg" for i in range(4)]
    
    try:
        results = batch_process_images(test_images)
        
        # 打印结果
        for result in results:
            print(result)
    except Exception as e:
        print(f"主程序发生错误:{e}")
线程安全和锁机制

以下是一个改进的银行账户示例,展示了正确的锁使用和死锁预防:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import threading
import time
from contextlib import contextmanager

class InsufficientFundsError(Exception):
    pass

class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self._lock = threading.RLock()  # 使用可重入锁
        
    @contextmanager
    def _account_lock(self, timeout=1.0):
        """安全的获取锁的上下文管理器"""
        if not self._lock.acquire(timeout=timeout):
            raise TimeoutError("无法获取账户锁,操作超时")
        try:
            yield
        finally:
            self._lock.release()
    
    def get_balance(self):
        """安全地获取余额"""
        with self._account_lock():
            return self.balance
    
    def deposit(self, amount):
        """存款操作"""
        if amount <= 0:
            raise ValueError("存款金额必须大于 0")
            
        with self._account_lock():
            print(f"存款 ¥{amount}...")
            # 模拟处理时间
            time.sleep(0.1)
            self.balance += amount
            print(f"存款完成,当前余额:¥{self.balance}")
    
    def withdraw(self, amount):
        """取款操作"""
        if amount <= 0:
            raise ValueError("取款金额必须大于 0")
            
        with self._account_lock():
            if self.balance >= amount:
                print(f"取款 ¥{amount}...")
                # 模拟处理时间
                time.sleep(0.1)
                self.balance -= amount
                print(f"取款完成,当前余额:¥{self.balance}")
            else:
                raise InsufficientFundsError("余额不足!")
    
    def transfer(self, other_account, amount):
        """
        转账操作
        使用固定的锁定顺序来防止死锁
        """
        if self is other_account:
            raise ValueError("不能转账给自己")
            
        if amount <= 0:
            raise ValueError("转账金额必须大于 0")
        
        # 始终按照内存地址顺序获取锁,防止死锁
        first_account, second_account = sorted([self, other_account], 
                                             key=lambda x: id(x))
        
        with first_account._account_lock():
            with second_account._account_lock():
                if self.balance >= amount:
                    print(f"转账 ¥{amount}...")
                    self.balance -= amount
                    other_account.balance += amount
                    print(f"转账完成")
                else:
                    raise InsufficientFundsError("余额不足!")

def test_bank_account():
    # 创建测试账户
    account1 = BankAccount(1000)
    account2 = BankAccount(500)
    
    # 创建多个操作线程
    threads = []
    
    # 存款操作
    threads.append(threading.Thread(
        target=account1.deposit, 
        args=(500,), 
        name="Deposit-1"
    ))
    
    # 取款操作
    threads.append(threading.Thread(
        target=account1.withdraw, 
        args=(300,), 
        name="Withdraw-1"
    ))
    
    # 转账操作
    threads.append(threading.Thread(
        target=account1.transfer, 
        args=(account2, 400), 
        name="Transfer-1-to-2"
    ))
    
    # 启动所有线程
    for thread in threads:
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    
    # 打印最终状态
    print("\n最终状态:")
    print(f"账户 1 余额:¥{account1.get_balance()}")
    print(f"账户 2 余额:¥{account2.get_balance()}")

if __name__ == "__main__":
    test_bank_account()

常见陷阱和调试技巧

  1. 异步编程陷阱:
    • 避免在协程中使用阻塞操作
    • 正确处理异步上下文管理器
    • 注意任务取消和清理
    • 使用 asyncio.shield() 保护关键操作
  2. 多线程陷阱:
    • 理解 GIL 的限制
    • 避免死锁
    • 正确使用线程安全的数据结构
    • 合理设置超时机制
  3. 多进程陷阱:
    • 注意进程间通信的开销
    • 合理设置进程池大小
    • 正确处理进程终止
    • 避免共享状态问题
  4. 调试技巧:
    • 使用 logging 模块记录日志
    • 设置适当的超时机制
    • 使用 threading.current_thread().name 跟踪线程
    • 使用 multiprocessing.current_process().name 跟踪进程

总结

本教程详细介绍了 Python 中的异步编程和并发编程的核心概念和实践应用:

  1. 异步编程(asyncio)
    • 通过协程实现非阻塞操作
    • 使用事件循环管理异步任务
    • async/await语法简化异步代码
    • 任务和 Future 处理异步操作结果
    • 异步上下文管理器管理资源
  2. 并发编程
    • 多线程处理 I/O 密集型任务
    • 多进程处理 CPU 密集型任务
    • 线程池和进程池管理并发任务
    • 锁机制确保数据安全
    • 并发设计模式解决实际问题

在选择并发策略时,需要考虑以下因素:

  • I/O 密集型任务:选择 asyncio 或多线程
  • CPU 密集型任务:选择多进程
  • 混合型任务:考虑组合使用不同策略

最佳实践:

  1. 始终进行适当的错误处理
  2. 实现优雅的关闭机制
  3. 使用超时机制防止无限等待
  4. 注意资源管理和清理
  5. 选择合适的并发策略
  6. 注意线程安全性
  7. 使用锁和其他同步原语保护共享资源
This post is licensed under CC BY 4.0 by the author.