1. PyTorch Cheat Sheet
- 1. PyTorch Cheat Sheet
- 1.1 Getting Started
- 1.2 Tensors
- 1.3 Neural Networks
- 1.4 Training
- 1.5 Evaluation
- 1.6 Prediction and Inference
- 1.7 Saving and Loading Models
- 1.8 CUDA (GPU Support)
- 1.9 Distributed Training
- 1.10 Autograd (Automatic Differentiation)
- 1.11 Data Augmentation
- 1.12 Learning Rate Schedulers
- 1.13 TensorBoard Integration
- 1.14 ONNX Export
- 1.15 TorchScript
- 1.16 Deployment
- 1.17 Distributed Training
- 1.18 Common Neural Network Architectures
- 1.19 Common Issues and Debugging
- 1.20 Best Practices
- 1.21 Quick Reference
This cheat sheet provides an exhaustive overview of the PyTorch deep learning library, covering essential concepts, code snippets, and best practices for efficient model building, training, and deployment. It aims to be a one-stop reference for common tasks.
1.1 Getting Started
1.1.1 Installation
# CPU-only version
pip install torch torchvision torchaudio
# CUDA 11.8 support
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# CUDA 12.1 support
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
Replace cu121 with your CUDA version. Check the PyTorch website for the most up-to-date installation instructions.
1.1.2 Importing PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as transforms
# Check version
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"CUDA version: {torch.version.cuda}")
1.2 Tensors
1.2.1 Tensor Creation Flow
┌────────────────────────────────────────┐
│ Tensor Creation Methods │
└───┬────────────────────────────────┬───┘
│ │
↓ ↓
┌─────────┐ ┌─────────┐
│ From │ │ From │
│ Data │ │ Scratch │
└────┬────┘ └────┬────┘
│ │
↓ ↓
┌──────────┐ ┌──────────┐
│ .tensor()│ │ .zeros() │
│ .from_ │ │ .ones() │
│ numpy() │ │ .rand() │
│ .as_ │ │ .randn() │
│ tensor()│ │ .empty() │
└──────────┘ └──────────┘
1.2.2 Creating Tensors
From a List:
# Direct creation from list
data = [1, 2, 3, 4, 5]
tensor = torch.tensor(data)
print(tensor) # tensor([1, 2, 3, 4, 5])
# 2D tensor
data_2d = [[1, 2, 3], [4, 5, 6]]
tensor_2d = torch.tensor(data_2d)
print(tensor_2d.shape) # torch.Size([2, 3])
From a NumPy Array:
import numpy as np
# Convert NumPy to tensor (shares memory)
data = np.array([1, 2, 3, 4, 5])
tensor = torch.from_numpy(data)
# Changes in NumPy array affect tensor
data[0] = 100
print(tensor) # tensor([100, 2, 3, 4, 5])
# Create independent copy
tensor_copy = torch.tensor(data)
Zeros, Ones, and Filled Tensors:
# Create tensor filled with zeros
zeros = torch.zeros(3, 4)
print(zeros.shape) # torch.Size([3, 4])
# Create tensor filled with ones
ones = torch.ones(3, 4)
# Create tensor filled with specific value
full = torch.full((3, 4), fill_value=7)
# Create tensor like another tensor
x = torch.tensor([[1, 2], [3, 4]])
zeros_like = torch.zeros_like(x)
ones_like = torch.ones_like(x)
# Empty tensor (uninitialized memory)
empty = torch.empty(3, 4)
Ranges:
# Create range of integers
arange = torch.arange(start=0, end=10, step=2)
print(arange) # tensor([0, 2, 4, 6, 8])
# Create linearly spaced values
linspace = torch.linspace(start=0, end=1, steps=5)
print(linspace) # tensor([0.0000, 0.2500, 0.5000, 0.7500, 1.0000])
# Logarithmically spaced values
logspace = torch.logspace(start=0, end=2, steps=5)
print(logspace) # tensor([1., 3.1623, 10., 31.6228, 100.])
Random Number Generation:
# Set random seed for reproducibility
torch.manual_seed(42)
# Uniform distribution [0, 1)
rand = torch.rand(3, 4)
# Standard normal distribution (mean=0, std=1)
randn = torch.randn(3, 4)
# Random integers in range [low, high)
randint = torch.randint(low=0, high=10, size=(3, 4))
# Random permutation
perm = torch.randperm(10) # tensor([2, 5, 1, 9, 0, 3, 7, 4, 6, 8])
# Random sampling from normal distribution
normal = torch.normal(mean=0.0, std=1.0, size=(3, 4))
1.2.3 Tensor Attributes
tensor.shape # Shape of the tensor
tensor.size() # Same as shape
tensor.ndim # Number of dimensions
tensor.dtype # Data type of the tensor
tensor.device # Device where the tensor is stored (CPU or GPU)
tensor.requires_grad # Whether gradients are tracked
tensor.layout # Memory layout (torch.strided, torch.sparse_coo)
1.2.4 Tensor Operations
┌──────────────────────────────────┐
│ Tensor Operation Types │
└────┬────────────────────────┬────┘
│ │
↓ ↓
┌─────────┐ ┌─────────┐
│Arithmetic│ │ Shape │
│Operations│ │Operations│
└────┬────┘ └────┬────┘
│ │
┌────┴────┐ ┌────┴────┐
│ +, -, *,│ │ reshape │
│ /, **, @│ │ view │
│ add_() │ │ squeeze │
│ sub_() │ │ unsqueeze│
└─────────┘ └─────────┘
Arithmetic Operations:
a = torch.tensor([1.0, 2.0, 3.0])
b = torch.tensor([4.0, 5.0, 6.0])
# Element-wise operations
c = a + b # tensor([5., 7., 9.])
d = a - b # tensor([-3., -3., -3.])
e = a * b # tensor([4., 10., 18.])
f = a / b # tensor([0.25, 0.4, 0.5])
g = a ** 2 # tensor([1., 4., 9.])
# In-place operations (modify original tensor)
a.add_(b) # a = tensor([5., 7., 9.])
a.sub_(b) # Subtract in-place
a.mul_(b) # Multiply in-place
a.div_(b) # Divide in-place
# Functional form
result = torch.add(a, b)
result = torch.mul(a, b)
# Scalar operations
a = torch.tensor([1.0, 2.0, 3.0])
scaled = a * 2 # tensor([2., 4., 6.])
shifted = a + 5 # tensor([6., 7., 8.])
Slicing and Indexing:
tensor = torch.tensor([[1, 2, 3], [4, 5, 6]])
# Basic indexing
first_row = tensor[0] # tensor([1, 2, 3])
second_col = tensor[:, 1] # tensor([2, 5])
element = tensor[0, 1] # tensor(2)
# Slicing
sliced = tensor[0:2, 1:3] # tensor([[2, 3], [5, 6]])
# Boolean indexing
mask = tensor > 3
filtered = tensor[mask] # tensor([4, 5, 6])
# Advanced indexing
indices = torch.tensor([0, 1, 0])
selected = tensor[:, indices] # Select columns
# Fancy indexing
rows = torch.tensor([0, 1])
cols = torch.tensor([1, 2])
elements = tensor[rows, cols] # tensor([2, 6])
Reshaping:
tensor = torch.arange(12)
print(tensor) # tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])
# Reshape to 2D
reshaped = tensor.reshape(3, 4)
print(reshaped.shape) # torch.Size([3, 4])
# View (shares memory with original)
viewed = tensor.view(3, 4) # Same as reshape but requires contiguous tensor
# Transpose (2D tensors)
tensor_2d = torch.tensor([[1, 2, 3], [4, 5, 6]])
transposed = tensor_2d.T
print(transposed.shape) # torch.Size([3, 2])
# Permute dimensions (generalized transpose)
tensor_3d = torch.randn(2, 3, 4)
permuted = tensor_3d.permute(2, 0, 1) # New shape: (4, 2, 3)
# Flatten to 1D
flattened = tensor_2d.flatten()
print(flattened) # tensor([1, 2, 3, 4, 5, 6])
# Squeeze (remove dimensions of size 1)
tensor_with_ones = torch.randn(1, 3, 1, 4)
squeezed = tensor_with_ones.squeeze() # Shape: (3, 4)
# Unsqueeze (add dimension of size 1)
tensor_1d = torch.tensor([1, 2, 3])
unsqueezed = tensor_1d.unsqueeze(0) # Shape: (1, 3)
unsqueezed = tensor_1d.unsqueeze(1) # Shape: (3, 1)
# Contiguous (ensure tensor is contiguous in memory)
contiguous = transposed.contiguous()
Concatenation and Stacking:
tensor1 = torch.tensor([[1, 2], [3, 4]])
tensor2 = torch.tensor([[5, 6], [7, 8]])
# Concatenate along existing dimension
cat_rows = torch.cat((tensor1, tensor2), dim=0) # Shape: (4, 2)
print(cat_rows)
# tensor([[1, 2],
# [3, 4],
# [5, 6],
# [7, 8]])
cat_cols = torch.cat((tensor1, tensor2), dim=1) # Shape: (2, 4)
print(cat_cols)
# tensor([[1, 2, 5, 6],
# [3, 4, 7, 8]])
# Stack creates new dimension
stacked = torch.stack((tensor1, tensor2), dim=0) # Shape: (2, 2, 2)
print(stacked.shape) # torch.Size([2, 2, 2])
# Split tensor into chunks
tensor = torch.arange(10)
chunks = torch.chunk(tensor, 3) # Split into 3 chunks
print(chunks) # (tensor([0, 1, 2, 3]), tensor([4, 5, 6]), tensor([7, 8, 9]))
# Split with specific sizes
split = torch.split(tensor, [2, 3, 5]) # Sizes: 2, 3, 5
Matrix Multiplication:
# Matrix multiplication
a = torch.randn(3, 4)
b = torch.randn(4, 5)
# Three equivalent ways
c = torch.matmul(a, b) # Functional form
d = a @ b # Operator form (preferred)
e = torch.mm(a, b) # Explicit matrix multiplication
print(c.shape) # torch.Size([3, 5])
# Batch matrix multiplication
batch1 = torch.randn(10, 3, 4) # 10 matrices of shape (3, 4)
batch2 = torch.randn(10, 4, 5) # 10 matrices of shape (4, 5)
batch_result = torch.bmm(batch1, batch2) # Shape: (10, 3, 5)
# Element-wise multiplication (Hadamard product)
a = torch.tensor([1, 2, 3])
b = torch.tensor([4, 5, 6])
c = a * b # tensor([4, 10, 18])
# Dot product (1D tensors)
dot_product = torch.dot(a, b) # tensor(32)
# Outer product
outer = torch.outer(a, b) # Shape: (3, 3)
# Matrix-vector multiplication
matrix = torch.randn(3, 4)
vector = torch.randn(4)
result = torch.mv(matrix, vector) # Shape: (3,)
1.2.5 Data Types
torch.float32ortorch.float: 32-bit floating pointtorch.float64ortorch.double: 64-bit floating pointtorch.float16ortorch.half: 16-bit floating pointtorch.bfloat16: BFloat16 floating point (useful for mixed precision)torch.int8: 8-bit integer (signed)torch.int16ortorch.short: 16-bit integer (signed)torch.int32ortorch.int: 32-bit integer (signed)torch.int64ortorch.long: 64-bit integer (signed)torch.uint8: 8-bit integer (unsigned)torch.bool: Boolean
1.2.6 Device Management
┌─────────────────────────┐
│ Device Management │
└──────────┬──────────────┘
│
┌──────┴──────┐
│ │
↓ ↓
┌───────┐ ┌────────┐
│ CPU │ │ GPU │
│ tensor│←──→│ tensor │
└───────┘ └────────┘
│ │
↓ ↓
.cpu() .cuda()
.to('cpu') .to('cuda')
# Check device availability
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"CUDA device count: {torch.cuda.device_count()}")
if torch.cuda.is_available():
print(f"CUDA device name: {torch.cuda.get_device_name(0)}")
# Set default device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Create tensor on specific device
tensor_gpu = torch.randn(3, 4, device=device)
print(tensor_gpu.device) # cuda:0 or cpu
# Move tensor between devices
tensor_cpu = torch.randn(3, 4)
tensor_gpu = tensor_cpu.to(device) # Move to GPU
tensor_gpu = tensor_cpu.cuda() # Alternative
tensor_cpu_back = tensor_gpu.cpu() # Move back to CPU
# Check tensor device
print(f"Tensor is on CUDA: {tensor_gpu.is_cuda}")
# Multiple GPU support
if torch.cuda.device_count() > 1:
tensor_gpu0 = tensor.to('cuda:0')
tensor_gpu1 = tensor.to('cuda:1')
# Set active GPU
torch.cuda.set_device(0) # Use GPU 0
1.3 Neural Networks
1.3.1 Neural Network Architecture
┌──────────┐
│ Input │
│ Layer │
└────┬─────┘
│
↓
┌──────────┐
│ Hidden │
│ Layer 1 │──→ Activation (ReLU)
└────┬─────┘
│
↓
┌──────────┐
│ Hidden │
│ Layer 2 │──→ Activation (ReLU)
└────┬─────┘
│
↓
┌──────────┐
│ Output │
│ Layer │──→ Softmax/Sigmoid
└──────────┘
1.3.2 Defining a Model
import torch
import torch.nn as nn
import torch.nn.functional as F
# Simple feedforward network
class SimpleNet(nn.Module):
def __init__(self, input_size, hidden_size, num_classes):
super(SimpleNet, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, num_classes)
self.dropout = nn.Dropout(0.2)
def forward(self, x):
# Flatten input if needed
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = F.relu(self.fc2(x))
x = self.dropout(x)
x = self.fc3(x) # No activation for logits
return x
# Instantiate model
model = SimpleNet(input_size=784, hidden_size=128, num_classes=10)
print(model)
# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
1.3.3 Layers
nn.Linear: Fully connected layer.nn.Conv1d: 1D convolution layer.nn.Conv2d: 2D convolution layer.nn.Conv3d: 3D convolution layer.nn.ConvTranspose2d: Transposed convolution layer (deconvolution).nn.MaxPool1d,nn.MaxPool2d,nn.MaxPool3d: Max pooling layers.nn.AvgPool1d,nn.AvgPool2d,nn.AvgPool3d: Average pooling layers.nn.AdaptiveAvgPool2d: Adaptive average pooling layer.nn.ReLU: ReLU activation function.nn.Sigmoid: Sigmoid activation function.nn.Tanh: Tanh activation function.nn.BatchNorm1d,nn.BatchNorm2d,nn.BatchNorm3d: Batch normalization layers.nn.LayerNorm: Layer normalization layer.nn.Dropout: Dropout layer.nn.Embedding: Embedding layer.nn.LSTM: LSTM layer.nn.GRU: GRU layer.nn.Transformer: Transformer layer.nn.TransformerEncoder,nn.TransformerDecoder: Transformer encoder and decoder layers.nn.MultiheadAttention: Multi-head attention layer.
1.3.4 Activation Functions
torch.relu: Rectified Linear Unit.torch.sigmoid: Sigmoid function.torch.tanh: Hyperbolic tangent function.torch.softmax: Softmax function (for multi-class classification).torch.elu: Exponential Linear Unit.torch.selu: Scaled Exponential Linear Unit.torch.leaky_relu: Leaky Rectified Linear Unit.torch.gelu: Gaussian Error Linear Unit (GELU).torch.silu: SiLU (Sigmoid Linear Unit) or Swish.
1.3.5 Loss Functions
nn.CrossEntropyLoss: Cross-entropy loss (for multi-class classification).nn.BCELoss: Binary cross-entropy loss (for binary classification).nn.BCEWithLogitsLoss: Binary cross-entropy with logits (more stable).nn.MSELoss: Mean squared error loss (for regression).nn.L1Loss: Mean absolute error loss (for regression).nn.SmoothL1Loss: Huber loss (for robust regression).nn.CTCLoss: Connectionist Temporal Classification loss (for sequence labeling).nn.TripletMarginLoss: Triplet margin loss (for learning embeddings).nn.CosineEmbeddingLoss: Cosine embedding loss.
1.3.6 Optimizers
optim.SGD: Stochastic Gradient Descent.optim.Adam: Adaptive Moment Estimation.optim.RMSprop: Root Mean Square Propagation.optim.Adagrad: Adaptive Gradient Algorithm.optim.Adadelta: Adaptive Delta.optim.AdamW: Adam with weight decay regularization.optim.SparseAdam: Adam optimizer for sparse tensors.
1.3.7 Optimizer Configuration
optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.01, amsgrad=False)
1.3.8 Learning Rate Schedulers
from torch.optim.lr_scheduler import StepLR
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = StepLR(optimizer, step_size=30, gamma=0.1)
for epoch in range(100):
# Training loop
scheduler.step()
Common Schedulers:
StepLR: Decays the learning rate by a factor every few steps.MultiStepLR: Decays the learning rate at specified milestones.ExponentialLR: Decays the learning rate exponentially.CosineAnnealingLR: Uses a cosine annealing schedule.ReduceLROnPlateau: Reduces the learning rate when a metric has stopped improving.CyclicLR: Sets the learning rate cyclically.OneCycleLR: Sets the learning rate according to the 1cycle policy.CosineAnnealingWarmRestarts: Cosine annealing with warm restarts.
1.3.9 Metrics
- Accuracy
- Precision
- Recall
- F1-Score
- AUC (Area Under the Curve)
- IoU (Intersection over Union)
1.4 Training
1.4.1 Training Pipeline
┌─────────────────────────────────────────┐
│ Training Pipeline │
└───────────────┬─────────────────────────┘
│
↓
┌───────────────────────────────┐
│ 1. Load Data (DataLoader) │
└──────────────┬────────────────┘
│
↓
┌───────────────────────────────┐
│ 2. Zero Gradients │
│ optimizer.zero_grad() │
└──────────────┬────────────────┘
│
↓
┌───────────────────────────────┐
│ 3. Forward Pass │
│ outputs = model(inputs) │
└──────────────┬────────────────┘
│
↓
┌───────────────────────────────┐
│ 4. Compute Loss │
│ loss = criterion(...) │
└──────────────┬────────────────┘
│
↓
┌───────────────────────────────┐
│ 5. Backward Pass │
│ loss.backward() │
└──────────────┬────────────────┘
│
↓
┌───────────────────────────────┐
│ 6. Update Weights │
│ optimizer.step() │
└──────────────┬────────────────┘
│
└──→ Repeat for all batches
1.4.2 Training Loop
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
# Sample data
X_train = torch.randn(1000, 784)
y_train = torch.randint(0, 10, (1000,))
X_val = torch.randn(200, 784)
y_val = torch.randint(0, 10, (200,))
# Create datasets and dataloaders
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
# Model, loss, optimizer
model = SimpleNet(784, 128, 10)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training loop
num_epochs = 10
for epoch in range(num_epochs):
# Training phase
model.train()
train_loss = 0.0
train_correct = 0
train_total = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
# Zero the gradients
optimizer.zero_grad()
# Forward pass
outputs = model(inputs)
loss = criterion(outputs, labels)
# Backward pass and optimization
loss.backward()
optimizer.step()
# Statistics
train_loss += loss.item() * inputs.size(0)
_, predicted = torch.max(outputs.data, 1)
train_total += labels.size(0)
train_correct += (predicted == labels).sum().item()
# Calculate average training metrics
train_loss = train_loss / len(train_loader.dataset)
train_acc = 100 * train_correct / train_total
# Validation phase
model.eval()
val_loss = 0.0
val_correct = 0
val_total = 0
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item() * inputs.size(0)
_, predicted = torch.max(outputs.data, 1)
val_total += labels.size(0)
val_correct += (predicted == labels).sum().item()
val_loss = val_loss / len(val_loader.dataset)
val_acc = 100 * val_correct / val_total
print(f'Epoch [{epoch+1}/{num_epochs}] '
f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}% | '
f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
1.4.3 Custom Datasets and DataLoaders
from torch.utils.data import Dataset, DataLoader
import torch
import pandas as pd
# Custom Dataset
class CustomDataset(Dataset):
def __init__(self, data, labels, transform=None):
"""
Args:
data: Input data (numpy array or list)
labels: Target labels (numpy array or list)
transform: Optional transform to be applied
"""
self.data = torch.FloatTensor(data)
self.labels = torch.LongTensor(labels)
self.transform = transform
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
sample = self.data[idx]
label = self.labels[idx]
if self.transform:
sample = self.transform(sample)
return sample, label
# CSV Dataset example
class CSVDataset(Dataset):
def __init__(self, csv_file, transform=None):
self.data = pd.read_csv(csv_file)
self.transform = transform
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
# Assuming last column is label
features = self.data.iloc[idx, :-1].values.astype('float32')
label = self.data.iloc[idx, -1]
if self.transform:
features = self.transform(features)
return torch.FloatTensor(features), torch.LongTensor([label])
# Create dataset and dataloader
dataset = CustomDataset(data, labels)
# DataLoader parameters
dataloader = DataLoader(
dataset,
batch_size=32, # Number of samples per batch
shuffle=True, # Shuffle data at every epoch
num_workers=4, # Number of subprocesses for data loading
pin_memory=True, # Pin memory for faster data transfer to CUDA
drop_last=False, # Drop last incomplete batch
persistent_workers=True # Keep workers alive between epochs
)
# Iterate through batches
for batch_idx, (inputs, labels) in enumerate(dataloader):
print(f"Batch {batch_idx}: inputs shape {inputs.shape}, labels shape {labels.shape}")
1.4.4 Transforms
import torchvision.transforms as transforms
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])
trainset = torchvision.datasets.ImageFolder(root='./data/train', transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=32,
shuffle=True, num_workers=4, pin_memory=True)
Common Augmentations:
transforms.RandomHorizontalFlip: Horizontally flips the image.transforms.RandomVerticalFlip: Vertically flips the image.transforms.RandomRotation: Rotates the image by a random angle.transforms.RandomAffine: Applies random affine transformations.transforms.RandomPerspective: Performs perspective transformation of the given image randomly with a given magnitude.transforms.RandomCrop: Crops a random portion of the image.transforms.CenterCrop: Crops the image from the center.transforms.ColorJitter: Randomly changes the brightness, contrast, saturation, and hue of an image.transforms.RandomGrayscale: Converts the image to grayscale with a certain probability.transforms.RandomErasing: Randomly erases a rectangular region in the image.
1.4.5 Mixed Precision Training
┌────────────────────────────────────┐
│ Mixed Precision Training Flow │
└─────────────┬──────────────────────┘
│
↓
┌────────────────────────────┐
│ Forward Pass (FP16) │
│ with autocast() │
└──────────┬─────────────────┘
│
↓
┌────────────────────────────┐
│ Compute Loss (FP16) │
└──────────┬─────────────────┘
│
↓
┌────────────────────────────┐
│ Scale Loss │
│ scaler.scale(loss) │
└──────────┬─────────────────┘
│
↓
┌────────────────────────────┐
│ Backward Pass (FP32) │
│ scaled_loss.backward() │
└──────────┬─────────────────┘
│
↓
┌────────────────────────────┐
│ Unscale & Update │
│ scaler.step(optimizer) │
└────────────────────────────┘
import torch
from torch.cuda.amp import autocast, GradScaler
# Initialize GradScaler
scaler = GradScaler()
# Training loop with mixed precision
for epoch in range(num_epochs):
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
# Forward pass with autocast
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
# Backward pass with scaled gradients
scaler.scale(loss).backward()
# Unscale gradients and perform optimizer step
scaler.step(optimizer)
# Update scaler for next iteration
scaler.update()
# Benefits:
# - 2-3x faster training
# - ~50% memory reduction
# - Maintains model accuracy
1.5 Evaluation
┌────────────────────────────┐
│ Evaluation Mode │
└────────┬───────────────────┘
│
↓
┌─────────────────┐
│ model.eval() │──→ Disable dropout
└────────┬────────┘ Freeze batch norm
│
↓
┌─────────────────────┐
│ torch.no_grad() │──→ Disable gradient
└────────┬────────────┘ computation
│
↓
┌─────────────────────┐
│ Forward Pass │
│ Compute Metrics │
└─────────────────────┘
import torch
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
def evaluate_model(model, test_loader, device):
"""Comprehensive model evaluation"""
model.eval()
all_preds = []
all_labels = []
total_loss = 0.0
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
# Forward pass
outputs = model(inputs)
loss = criterion(outputs, labels)
# Get predictions
_, predicted = torch.max(outputs.data, 1)
# Store results
all_preds.extend(predicted.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
total_loss += loss.item() * inputs.size(0)
# Calculate metrics
avg_loss = total_loss / len(test_loader.dataset)
accuracy = 100 * np.sum(np.array(all_preds) == np.array(all_labels)) / len(all_labels)
print(f"Test Loss: {avg_loss:.4f}")
print(f"Test Accuracy: {accuracy:.2f}%")
print("\nClassification Report:")
print(classification_report(all_labels, all_preds))
print("\nConfusion Matrix:")
print(confusion_matrix(all_labels, all_preds))
return accuracy, avg_loss
# Quick evaluation
model.eval()
correct = 0
total = 0
with torch.no_grad():
for images, labels in test_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
accuracy = 100 * correct / total
print(f'Test Accuracy: {accuracy:.2f}%')
1.6 Prediction and Inference
import torch
import torch.nn.functional as F
# Single prediction
model.eval()
with torch.no_grad():
# Prepare input
input_tensor = torch.randn(1, 3, 224, 224).to(device)
# Get prediction
output = model(input_tensor)
# For classification
probabilities = F.softmax(output, dim=1)
confidence, predicted_class = torch.max(probabilities, 1)
print(f'Predicted class: {predicted_class.item()}')
print(f'Confidence: {confidence.item():.4f}')
print(f'All probabilities: {probabilities[0].cpu().numpy()}')
# Batch prediction
def predict_batch(model, inputs, device):
"""Make predictions for a batch of inputs"""
model.eval()
inputs = inputs.to(device)
with torch.no_grad():
outputs = model(inputs)
probabilities = F.softmax(outputs, dim=1)
confidences, predictions = torch.max(probabilities, 1)
return predictions.cpu().numpy(), confidences.cpu().numpy()
# Example usage
batch_inputs = torch.randn(10, 3, 224, 224)
predictions, confidences = predict_batch(model, batch_inputs, device)
for i, (pred, conf) in enumerate(zip(predictions, confidences)):
print(f"Sample {i}: Class {pred}, Confidence {conf:.4f}")
# Top-k predictions
def get_top_k_predictions(model, input_tensor, k=5):
"""Get top-k predictions with probabilities"""
model.eval()
with torch.no_grad():
output = model(input_tensor.to(device))
probabilities = F.softmax(output, dim=1)
top_probs, top_classes = torch.topk(probabilities, k, dim=1)
return top_classes[0].cpu().numpy(), top_probs[0].cpu().numpy()
# Usage
input_tensor = torch.randn(1, 3, 224, 224)
classes, probs = get_top_k_predictions(model, input_tensor, k=5)
print("Top 5 predictions:")
for cls, prob in zip(classes, probs):
print(f" Class {cls}: {prob:.4f}")
1.7 Saving and Loading Models
┌──────────────────────────────────┐
│ Model Saving Strategies │
└────────┬─────────────────────────┘
│
┌────┴─────┐
│ │
↓ ↓
┌────────┐ ┌──────────────┐
│ Full │ │ State Dict │
│ Model │ │ (Preferred) │
└────────┘ └──────────────┘
1.7.1 Save Model State Dictionary (Recommended)
import torch
import os
# Save only state dictionary (recommended)
torch.save(model.state_dict(), 'model_weights.pth')
# Save checkpoint with additional info
checkpoint = {
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
'accuracy': accuracy
}
torch.save(checkpoint, 'checkpoint.pth')
# Save best model
best_acc = 0.0
if val_acc > best_acc:
best_acc = val_acc
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'best_acc': best_acc
}, 'best_model.pth')
1.7.2 Load Model State Dictionary
# Load state dictionary
model = SimpleNet(784, 128, 10)
model.load_state_dict(torch.load('model_weights.pth'))
model.eval()
# Load checkpoint
checkpoint = torch.load('checkpoint.pth')
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']
# Resume training
model.train()
# Load for inference on different device
device = torch.device('cpu')
model = SimpleNet(784, 128, 10)
model.load_state_dict(torch.load('model_weights.pth', map_location=device))
model.eval()
1.7.3 Save Entire Model (Not Recommended)
# Save entire model
torch.save(model, 'full_model.pth')
# Load entire model
model = torch.load('full_model.pth')
model.eval()
# Note: This is less flexible and may break with PyTorch version changes
1.7.4 Model Versioning and Management
import os
from datetime import datetime
def save_model_with_metadata(model, optimizer, epoch, metrics, save_dir='models'):
"""Save model with comprehensive metadata"""
os.makedirs(save_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'model_epoch{epoch}_{timestamp}.pth'
filepath = os.path.join(save_dir, filename)
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'metrics': metrics,
'timestamp': timestamp,
'pytorch_version': torch.__version__
}, filepath)
print(f"Model saved to {filepath}")
return filepath
# Usage
metrics = {'train_loss': 0.5, 'train_acc': 0.85, 'val_loss': 0.6, 'val_acc': 0.82}
save_model_with_metadata(model, optimizer, epoch=10, metrics=metrics)
1.8 CUDA (GPU Support)
1.8.1 Check CUDA Availability
torch.cuda.is_available()
1.8.2 Set Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
1.8.3 Move Tensors to GPU
tensor = tensor.to(device)
1.8.4 CUDA Best Practices
- Use pinned memory for data transfer:
torch.utils.data.DataLoader(..., pin_memory=True) - Use asynchronous data transfer:
torch.cuda.Stream() - Use mixed precision training:
torch.cuda.amp.autocast()andtorch.cuda.amp.GradScaler() - Use
torch.backends.cudnn.benchmark = Truefor faster convolutions when input sizes are fixed.
1.9 Distributed Training
┌─────────────────────────────────────┐
│ Distributed Training Methods │
└───────────────┬─────────────────────┘
│
┌───────────┴──────────┐
│ │
↓ ↓
┌────────────┐ ┌───────────────┐
│DataParallel│ │DistributedData│
│ (DP) │ │Parallel (DDP) │
│ │ │ │
│ • Single │ │ • Multi-node │
│ node │ │ • Faster │
│ • Easier │ │ • Scalable │
└────────────┘ └───────────────┘
1.9.1 DataParallel (Simple but limited)
import torch
import torch.nn as nn
# Check available GPUs
if torch.cuda.device_count() > 1:
print(f"Using {torch.cuda.device_count()} GPUs")
# Wrap model with DataParallel
model = nn.DataParallel(model)
model.to('cuda')
# Training works the same way
for inputs, labels in train_loader:
inputs, labels = inputs.to('cuda'), labels.to('cuda')
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# Note: DataParallel splits batches across GPUs but has limitations:
# - Single process (GIL bottleneck)
# - Slower than DDP
# - Uneven GPU utilization
1.9.2 DistributedDataParallel (Recommended)
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
import os
def setup(rank, world_size):
"""Initialize the distributed environment"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# Initialize process group
dist.init_process_group(
backend='nccl', # Use 'gloo' for CPU, 'nccl' for GPU
init_method='env://',
rank=rank,
world_size=world_size
)
# Set device
torch.cuda.set_device(rank)
def cleanup():
"""Clean up the distributed environment"""
dist.destroy_process_group()
def train(rank, world_size, epochs):
"""Training function for each process"""
print(f"Running DDP on rank {rank}")
setup(rank, world_size)
# Create model and move to GPU
model = SimpleNet(784, 128, 10).to(rank)
ddp_model = DDP(model, device_ids=[rank])
# Create dataset and sampler
train_dataset = YourDataset()
train_sampler = DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=rank,
shuffle=True
)
train_loader = DataLoader(
train_dataset,
batch_size=32,
sampler=train_sampler,
num_workers=2,
pin_memory=True
)
# Optimizer and loss
optimizer = torch.optim.Adam(ddp_model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss().to(rank)
# Training loop
for epoch in range(epochs):
# Set epoch for sampler (important for shuffling)
train_sampler.set_epoch(epoch)
ddp_model.train()
epoch_loss = 0.0
for batch_idx, (inputs, labels) in enumerate(train_loader):
inputs, labels = inputs.to(rank), labels.to(rank)
optimizer.zero_grad()
outputs = ddp_model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
# Average loss across all processes
avg_loss = epoch_loss / len(train_loader)
if rank == 0: # Only print from main process
print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
# Save checkpoint from main process
if rank == 0:
checkpoint = {
'epoch': epoch,
'model_state_dict': ddp_model.module.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': avg_loss
}
torch.save(checkpoint, f'checkpoint_epoch_{epoch}.pth')
cleanup()
def main():
"""Main function to spawn processes"""
world_size = torch.cuda.device_count()
if world_size < 2:
print("Need at least 2 GPUs for DDP")
return
print(f"Training on {world_size} GPUs")
# Spawn processes
mp.spawn(
train,
args=(world_size, 10), # world_size, epochs
nprocs=world_size,
join=True
)
if __name__ == "__main__":
main()
1.9.3 Multi-Node DDP
import torch.distributed as dist
# On each node, set these environment variables:
# MASTER_ADDR: IP address of rank 0 node
# MASTER_PORT: Free port on rank 0 node
# WORLD_SIZE: Total number of processes across all nodes
# RANK: Global rank of this process
def setup_multinode(rank, world_size):
"""Setup for multi-node training"""
# These should be set via environment variables
master_addr = os.environ.get('MASTER_ADDR', 'localhost')
master_port = os.environ.get('MASTER_PORT', '12355')
dist.init_process_group(
backend='nccl',
init_method=f'tcp://{master_addr}:{master_port}',
rank=rank,
world_size=world_size
)
# Launch on node 0 (4 GPUs):
# MASTER_ADDR=node0 MASTER_PORT=12355 WORLD_SIZE=8 RANK=0 python train.py
# Launch on node 1 (4 GPUs):
# MASTER_ADDR=node0 MASTER_PORT=12355 WORLD_SIZE=8 RANK=4 python train.py
1.9.4 DDP with Mixed Precision
from torch.cuda.amp import autocast, GradScaler
def train_with_amp(rank, world_size):
setup(rank, world_size)
model = SimpleNet(784, 128, 10).to(rank)
ddp_model = DDP(model, device_ids=[rank])
optimizer = torch.optim.Adam(ddp_model.parameters(), lr=0.001)
scaler = GradScaler()
for inputs, labels in train_loader:
inputs, labels = inputs.to(rank), labels.to(rank)
optimizer.zero_grad()
# Mixed precision forward pass
with autocast():
outputs = ddp_model(inputs)
loss = criterion(outputs, labels)
# Scaled backward pass
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
cleanup()
1.10 Autograd (Automatic Differentiation)
1.10.1 Computational Graph
┌─────────────────────────────────┐
│ Computational Graph │
└───────────────┬─────────────────┘
│
↓ Forward
┌───────┐ ┌───────┐ ┌───────┐
│ x │─────→│ y=x+2│─────→│z=y*y*3│
│(leaf) │ │ │ │ │
└───────┘ └───────┘ └───┬────┘
↑ │
│ ↓
│ Backward ┌────────┐
└────────────────────────│out.mean│
└────────┘
1.10.2 Tracking Gradients
import torch
# Create tensor with gradient tracking
x = torch.randn(3, requires_grad=True)
print(f"x: {x}")
print(f"requires_grad: {x.requires_grad}")
# Perform operations
y = x + 2
z = y * y * 3
out = z.mean()
print(f"\nout: {out}")
# Compute gradients
out.backward()
# Access gradients
print(f"\nx.grad: {x.grad}")
print(f"\nGradient of out with respect to x: {x.grad}")
# Gradient accumulation (multiple backward passes)
x = torch.randn(3, requires_grad=True)
for i in range(3):
y = (x ** 2).sum()
y.backward()
print(f"Iteration {i+1}, x.grad: {x.grad}")
# Zero gradients manually
x.grad.zero_()
print(f"After zeroing: {x.grad}")
1.10.3 Controlling Gradient Tracking
# Disable gradient tracking (for inference)
with torch.no_grad():
y = x + 2
z = y * y
print(f"z.requires_grad: {z.requires_grad}") # False
# Alternative: use inference mode (faster than no_grad)
with torch.inference_mode():
y = x + 2
z = y * y
# Temporarily disable gradient
x = torch.randn(3, requires_grad=True)
with torch.set_grad_enabled(False):
y = x * 2
# Detach tensor from computation graph
x = torch.randn(3, requires_grad=True)
y = x.detach() # y shares data with x but has no gradient
z = x.detach().clone() # Create independent copy without gradient
print(f"x.requires_grad: {x.requires_grad}") # True
print(f"y.requires_grad: {y.requires_grad}") # False
print(f"z.requires_grad: {z.requires_grad}") # False
1.10.4 Computing Higher-Order Derivatives
# Second derivative
x = torch.tensor([2.0], requires_grad=True)
y = x ** 3
# First derivative
first_grad = torch.autograd.grad(y, x, create_graph=True)[0]
print(f"First derivative (3x^2): {first_grad}") # 12.0
# Second derivative
second_grad = torch.autograd.grad(first_grad, x)[0]
print(f"Second derivative (6x): {second_grad}") # 12.0
1.10.5 Gradient for Non-Scalar Outputs
# For non-scalar outputs, provide gradient argument
x = torch.randn(3, requires_grad=True)
y = x * 2
# Create gradient tensor matching y's shape
grad_output = torch.ones_like(y)
y.backward(grad_output)
print(f"x.grad: {x.grad}")
1.10.6 Custom Autograd Functions
import torch
class CustomReLU(torch.autograd.Function):
"""
Custom ReLU implementation with autograd support
"""
@staticmethod
def forward(ctx, input):
# Save input for backward pass
ctx.save_for_backward(input)
# Apply ReLU: max(0, x)
return input.clamp(min=0)
@staticmethod
def backward(ctx, grad_output):
# Retrieve saved input
input, = ctx.saved_tensors
# Compute gradient
grad_input = grad_output.clone()
grad_input[input < 0] = 0 # Gradient is 0 where input < 0
return grad_input
# Use custom function
custom_relu = CustomReLU.apply
x = torch.randn(5, requires_grad=True)
y = custom_relu(x)
loss = y.sum()
loss.backward()
print(f"Input: {x}")
print(f"Output: {y}")
print(f"Gradient: {x.grad}")
# Custom function with multiple inputs/outputs
class CustomMultiply(torch.autograd.Function):
@staticmethod
def forward(ctx, input1, input2, constant):
ctx.save_for_backward(input1, input2)
ctx.constant = constant
return input1 * input2 * constant
@staticmethod
def backward(ctx, grad_output):
input1, input2 = ctx.saved_tensors
constant = ctx.constant
grad_input1 = grad_output * input2 * constant
grad_input2 = grad_output * input1 * constant
# Return gradient for each input (constant has no gradient)
return grad_input1, grad_input2, None
# Usage
a = torch.tensor([2.0], requires_grad=True)
b = torch.tensor([3.0], requires_grad=True)
c = CustomMultiply.apply(a, b, 5.0)
c.backward()
print(f"a.grad: {a.grad}") # 15.0 (3 * 5)
print(f"b.grad: {b.grad}") # 10.0 (2 * 5)
1.11 Data Augmentation
import torchvision.transforms as transforms
transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(degrees=15),
transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.1),
transforms.ToTensor(),
transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])
Common Augmentations:
transforms.RandomHorizontalFlip: Horizontally flips the image.transforms.RandomVerticalFlip: Vertically flips the image.transforms.RandomRotation: Rotates the image by a random angle.transforms.RandomAffine: Applies random affine transformations.transforms.RandomPerspective: Performs perspective transformation of the given image randomly with a given magnitude.transforms.RandomCrop: Crops a random portion of the image.transforms.CenterCrop: Crops the image from the center.transforms.ColorJitter: Randomly changes the brightness, contrast, saturation, and hue of an image.transforms.RandomGrayscale: Converts the image to grayscale with a certain probability.transforms.RandomErasing: Randomly erases a rectangular region in the image.transforms.RandomResizedCrop: Crops a random portion of the image and resizes it.
1.12 Learning Rate Schedulers
Learning Rate Schedules Visualization
StepLR: CosineAnnealing:
lr │ lr │
├─────┐ ├───┐
│ └─────┐ │ ╮╮
│ └──── │ ╲ ╯┐
│ │ ╯┐╲
└────────────────── └───────────────
epochs epochs
1.12.1 Common Schedulers
import torch
import torch.optim as optim
from torch.optim import lr_scheduler
model = SimpleNet(784, 128, 10)
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 1. StepLR: Decay LR by gamma every step_size epochs
scheduler = lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)
# LR: 0.001 → 0.0001 (epoch 30) → 0.00001 (epoch 60)
# 2. MultiStepLR: Decay at specific milestones
scheduler = lr_scheduler.MultiStepLR(optimizer, milestones=[30, 60, 90], gamma=0.1)
# 3. ExponentialLR: Exponential decay
scheduler = lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
# LR = initial_lr * (gamma ** epoch)
# 4. CosineAnnealingLR: Cosine annealing
scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=100, eta_min=0)
# 5. ReduceLROnPlateau: Reduce when metric plateaus
scheduler = lr_scheduler.ReduceLROnPlateau(
optimizer,
mode='min', # 'min' for loss, 'max' for accuracy
factor=0.1, # Multiply LR by factor
patience=10, # Wait 10 epochs before reducing
verbose=True,
min_lr=1e-6
)
# 6. CyclicLR: Cycle between two boundaries
scheduler = lr_scheduler.CyclicLR(
optimizer,
base_lr=0.001,
max_lr=0.01,
step_size_up=2000,
mode='triangular'
)
# 7. OneCycleLR: 1cycle policy (very effective)
scheduler = lr_scheduler.OneCycleLR(
optimizer,
max_lr=0.01,
epochs=100,
steps_per_epoch=len(train_loader)
)
# 8. CosineAnnealingWarmRestarts: Cosine with restarts
scheduler = lr_scheduler.CosineAnnealingWarmRestarts(
optimizer,
T_0=10, # First restart after 10 epochs
T_mult=2, # Double period after each restart
eta_min=0
)
# 9. LambdaLR: Custom schedule
scheduler = lr_scheduler.LambdaLR(
optimizer,
lr_lambda=lambda epoch: 0.95 ** epoch
)
1.12.2 Usage in Training Loop
# For most schedulers
for epoch in range(num_epochs):
# Training
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# Step scheduler after each epoch
scheduler.step()
# Print current learning rate
current_lr = optimizer.param_groups[0]['lr']
print(f"Epoch {epoch+1}, LR: {current_lr:.6f}")
# For ReduceLROnPlateau (needs validation metric)
for epoch in range(num_epochs):
# Training
train_loss = train_one_epoch(model, train_loader)
# Validation
val_loss = validate(model, val_loader)
# Step with validation loss
scheduler.step(val_loss)
print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
# For OneCycleLR (step after each batch)
for epoch in range(num_epochs):
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
scheduler.step() # Step after each batch
# Get learning rate history
lr_history = []
for epoch in range(100):
lr_history.append(optimizer.param_groups[0]['lr'])
scheduler.step()
# Plot learning rate schedule
import matplotlib.pyplot as plt
plt.plot(lr_history)
plt.xlabel('Epoch')
plt.ylabel('Learning Rate')
plt.title('Learning Rate Schedule')
plt.show()
1.13 TensorBoard Integration
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter("runs/experiment_1")
# Log scalar values
writer.add_scalar('Loss/train', loss.item(), epoch)
writer.add_scalar('Accuracy/train', accuracy, epoch)
# Log model graph
writer.add_graph(model, images)
# Log images
writer.add_image('Image', img_grid, epoch)
# Log histograms
writer.add_histogram('fc1.weight', model.fc1.weight, epoch)
# Log embeddings
writer.add_embedding(features, metadata=labels, tag='my_embedding')
writer.close()
Run TensorBoard:
tensorboard --logdir=runs
1.14 ONNX Export
┌─────────────────────────────────┐
│ ONNX Export Pipeline │
└─────────────┬───────────────────┘
│
↓
┌─────────────────────────────────┐
│ PyTorch Model (.pth) │
└─────────────┬───────────────────┘
│
↓
┌─────────────────────────────────┐
│ torch.onnx.export() │
└─────────────┬───────────────────┘
│
↓
┌─────────────────────────────────┐
│ ONNX Model (.onnx) │
│ (Cross-platform inference) │
└─────────────────────────────────┘
import torch
import onnx
import onnxruntime as ort
import numpy as np
# Export model to ONNX
model.eval()
dummy_input = torch.randn(1, 3, 224, 224).to(device)
# Export with dynamic axes for variable batch size
torch.onnx.export(
model,
dummy_input,
"model.onnx",
export_params=True,
opset_version=14, # ONNX opset version
do_constant_folding=True, # Optimize constant folding
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'}, # Variable batch size
'output': {0: 'batch_size'}
},
verbose=False
)
print("ONNX model exported successfully!")
# Verify ONNX model
onnx_model = onnx.load("model.onnx")
onnx.checker.check_model(onnx_model)
print("ONNX model is valid!")
# Print model info
print(f"\nONNX Model Info:")
print(f"Inputs: {[(i.name, i.type) for i in onnx_model.graph.input]}")
print(f"Outputs: {[(o.name, o.type) for o in onnx_model.graph.output]}")
# Test ONNX model with ONNX Runtime
ort_session = ort.InferenceSession("model.onnx")
# Prepare input
test_input = np.random.randn(1, 3, 224, 224).astype(np.float32)
# Run inference
ort_inputs = {ort_session.get_inputs()[0].name: test_input}
ort_outputs = ort_session.run(None, ort_inputs)
print(f"\nONNX Runtime output shape: {ort_outputs[0].shape}")
# Compare PyTorch and ONNX outputs
with torch.no_grad():
pytorch_output = model(torch.from_numpy(test_input).to(device))
pytorch_output = pytorch_output.cpu().numpy()
difference = np.abs(pytorch_output - ort_outputs[0])
print(f"Max difference between PyTorch and ONNX: {np.max(difference)}")
print(f"Mean difference: {np.mean(difference)}")
# Optimize ONNX model
from onnxruntime.quantization import quantize_dynamic, QuantType
# Dynamic quantization (INT8)
quantize_dynamic(
"model.onnx",
"model_quantized.onnx",
weight_type=QuantType.QInt8
)
print("\nQuantized ONNX model created!")
# Compare model sizes
import os
original_size = os.path.getsize("model.onnx") / (1024 * 1024)
quantized_size = os.path.getsize("model_quantized.onnx") / (1024 * 1024)
print(f"Original model size: {original_size:.2f} MB")
print(f"Quantized model size: {quantized_size:.2f} MB")
print(f"Size reduction: {(1 - quantized_size/original_size) * 100:.1f}%")
1.15 TorchScript
┌──────────────────────────────┐
│ TorchScript Methods │
└──────────┬───────────────────┘
│
┌──────┴──────┐
│ │
↓ ↓
┌────────┐ ┌──────────┐
│ Tracing│ │ Scripting│
│ (Record │ │ (Analyze │
│ ops) │ │ source) │
└────────┘ └──────────┘
1.15.1 Tracing (Recommended for most models)
import torch
# Set model to evaluation mode
model.eval()
# Create example input
example_input = torch.rand(1, 3, 224, 224).to(device)
# Trace the model
traced_model = torch.jit.trace(model, example_input)
# Save traced model
traced_model.save("model_traced.pt")
print("Traced model saved!")
# Load traced model
loaded_model = torch.jit.load("model_traced.pt")
loaded_model.eval()
# Test traced model
with torch.no_grad():
test_input = torch.randn(1, 3, 224, 224).to(device)
output = loaded_model(test_input)
print(f"Output shape: {output.shape}")
# Optimize for mobile deployment
optimized_model = torch.jit.optimize_for_inference(traced_model)
optimized_model.save("model_optimized.pt")
1.15.2 Scripting (For models with control flow)
import torch
import torch.nn as nn
# Script entire model
class ScriptableModel(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(784, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(x.size(0), -1)
x = torch.relu(self.fc1(x))
# Control flow is preserved
if x.sum() > 0:
x = self.fc2(x)
else:
x = torch.zeros(x.size(0), 10)
return x
model = ScriptableModel()
scripted_model = torch.jit.script(model)
scripted_model.save("model_scripted.pt")
# Script individual functions
@torch.jit.script
def custom_function(x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
"""Custom function with type annotations"""
if x.sum() > y.sum():
return x + y
else:
return x - y
# Use scripted function
result = custom_function(torch.randn(3, 4), torch.randn(3, 4))
# Combine tracing and scripting
class HybridModel(nn.Module):
def __init__(self):
super().__init__()
self.conv = nn.Conv2d(3, 64, 3)
self.fc = nn.Linear(64, 10)
@torch.jit.script_method
def forward(self, x):
# This method will be scripted
x = torch.relu(self.conv(x))
x = x.mean(dim=[2, 3])
return self.fc(x)
# Mobile deployment
from torch.utils.mobile_optimizer import optimize_for_mobile
model.eval()
traced = torch.jit.trace(model, example_input)
optimized = optimize_for_mobile(traced)
optimized._save_for_lite_interpreter("model_mobile.ptl")
print("Mobile model saved!")
1.15.3 Performance Comparison
import time
import torch
# Benchmark function
def benchmark(model, input_tensor, num_runs=100):
model.eval()
with torch.no_grad():
# Warmup
for _ in range(10):
_ = model(input_tensor)
# Benchmark
start = time.time()
for _ in range(num_runs):
_ = model(input_tensor)
end = time.time()
avg_time = (end - start) / num_runs * 1000 # ms
return avg_time
# Compare models
input_tensor = torch.randn(1, 3, 224, 224).to(device)
original_time = benchmark(model, input_tensor)
traced_time = benchmark(traced_model, input_tensor)
print(f"Original model: {original_time:.3f} ms")
print(f"Traced model: {traced_time:.3f} ms")
print(f"Speedup: {original_time/traced_time:.2f}x")
1.16 Deployment
┌───────────────────────────────────┐
│ Deployment Pipeline │
└──────────────┬────────────────────┘
│
↓
┌───────────────────────────────────┐
│ 1. Train Model │
└──────────────┬────────────────────┘
│
↓
┌───────────────────────────────────┐
│ 2. Save Model (.pth/.onnx) │
└──────────────┬────────────────────┘
│
↓
┌───────────────────────────────────┐
│ 3. Create API (Flask/FastAPI) │
└──────────────┬────────────────────┘
│
↓
┌───────────────────────────────────┐
│ 4. Containerize (Docker) │
└──────────────┬────────────────────┘
│
↓
┌───────────────────────────────────┐
│ 5. Deploy (Cloud/On-Premise) │
└───────────────────────────────────┘
1.16.1 Serving with Flask
from flask import Flask, request, jsonify
import torch
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import io
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__)
# Load model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SimpleNet(784, 128, 10)
model.load_state_dict(torch.load('model_weights.pth', map_location=device))
model.to(device)
model.eval()
logger.info(f"Model loaded successfully on {device}")
# Define image transformations
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def transform_image(image_bytes):
"""Transform image bytes to tensor"""
try:
image = Image.open(io.BytesIO(image_bytes)).convert('RGB')
return transform(image).unsqueeze(0)
except Exception as e:
logger.error(f"Error transforming image: {str(e)}")
raise
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({'status': 'healthy', 'device': str(device)})
@app.route('/predict', methods=['POST'])
def predict():
"""Prediction endpoint"""
if 'image' not in request.files:
return jsonify({'error': 'No image provided'}), 400
try:
# Read and transform image
image_bytes = request.files['image'].read()
img_tensor = transform_image(image_bytes)
img_tensor = img_tensor.to(device)
# Make prediction
with torch.no_grad():
outputs = model(img_tensor)
probabilities = F.softmax(outputs, dim=1)
confidence, predicted_class = torch.max(probabilities, 1)
# Get top 3 predictions
top3_probs, top3_classes = torch.topk(probabilities, 3, dim=1)
# Prepare response
response = {
'predicted_class': predicted_class.item(),
'confidence': confidence.item(),
'top3_predictions': [
{
'class': top3_classes[0][i].item(),
'probability': top3_probs[0][i].item()
}
for i in range(3)
]
}
logger.info(f"Prediction: {response}")
return jsonify(response)
except Exception as e:
logger.error(f"Error during prediction: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/batch_predict', methods=['POST'])
def batch_predict():
"""Batch prediction endpoint"""
if 'images' not in request.files:
return jsonify({'error': 'No images provided'}), 400
try:
images = request.files.getlist('images')
batch_tensors = []
for img in images:
img_bytes = img.read()
img_tensor = transform_image(img_bytes)
batch_tensors.append(img_tensor)
# Stack tensors into batch
batch = torch.cat(batch_tensors, dim=0).to(device)
# Make predictions
with torch.no_grad():
outputs = model(batch)
probabilities = F.softmax(outputs, dim=1)
confidences, predictions = torch.max(probabilities, 1)
# Prepare response
results = [
{
'index': i,
'predicted_class': predictions[i].item(),
'confidence': confidences[i].item()
}
for i in range(len(predictions))
]
return jsonify({'predictions': results})
except Exception as e:
logger.error(f"Error during batch prediction: {str(e)}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
1.16.2 Test API
import requests
# Test single prediction
with open('test_image.jpg', 'rb') as f:
response = requests.post(
'http://localhost:5000/predict',
files={'image': f}
)
print(response.json())
# Test batch prediction
files = [
('images', open('image1.jpg', 'rb')),
('images', open('image2.jpg', 'rb')),
('images', open('image3.jpg', 'rb'))
]
response = requests.post(
'http://localhost:5000/batch_predict',
files=files
)
print(response.json())
1.16.3 Docker Deployment
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy model and application
COPY model_weights.pth .
COPY app.py .
COPY model.py .
# Expose port
EXPOSE 5000
# Set environment variables
ENV FLASK_APP=app.py
ENV MODEL_PATH=model_weights.pth
# Run application
CMD ["python", "app.py"]
# docker-compose.yml
version: '3.8'
services:
pytorch-api:
build: .
ports:
- "5000:5000"
environment:
- MODEL_PATH=/app/model_weights.pth
volumes:
- ./models:/app/models
deploy:
resources:
limits:
cpus: '2'
memory: 4G
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
interval: 30s
timeout: 10s
retries: 3
# Build and run Docker container
docker build -t pytorch-api:latest .
docker run -p 5000:5000 pytorch-api:latest
# Or use docker-compose
docker-compose up -d
# Scale service
docker-compose up -d --scale pytorch-api=3
1.16.4 Serving with TorchServe
# Install TorchServe
pip install torchserve torch-model-archiver torch-workflow-archiver
# Create handler.py
cat > handler.py << 'EOF'
import torch
import torch.nn.functional as F
from torchvision import transforms
from ts.torch_handler.base_handler import BaseHandler
from PIL import Image
import io
class ImageClassifier(BaseHandler):
def __init__(self):
super(ImageClassifier, self).__init__()
self.transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def preprocess(self, data):
images = []
for row in data:
image = row.get("data") or row.get("body")
if isinstance(image, str):
image = base64.b64decode(image)
image = Image.open(io.BytesIO(image))
image = self.transform(image)
images.append(image)
return torch.stack(images).to(self.device)
def postprocess(self, inference_output):
probabilities = F.softmax(inference_output, dim=1)
confidences, predictions = torch.max(probabilities, 1)
return [
{
"class": pred.item(),
"confidence": conf.item()
}
for pred, conf in zip(predictions, confidences)
]
EOF
# Create model archive
torch-model-archiver \
--model-name image_classifier \
--version 1.0 \
--model-file model.py \
--serialized-file model_weights.pth \
--handler handler.py \
--extra-files index_to_name.json
# Create model store directory
mkdir -p model_store
mv image_classifier.mar model_store/
# Start TorchServe
torchserve --start \
--model-store model_store \
--models image_classifier=image_classifier.mar \
--ncs
# Test inference
curl -X POST http://localhost:8080/predictions/image_classifier \
-T test_image.jpg
# Management API
curl http://localhost:8081/models
# Stop TorchServe
torchserve --stop
1.17 Distributed Training
1.17.1 DataParallel
model = nn.DataParallel(model)
1.17.2 DistributedDataParallel (DDP)
import torch.distributed as dist
import torch.multiprocessing as mp
import os
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def train(rank, world_size):
setup(rank, world_size)
model = Net().to(rank)
ddp_model = nn.parallel.DistributedDataParallel(model, device_ids=[rank])
# Training loop
cleanup()
if __name__ == "__main__":
world_size = torch.cuda.device_count()
mp.spawn(train,
args=(world_size,),
nprocs=world_size,
join=True)
1.17.3 Gradient Clipping
┌────────────────────────────┐
│ Gradient Clipping Flow │
└───────────┬────────────────┘
│
↓
┌────────────────────────────┐
│ Compute Gradients │
│ loss.backward() │
└───────────┬────────────────┘
│
↓
┌────────────────────────────┐
│ Check Gradient Norm │
└───────────┬────────────────┘
│
┌───────┴───────┐
│ │
↓ >max_norm ↓ <=max_norm
┌─────────┐ ┌─────────┐
│ Scale │ │ Use │
│Gradients│ │ As-Is │
└────┬────┘ └────┬────┘
└────────┬───────┘
│
↓
┌────────────────────────────┐
│ optimizer.step() │
└────────────────────────────┘
import torch
import torch.nn as nn
# Gradient clipping by norm (most common)
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
# Clip gradients to prevent exploding gradients
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
# Gradient clipping by value
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
# Clip each gradient to [-clip_value, clip_value]
torch.nn.utils.clip_grad_value_(model.parameters(), clip_value=0.5)
optimizer.step()
# Monitor gradient norms
def get_gradient_norm(model):
"""Calculate total gradient norm"""
total_norm = 0.0
for p in model.parameters():
if p.grad is not None:
param_norm = p.grad.data.norm(2)
total_norm += param_norm.item() ** 2
total_norm = total_norm ** 0.5
return total_norm
# Usage in training loop
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
# Monitor gradients
grad_norm = get_gradient_norm(model)
print(f"Gradient norm: {grad_norm:.4f}")
# Clip if needed
if grad_norm > 5.0:
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
optimizer.step()
1.17.4 Weight Decay
Weight decay (L2 regularization) is often included directly in the optimizer:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
1.17.5 Early Stopping
import torch
import numpy as np
class EarlyStopping:
"""Early stopping to stop training when validation loss doesn't improve"""
def __init__(self, patience=7, min_delta=0.0, verbose=True, path='best_model.pth'):
"""
Args:
patience: How many epochs to wait after last improvement
min_delta: Minimum change to qualify as improvement
verbose: Print messages
path: Path to save best model
"""
self.patience = patience
self.min_delta = min_delta
self.verbose = verbose
self.path = path
self.counter = 0
self.best_loss = None
self.early_stop = False
self.val_loss_min = np.Inf
def __call__(self, val_loss, model):
if self.best_loss is None:
self.best_loss = val_loss
self.save_checkpoint(val_loss, model)
elif val_loss > self.best_loss - self.min_delta:
self.counter += 1
if self.verbose:
print(f'EarlyStopping counter: {self.counter}/{self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_loss = val_loss
self.save_checkpoint(val_loss, model)
self.counter = 0
def save_checkpoint(self, val_loss, model):
"""Save model when validation loss decreases"""
if self.verbose:
print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model...')
torch.save(model.state_dict(), self.path)
self.val_loss_min = val_loss
# Usage in training loop
early_stopping = EarlyStopping(patience=10, verbose=True)
for epoch in range(num_epochs):
# Training phase
model.train()
train_loss = 0.0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation phase
model.eval()
val_loss = 0.0
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item()
avg_train_loss = train_loss / len(train_loader)
avg_val_loss = val_loss / len(val_loader)
print(f'Epoch [{epoch+1}/{num_epochs}] Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')
# Check early stopping
early_stopping(avg_val_loss, model)
if early_stopping.early_stop:
print("Early stopping triggered!")
break
# Load best model
model.load_state_dict(torch.load('best_model.pth'))
1.17.6 Learning Rate Finders
# Requires a separate library like `torch_lr_finder`
from torch_lr_finder import LRFinder
optimizer = torch.optim.Adam(model.parameters(), lr=1e-7, weight_decay=0.01)
lr_finder = LRFinder(model, optimizer, criterion, device="cuda")
lr_finder.range_test(trainloader, end_lr=1, num_iter=100)
lr_finder.plot()
lr_finder.reset()
1.17.7 Gradient Accumulation
Gradient accumulation allows you to simulate larger batch sizes when you are limited by GPU memory. It works by accumulating gradients over multiple smaller batches before performing the optimization step.
Effective Batch Size = batch_size × accumulation_steps
┌───────────────────────────────────────┐
│ Gradient Accumulation Process │
└─────────────────┬─────────────────────┘
│
┌─────────────┴─────────────┐
│ optimizer.zero_grad() │
└─────────────┬──────────────┘
│
↓
┌────────────────────────────┐
│ Batch 1: Forward + Back │
│ (gradients accumulate) │
└─────────────┬───────────────┘
│
↓
┌────────────────────────────┐
│ Batch 2: Forward + Back │
│ (gradients accumulate) │
└─────────────┬───────────────┘
│
↓
┌────────────────────────────┐
│ Batch N: Forward + Back │
│ (gradients accumulate) │
└─────────────┬───────────────┘
│
↓
┌────────────────────────────┐
│ optimizer.step() │
│ (update weights) │
└────────────────────────────┘
import torch
# Configuration
accumulation_steps = 4 # Simulate 4x larger batch size
# Training loop with gradient accumulation
model.train()
optimizer.zero_grad()
for i, (inputs, labels) in enumerate(train_loader):
inputs, labels = inputs.to(device), labels.to(device)
# Forward pass
outputs = model(inputs)
loss = criterion(outputs, labels)
# Normalize loss (important!)
loss = loss / accumulation_steps
# Backward pass (gradients accumulate)
loss.backward()
# Update weights every accumulation_steps
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
print(f"Step {(i+1)//accumulation_steps}: Loss = {loss.item() * accumulation_steps:.4f}")
# Handle remaining batches if dataset size not divisible by accumulation_steps
if (i + 1) % accumulation_steps != 0:
optimizer.step()
optimizer.zero_grad()
# Example: Compare memory usage
# Without accumulation: batch_size = 64 → GPU memory = X GB
# With accumulation: batch_size = 16, accumulation_steps = 4 → GPU memory = X/4 GB
# But effective batch size is still 64
1.18 Common Neural Network Architectures
1.18.1 Convolutional Neural Network (CNN)
┌────────────────────────────────────┐
│ CNN Architecture Flow │
└────────────────┬───────────────────┘
│
Input Image ↓
(3, 224, 224) │
│
┌────────────────┴──────────────────┐
│ Conv2d + BatchNorm + ReLU │
│ (64 filters) │
└────────────────┬──────────────────┘
│
↓
┌──────────────────────────────────┐
│ MaxPool2d (2x2) │
└────────────────┬─────────────────┘
│
↓ (Repeat blocks)
┌──────────────────────────────────┐
│ Flatten │
└────────────────┬─────────────────┘
│
↓
┌──────────────────────────────────┐
│ Fully Connected Layers │
└────────────────┬─────────────────┘
│
↓
┌──────────────────────────────────┐
│ Output (num_classes) │
└──────────────────────────────────┘
import torch
import torch.nn as nn
import torch.nn.functional as F
class SimpleCNN(nn.Module):
def __init__(self, num_classes=10):
super(SimpleCNN, self).__init__()
# Convolutional layers
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
self.bn1 = nn.BatchNorm2d(64)
self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
self.bn2 = nn.BatchNorm2d(128)
self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
self.bn3 = nn.BatchNorm2d(256)
# Pooling
self.pool = nn.MaxPool2d(2, 2)
# Fully connected layers
self.fc1 = nn.Linear(256 * 28 * 28, 512)
self.fc2 = nn.Linear(512, num_classes)
# Dropout
self.dropout = nn.Dropout(0.5)
def forward(self, x):
# Block 1
x = self.pool(F.relu(self.bn1(self.conv1(x))))
# Block 2
x = self.pool(F.relu(self.bn2(self.conv2(x))))
# Block 3
x = self.pool(F.relu(self.bn3(self.conv3(x))))
# Flatten
x = x.view(x.size(0), -1)
# Fully connected
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
# Usage
model = SimpleCNN(num_classes=10)
print(model)
# Count parameters
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")
1.18.2 ResNet-style Residual Block
class ResidualBlock(nn.Module):
"""Residual block with skip connection"""
def __init__(self, in_channels, out_channels, stride=1):
super(ResidualBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
# Skip connection
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1,
stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
identity = x
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
# Add skip connection
out += self.shortcut(identity)
out = F.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, num_classes=10):
super(ResNet, self).__init__()
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
# Residual layers
self.layer1 = self._make_layer(64, 64, 2)
self.layer2 = self._make_layer(64, 128, 2, stride=2)
self.layer3 = self._make_layer(128, 256, 2, stride=2)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(256, num_classes)
def _make_layer(self, in_channels, out_channels, num_blocks, stride=1):
layers = []
layers.append(ResidualBlock(in_channels, out_channels, stride))
for _ in range(1, num_blocks):
layers.append(ResidualBlock(out_channels, out_channels))
return nn.Sequential(*layers)
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
1.18.3 Recurrent Neural Network (LSTM)
class LSTMClassifier(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_classes, num_layers=2):
super(LSTMClassifier, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(
embedding_dim,
hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=0.5,
bidirectional=True
)
# *2 for bidirectional
self.fc = nn.Linear(hidden_dim * 2, num_classes)
self.dropout = nn.Dropout(0.5)
def forward(self, x):
# x shape: (batch_size, seq_length)
embedded = self.embedding(x) # (batch_size, seq_length, embedding_dim)
# LSTM output
lstm_out, (hidden, cell) = self.lstm(embedded)
# lstm_out: (batch_size, seq_length, hidden_dim * 2)
# Use last output or pooling
output = lstm_out[:, -1, :] # Last time step
# Or use mean pooling: output = torch.mean(lstm_out, dim=1)
output = self.dropout(output)
output = self.fc(output)
return output
# Usage
model = LSTMClassifier(
vocab_size=10000,
embedding_dim=300,
hidden_dim=256,
num_classes=5
)
1.18.4 Transformer Encoder
class TransformerClassifier(nn.Module):
def __init__(self, vocab_size, d_model, nhead, num_layers, num_classes):
super(TransformerClassifier, self).__init__()
self.embedding = nn.Embedding(vocab_size, d_model)
self.pos_encoder = PositionalEncoding(d_model)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=d_model * 4,
dropout=0.1
)
self.transformer_encoder = nn.TransformerEncoder(
encoder_layer,
num_layers=num_layers
)
self.fc = nn.Linear(d_model, num_classes)
def forward(self, src, src_mask=None):
# src: (batch_size, seq_length)
src = self.embedding(src) * math.sqrt(self.d_model)
src = self.pos_encoder(src)
# Transformer expects (seq_length, batch_size, d_model)
src = src.transpose(0, 1)
output = self.transformer_encoder(src, src_mask)
# Use CLS token or mean pooling
output = output.mean(dim=0) # Average over sequence
output = self.fc(output)
return output
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:, :x.size(1), :]
1.18.5 Autoencoder
class Autoencoder(nn.Module):
def __init__(self, input_dim, encoding_dim):
super(Autoencoder, self).__init__()
# Encoder
self.encoder = nn.Sequential(
nn.Linear(input_dim, 512),
nn.ReLU(),
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, encoding_dim),
nn.ReLU()
)
# Decoder
self.decoder = nn.Sequential(
nn.Linear(encoding_dim, 256),
nn.ReLU(),
nn.Linear(256, 512),
nn.ReLU(),
nn.Linear(512, input_dim),
nn.Sigmoid() # For normalized inputs [0, 1]
)
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
def encode(self, x):
return self.encoder(x)
def decode(self, z):
return self.decoder(z)
1.19 Common Issues and Debugging
┌─────────────────────────────────────┐
│ Debugging Workflow │
└────────────────┬─────────────────────┘
│
↓
┌─────────────────────────────────────┐
│ 1. Check Tensor Shapes/Devices │
└────────────────┬─────────────────────┘
│
↓
┌─────────────────────────────────────┐
│ 2. Verify Gradients Flow │
└────────────────┬─────────────────────┘
│
↓
┌─────────────────────────────────────┐
│ 3. Monitor Loss/Metrics │
└────────────────┬─────────────────────┘
│
↓
┌─────────────────────────────────────┐
│ 4. Profile Performance │
└─────────────────────────────────────┘
1.19.1 Debugging Tools
import torch
import torch.nn as nn
# 1. Register hooks to monitor gradients
def print_grad_hook(name):
def hook(grad):
print(f"{name} gradient: {grad.norm():.4f}")
return hook
# Register hooks
for name, param in model.named_parameters():
if param.requires_grad:
param.register_hook(print_grad_hook(name))
# 2. Check for NaN/Inf values
def check_nan_inf(tensor, name="tensor"):
if torch.isnan(tensor).any():
print(f"NaN detected in {name}")
if torch.isinf(tensor).any():
print(f"Inf detected in {name}")
# 3. Print model summary
def print_model_summary(model, input_size):
from torchsummary import summary
summary(model, input_size)
# Usage
print_model_summary(model, (3, 224, 224))
# 4. Visualize gradient flow
def plot_grad_flow(named_parameters):
import matplotlib.pyplot as plt
ave_grads = []
max_grads = []
layers = []
for n, p in named_parameters:
if p.requires_grad and p.grad is not None:
layers.append(n)
ave_grads.append(p.grad.abs().mean().cpu())
max_grads.append(p.grad.abs().max().cpu())
plt.bar(range(len(max_grads)), max_grads, alpha=0.5, label="max")
plt.bar(range(len(ave_grads)), ave_grads, alpha=0.5, label="mean")
plt.xticks(range(len(ave_grads)), layers, rotation="vertical")
plt.legend()
plt.show()
# 5. Memory profiling
def profile_memory():
if torch.cuda.is_available():
print(f"Allocated: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
print(f"Cached: {torch.cuda.memory_reserved() / 1024**3:.2f} GB")
# 6. Torch profiler
from torch.profiler import profile, ProfilerActivity
with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof:
for _ in range(10):
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=10))
1.19.2 Common Issues and Solutions
-
CUDA Out of Memory:
- Reduce batch size
- Use mixed precision training (
torch.cuda.amp) - Use gradient checkpointing
- Clear cache:
torch.cuda.empty_cache() - Use smaller model or fewer layers
-
NaN/Inf Losses:
- Reduce learning rate
- Use gradient clipping:
torch.nn.utils.clip_grad_norm_() - Check for division by zero
- Normalize input data
- Use stable loss functions (e.g.,
BCEWithLogitsLossinstead ofBCELoss)
-
Slow Training:
- Profile code to find bottlenecks
- Use GPU acceleration
- Increase
num_workersin DataLoader - Use
pin_memory=Truein DataLoader - Enable
torch.backends.cudnn.benchmark = Truefor fixed input sizes - Use mixed precision training
-
Overfitting:
- Add dropout layers
- Use data augmentation
- Implement early stopping
- Reduce model complexity
- Add L2 regularization (weight decay)
- Increase training data
-
Underfitting:
- Increase model capacity (more layers/neurons)
- Train for more epochs
- Reduce regularization
- Check if data preprocessing is correct
- Use better optimizer (Adam instead of SGD)
-
Incorrect Tensor Shapes:
# Debug tensor shapes print(f"Input shape: {x.shape}") print(f"Expected shape: (batch, channels, height, width)") # Use assertions assert x.shape[1] == 3, f"Expected 3 channels, got {x.shape[1]}" -
Device Mismatch:
# Check device print(f"Model device: {next(model.parameters()).device}") print(f"Input device: {inputs.device}") # Move everything to same device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) inputs = inputs.to(device) -
Gradients Not Flowing:
# Check requires_grad for name, param in model.named_parameters(): print(f"{name}: requires_grad={param.requires_grad}, grad={param.grad is not None}") # Check for detached tensors # Make sure not using .detach() or .data unintentionally -
Dead Neurons (ReLU):
- Use Leaky ReLU:
nn.LeakyReLU(0.01) - Use ELU:
nn.ELU() - Reduce learning rate
- Better weight initialization:
nn.init.kaiming_normal_()
- Use Leaky ReLU:
-
Data Loading Bottlenecks:
# Increase workers and use prefetching train_loader = DataLoader( dataset, batch_size=32, num_workers=4, pin_memory=True, prefetch_factor=2, persistent_workers=True )
1.20 Best Practices
1.20.1 Code Organization
# Organize code into modular components
# model.py
class MyModel(nn.Module):
def __init__(self, config):
super().__init__()
# Define layers
def forward(self, x):
# Forward pass
return x
# dataset.py
class MyDataset(Dataset):
def __init__(self, data_path):
# Load data
pass
def __getitem__(self, idx):
# Return sample
pass
# train.py
def train_epoch(model, loader, optimizer, criterion, device):
model.train()
total_loss = 0
for inputs, labels in loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(loader)
# config.py
class Config:
# Hyperparameters
BATCH_SIZE = 32
LEARNING_RATE = 0.001
NUM_EPOCHS = 100
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
1.20.2 Performance Optimization
# 1. Enable cuDNN autotuner for fixed input sizes
torch.backends.cudnn.benchmark = True
# 2. Use appropriate data types
# Use fp16 for training when possible
from torch.cuda.amp import autocast, GradScaler
# 3. Optimize DataLoader
train_loader = DataLoader(
dataset,
batch_size=32,
num_workers=4, # Multi-process data loading
pin_memory=True, # Fast data transfer to GPU
persistent_workers=True, # Keep workers alive
prefetch_factor=2 # Prefetch batches
)
# 4. Use in-place operations when possible
x.add_(1) # In-place
x.relu_() # In-place
# 5. Avoid unnecessary CPU-GPU transfers
# Keep data on GPU as much as possible
# 6. Use torch.no_grad() for inference
with torch.no_grad():
predictions = model(inputs)
# 7. Clear unused variables
del intermediate_tensor
torch.cuda.empty_cache()
1.20.3 Reproducibility
import torch
import numpy as np
import random
def set_seed(seed=42):
"""Set seed for reproducibility"""
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
# Make cudnn deterministic
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
set_seed(42)
1.20.4 Model Initialization
import torch.nn as nn
def init_weights(m):
"""Initialize model weights"""
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
# Apply initialization
model.apply(init_weights)
1.20.5 Logging and Monitoring
import logging
from pathlib import Path
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('training.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Log training progress
logger.info(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss:.4f}")
# Use TensorBoard
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter('runs/experiment1')
for epoch in range(num_epochs):
writer.add_scalar('Loss/train', train_loss, epoch)
writer.add_scalar('Accuracy/train', train_acc, epoch)
writer.close()
1.20.6 Model Deployment Checklist
# 1. Model validation
model.eval()
with torch.no_grad():
test_acc = evaluate(model, test_loader)
print(f"Test Accuracy: {test_acc:.2f}%")
# 2. Save model with metadata
torch.save({
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'epoch': epoch,
'test_accuracy': test_acc,
'config': config,
'pytorch_version': torch.__version__
}, 'model_final.pth')
# 3. Export to ONNX for production
torch.onnx.export(model, dummy_input, 'model.onnx')
# 4. Test exported model
ort_session = ort.InferenceSession('model.onnx')
# 5. Create API endpoint
# See deployment section above
# 6. Containerize with Docker
# See Docker section above
# 7. Set up monitoring and logging
# Track inference time, memory usage, error rates
# 8. Implement versioning
# Use model registry (MLflow, DVC, etc.)
1.20.7 Development Tips
- Virtual Environments: Use
condaorvenvto isolate dependencies - Code Style: Follow PEP 8, use
blackfor formatting - Version Control: Use Git, commit frequently with meaningful messages
- Testing: Write unit tests for data loading, model forward pass, etc.
- Documentation: Add docstrings to classes and functions
- GPU Memory: Monitor with
nvidia-smiortorch.cuda.memory_summary() - Hyperparameter Tuning: Use Optuna, Ray Tune, or Weights & Biases
- Model Compression: Quantization, pruning, knowledge distillation
- Regular Updates: Keep PyTorch and dependencies up to date
- Experiment Tracking: Use MLflow, Weights & Biases, or TensorBoard
1.21 Quick Reference
1.21.1 Essential Operations
# Tensor creation
x = torch.tensor([1, 2, 3])
x = torch.zeros(3, 4)
x = torch.randn(3, 4)
# Tensor operations
y = x.view(12) # Reshape
y = x.permute(1, 0) # Transpose
y = x.unsqueeze(0) # Add dimension
y = x.squeeze() # Remove dimensions of size 1
# Move to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
x = x.to(device)
# Gradient tracking
x = torch.tensor([1.0], requires_grad=True)
y = x ** 2
y.backward()
print(x.grad) # dy/dx
# No gradient context
with torch.no_grad():
y = x * 2
1.21.2 Training Template
# Setup
model = MyModel().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
# Training loop
for epoch in range(num_epochs):
model.train()
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# Validation
model.eval()
with torch.no_grad():
for inputs, labels in val_loader:
outputs = model(inputs)
# Calculate metrics
1.21.3 Common Layer Patterns
# Conv block
nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
nn.MaxPool2d(2)
)
# Dense block
nn.Sequential(
nn.Linear(in_features, out_features),
nn.ReLU(),
nn.Dropout(0.5)
)
# Residual connection
out = F.relu(self.conv(x))
out = out + x # Skip connection
1.21.4 Useful Commands
# Model info
print(model)
total_params = sum(p.numel() for p in model.parameters())
# Save/Load
torch.save(model.state_dict(), 'model.pth')
model.load_state_dict(torch.load('model.pth'))
# Learning rate
for param_group in optimizer.param_groups:
print(param_group['lr'])
param_group['lr'] = 0.0001 # Update LR
# Freeze layers
for param in model.parameters():
param.requires_grad = False
# GPU memory
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"Memory: {torch.cuda.memory_allocated()/1e9:.2f}GB")
1.21.5 Performance Tips
# Speed up training
torch.backends.cudnn.benchmark = True
# Mixed precision
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# DataLoader optimization
DataLoader(dataset, batch_size=32, num_workers=4,
pin_memory=True, persistent_workers=True)
Happy PyTorch Coding! 🔥