YOLOv12_ultralytics-8.3.145_2025_5_27部分代码阅读笔记-torch_utils.py
torch_utils.py
ultralytics\utils\torch_utils.py
目录
torch_utils.py
1.所需的库和模块
2.def smart_inference_mode():
3.def autocast(enabled: bool, device: str = "cuda"):
4.def time_sync():
5.def fuse_conv_and_bn(conv, bn):
6.def fuse_deconv_and_bn(deconv, bn):
7.def get_num_params(model):
8.def get_num_gradients(model):
9.def get_flops(model, imgsz=640):
10.def get_flops_with_torch_profiler(model, imgsz=640):
11.def initialize_weights(model):
12.def scale_img(img, ratio=1.0, same_shape=False, gs=32):
13.def copy_attr(a, b, include=(), exclude=()):
14.def intersect_dicts(da, db, exclude=()):
15.def one_cycle(y1=0.0, y2=1.0, steps=100):
16.class ModelEMA:
17.def convert_optimizer_state_dict_to_fp16(state_dict):
18.def profile_ops(input, ops, n=10, device=None, max_num_obj=0):
1.所需的库和模块
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/licenseimport functools
import gc
import math
import os
import random
import time
from contextlib import contextmanager
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Unionimport numpy as np
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as Ffrom ultralytics import __version__
from ultralytics.utils import (DEFAULT_CFG_DICT,DEFAULT_CFG_KEYS,LOGGER,NUM_THREADS,PYTHON_VERSION,TORCHVISION_VERSION,WINDOWS,colorstr,
)
from ultralytics.utils.checks import check_version# Version checks (all default to version>=min_version)
TORCH_1_9 = check_version(torch.__version__, "1.9.0")
TORCH_1_13 = check_version(torch.__version__, "1.13.0")
TORCH_2_0 = check_version(torch.__version__, "2.0.0")
TORCH_2_4 = check_version(torch.__version__, "2.4.0")
TORCHVISION_0_10 = check_version(TORCHVISION_VERSION, "0.10.0")
TORCHVISION_0_11 = check_version(TORCHVISION_VERSION, "0.11.0")
TORCHVISION_0_13 = check_version(TORCHVISION_VERSION, "0.13.0")
TORCHVISION_0_18 = check_version(TORCHVISION_VERSION, "0.18.0")
if WINDOWS and check_version(torch.__version__, "==2.4.0"): # reject version 2.4.0 on WindowsLOGGER.warning("Known issue with torch==2.4.0 on Windows with CPU, recommend upgrading to torch>=2.4.1 to resolve ""https://github.com/ultralytics/ultralytics/issues/15049")
2.def smart_inference_mode():
# 这段代码定义了一个名为 smart_inference_mode 的装饰器工厂函数,其目的是根据 PyTorch 的版本和当前是否处于推理模式来决定是否对被装饰的函数进行包装。
# 定义了一个名为 smart_inference_mode 的函数,它是一个装饰器工厂函数,用于生成装饰器。
def smart_inference_mode():# 如果 torch>=1.9.0,则应用 torch.inference_mode() 装饰器,否则应用 torch.no_grad() 装饰器。"""Apply torch.inference_mode() decorator if torch>=1.9.0 else torch.no_grad() decorator."""# 定义了一个内部函数 decorate ,它接收一个函数 1.fn 作为参数,这个函数将被装饰。def decorate(fn):# 根据 torch 版本为推理模式应用适当的 torch 装饰器。"""Apply appropriate torch decorator for inference mode based on torch version."""# 检查两个条件:# TORCH_1_9 :这是一个布尔值,表示当前的 PyTorch 版本是否是 1.9 或更高版本。# torch.is_inference_mode_enabled() :检查当前 PyTorch 是否已经处于推理模式。# 如果这两个条件都为真,说明当前环境已经处于推理模式,不需要额外的包装。if TORCH_1_9 and torch.is_inference_mode_enabled():# 如果当前已经处于推理模式,直接返回原始函数 fn ,不对其进行任何修改,相当于一个透明的装饰器。return fn # already in inference_mode, act as a pass-through# 如果当前环境不满足上述条件(即 PyTorch 版本低于 1.9 或者当前不在推理模式),则需要对函数进行包装。else:# 根据 PyTorch 的版本选择合适的上下文管理器:# 如果 TORCH_1_9 为真,使用 torch.inference_mode 。# 否则,使用 torch.no_grad 。# 然后,通过上下文管理器对函数 fn 进行包装,使其在推理模式下运行。return (torch.inference_mode if TORCH_1_9 else torch.no_grad)()(fn)# 返回内部定义的 decorate 函数,使其可以作为装饰器使用。return decorate
# 这段代码实现了一个智能的推理模式装饰器,其主要功能是根据 PyTorch 的版本和当前是否处于推理模式来决定是否对被装饰的函数进行包装。如果当前环境已经处于推理模式,则直接返回原始函数,避免不必要的包装;否则,根据 PyTorch 的版本选择合适的上下文管理器( torch.inference_mode 或 torch.no_grad )来包装函数,确保函数在推理模式下运行。这种设计既灵活又高效,能够适应不同版本的 PyTorch 环境,同时避免了重复的推理模式设置。
3.def autocast(enabled: bool, device: str = "cuda"):
# 这段代码定义了一个名为 autocast 的函数,用于根据 PyTorch 的版本和指定的设备类型,返回合适的自动混合精度上下文管理器。
# 定义了一个函数 autocast ,它接受两个参数:
# 1.enabled :一个布尔值,表示是否启用自动混合精度。
# 2.device :一个字符串,默认值为 "cuda" ,表示设备类型,通常用于指定 GPU 设备。
def autocast(enabled: bool, device: str = "cuda"):# 根据 PyTorch 版本和 AMP 设置获取合适的自动转换上下文管理器。# 此函数返回一个用于自动混合精度 (AMP) 训练的上下文管理器,该管理器兼容 PyTorch 的新旧版本。它处理不同 PyTorch 版本之间自动转换 API 的差异。# 参数:# enabled(布尔值):是否启用自动混合精度。# device(字符串,可选):用于自动转换的设备。# 返回:# (torch.amp.autocast):合适的自动转换上下文管理器。# 说明:# - 对于 PyTorch 1.13 及更高版本,使用 `torch.amp.autocast`。# - 对于旧版本,使用 `torch.cuda.autocast`。# 示例:# >>> with autocast(enabled=True):# ... # 此处填写您的混合精度操作# ... pass"""Get the appropriate autocast context manager based on PyTorch version and AMP setting.This function returns a context manager for automatic mixed precision (AMP) training that is compatible with botholder and newer versions of PyTorch. It handles the differences in the autocast API between PyTorch versions.Args:enabled (bool): Whether to enable automatic mixed precision.device (str, optional): The device to use for autocast.Returns:(torch.amp.autocast): The appropriate autocast context manager.Notes:- For PyTorch versions 1.13 and newer, it uses `torch.amp.autocast`.- For older versions, it uses `torch.cuda.autocast`.Examples:>>> with autocast(enabled=True):... # Your mixed precision operations here... pass"""# 检查一个布尔变量 TORCH_1_13 ,该变量表示当前 PyTorch 的版本是否为 1.13 或更高版本。如果为真,表示可以使用 PyTorch 1.13 及以上版本提供的功能。if TORCH_1_13:# 如果 PyTorch 版本为 1.13 或更高,返回 torch.amp.autocast 上下文管理器,并传递 device 和 enabled 参数。 torch.amp.autocast 是 PyTorch 提供的自动混合精度工具,用于在训练和推理过程中自动选择合适的数据类型(如 FP16 和 FP32),以提高计算效率和减少内存占用。return torch.amp.autocast(device, enabled=enabled)# 如果 PyTorch 版本低于 1.13,则进入这个分支。else:# 对于 PyTorch 版本低于 1.13 的情况,返回 torch.cuda.amp.autocast 上下文管理器,并传递 enabled 参数。 torch.cuda.amp.autocast 是 PyTorch 早期版本中用于自动混合精度的工具,它仅支持 CUDA 设备。return torch.cuda.amp.autocast(enabled)
# 这段代码实现了一个简单的函数 autocast ,用于根据 PyTorch 的版本和指定的设备类型,返回合适的自动混合精度上下文管理器。它通过检查 TORCH_1_13 变量来判断当前 PyTorch 的版本,然后根据版本选择 torch.amp.autocast 或 torch.cuda.amp.autocast 。这种设计使得代码能够兼容不同版本的 PyTorch,同时提供了灵活性,允许用户指定设备类型和是否启用自动混合精度。这在实际使用中非常有用,尤其是在需要支持多种 PyTorch 版本的项目中。
4.def time_sync():
# 这段代码定义了一个名为 time_sync 的函数,用于在 PyTorch 环境中同步 GPU 操作并获取当前时间。
# 定义了一个名为 time_sync 的函数,该函数不接受任何参数。
def time_sync():# 返回 PyTorch 精确的时间。"""Return PyTorch-accurate time."""# 检查当前环境中是否可用 CUDA,即是否有可用的 GPU 设备。 torch.cuda.is_available() 是 PyTorch 提供的一个函数,用于检测系统是否支持 CUDA。if torch.cuda.is_available():# 如果 CUDA 可用,调用 torch.cuda.synchronize() 函数。这个函数会阻塞当前线程,直到所有 CUDA 核心完成它们的任务。这确保了在获取时间之前,所有 GPU 操作都已完成,从而避免了由于 GPU 异步执行而导致的时间测量不准确。torch.cuda.synchronize()# 返回当前的时间戳。 time.time() 是 Python 标准库 time 模块中的一个函数,返回自 Unix 纪元(1970 年 1 月 1 日)以来的秒数。由于在返回时间之前已经同步了 GPU 操作,因此返回的时间戳是准确的。return time.time()
# 这段代码实现了一个简单的函数 time_sync ,用于在 PyTorch 环境中同步 GPU 操作并获取当前时间。它通过检查 torch.cuda.is_available() 来确定是否有可用的 GPU 设备,如果有,则调用 torch.cuda.synchronize() 来确保所有 GPU 操作完成。最后,返回当前的时间戳。这种设计确保了在多线程和异步执行的环境中,时间测量的准确性,特别适用于需要精确测量时间的场景,如性能分析和基准测试。
5.def fuse_conv_and_bn(conv, bn):
# 这段代码定义了一个名为 fuse_conv_and_bn 的函数,用于将卷积层( conv )和批量归一化层( bn )融合成一个单一的卷积层,这在模型优化和推理加速中非常常见。
# 定义了一个名为 fuse_conv_and_bn 的函数,它接受两个参数:
# 1.conv :一个 nn.Conv2d 对象,表示卷积层。
# 2.bn :一个 nn.BatchNorm2d 对象,表示批量归一化层。
def fuse_conv_and_bn(conv, bn):# 融合 Conv2d() 和 BatchNorm2d() 层。"""Fuse Conv2d() and BatchNorm2d() layers."""# 创建一个新的 nn.Conv2d 对象 fusedconv ,其参数与输入的 conv 层相同,但设置 bias=True ,因为融合后的卷积层需要偏置项。然后,将该层的梯度计算设置为 False (即不参与反向传播),并将其移动到与 conv 层权重相同的设备上。fusedconv = (nn.Conv2d(conv.in_channels,conv.out_channels,kernel_size=conv.kernel_size,stride=conv.stride,padding=conv.padding,dilation=conv.dilation,groups=conv.groups,bias=True,).requires_grad_(False).to(conv.weight.device))# Prepare filters# 将卷积层的权重 conv.weight 重塑为二维张量,形状为 (conv.out_channels, -1) ,其中 -1 表示自动计算列数。这一步是为了方便后续的矩阵运算。w_conv = conv.weight.view(conv.out_channels, -1)# 计算批量归一化层的权重转换矩阵 w_bn 。具体来说, bn.weight 除以 sqrt(bn.eps + bn.running_var) ,然后取对角矩阵。 bn.eps 是一个小常数,用于数值稳定。w_bn = torch.diag(bn.weight.div(torch.sqrt(bn.eps + bn.running_var)))# 将 w_bn 和 w_conv 进行矩阵乘法,得到融合后的卷积权重,并将其复制到 fusedconv.weight 中。最后,将结果重塑为与 fusedconv.weight 相同的形状。fusedconv.weight.copy_(torch.mm(w_bn, w_conv).view(fusedconv.weight.shape))# Prepare spatial bias# 如果原始卷积层没有偏置项( conv.bias is None ),则创建一个全零张量作为偏置项,形状为 (conv.weight.shape[0],) ,并确保其数据类型和设备与 conv.weight 相同。否则,直接使用原始卷积层的偏置项。b_conv = (torch.zeros(conv.weight.shape[0], dtype=conv.weight.dtype, device=conv.weight.device)if conv.bias is Noneelse conv.bias)# 计算批量归一化层的偏置项转换 b_bn 。具体来说, bn.bias 减去 bn.weight 乘以 bn.running_mean 再除以 sqrt(bn.running_var + bn.eps) 。b_bn = bn.bias - bn.weight.mul(bn.running_mean).div(torch.sqrt(bn.running_var + bn.eps))# 将 w_bn 和 b_conv 进行矩阵乘法,得到融合后的偏置项,并将其与 b_bn 相加,最后复制到 fusedconv.bias 中。fusedconv.bias.copy_(torch.mm(w_bn, b_conv.reshape(-1, 1)).reshape(-1) + b_bn)# 返回融合后的卷积层 fusedconv 。return fusedconv
# 这段代码实现了一个函数 fuse_conv_and_bn ,用于将卷积层和批量归一化层融合成一个单一的卷积层。通过将批量归一化的参数融入卷积层的权重和偏置中,可以减少模型的推理时间,因为融合后的模型在推理时可以跳过批量归一化层的计算。这种优化在实际应用中非常有效,尤其是在需要部署轻量级模型的场景中。
6.def fuse_deconv_and_bn(deconv, bn):
# 这段代码定义了一个名为 fuse_deconv_and_bn 的函数,用于将反卷积层( deconv )和批量归一化层( bn )融合成一个单一的反卷积层。这种融合操作在模型优化和推理加速中非常有用,尤其是在需要减少计算量和内存占用的场景中。
# 定义了一个名为 fuse_deconv_and_bn 的函数,它接受两个参数:
# 1.deconv :一个 nn.ConvTranspose2d 对象,表示反卷积层。
# 2.bn :一个 nn.BatchNorm2d 对象,表示批量归一化层。
def fuse_deconv_and_bn(deconv, bn):# 融合 ConvTranspose2d() 和 BatchNorm2d() 层。"""Fuse ConvTranspose2d() and BatchNorm2d() layers."""# 创建一个新的 nn.ConvTranspose2d 对象 fuseddconv ,其参数与输入的 deconv 层相同,但设置 bias=True ,因为融合后的反卷积层需要偏置项。然后,将该层的梯度计算设置为 False (即不参与反向传播),并将其移动到与 deconv.weight 相同的设备上。fuseddconv = (nn.ConvTranspose2d(deconv.in_channels,deconv.out_channels,kernel_size=deconv.kernel_size,stride=deconv.stride,padding=deconv.padding,output_padding=deconv.output_padding,dilation=deconv.dilation,groups=deconv.groups,bias=True,).requires_grad_(False).to(deconv.weight.device))# Prepare filters# 将反卷积层的权重 deconv.weight 重塑为二维张量,形状为 (deconv.out_channels, -1) ,其中 -1 表示自动计算列数。这一步是为了方便后续的矩阵运算。w_deconv = deconv.weight.view(deconv.out_channels, -1)# 计算批量归一化层的权重转换矩阵 w_bn 。具体来说, bn.weight 除以 sqrt(bn.eps + bn.running_var) ,然后取对角矩阵。 bn.eps 是一个小常数,用于数值稳定。w_bn = torch.diag(bn.weight.div(torch.sqrt(bn.eps + bn.running_var)))# 将 w_bn 和 w_deconv 进行矩阵乘法,得到融合后的反卷积权重,并将其复制到 fuseddconv.weight 中。最后,将结果重塑为与 fuseddconv.weight 相同的形状。fuseddconv.weight.copy_(torch.mm(w_bn, w_deconv).view(fuseddconv.weight.shape))# Prepare spatial bias# 如果原始反卷积层没有偏置项( deconv.bias is None ),则创建一个全零张量作为偏置项,形状为 (deconv.weight.shape[1],) ,并确保其设备与 deconv.weight 相同。否则,直接使用原始反卷积层的偏置项。b_conv = torch.zeros(deconv.weight.shape[1], device=deconv.weight.device) if deconv.bias is None else deconv.bias# 计算批量归一化层的偏置项转换 b_bn 。具体来说, bn.bias 减去 bn.weight 乘以 bn.running_mean 再除以 sqrt(bn.running_var + bn.eps) 。b_bn = bn.bias - bn.weight.mul(bn.running_mean).div(torch.sqrt(bn.running_var + bn.eps))# 将 w_bn 和 b_conv 进行矩阵乘法,得到融合后的偏置项,并将其与 b_bn 相加,最后复制到 fuseddconv.bias 中。fuseddconv.bias.copy_(torch.mm(w_bn, b_conv.reshape(-1, 1)).reshape(-1) + b_bn)# 返回融合后的反卷积层 fuseddconv 。return fuseddconv
# 这段代码实现了一个函数 fuse_deconv_and_bn ,用于将反卷积层和批量归一化层融合成一个单一的反卷积层。通过将批量归一化的参数融入反卷积层的权重和偏置中,可以减少模型的推理时间,因为融合后的模型在推理时可以跳过批量归一化层的计算。这种优化在实际应用中非常有效,尤其是在需要部署轻量级模型的场景中。
7.def get_num_params(model):
# 这段代码定义了一个名为 get_num_params 的函数,用于计算 PyTorch 模型中所有参数的总数。
# 定义了一个名为 get_num_params 的函数,它接受一个参数:
# 1.model :一个 PyTorch 模型,通常是 torch.nn.Module 的实例。
def get_num_params(model):# 返回 YOLO 模型中的参数总数。"""Return the total number of parameters in a YOLO model."""# 这行代码是函数的核心部分,它使用了 Python 的生成器表达式和 sum 函数来计算模型中所有参数的总数。# model.parameters() :返回模型中所有参数的迭代器,这些参数通常是张量( torch.Tensor )。# x.numel() :对于每个参数张量 x , x.numel() 返回该张量中元素的总数。# sum(x.numel() for x in model.parameters()) :对所有参数张量的元素总数求和,得到模型中所有参数的总数。return sum(x.numel() for x in model.parameters())
# 这段代码实现了一个简单的函数 get_num_params ,用于计算 PyTorch 模型中所有参数的总数。通过遍历模型的所有参数张量并累加它们的元素总数,可以快速得到模型的参数总数。这种统计在模型分析、优化和调试中非常有用,例如在比较不同模型的复杂度、选择合适的模型大小以及进行资源分配时。
8.def get_num_gradients(model):
# 这段代码定义了一个名为 get_num_gradients 的函数,用于计算 PyTorch 模型中可训练参数(即需要计算梯度的参数)的总数。
# 定义了一个名为 get_num_gradients 的函数,它接受一个参数:
# 1.model :一个 PyTorch 模型,通常是 torch.nn.Module 的实例。
def get_num_gradients(model):# 返回 YOLO 模型中具有梯度的参数总数。"""Return the total number of parameters with gradients in a YOLO model."""# 这行代码是函数的核心部分,它使用了 Python 的生成器表达式和 sum 函数来计算模型中所有可训练参数的总数。# model.parameters() :返回模型中所有参数的迭代器,这些参数通常是张量( torch.Tensor )。# x.requires_grad :检查每个参数张量 x 是否需要计算梯度。如果 x.requires_grad 为 True ,则该参数是可训练的。# x.numel() :对于每个需要计算梯度的参数张量 x , x.numel() 返回该张量中元素的总数。# sum(x.numel() for x in model.parameters() if x.requires_grad) :对所有需要计算梯度的参数张量的元素总数求和,得到模型中所有可训练参数的总数。return sum(x.numel() for x in model.parameters() if x.requires_grad)
# 这段代码实现了一个简单的函数 get_num_gradients ,用于计算 PyTorch 模型中可训练参数的总数。通过遍历模型的所有参数张量,检查每个参数是否需要计算梯度( requires_grad ),并累加这些参数的元素总数,可以快速得到模型中可训练参数的总数。这种统计在模型分析、优化和调试中非常有用,例如在比较不同模型的复杂度、选择合适的模型大小以及进行资源分配时。特别是,在训练过程中,了解可训练参数的数量有助于调整学习率、优化器配置等。
9.def get_flops(model, imgsz=640):
# 这段代码定义了一个名为 get_flops 的函数,用于计算 PyTorch 模型的浮点运算次数(FLOPs),单位为 GFLOPs(十亿浮点运算次数)。它通过使用 thop 库来实现这一功能,并提供了两种方法来计算 FLOPs,以适应不同的模型需求。
# 定义了一个名为 get_flops 的函数,它接受两个参数:
# 1.model :一个 PyTorch 模型,通常是 torch.nn.Module 的实例。
# 2.imgsz :输入图像的大小,默认值为 640。如果输入是整数或浮点数,将被扩展为一个列表 [imgsz, imgsz] 。
def get_flops(model, imgsz=640):# 计算模型的 FLOP(浮点运算次数),单位为十亿。# 尝试两种计算方法:首先使用基于步长的张量以提高效率,然后根据需要(例如,对于 RTDETR 模型)回退到完整图像大小。如果 thop 库不可用或计算失败,则返回 0.0。"""Calculate FLOPs (floating point operations) for a model in billions.Attempts two calculation methods: first with a stride-based tensor for efficiency,then falls back to full image size if needed (e.g., for RTDETR models). Returns 0.0if thop library is unavailable or calculation fails.Args:model (nn.Module): The model to calculate FLOPs for.imgsz (int | list, optional): Input image size.Returns:(float): The model FLOPs in billions."""# 尝试导入 thop 库。如果导入失败(即 thop 未安装),则将 thop 设置为 None 。这允许函数在没有安装 thop 的情况下运行,但会返回 0.0 GFLOPs。try:import thopexcept ImportError:thop = None # conda support without 'ultralytics-thop' installed# 如果 thop 未安装,则直接返回 0.0 GFLOPs。if not thop:return 0.0 # if not installed return 0.0 GFLOPs# 尝试对模型进行去并行化处理。 de_parallel 是一个函数,用于将模型从并行状态转换为单 GPU 或 CPU 状态,确保 FLOPs 计算的准确性。try:model = de_parallel(model)# 获取模型的第一个参数张量 p ,用于确定输入图像的通道数。p = next(model.parameters())# 如果 imgsz 是整数或浮点数,将其扩展为一个列表 [imgsz, imgsz] ,表示图像的宽度和高度。if not isinstance(imgsz, list):imgsz = [imgsz, imgsz] # expand if int/float# 尝试获取模型的最大步长 stride 。如果模型有 stride 属性,则取其最大值并与 32 取较大值;否则,默认步长为 32。try:# Method 1: Use stride-based input tensorstride = max(int(model.stride.max()), 32) if hasattr(model, "stride") else 32 # max stride# 创建一个空的输入张量 im ,其形状为 (1, p.shape[1], stride, stride) ,表示批量大小为 1,通道数为 p.shape[1] ,高度和宽度均为 stride 。该张量的设备与模型的第一个参数张量 p 相同。im = torch.empty((1, p.shape[1], stride, stride), device=p.device) # input image in BCHW format# 使用 thop.profile 函数计算模型在输入张量 im 上的 FLOPs。 deepcopy(model) 确保在计算过程中不会修改原始模型。 inputs=[im] 指定输入张量。 verbose=False 表示不输出详细信息。计算结果除以 1e9 转换为 GFLOPs,并乘以 2(因为 FLOPs 通常包括加法和乘法操作)。flops = thop.profile(deepcopy(model), inputs=[im], verbose=False)[0] / 1e9 * 2 # stride GFLOPs# 将计算得到的 FLOPs 按比例调整到实际图像大小 imgsz 。具体来说,将 flops 乘以 imgsz[0] / stride 和 imgsz[1] / stride ,得到对应于实际图像大小的 FLOPs。return flops * imgsz[0] / stride * imgsz[1] / stride # imgsz GFLOPs# 如果第一种方法失败(例如,模型没有 stride 属性或 thop.profile 抛出异常),则尝试第二种方法。# 创建一个空的输入张量 im ,其形状为 (1, p.shape[1], *imgsz) ,表示批量大小为 1,通道数为 p.shape[1] ,高度和宽度分别为 imgsz[0] 和 imgsz[1] 。except Exception:# Method 2: Use actual image size (required for RTDETR models)im = torch.empty((1, p.shape[1], *imgsz), device=p.device) # input image in BCHW format# 使用 thop.profile 函数计算模型在输入张量 im 上的 FLOPs。计算结果除以 1e9 转换为 GFLOPs,并乘以 2。return thop.profile(deepcopy(model), inputs=[im], verbose=False)[0] / 1e9 * 2 # imgsz GFLOPs# 如果在计算过程中发生任何异常,返回 0.0 GFLOPs。except Exception:return 0.0
# 这段代码实现了一个函数 get_flops ,用于计算 PyTorch 模型的浮点运算次数(FLOPs),单位为 GFLOPs。它通过使用 thop 库来实现这一功能,并提供了两种方法来计算 FLOPs: 使用基于步长的输入张量(适用于大多数模型)。 使用实际图像大小的输入张量(适用于某些特殊模型,如 RTDETR)。这种设计确保了函数的灵活性和鲁棒性,能够在不同情况下准确计算模型的 FLOPs。如果 thop 未安装或计算过程中发生异常,函数会返回 0.0 GFLOPs,避免了程序崩溃。
10.def get_flops_with_torch_profiler(model, imgsz=640):
# 这段代码定义了一个名为 get_flops_with_torch_profiler 的函数,用于计算 PyTorch 模型的浮点运算次数(FLOPs),单位为 GFLOPs(十亿浮点运算次数)。它通过使用 PyTorch 2.0 及以上版本中的 torch.profiler 来实现这一功能,并提供了两种方法来计算 FLOPs,以适应不同的模型需求。
# 定义了一个名为 get_flops_with_torch_profiler 的函数,它接受两个参数:
# 1.model :一个 PyTorch 模型,通常是 torch.nn.Module 的实例。
# 2.imgsz :输入图像的大小,默认值为 640。如果输入是整数或浮点数,将被扩展为一个列表 [imgsz, imgsz] 。
def get_flops_with_torch_profiler(model, imgsz=640):# 使用 torch profiler(thop 包的替代包,但速度慢 2-10 倍)计算模型 FLOP。"""Compute model FLOPs using torch profiler (alternative to thop package, but 2-10x slower).Args:model (nn.Module): The model to calculate FLOPs for.imgsz (int | list, optional): Input image size.Returns:(float): The model's FLOPs in billions."""# 检查当前的 PyTorch 版本是否为 2.0 或更高版本。如果版本低于 2.0, torch.profiler 不可用,因此直接返回 0.0 GFLOPs。if not TORCH_2_0: # torch profiler implemented in torch>=2.0return 0.0# 对模型进行去并行化处理。 de_parallel 是一个函数,用于将模型从并行状态转换为单 GPU 或 CPU 状态,确保 FLOPs 计算的准确性。model = de_parallel(model)# 获取模型的第一个参数张量 p ,用于确定输入图像的通道数。p = next(model.parameters())# 如果 imgsz 是整数或浮点数,将其扩展为一个列表 [imgsz, imgsz] ,表示图像的宽度和高度。if not isinstance(imgsz, list):imgsz = [imgsz, imgsz] # expand if int/float# 尝试获取模型的最大步长 stride 。如果模型有 stride 属性,则取其最大值并与 32 取较大值,然后乘以 2;否则,默认步长为 32。try:# Use stride size for input tensorstride = (max(int(model.stride.max()), 32) if hasattr(model, "stride") else 32) * 2 # max stride# 创建一个空的输入张量 im ,其形状为 (1, p.shape[1], stride, stride) ,表示批量大小为 1,通道数为 p.shape[1] ,高度和宽度均为 stride 。该张量的设备与模型的第一个参数张量 p 相同。im = torch.empty((1, p.shape[1], stride, stride), device=p.device) # input image in BCHW format# 使用 torch.profiler.profile 上下文管理器,启用 FLOPs 计算( with_flops=True ),对模型进行一次前向传播。 prof 是一个 torch.profiler.Profiler 对象,用于记录和分析模型的性能。with torch.profiler.profile(with_flops=True) as prof:model(im)# 从 prof.key_averages() 获取所有关键事件的平均值,并计算每个事件的 FLOPs 总和。将结果除以 1e9 转换为 GFLOPs。flops = sum(x.flops for x in prof.key_averages()) / 1e9# 将计算得到的 FLOPs 按比例调整到实际图像大小 imgsz 。具体来说,将 flops 乘以 imgsz[0] / stride 和 imgsz[1] / stride ,得到对应于实际图像大小的 FLOPs。flops = flops * imgsz[0] / stride * imgsz[1] / stride # 640x640 GFLOPs# 如果第一种方法失败(例如,模型没有 stride 属性或 torch.profiler.profile 抛出异常),则尝试第二种方法。# 创建一个空的输入张量 im ,其形状为 (1, p.shape[1], *imgsz) ,表示批量大小为 1,通道数为 p.shape[1] ,高度和宽度分别为 imgsz[0] 和 imgsz[1] 。except Exception:# Use actual image size for input tensor (i.e. required for RTDETR models)im = torch.empty((1, p.shape[1], *imgsz), device=p.device) # input image in BCHW format# 使用 torch.profiler.profile 上下文管理器,启用 FLOPs 计算( with_flops=True ),对模型进行一次前向传播。 prof 是一个 torch.profiler.Profiler 对象,用于记录和分析模型的性能。with torch.profiler.profile(with_flops=True) as prof:model(im)# 从 prof.key_averages() 获取所有关键事件的平均值,并计算每个事件的 FLOPs 总和。将结果除以 1e9 转换为 GFLOPs。flops = sum(x.flops for x in prof.key_averages()) / 1e9# 返回计算得到的 FLOPs。return flops
# 这段代码实现了一个函数 get_flops_with_torch_profiler ,用于计算 PyTorch 模型的浮点运算次数(FLOPs),单位为 GFLOPs。它通过使用 PyTorch 2.0 及以上版本中的 torch.profiler 来实现这一功能,并提供了两种方法来计算 FLOPs: 使用基于步长的输入张量(适用于大多数模型)。 使用实际图像大小的输入张量(适用于某些特殊模型,如 RTDETR)。这种设计确保了函数的灵活性和鲁棒性,能够在不同情况下准确计算模型的 FLOPs。如果 PyTorch 版本低于 2.0 或计算过程中发生异常,函数会返回 0.0 GFLOPs,避免了程序崩溃。
11.def initialize_weights(model):
# 这段代码定义了一个名为 initialize_weights 的函数,用于初始化 PyTorch 模型中的权重和一些层的属性。它遍历模型的所有模块,并根据模块的类型进行不同的初始化操作。
# 定义了一个名为 initialize_weights 的函数,它接受一个参数:
# 1.model :一个 PyTorch 模型,通常是 torch.nn.Module 的实例。
def initialize_weights(model):# 将模型权重初始化为随机值。"""Initialize model weights to random values."""# 遍历模型中的所有模块。 model.modules() 返回一个生成器,包含模型中的所有子模块(包括嵌套的子模块)。for m in model.modules():# 获取当前模块 m 的类型,并将其存储在变量 t 中。这用于后续的类型检查。t = type(m)# 如果当前模块是 nn.Conv2d (二维卷积层),则执行 pass 操作。注释中提到可以使用 nn.init.kaiming_normal_ 初始化卷积层的权重,但实际代码中没有执行这一操作。 nn.init.kaiming_normal_ 是一种常用的权重初始化方法,适用于 ReLU 激活函数。if t is nn.Conv2d:pass # nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')# 如果当前模块是 nn.BatchNorm2d (二维批量归一化层),则设置其 eps 和 momentum 属性:# m.eps :设置为 1e-3 ,这是一个小常数,用于数值稳定。# m.momentum :设置为 0.03 ,这是批量归一化层的动量参数,用于控制移动平均的更新速度。elif t is nn.BatchNorm2d:m.eps = 1e-3m.momentum = 0.03# 如果当前模块是以下激活函数之一: nn.Hardswish 、 nn.LeakyReLU 、 nn.ReLU 、 nn.ReLU6 或 nn.SiLU ,则将 inplace 属性设置为 True 。 inplace=True 表示这些激活函数将直接在输入张量上进行操作,而不是创建新的张量,从而节省内存。elif t in {nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.SiLU}:m.inplace = True
# 这段代码实现了一个函数 initialize_weights ,用于初始化 PyTorch 模型中的权重和一些层的属性。它通过遍历模型的所有模块,并根据模块的类型进行不同的初始化操作: 对于 nn.Conv2d 层,代码中提供了权重初始化的注释,但实际没有执行。 对于 nn.BatchNorm2d 层,设置了 eps 和 momentum 属性。 对于常见的激活函数(如 nn.Hardswish 、 nn.LeakyReLU 、 nn.ReLU 、 nn.ReLU6 和 nn.SiLU ),将 inplace 属性设置为 True ,以节省内存。这种初始化方法在模型训练前非常有用,可以确保模型的权重和属性被合理设置,从而提高训练的稳定性和效率。
12.def scale_img(img, ratio=1.0, same_shape=False, gs=32):
# 这段代码定义了一个名为 scale_img 的函数,用于对输入图像进行缩放,并可选地调整图像大小以匹配指定的网格大小( gs )。它还提供了填充或裁剪图像的功能,以确保图像的尺寸符合特定要求。
# 定义了一个名为 scale_img 的函数,它接受以下参数:
# 1.img :输入图像,通常是一个四维张量,形状为 (batch_size, channels, height, width) 。
# 2.ratio :缩放比例,默认值为 1.0,表示不进行缩放。
# 3.same_shape :布尔值,表示是否保持图像的原始宽高比。如果为 False ,则会调整图像大小以匹配网格大小 gs 。
# 4.gs :网格大小,默认值为 32,表示图像的宽度和高度应为 32 的倍数。
def scale_img(img, ratio=1.0, same_shape=False, gs=32):# 缩放并填充图像张量,可选择保持宽高比并填充到 gs 倍数。"""Scale and pad an image tensor, optionally maintaining aspect ratio and padding to gs multiple.Args:img (torch.Tensor): Input image tensor.ratio (float, optional): Scaling ratio.same_shape (bool, optional): Whether to maintain the same shape.gs (int, optional): Grid size for padding.Returns:(torch.Tensor): Scaled and padded image tensor."""# 如果缩放比例 ratio 为 1.0,表示不需要缩放,直接返回原始图像。if ratio == 1.0:return img# 获取输入图像的高度 h 和宽度 w 。 img.shape[2:] 返回图像的高和宽,假设输入图像是一个四维张量。h, w = img.shape[2:]# 计算新的图像尺寸 s ,即原始尺寸乘以缩放比例 ratio ,并取整。s = (int(h * ratio), int(w * ratio)) # new size# 使用 PyTorch 的 F.interpolate 函数对图像进行双线性插值缩放。 size=s 指定新的图像尺寸, mode="bilinear" 表示使用双线性插值, align_corners=False 是一个常用的参数设置,用于避免插值时的坐标对齐问题。img = F.interpolate(img, size=s, mode="bilinear", align_corners=False) # resize# 如果 same_shape 为 False ,则调整图像的宽度和高度,使其为网格大小 gs 的倍数。具体来说,将原始尺寸乘以缩放比例后,除以 gs ,向上取整,再乘以 gs ,以确保尺寸是 gs 的倍数。if not same_shape: # pad/crop imgh, w = (math.ceil(x * ratio / gs) * gs for x in (h, w))# 使用 PyTorch 的 F.pad 函数对图像进行填充。 [0, w - s[1], 0, h - s[0]] 指定了填充的边界,分别对应左、右、上、下边界。 value=0.447 是填充的值,这里使用了 ImageNet 数据集的均值之一(0.447)作为填充值。如果图像尺寸已经符合要求,则填充的边界为 0,图像不会被修改。return F.pad(img, [0, w - s[1], 0, h - s[0]], value=0.447) # value = imagenet mean
# 这段代码实现了一个函数 scale_img ,用于对输入图像进行缩放,并可选地调整图像大小以匹配指定的网格大小。它通过以下步骤实现: 如果缩放比例为 1.0,直接返回原始图像。 计算新的图像尺寸,并使用双线性插值对图像进行缩放。 如果需要,调整图像的宽度和高度,使其为网格大小的倍数。 对图像进行填充,以确保其尺寸符合要求。这种操作在计算机视觉任务中非常常见,特别是在处理需要固定尺寸输入的模型时。通过填充或裁剪图像,可以确保图像的尺寸符合模型的要求,同时尽量减少对图像内容的影响。
13.def copy_attr(a, b, include=(), exclude=()):
# 这段代码定义了一个名为 copy_attr 的函数,用于将一个对象( b )的属性复制到另一个对象( a )中。它提供了灵活的选项,允许用户指定哪些属性需要包含或排除。
# 定义了一个名为 copy_attr 的函数,它接受四个参数:
# 1.a :目标对象,属性将被复制到这个对象中。
# 2.b :源对象,属性将从这个对象中复制。
# 3.include :一个元组,包含需要复制的属性名称。默认为空元组,表示没有特定的属性需要包含。
# 4.exclude :一个元组,包含需要排除的属性名称。默认为空元组,表示没有特定的属性需要排除。
def copy_attr(a, b, include=(), exclude=()):# 将属性从对象“b”复制到对象“a”,并提供包含/排除某些属性的选项。"""Copy attributes from object 'b' to object 'a', with options to include/exclude certain attributes.Args:a (Any): Destination object to copy attributes to.b (Any): Source object to copy attributes from.include (tuple, optional): Attributes to include. If empty, all attributes are included.exclude (tuple, optional): Attributes to exclude."""# 遍历源对象 b 的所有属性。 b.__dict__ 是一个字典,包含对象 b 的所有属性及其值。 items() 方法返回一个包含键值对的迭代器。for k, v in b.__dict__.items():# 检查当前属性 k 是否满足以下条件之一:# 如果 include 不为空且 k 不在 include 中。# 如果 k 以 _ 开头(通常表示私有属性)。# 如果 k 在 exclude 中。# 如果满足上述任意一个条件,则跳过当前属性,继续处理下一个属性。if (len(include) and k not in include) or k.startswith("_") or k in exclude:# 如果当前属性满足上述条件之一,则跳过当前循环迭代,继续处理下一个属性。continue# 如果当前属性 k 不满足上述条件,则使用 setattr 函数将属性 k 及其值 v 设置到目标对象 a 中。else:setattr(a, k, v)
# 这段代码实现了一个函数 copy_attr ,用于将一个对象的属性复制到另一个对象中。它提供了灵活的选项,允许用户指定哪些属性需要包含或排除。具体功能如下: 包含属性:通过 include 参数,用户可以指定需要复制的属性名称。如果 include 为空,则默认复制所有属性(除了私有属性和排除的属性)。 排除属性:通过 exclude 参数,用户可以指定需要排除的属性名称。 私有属性:默认情况下,以 _ 开头的私有属性不会被复制。这种设计使得函数非常灵活,适用于各种场景,特别是在需要部分复制对象属性或避免复制某些敏感属性时。
14.def intersect_dicts(da, db, exclude=()):
# 这段代码定义了一个名为 intersect_dicts 的函数,用于计算两个字典的交集,并排除指定的键。它返回一个新字典,其中只包含两个输入字典中都存在的键,并且这些键对应的值具有相同的形状(假设值是张量)。
# 定义了一个名为 intersect_dicts 的函数,它接受三个参数:
# 1.da :第一个字典。
# 2.db :第二个字典。
# 3.exclude :一个元组,包含需要排除的键。默认为空元组,表示没有特定的键需要排除。
def intersect_dicts(da, db, exclude=()):# 返回一个包含形状匹配的相交键的字典,排除“exclude”键,并使用 da 值。# 参数:# da (dict):第一个字典。# db (dict):第二个字典。# exclude (tuple,可选):要排除的键。# 返回:# (dict):包含形状匹配的相交键的字典。"""Return a dictionary of intersecting keys with matching shapes, excluding 'exclude' keys, using da values.Args:da (dict): First dictionary.db (dict): Second dictionary.exclude (tuple, optional): Keys to exclude.Returns:(dict): Dictionary of intersecting keys with matching shapes."""# 这行代码是一个字典推导式,用于构建返回的新字典。它遍历 da 中的每个键值对 (k, v) ,并根据以下条件筛选键:# k in db :键 k 必须同时存在于字典 db 中。# all(x not in k for x in exclude) :键 k 不应包含 exclude 中的任何子字符串。# v.shape == db[k].shape :键 k 对应的值 v 的形状必须与 db[k] 的形状相同。这里假设值是张量,因此使用了 .shape 属性。# 如果键 k 满足上述所有条件,则将其键值对 (k, v) 添加到新字典中。return {k: v for k, v in da.items() if k in db and all(x not in k for x in exclude) and v.shape == db[k].shape}
# 这段代码实现了一个函数 intersect_dicts ,用于计算两个字典的交集,并排除指定的键。它返回一个新字典,其中只包含两个输入字典中都存在的键,并且这些键对应的值具有相同的形状。这种设计在处理模型权重或配置字典时非常有用,特别是在需要合并或比较两个字典时。通过排除特定的键,可以避免不必要的冲突或错误。
15.def one_cycle(y1=0.0, y2=1.0, steps=100):
# 这段代码定义了一个名为 one_cycle 的函数,用于生成一个单周期(one-cycle)调度器的值。这种调度器通常用于调整学习率或其他超参数,使其在一个训练周期内按照特定的曲线变化。
# 定义了一个名为 one_cycle 的函数,它接受三个参数:
# 1.y1 :周期的最小值,默认为 0.0。
# 2.y2 :周期的最大值,默认为 1.0。
# 3.steps :周期的总步数,默认为 100。
def one_cycle(y1=0.0, y2=1.0, steps=100):# 返回一个用于计算从 y1 到 y2 的正弦曲线的 lambda 函数 https://arxiv.org/pdf/1812.01187.pdf。# 参数:# y1(浮点型,可选):初始值。# y2(浮点型,可选):最终值。# steps(整数,可选):步数。# 返回:# (函数):用于计算正弦曲线的 lambda 函数。"""Return a lambda function for sinusoidal ramp from y1 to y2 https://arxiv.org/pdf/1812.01187.pdf.Args:y1 (float, optional): Initial value.y2 (float, optional): Final value.steps (int, optional): Number of steps.Returns:(function): Lambda function for computing the sinusoidal ramp."""# 这行代码返回一个匿名函数( lambda 函数),该函数接受一个参数 x ,表示当前的步数。函数的逻辑如下:# x * math.pi / steps :将当前步数 x 映射到 [0, π] 区间内。# math.cos(x * math.pi / steps) :计算余弦值,其范围在 [-1, 1] 之间。# (1 - math.cos(x * math.pi / steps)) / 2 :将余弦值映射到 [0, 1] 区间内,形成一个从 0 到 1 的曲线。# max((1 - math.cos(x * math.pi / steps)) / 2, 0) :确保值不会小于 0。# max((1 - math.cos(x * math.pi / steps)) / 2, 0) * (y2 - y1) + y1 :将曲线缩放到 [y1, y2] 区间内。return lambda x: max((1 - math.cos(x * math.pi / steps)) / 2, 0) * (y2 - y1) + y1
# 这段代码实现了一个函数 one_cycle ,用于生成一个单周期调度器的值。这种调度器通常用于调整学习率或其他超参数,使其在一个训练周期内按照特定的曲线变化。具体来说,它返回一个匿名函数,该函数根据当前步数 x 计算出一个值,该值在 [y1, y2] 区间内变化,并且在 steps 步内完成一个完整的周期。这种单周期调度器在训练深度学习模型时非常有用,特别是在需要动态调整学习率以提高训练效率和模型性能时。通过调整 y1 、 y2 和 steps ,可以灵活地控制调度器的行为。
16.class ModelEMA:
# 这段代码定义了一个名为 ModelEMA 的类,用于实现模型的指数移动平均(Exponential Moving Average, EMA)。EMA 是一种常用的模型优化技术,用于在训练过程中维护一个模型参数的平滑版本,以提高模型的泛化能力和稳定性。
# 定义了一个名为 ModelEMA 的类,用于实现模型的指数移动平均(EMA)。
class ModelEMA:# 更新了指数移动平均线 (EMA) 的实现。# 保持模型 state_dict 中所有内容(参数和缓冲区)的移动平均值。有关 EMA 的详细信息,请参阅参考资料。# 要禁用 EMA,请将 `enabled` 属性设置为 `False`。# 参考资料:# - https://github.com/rwightman/pytorch-image-models# - https://www.tensorflow.org/api_docs/python/tf/train/ExponentialMovingAverage"""Updated Exponential Moving Average (EMA) implementation.Keeps a moving average of everything in the model state_dict (parameters and buffers).For EMA details see References.To disable EMA set the `enabled` attribute to `False`.Attributes:ema (nn.Module): Copy of the model in evaluation mode.updates (int): Number of EMA updates.decay (function): Decay function that determines the EMA weight.enabled (bool): Whether EMA is enabled.References:- https://github.com/rwightman/pytorch-image-models- https://www.tensorflow.org/api_docs/python/tf/train/ExponentialMovingAverage"""# 定义了 ModelEMA 类的初始化方法,它接受以下参数:# 1.model :需要进行 EMA 的模型,通常是 torch.nn.Module 的实例。# 2.decay :EMA 的衰减率,默认值为 0.9999。# 3.tau :用于调整衰减率的参数,控制衰减率的变化速度,默认值为 2000。# 4.updates :EMA 更新的次数,默认值为 0。def __init__(self, model, decay=0.9999, tau=2000, updates=0):# 使用给定参数初始化“模型”的 EMA。"""Initialize EMA for 'model' with given arguments.Args:model (nn.Module): Model to create EMA for.decay (float, optional): Maximum EMA decay rate.tau (int, optional): EMA decay time constant.updates (int, optional): Initial number of updates."""# 创建一个模型的深度拷贝 self.ema ,并将其设置为评估模式( eval() )。 de_parallel 是一个函数,用于将模型从并行状态转换为单 GPU 或 CPU 状态,确保 EMA 模型的参数与原始模型一致。self.ema = deepcopy(de_parallel(model)).eval() # FP32 EMA# 初始化 self.updates ,记录 EMA 更新的次数。self.updates = updates # number of EMA updates# 定义一个衰减函数 self.decay ,该函数根据当前的更新次数 x 计算衰减率。这个衰减率在训练初期会逐渐增加,有助于模型在早期阶段更快地适应数据。self.decay = lambda x: decay * (1 - math.exp(-x / tau)) # decay exponential ramp (to help early epochs)# 将 EMA 模型的所有参数的 requires_grad 属性设置为 False ,表示这些参数不需要计算梯度,从而节省内存和计算资源。for p in self.ema.parameters():p.requires_grad_(False)# 初始化 self.enabled ,表示 EMA 是否启用。默认值为 True ,表示 EMA 是启用的。self.enabled = True# 定义了一个名为 update 的方法,用于更新 EMA 模型的参数。它接受一个参数:# 1.model :当前训练的模型,通常是 torch.nn.Module 的实例。def update(self, model):# 更新 EMA 参数。"""Update EMA parameters.Args:model (nn.Module): Model to update EMA from."""# 如果 EMA 是启用的,则继续执行更新操作。if self.enabled:# 增加 EMA 更新的次数。self.updates += 1# 根据当前的更新次数 self.updates ,计算当前的衰减率 d 。d = self.decay(self.updates)# 获取当前模型的状态字典 msd , de_parallel 确保模型从并行状态转换为单 GPU 或 CPU 状态。msd = de_parallel(model).state_dict() # model state_dict# 遍历 EMA 模型的状态字典中的每个键值对 (k, v) 。for k, v in self.ema.state_dict().items():# 如果参数 v 的数据类型是浮点数(FP16 或 FP32),则进行更新操作。if v.dtype.is_floating_point: # true for FP16 and FP32# 根据 EMA 的更新公式,更新参数 v :# v *= d :将当前 EMA 参数乘以衰减率 d 。# v += (1 - d) * msd[k].detach() :加上当前模型参数的加权值, msd[k].detach() 确保不会计算梯度。v *= dv += (1 - d) * msd[k].detach()# assert v.dtype == msd[k].dtype == torch.float32, f'{k}: EMA {v.dtype}, model {msd[k].dtype}'# 定义了一个名为 update_attr 的方法,用于更新 EMA 模型的属性。它接受以下参数:# 1.model :当前训练的模型,通常是 torch.nn.Module 的实例。# 2.include :一个元组,包含需要更新的属性名称,默认为空元组。# 3.exclude :一个元组,包含需要排除的属性名称,默认为 ("process_group", "reducer") 。def update_attr(self, model, include=(), exclude=("process_group", "reducer")):# 更新属性并保存已移除优化器的精简模型。"""Update attributes and save stripped model with optimizer removed.Args:model (nn.Module): Model to update attributes from.include (tuple, optional): Attributes to include.exclude (tuple, optional): Attributes to exclude."""# 如果 EMA 是启用的,则继续执行属性更新操作。if self.enabled:# 调用 copy_attr 函数,将当前模型的属性复制到 EMA 模型中。 copy_attr 函数会根据 include 和 exclude 参数选择性地复制属性。copy_attr(self.ema, model, include, exclude)
# 这段代码实现了一个 ModelEMA 类,用于维护模型参数的指数移动平均(EMA)。EMA 是一种常用的模型优化技术,可以提高模型的泛化能力和稳定性。 ModelEMA 类提供了以下功能: 初始化:创建一个模型的深度拷贝作为 EMA 模型,并将其设置为评估模式。 更新:根据当前模型的参数和 EMA 的衰减率,更新 EMA 模型的参数。 属性更新:选择性地更新 EMA 模型的属性,以确保 EMA 模型与当前模型保持一致。这种设计使得 ModelEMA 类非常灵活,适用于各种深度学习任务,特别是在需要动态调整模型参数时。
17.def convert_optimizer_state_dict_to_fp16(state_dict):
# 这段代码定义了一个名为 convert_optimizer_state_dict_to_fp16 的函数,用于将优化器的状态字典中的浮点数张量从 32 位浮点数( torch.float32 )转换为 16 位浮点数( torch.float16 ),以节省内存并提高计算效率。
# 定义了一个名为 convert_optimizer_state_dict_to_fp16 的函数,它接受一个参数:
# 1.state_dict :优化器的状态字典,通常是一个包含优化器状态的嵌套字典。
def convert_optimizer_state_dict_to_fp16(state_dict):# 将给定优化器的 state_dict 转换为 FP16,并重点关注张量转换的“state”键。"""Convert the state_dict of a given optimizer to FP16, focusing on the 'state' key for tensor conversions.Args:state_dict (dict): Optimizer state dictionary.Returns:(dict): Converted optimizer state dictionary with FP16 tensors."""# 遍历 state_dict 中的 "state" 键对应的值。 state_dict["state"] 是一个字典,其键是参数的名称,值是另一个字典,包含该参数的优化器状态。for state in state_dict["state"].values():# 遍历每个参数状态字典中的键值对 (k, v) 。for k, v in state.items():# 检查当前的键值对 (k, v) 是否满足以下条件:# 键 k 不等于 "step" ,因为 "step" 通常是一个整数,表示优化器的步数,不需要转换。# 值 v 是一个 torch.Tensor 对象。# 值 v 的数据类型是 torch.float32 。if k != "step" and isinstance(v, torch.Tensor) and v.dtype is torch.float32:# 如果当前的键值对满足上述条件,则将值 v 转换为 torch.float16 类型,并更新状态字典中的对应值。 v.half() 是 PyTorch 提供的一个方法,用于将张量从 torch.float32 转换为 torch.float16 。state[k] = v.half()# 返回修改后的状态字典 state_dict 。return state_dict
# 这段代码实现了一个函数 convert_optimizer_state_dict_to_fp16 ,用于将优化器的状态字典中的浮点数张量从 32 位浮点数( torch.float32 )转换为 16 位浮点数( torch.float16 )。这种转换可以显著节省内存,特别是在处理大型模型时。此外,使用 16 位浮点数还可以提高计算效率,尤其是在支持半精度计算的硬件上。需要注意的是,这种转换可能会引入一些数值精度的损失,但在许多实际应用中,这种损失是可以接受的。
18.def profile_ops(input, ops, n=10, device=None, max_num_obj=0):
# 这段代码定义了一个名为 profile_ops 的函数,用于对给定的输入和操作进行性能分析,包括计算 FLOPs、GPU 内存使用量、前向传播和反向传播的时间等。它还提供了日志记录功能,以便用户可以直观地查看每个操作的性能指标。
# 定义了一个名为 profile_ops 的函数,它接受以下参数:
# 1.input :输入数据,可以是一个张量或张量列表。
# 2.ops :需要分析的操作,可以是一个模块或模块列表。
# 3.n :重复测试的次数,默认为 10。
# 4.device :设备类型,默认为 None ,表示自动选择设备。
# 5.max_num_obj :用于模拟训练时每张图像的预测数量,默认为 0。
def profile_ops(input, ops, n=10, device=None, max_num_obj=0):# Ultralytics 速度、内存和 FLOP 分析器。"""Ultralytics speed, memory and FLOPs profiler.Args:input (torch.Tensor | list): Input tensor(s) to profile.ops (nn.Module | list): Model or list of operations to profile.n (int, optional): Number of iterations to average.device (str | torch.device, optional): Device to profile on.max_num_obj (int, optional): Maximum number of objects for simulation.Returns:(list): Profile results for each operation.Examples:>>> from ultralytics.utils.torch_utils import profile_ops>>> input = torch.randn(16, 3, 640, 640)>>> m1 = lambda x: x * torch.sigmoid(x)>>> m2 = nn.SiLU()>>> profile_ops(input, [m1, m2], n=100) # profile over 100 iterations"""# 尝试导入 thop 库,用于计算 FLOPs。如果导入失败,则将 thop 设置为 None 。try:import thopexcept ImportError:thop = None # conda support without 'ultralytics-thop' installed# 初始化一个空列表 results ,用于存储每个操作的性能分析结果。这个列表将在后续的循环中被填充,最终返回给调用者。results = []# 检查 device 是否是一个 torch.device 对象。如果不是,则调用 select_device 函数来选择一个合适的设备。 select_device 函数通常会根据输入参数(如设备名称或索引)选择一个可用的设备(如 GPU 或 CPU)。这确保了代码可以在指定的设备上运行,即使用户没有直接提供一个 torch.device 对象。if not isinstance(device, torch.device):device = select_device(device)# 使用 LOGGER.info 打印性能分析的表头。 LOGGER 是一个日志记录器,通常用于记录信息、警告和错误等。这里使用了格式化字符串(f-string)来对齐和格式化表头,使其更加易读。表头包括以下列:# Params :参数数量,右对齐,宽度为 12 个字符。# GFLOPs :GFLOPs(十亿浮点运算次数),右对齐,宽度为 12 个字符。# GPU_mem (GB) :GPU 内存使用量(以 GB 为单位),右对齐,宽度为 14 个字符。# forward (ms) :前向传播时间(以毫秒为单位),右对齐,宽度为 14 个字符。# backward (ms) :反向传播时间(以毫秒为单位),右对齐,宽度为 14 个字符。# input :输入形状,右对齐,宽度为 24 个字符。# output :输出形状,右对齐,宽度为 24 个字符。LOGGER.info(f"{'Params':>12s}{'GFLOPs':>12s}{'GPU_mem (GB)':>14s}{'forward (ms)':>14s}{'backward (ms)':>14s}"f"{'input':>24s}{'output':>24s}")# 调用 Python 的垃圾回收器 gc.collect() ,尝试释放未使用的内存。这有助于减少内存泄漏,特别是在长时间运行的程序中。gc.collect() # attempt to free unused memory# 调用 torch.cuda.empty_cache() ,释放未使用的 GPU 内存。这不会减少已分配的张量的内存,但可以释放那些不再使用的内存块,从而减少 GPU 内存的碎片化。torch.cuda.empty_cache()# 如果 input 是一个列表,则遍历列表中的每个输入张量;否则,将 input 包装成一个列表并遍历。这确保了代码可以处理单个输入或多个输入的情况。for x in input if isinstance(input, list) else [input]:# 将输入张量 x 移动到指定的设备(如 GPU 或 CPU)。x = x.to(device)# 设置输入张量 x 的 requires_grad 属性为 True ,以便在后续的反向传播中计算梯度。x.requires_grad = True# 如果 ops 是一个列表,则遍历列表中的每个操作;否则,将 ops 包装成一个列表并遍历。这确保了代码可以处理单个操作或多个操作的情况。for m in ops if isinstance(ops, list) else [ops]:# 将操作 m 移动到指定的设备。 hasattr(m, "to") 检查 m 是否有 to 方法,这通常适用于 PyTorch 的模块(如 nn.Module )。m = m.to(device) if hasattr(m, "to") else m # device# 如果输入张量 x 是 16 位浮点数( torch.float16 ),并且操作 m 有 half 方法,则将操作 m 转换为 16 位浮点数。这有助于在支持半精度计算的硬件上提高计算效率。m = m.half() if hasattr(m, "half") and isinstance(x, torch.Tensor) and x.dtype is torch.float16 else m# 初始化前向传播时间和反向传播时间的变量。 tf 和 tb 分别表示前向传播和反向传播的时间, t 是一个列表,用于存储时间戳。tf, tb, t = 0, 0, [0, 0, 0] # dt forward, backward# 尝试计算操作 m 的 FLOPs:# 如果 thop 已安装,则使用 thop.profile 计算 FLOPs。 deepcopy(m) 确保在计算过程中不会修改原始操作 m 。 inputs=[x] 指定输入张量。 verbose=False 表示不输出详细信息。计算结果除以 1e9 转换为 GFLOPs,并乘以 2(因为 FLOPs 通常包括加法和乘法操作)。# 如果 thop 未安装或计算过程中发生异常,则将 FLOPs 设置为 0。try:flops = thop.profile(deepcopy(m), inputs=[x], verbose=False)[0] / 1e9 * 2 if thop else 0 # GFLOPsexcept Exception:flops = 0# 开始一个 try 块,用于捕获可能发生的任何异常。try:# 初始化变量 mem ,用于累积 GPU 内存使用量。mem = 0# 循环 n 次,以多次测量性能指标并取平均值,提高测量的准确性。for _ in range(n):# 使用上下文管理器 cuda_memory_usage ,记录当前 GPU 内存使用情况。 cuda_info 是一个字典,包含当前的内存使用信息。with cuda_memory_usage(device) as cuda_info:# 记录前向传播开始的时间。t[0] = time_sync()# 执行前向传播,将输入 x 传递给模块 m ,得到输出 y 。y = m(x)# 记录前向传播结束的时间。t[1] = time_sync()# 尝试执行反向传播:# 如果输出 y 是一个列表,则对列表中的每个元素求和,然后调用 backward() 。# 如果输出 y 是一个张量,则直接调用 backward() 。# 如果模块没有反向传播方法(例如,某些非可训练模块),捕获异常并设置 t[2] 为 NaN 。try:(sum(yi.sum() for yi in y) if isinstance(y, list) else y).sum().backward()t[2] = time_sync()except Exception: # no backward method# print(e) # for debugt[2] = float("nan")# 累积 GPU 内存使用量(单位为 GB)。mem += cuda_info["memory"] / 1e9 # (GB)# 计算前向传播时间(单位为毫秒)并累加。tf += (t[1] - t[0]) * 1000 / n # ms per op forward# 计算反向传播时间(单位为毫秒)并累加。tb += (t[2] - t[1]) * 1000 / n # ms per op backward# 检查 max_num_obj 是否大于 0。如果 max_num_obj 大于 0,则执行模拟训练的代码。 max_num_obj 通常表示每张图像的预测数量,用于模拟实际训练中的内存使用情况。if max_num_obj: # simulate training with predictions per image grid (for AutoBatch)# 使用上下文管理器 cuda_memory_usage ,记录当前 GPU 内存使用情况。 cuda_info 是一个字典,包含当前的内存使用信息。with cuda_memory_usage(device) as cuda_info:# 生成一个随机张量,模拟训练时的预测结果:# x.shape[0] :批量大小。# max_num_obj :每张图像的预测数量。# int(sum((x.shape[-1] / s) * (x.shape[-2] / s) for s in m.stride.tolist())) :计算每个预测的特征图大小。这里假设特征图的大小是输入图像大小除以步长 s 的平方。# device=device :将生成的张量放置在指定的设备上。# dtype=torch.float32 :指定张量的数据类型为 32 位浮点数。torch.randn(x.shape[0],max_num_obj,int(sum((x.shape[-1] / s) * (x.shape[-2] / s) for s in m.stride.tolist())),device=device,dtype=torch.float32,)# 将生成随机张量时的 GPU 内存使用量累加到 mem 中。 cuda_info["memory"] 是当前的 GPU 内存使用量(单位为字节),除以 1e9 转换为 GB。mem += cuda_info["memory"] / 1e9 # (GB)# 使用生成器表达式,分别获取输入 x 和输出 y 的形状。# 如果 x 或 y 是一个 torch.Tensor ,则获取其形状并转换为元组。# 如果 x 或 y 不是 torch.Tensor (例如,是一个列表),则返回字符串 "list" 。# s_in 和 s_out 分别存储输入和输出的形状。s_in, s_out = (tuple(x.shape) if isinstance(x, torch.Tensor) else "list" for x in (x, y)) # shapes# 如果 m 是一个 nn.Module ,则计算其所有参数的总数。 x.numel() 返回每个参数张量的元素数量, sum 函数将这些数量相加。# 如果 m 不是 nn.Module ,则参数数量 p 设置为 0。p = sum(x.numel() for x in m.parameters()) if isinstance(m, nn.Module) else 0 # parameters# 使用 LOGGER.info 打印性能分析结果,格式化输出以下信息:# p :参数数量,右对齐,宽度为 12 个字符。# flops :FLOPs(十亿浮点运算次数),右对齐,宽度为 12 个字符,格式化为 4 位有效数字。# mem :GPU 内存使用量(单位为 GB),右对齐,宽度为 14 个字符,格式化为 3 位小数。# tf :前向传播时间(单位为毫秒),右对齐,宽度为 14 个字符,格式化为 4 位小数。# tb :反向传播时间(单位为毫秒),右对齐,宽度为 14 个字符,格式化为 4 位小数。# s_in :输入形状,右对齐,宽度为 24 个字符。# s_out :输出形状,右对齐,宽度为 24 个字符。LOGGER.info(f"{p:12}{flops:12.4g}{mem:>14.3f}{tf:14.4g}{tb:14.4g}{str(s_in):>24s}{str(s_out):>24s}")# 将当前操作的性能分析结果存储到 results 列表中。每个结果是一个包含以下信息的列表:# 参数数量 p# FLOPs flops# GPU 内存使用量 mem# 前向传播时间 tf# 反向传播时间 tb# 输入形状 s_in# 输出形状 s_outresults.append([p, flops, mem, tf, tb, s_in, s_out])# 捕获在性能分析过程中可能发生的任何异常。 Exception as e 将捕获的异常存储在变量 e 中。except Exception as e:# 使用 LOGGER.info 记录异常信息。这有助于在发生错误时进行调试和问题追踪。LOGGER.info(e)# 在发生异常时,将 None 添加到 results 列表中。这表示当前操作的性能分析未能成功完成,结果不可用。results.append(None)# finally 块确保在退出时执行清理操作,无论是否发生异常。finally:# 调用 Python 的垃圾回收器 gc.collect() ,尝试释放未使用的内存。这有助于减少内存泄漏,特别是在长时间运行的程序中。gc.collect() # attempt to free unused memory# 调用 torch.cuda.empty_cache() ,释放未使用的 GPU 内存。这不会减少已分配的张量的内存,但可以释放那些不再使用的内存块,从而减少 GPU 内存的碎片化。torch.cuda.empty_cache()# 返回性能分析的结果列表 results 。这个列表包含了每个操作的性能分析结果,或者在发生异常时为 None 。return results
# 这段代码实现了一个函数 profile_ops ,用于对给定的输入和操作进行性能分析,包括计算 FLOPs、GPU 内存使用量、前向传播和反向传播的时间等。它还提供了日志记录功能,以便用户可以直观地查看每个操作的性能指标。这种性能分析在优化模型和选择合适的硬件时非常有用。