Grad-CAM与Hook函数
知识点回顾
- 回调函数
- lambda函数
- hook函数的模块钩子和张量钩子
- Grad-CAM的示例
作业:理解下今天的代码即可
1.回调函数
def handle_result(result):"""处理计算结果的回调函数"""print(f"计算结果是: {result}")def with_callback(callback):"""装饰器工厂:创建一个将计算结果传递给回调函数的装饰器"""def decorator(func):"""实际的装饰器,用于包装目标函数"""def wrapper(a, b):"""被装饰后的函数,执行计算并调用回调"""result = func(a, b) # 执行原始计算callback(result) # 调用回调函数处理结果return result # 返回计算结果(可选)return wrapperreturn decorator# 使用装饰器包装原始计算函数
@with_callback(handle_result)
def calculate(a, b):"""执行加法计算"""return a + b# 直接调用被装饰后的函数
calculate(3, 5)
输出结果:
计算结果是: 8
2.lamda匿名函数
square = lambda x : x ** 2
print(square(5))
输出结果:
25
3.hook函数
import os
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt# 设置随机种子确保结果可复现
torch.manual_seed(42)
np.random.seed(42)# 解决OpenMP运行时库冲突问题
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' # 临时解决方案
# 或者限制OpenMP线程数(可选)
# os.environ['OMP_NUM_THREADS'] = '1'# 定义简单的卷积神经网络
class SimpleModel(nn.Module):def __init__(self):super(SimpleModel, self).__init__()# 卷积层:输入1通道,输出2通道,3x3卷积核,填充1保持尺寸self.conv = nn.Conv2d(1, 2, kernel_size=3, padding=1)self.relu = nn.ReLU()# 全连接层:输入2*4*4特征,输出10分类self.fc = nn.Linear(2 * 4 * 4, 10)def forward(self, x):x = self.conv(x)x = self.relu(x)x = x.view(-1, 2 * 4 * 4)x = self.fc(x)return xdef analyze_model_hooks():# 创建模型model = SimpleModel()# 存储中间层输出conv_outputs = []# 前向钩子函数def forward_hook(module, input, output):"""前向钩子函数,在模块前向传播后自动调用参数:module: 当前模块实例input: 输入张量元组output: 输出张量"""print(f"前向钩子被调用 - 模块类型: {type(module)}")print(f"输入形状: {input[0].shape}")print(f"输出形状: {output.shape}")# 保存卷积层输出用于分析conv_outputs.append(output.detach())# 注册前向钩子forward_hook_handle = model.conv.register_forward_hook(forward_hook)# 创建输入张量x = torch.randn(1, 1, 4, 4)# 执行前向传播output = model(x)# 释放前向钩子forward_hook_handle.remove()# 打印并可视化卷积层输出if conv_outputs:print(f"\n卷积层输出形状: {conv_outputs[0].shape}")print(f"卷积层第一个输出通道示例:\n{conv_outputs[0][0, 0, :, :]}")# 尝试可视化,如果环境支持try:plt.figure(figsize=(12, 4))# 输入图像plt.subplot(1, 3, 1)plt.title('输入图像')plt.imshow(x[0, 0].detach().numpy(), cmap='gray')# 第一个卷积核输出plt.subplot(1, 3, 2)plt.title('卷积核1输出')plt.imshow(conv_outputs[0][0, 0].detach().numpy(), cmap='gray')# 第二个卷积核输出plt.subplot(1, 3, 3)plt.title('卷积核2输出')plt.imshow(conv_outputs[0][0, 1].detach().numpy(), cmap='gray')plt.tight_layout()plt.show()except Exception as e:print(f"无法显示图像: {e}. 可能需要GUI环境。")# 存储梯度conv_gradients = []# 反向钩子函数def backward_hook(module, grad_input, grad_output):"""反向钩子函数,在模块反向传播时自动调用参数:module: 当前模块实例grad_input: 输入梯度元组grad_output: 输出梯度元组"""print(f"\n反向钩子被调用 - 模块类型: {type(module)}")print(f"输入梯度数量: {len(grad_input)}")print(f"输出梯度数量: {len(grad_output)}")# 保存梯度用于分析conv_gradients.append((grad_input, grad_output))# 注册反向钩子backward_hook_handle = model.conv.register_backward_hook(backward_hook)# 创建带梯度的输入并执行前向传播x = torch.randn(1, 1, 4, 4, requires_grad=True)output = model(x)# 定义损失函数并执行反向传播loss = output.sum()loss.backward()# 释放反向钩子backward_hook_handle.remove()# 张量钩子示例def demonstrate_tensor_hook():print("\n=== 张量钩子示例 ===")# 创建带梯度的张量x = torch.tensor([2.0], requires_grad=True)y = x ** 2z = y ** 3# 梯度修改钩子def tensor_hook(grad):print(f"原始梯度: {grad}")# 修改梯度(例如减半)return grad / 2# 注册张量钩子tensor_hook_handle = y.register_hook(tensor_hook)# 执行反向传播z.backward()print(f"修改后的梯度: {x.grad}") # 应显示修改后的梯度# 释放张量钩子tensor_hook_handle.remove()# 运行张量钩子示例demonstrate_tensor_hook()if __name__ == "__main__":analyze_model_hooks()
输出结果:
前向钩子被调用 - 模块类型: <class 'torch.nn.modules.conv.Conv2d'>
输入形状: torch.Size([1, 1, 4, 4])
输出形状: torch.Size([1, 2, 4, 4])卷积层输出形状: torch.Size([1, 2, 4, 4])
卷积层第一个输出通道示例:
tensor([[-0.4173, -0.5642, 0.3407, -0.5395],[-0.0755, 0.8618, 0.5276, -0.2671],[-0.1973, -0.4461, -1.1223, 0.1371],[ 0.2291, -0.1226, 0.2020, 0.3061]])
反向钩子被调用 - 模块类型: <class 'torch.nn.modules.conv.Conv2d'>
输入梯度数量: 3
输出梯度数量: 1=== 张量钩子示例 ===
原始梯度: tensor([48.])
修改后的梯度: tensor([96.])
4.Grad-CAM
import os
# 解决OpenMP运行时库冲突问题
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' # 允许重复加载OpenMP库
# 可选:限制OpenMP线程数以减少冲突
# os.environ['OMP_NUM_THREADS'] = '1'import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import warnings# 全局设置
warnings.filterwarnings("ignore")
plt.rcParams["font.family"] = ["SimHei"] # 设置中文字体
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
torch.manual_seed(42) # 固定随机种子
np.random.seed(42)# 设备配置
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")# ---------------------- 数据处理模块 ---------------------- #
class DataLoader:"""数据加载与预处理类"""def __init__(self, data_root="./data"):self.data_root = data_rootself.transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])def load_dataset(self, train=True):"""加载CIFAR-10数据集"""try:dataset = torchvision.datasets.CIFAR10(root=self.data_root,train=train,download=True,transform=self.transform)return datasetexcept Exception as e:raise RuntimeError(f"数据集加载失败: {e}")def create_dataloader(self, dataset, batch_size=64, shuffle=True, num_workers=2):"""创建数据加载器"""return torch.utils.data.DataLoader(dataset,batch_size=batch_size,shuffle=shuffle,num_workers=num_workers,pin_memory=(device.type == "cuda"), # GPU优化persistent_workers=(num_workers > 0))# ---------------------- 模型定义模块 ---------------------- #
class SimpleCNN(nn.Module):"""简单CNN模型"""def __init__(self):super(SimpleCNN, self).__init__()self.conv_layers = nn.Sequential(nn.Conv2d(3, 32, kernel_size=3, padding=1),nn.ReLU(),nn.MaxPool2d(2, 2),nn.Conv2d(32, 64, kernel_size=3, padding=1),nn.ReLU(),nn.MaxPool2d(2, 2),nn.Conv2d(64, 128, kernel_size=3, padding=1),nn.ReLU(),nn.MaxPool2d(2, 2))self.fc_layers = nn.Sequential(nn.Linear(128 * 4 * 4, 512),nn.ReLU(),nn.Linear(512, 10))def forward(self, x):x = self.conv_layers(x)x = x.view(x.size(0), -1)x = self.fc_layers(x)return x# ---------------------- 训练模块 ---------------------- #
class Trainer:"""模型训练类"""def __init__(self, model, device):self.model = model.to(device)self.device = deviceself.criterion = nn.CrossEntropyLoss()self.optimizer = torch.optim.Adam(model.parameters(), lr=0.001)def train_epoch(self, dataloader):"""单轮训练"""self.model.train()running_loss = 0.0for i, (inputs, labels) in enumerate(dataloader, 1):inputs, labels = inputs.to(self.device), labels.to(self.device)self.optimizer.zero_grad()outputs = self.model(inputs)loss = self.criterion(outputs, labels)loss.backward()self.optimizer.step()running_loss += loss.item()if i % 100 == 0:print(f"批次 {i:4d} 平均损失: {running_loss / 100:.3f}")running_loss = 0.0def train(self, dataloader, epochs=1):"""完整训练流程"""for epoch in range(1, epochs + 1):print(f"\nEpoch {epoch}/{epochs}")self.train_epoch(dataloader)print("训练完成")# ---------------------- Grad-CAM可视化模块 ---------------------- #
class GradCAM:"""Grad-CAM可视化类(修复梯度跟踪和OpenMP冲突)"""def __init__(self, model, target_layer):self.model = modelself.target_layer = target_layerself.gradients = None # 存储脱离梯度的梯度self.activations = None # 存储脱离梯度的激活值self._register_hooks() # 注册钩子def _register_hooks(self):"""注册前向/反向钩子函数,并脱离梯度"""def forward_hook(module, _, output):# 前向传播时保存脱离梯度的激活值self.activations = output.detach()def backward_hook(module, grad_input, grad_output):# 反向传播时保存脱离梯度的梯度self.gradients = grad_output[0].detach()self.target_layer.register_forward_hook(forward_hook)self.target_layer.register_backward_hook(backward_hook)def _generate_cam(self, input_tensor, target_class):"""生成类激活映射(CAM)"""# 确保模型处于训练模式(临时启用梯度计算)self.model.train()# 启用输入张量的梯度跟踪input_tensor.requires_grad_(True)# 前向传播获取输出outputs = self.model(input_tensor)# 构建目标类别的one-hot向量one_hot = torch.zeros_like(outputs)one_hot[0, target_class] = 1# 反向传播计算梯度(保留计算图)self.model.zero_grad()outputs.backward(gradient=one_hot, retain_graph=True)# 获取已脱离梯度的激活值和梯度activations = self.activations # 已在钩子中detach()gradients = self.gradients # 已在钩子中detach()# 计算通道权重(全局平均池化)weights = torch.mean(gradients, dim=(2, 3), keepdim=True)# 加权求和生成CAMcam = torch.sum(weights * activations, dim=1, keepdim=True)# ReLU激活(保留正贡献区域)cam = F.relu(cam)# 调整尺寸并归一化cam = F.interpolate(cam, size=32, mode="bilinear", align_corners=False)cam = (cam - cam.min()) / (cam.max() + 1e-8) # 归一化到[0, 1]# 恢复模型为评估模式self.model.eval()# 转换为NumPy数组(已脱离梯度,安全转换)return cam.squeeze().numpy()def __call__(self, input_image, target_class=None):"""入口函数:生成热力图和预测类别"""input_tensor = input_image.unsqueeze(0).to(device)if target_class is None:# 不启用梯度模式下获取预测类别(节省计算)with torch.no_grad():target_class = self.model(input_tensor).argmax(dim=1).item()return self._generate_cam(input_tensor, target_class), target_class# ---------------------- 工具函数 ---------------------- #
def tensor_to_image(tensor):"""将Tensor转换为可视化图像(反归一化并脱离梯度)"""# 确保张量脱离梯度并转换为CPUtensor = tensor.detach().cpu()img = tensor.numpy().transpose(1, 2, 0)mean = np.array([0.5, 0.5, 0.5])std = np.array([0.5, 0.5, 0.5])return std * img + mean # 反归一化处理def visualize_cam(image, heatmap, pred_class, true_class, save_path="grad_cam_result.png"):"""可视化热力图(支持无GUI环境)"""try:plt.figure(figsize=(12, 4))# 原始图像(处理梯度并转换)image_np = tensor_to_image(image)# 原始图像plt.subplot(1, 3, 1)plt.imshow(image_np)plt.title(f"真实类别: {classes[true_class]}")plt.axis("off")# 热力图plt.subplot(1, 3, 2)plt.imshow(heatmap, cmap="jet")plt.title(f"预测类别: {classes[pred_class]}")plt.axis("off")# 叠加图像plt.subplot(1, 3, 3)heatmap_colored = plt.cm.jet(heatmap)[:, :, :3]superimposed = image_np * 0.6 + heatmap_colored * 0.4plt.imshow(superimposed)plt.title("叠加热力图")plt.axis("off")plt.tight_layout()plt.savefig(save_path)print(f"结果已保存至 {save_path}")plt.show()except Exception as e:print(f"可视化失败: {e},可能缺少GUI环境,已跳过显示")plt.close()# ---------------------- 主流程 ---------------------- #
if __name__ == "__main__":classes = ("飞机", "汽车", "鸟", "猫", "鹿", "狗", "青蛙", "马", "船", "卡车")# 1. 初始化数据加载器data_loader = DataLoader()try:testset = data_loader.load_dataset(train=False)except RuntimeError as e:print(f"数据加载失败: {e},程序终止")exit(1)# 2. 初始化模型model = SimpleCNN()print("模型已创建")# 3. 加载或训练模型model_path = "cifar10_cnn.pth"try:model.load_state_dict(torch.load(model_path, map_location=device))print(f"成功加载预训练模型: {model_path}")except FileNotFoundError:print("未找到预训练模型,开始训练...")trainset = data_loader.load_dataset(train=True)trainloader = data_loader.create_dataloader(trainset)trainer = Trainer(model, device)trainer.train(trainloader, epochs=1) # 可增加epochs参数torch.save(model.state_dict(), model_path)print(f"模型已保存至 {model_path}")except Exception as e:print(f"模型加载失败: {e},使用随机初始化模型")model = model.to(device)model.eval() # 模型评估模式# 4. Grad-CAM可视化try:idx = 102 # 固定索引便于复现(可改为随机索引:np.random.randint(len(testset)))image, true_label = testset[idx]print(f"选择图像索引 {idx},真实类别: {classes[true_label]}")# 选择最后一个卷积层(conv3,即conv_layers中的第7层)target_layer = model.conv_layers[-3] # 对应nn.Conv2d(64, 128, kernel_size=3, padding=1)grad_cam = GradCAM(model, target_layer)# 生成热力图(自动处理梯度跟踪)heatmap, pred_label = grad_cam(image, target_class=None)# 可视化结果(确保张量已脱离梯度)visualize_cam(image, heatmap, pred_label, true_label)except Exception as e:print(f"可视化失败: {e}")
输出结果:
使用设备: cpu
模型已创建
成功加载预训练模型: cifar10_cnn.pth
选择图像索引 102,真实类别: 青蛙
结果已保存至 grad_cam_result.png
@浙大疏锦行