第二周核心定位是从 PyTorch 框架层的流使用,深入到 CUDA 流底层原理,属于 **“知其然并知其所以然”的进阶阶段,重点解决“流的同步怎么精细化做”“隐式同步为什么会触发 / 怎么精准排查”“多 GPU 流的基础调度规则”三大核心问题,同时首次引入GPU 性能分析工具 nsight-systems**,实现 “从代码到硬件性能的可视化排查”。
本阶段所有内容均衔接第一周的基础 API 和 GPU 物理结构知识点,无全新的流核心规则,但会把第一周的 “入门级隐式同步” 拓展为工业级高频隐式同步场景,并新增 **CUDA Event(事件)** 这一精细化同步工具,为第三周的分布式流管理、工程化优化打下底层基础。
python
运行
import torch
import ctypes
# 加载CUDA驱动库(libcudart.so,Linux/macOS;cudart64_117.dll,Windows)
cudart = ctypes.CDLL("libcudart.so")
# 1. 创建PyTorch自定义流(默认/高优先级)
stream_default = torch.cuda.Stream(device=0)
stream_high = torch.cuda.Stream(device=0, priority=-1) # 高优先级
default_stream = torch.cuda.default_stream(device=0) # 默认流
# 2. 获取PyTorch流的底层CUDA句柄(PyTorch流对象的_cuda_stream属性)
def get_cuda_stream_handle(pytorch_stream):
return pytorch_stream._cuda_stream
# 3. 打印CUDA流句柄(唯一标识,验证一一对应)
print(f"PyTorch默认流的CUDA句柄:{get_cuda_stream_handle(default_stream)}")
print(f"PyTorch自定义默认优先级流句柄:{get_cuda_stream_handle(stream_default)}")
print(f"PyTorch自定义高优先级流句柄:{get_cuda_stream_handle(stream_high)}")
# 4. 验证CUDA流的优先级(调用CUDA底层API cudaStreamGetPriority)
def get_stream_priority(handle):
priority = ctypes.c_int()
cudart.cudaStreamGetPriority(handle, ctypes.byref(priority))
return priority.value
print(f"默认流优先级:{get_stream_priority(get_cuda_stream_handle(default_stream))}")
print(f"高优先级流优先级:{get_stream_priority(get_cuda_stream_handle(stream_high))}")
python
运行
import torch
import time
device = torch.device("cuda:0")
# 1. 创建2个自定义流+2个Event
stream1 = torch.cuda.Stream(device)
stream2 = torch.cuda.Stream(device)
event1 = torch.cuda.Event(enable_timing=True) # enable_timing:开启计时功能
event2 = torch.cuda.Event(enable_timing=True)
# 2. 创建独立张量(规避隐式同步)
x1 = torch.randn(10000, 10000, device=device)
x2 = torch.randn(10000, 10000, device=device)
# 示例1:Event实现精准同步(流1的任务完成后,流2才开始)
start = time.time()
# 流1执行任务,记录Event1
with torch.cuda.stream(stream1):
y1 = x1 @ x1
event1.record(stream1) # Event1标记在y1计算完成后
# 流2等待Event1,再执行任务
stream2.wait_event(event1)
with torch.cuda.stream(stream2):
y2 = x2 @ x2
torch.cuda.synchronize()
print(f"Event同步耗时:{time.time()-start:.4f}s")
# 示例2:对比stream.wait_stream(流级同步,流1所有任务完成后流2才开始)
start2 = time.time()
with torch.cuda.stream(stream1):
y1 = x1 @ x1
stream2.wait_stream(stream1)
with torch.cuda.stream(stream2):
y2 = x2 @ x2
torch.cuda.synchronize()
print(f"stream.wait_stream同步耗时:{time.time()-start2:.4f}s")
# 示例3:Event的计时功能(精准测量流中任务的执行时间)
with torch.cuda.stream(stream1):
event1.record(stream1)
y1 = x1 @ x1
event2.record(stream1)
torch.cuda.synchronize()
# 计算两个Event之间的时间(单位:ms)
print(f"流1中矩阵乘法的实际执行时间:{event1.elapsed_time(event2):.2f}ms")
python
运行
import torch
device = torch.device("cuda:0")
# 练习1:用stream.wait_stream实现流的链式依赖(流1→流2→流3)
print("=== 流级链式依赖 ===")
s1, s2, s3 = [torch.cuda.Stream(device) for _ in range(3)]
x1, x2, x3 = [torch.randn(8000, 8000, device=device) for _ in range(3)]
# 定义依赖:s2等s1,s3等s2
s2.wait_stream(s1)
s3.wait_stream(s2)
# 提交任务(即使提交顺序打乱,依赖仍保证执行顺序)
with torch.cuda.stream(s3):
y3 = x3 **2
with torch.cuda.stream(s2):
y2 = x2 **2
with torch.cuda.stream(s1):
y1 = x1** 2
torch.cuda.synchronize()
print("链式依赖执行完成")
# 练习2:用Event实现事件级链式依赖(流1的任务1→流2的任务2→流3的任务3)
print("
=== 事件级链式依赖 ===")
s1, s2, s3 = [torch.cuda.Stream(device) for _ in range(3)]
e1, e2 = torch.cuda.Event(), torch.cuda.Event()
x1, x2, x3 = [torch.randn(8000, 8000, device=device) for _ in range(3)]
# 流1执行任务1,记录e1
with torch.cuda.stream(s1):
y1 = x1 @ x1
e1.record(s1)
# 流2等待e1,执行任务2,记录e2
s2.wait_event(e1)
with torch.cuda.stream(s2):
y2 = x2 @ x2
e2.record(s2)
# 流3等待e2,执行任务3
s3.wait_event(e2)
with torch.cuda.stream(s3):
y3 = x3 @ x3
torch.cuda.synchronize()
print("事件级链式依赖执行完成")
# 练习3:多流并行+主流程等待所有流完成(实战高频场景)
print("
=== 多流并行+统一同步 ===")
streams = [torch.cuda.Stream(device) for _ in range(4)]
tensors = [torch.randn(6000, 6000, device=device) for _ in range(4)]
# 多流并行提交任务
for s, x in zip(streams, tensors):
with torch.cuda.stream(s):
_ = x @ x
# 主流程等待所有流完成(两种方式)
# 方式1:逐个流同步(推荐,开销略小)
for s in streams:
s.synchronize()
# 方式2:全局同步(简单,开销大)
# torch.cuda.synchronize()
print("多流并行执行完成")
表格
场景编号 | 触发条件 | 底层硬件原因 | 核心规避方法 |
1(入门级) | 跨流操作同一张 GPU 张量(同一全局内存地址) | 同步管理单元检测到 RAW/WAW/WAR 读写冲突 | 流 - 张量一一对应,一个流只操作专属张量 |
2(最高频) | 默认流与任意自定义流操作同一设备的张量(无论是否同一地址) | CUDA 传统默认流是 **“每线程默认流”**,会与所有自定义流隐式同步 | 尽量少用默认流,所有任务均用自定义流;或使用 CUDA 11.0 + 的每设备默认流 |
3 | 跨流操作同一共享内存(Shared Memory) 区域 | 共享内存是 SM 内私有,跨流访问会触发 SM 级的同步 | 为每个流分配专属的 SM 资源(编译器自动处理,用户只需保证流 - 张量独立) |
4 | 不同优先级的流,操作同一全局内存地址 | 高优先级流的调度会抢占低优先级流的内存访问,硬件触发同步避免冲突 | 同一内存区域的操作,使用相同优先级的流 |
5 | 跨流执行需要 GPU 全局资源的核函数(如原子操作、全局内存栅栏) | 全局资源的操作需要硬件级的原子性,触发全流同步 | 将全局资源操作放到单个流中执行,或使用 CUDA 原子操作 API 规避 |
python
运行
import torch
import time
device = torch.device("cuda:0")
# 验证:默认流与自定义流操作不同张量,仍触发隐式同步
print("=== 默认流与自定义流的隐式同步 ===")
custom_stream = torch.cuda.Stream(device)
# 两个独立张量(不同全局内存地址)
x1 = torch.randn(10000, 10000, device=device) # 自定义流操作
x2 = torch.randn(10000, 10000, device=device) # 默认流操作
# 场景1:自定义流+默认流交替执行(触发隐式同步,并行失效)
start = time.time()
with torch.cuda.stream(custom_stream):
y1 = x1 @ x1 # 自定义流任务
# 默认流任务(即使张量独立,仍触发隐式同步)
y2 = x2 @ x2
torch.cuda.synchronize()
sync_time = time.time() - start
# 场景2:两个自定义流执行(无隐式同步,并行生效)
s1, s2 = torch.cuda.Stream(device), torch.cuda.Stream(device)
start2 = time.time()
with torch.cuda.stream(s1):
y1 = x1 @ x1
with torch.cuda.stream(s2):
y2 = x2 @ x2
torch.cuda.synchronize()
async_time = time.time() - start2
print(f"默认流+自定义流耗时:{sync_time:.4f}s")
print(f"双自定义流并行耗时:{async_time:.4f}s")
print(f"耗时差:{sync_time-async_time:.4f}s(验证隐式同步触发)")
从 NVIDIA 官网下载对应系统的安装包(
https://developer.nvidia.com/nsight-systems),默认安装即可,无需额外配置。
python
运行
import torch
import time
torch.cuda.set_device(0)
device = torch.device("cuda:0")
# 代码1:有隐式同步(默认流+自定义流)
def sync_code():
custom_stream = torch.cuda.Stream(device)
x1 = torch.randn(10000, 10000, device=device)
x2 = torch.randn(10000, 10000, device=device)
start = time.time()
with torch.cuda.stream(custom_stream):
y1 = x1 @ x1
y2 = x2 @ x2
torch.cuda.synchronize()
return time.time() - start
# 代码2:无隐式同步(双自定义流)
def async_code():
s1, s2 = torch.cuda.Stream(device), torch.cuda.Stream(device)
x1 = torch.randn(10000, 10000, device=device)
x2 = torch.randn(10000, 10000, device=device)
start = time.time()
with torch.cuda.stream(s1):
y1 = x1 @ x1
with torch.cuda.stream(s2):
y2 = x2 @ x2
torch.cuda.synchronize()
return time.time() - start
# 运行代码,供nsight-systems采集
if __name__ == "__main__":
print(f"有隐式同步耗时:{sync_code():.4f}s")
print(f"无隐式同步耗时:{async_code():.4f}s")
# 休眠5秒,保证采集完整
time.sleep(5)
python
运行
import torch
# 验证是否有多个GPU
assert torch.cuda.device_count() >= 2, "需要至少2个GPU才能运行此代码"
gpu0, gpu1 = 0, 1
# 1. 为每个GPU创建专属的自定义流(流与GPU一一对应)
s0 = torch.cuda.Stream(device=gpu0)
s1 = torch.cuda.Stream(device=gpu1)
# 2. 为每个GPU创建专属的张量(张量与GPU一一对应)
x0 = torch.randn(15000, 15000, device=gpu0)
x1 = torch.randn(15000, 15000, device=gpu1)
# 3. 多GPU流并行执行任务(无任何同步,完全并行)
start = time.time()
with torch.cuda.stream(s0):
y0 = x0 @ x0 # GPU0的流执行GPU0的张量计算
with torch.cuda.stream(s1):
y1 = x1 @ x1 # GPU1的流执行GPU1的张量计算
# 同步所有GPU(torch.cuda.synchronize()会同步所有设备)
torch.cuda.synchronize()
total_time = time.time() - start
# 4. 单GPU串行执行(对比耗时,验证多GPU流并行)
start2 = time.time()
with torch.cuda.stream(s0):
y0 = x0 @ x0
torch.cuda.synchronize(gpu0)
with torch.cuda.stream(s1):
y1 = x1 @ x1
torch.cuda.synchronize(gpu1)
serial_time = time.time() - start2
print(f"多GPU流并行耗时:{total_time:.4f}s")
print(f"多GPU流串行耗时:{serial_time:.4f}s")
print(f"并行比串行快:{(serial_time-total_time)/serial_time*100:.2f}%")
# 5. 验证:跨GPU流操作不会触发同步(即使操作同名张量)
print("
=== 跨GPU流操作无同步 ===")
with torch.cuda.stream(s0):
x0 = x0 + 1 # GPU0的流操作GPU0的张量
with torch.cuda.stream(s1):
x1 = x1 + 1 # GPU1的流操作GPU1的张量
torch.cuda.synchronize()
print("跨GPU流操作执行完成,无隐式同步")
表格
同步方式 | API | 适用场景 | 开销 |
全局同步 | torch.cuda.synchronize() | 主流程等待所有 GPU 任务完成 | 最大 |
流级同步 | stream.synchronize() | 等待单个流的所有任务完成 | 中等 |
事件级同步 | event.wait(stream) | 等待流中某个具体任务完成 | 最小 |
验收要求:独立完成所有操作,无参考代码,能口头解释底层原因 + 工具分析结果,环境要求:PyTorch2.0+、CUDA11.7+、nsight-systems、单 / 多 GPU 均可。
第二周是连接基础使用和工程化优化的桥梁,掌握本阶段的内容后,第三周的分布式流管理、Autograd 与流的交互、大模型流优化等工程化内容,都会变得水到渠成。
更新时间:2026-02-26
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight All Rights Reserved.
Powered By 61893.com 闽ICP备11008920号
闽公网安备35020302035593号