CMU – 10 – 714 Deep Learning System – lab 1

与 lab 0 相比,lab 01 显然就非常的“亲切友好”。在这次实验中,我们将着手实现 needle 库;具体而言,我们将基于 python 实现一个基础的自动微分框架,并基于实现框架重新实现 MNIST 中关于手写数字分类的相关问题。

讲义提示当前项目中包含两个重要文件:autograd.py 以及 ops_mathematic.py 前者实现了计算图框架的基础并构成了自动微分的框架,后者聚焦于自动微分中的相关运算符。

我们先来看autograd.py

整体而言,autograd.py 包括如下类

autograd.py
├── Op    # 运算符基类
│   ├── TensorOp        # 输出张量的运算符操作类
│   └── TensorTupleOp   # 输出张量元组的操作符运算类
└── Value
    ├── TensorTuple     # 表示张量元组
    └── Tensor          # 表示张量,主要的数值容器

现在考虑两个基类 – Op 和 Value

Op 类

Op 类是操作符的基类,定义了所有运算符的基本接口和行为模式。在自动微分系统中,操作符表示计算图中的节点,负责定义前向计算和反向梯度传播的规则,该类下的方法有:

def __call__(self, *args):
    raise NotImplementedError()

__call__ 是 python 自定义类的一个魔法方法,用于实现函数式编程接口,此处直接抛出异常,要求子类必须重写此方法以实现具体操作.

同样的包括:

def compute(self, *args: Tuple[NDArray]):
    """计算运算符的前向传递。"""
    raise NotImplementedError()

def gradient(self, out_grad: "Value", node: "Value") -> Union["Value", Tuple["Value"]]:
    """为给定的输出旁点计算每个输入值的部分旁点。"""
    raise NotImplementedError()

以及

def gradient_as_tuple(self, out_grad: "Value", node: "Value") -> Tuple["Value"]:
    """Convenience method to always return a tuple from gradient call"""

其中 compute 用于实现操作的前向计算,它接收 Numpy 数组作为输入,返回计算结果,过程中只进行数学运算。

gradient 则用于实现反向传播的梯度计算,接收输出梯度out_grad 和当前节点 node,返回对每个输入节点的梯度。这也是自动微分的核心方法

gradient_as_tuple 确保梯度总是以元组形式返回

Value 类

Value 类则是计算图中所有值的基类,表示计算图中的一个节点。它封装了数据、操作和计算状态,是自动微分系统的核心数据结构。

属性

op: Optional[Op]  # 生成此值的操作
inputs: List["Value"]  # 输入值列表
cached_data: NDArray  # 缓存的计算结果
requires_grad: bool   # 是否需要计算梯度
  • op:指向生成此值的操作符,如果为 None 表示是常量(叶子节点)
  • inputs:此值的输入节点列表,构建了计算图的依赖关系
  • cached_data:缓存前向计算的结果,避免重复计算
  • requires_grad:标记此值是否需要参与梯度计算

核心方法

def realize_cached_data(self):
    """
    运行计算以实现缓存数据
    """
    # avoid recomputation
    if self.cached_data is not None:
        return self.cached_data
    # note: data implicitly calls realized cached data
    self.cached_data = self.op.compute(
        *[x.realize_cached_data() for x in self.inputs]
    )
    return self.cached_data

realize_cached_data 用于获取缓存数据,如果缓存不存在,则递归调用输入节点的方法然后运行相关计算,此处是惰性求值

def is_leaf(self):
    return self.op is None

此方法用于判断当前是否为叶子节点

剩下的方法包括:

  • _init :初始化方法,更新全局张量计数器并依据输入决定是否处理梯度
  • make_const:创建常量值
  • make_from_op:通过操作创建值
  • __del__:析构函数,用于实时跟踪并回收系统中的张量

我们着重看一下 make_from_op 这个主要方法:

@classmethod
def make_from_op(cls, op: Op, inputs: List["Value"]):
    # 创建新实例而不调用__init__
    value = cls.__new__(cls)
    
    # 使用_init方法初始化实例
    value._init(op, inputs)
    
    # 处理非懒加载模式
    if not LAZY_MODE:
        # 如果不需要梯度,返回分离的副本
        if not value.requires_grad:
            return value.detach()
        # 否则立即计算缓存数据
        value.realize_cached_data()
    
    return value

注意到代码在初始化实例时没有采用可能被重写的的 __init__ ,而是调用了更基础的 __new__ 来确保所有子类进行相同的初始化。此外方法会根据 LAZY_MODE 标志位判断当前是否立即进行缓存数据的计算。

Tensor 类

Tensor 类作为 Value 的子类,在我们接下来要实现的自动微分系统中有着至关重要的作用,它对基类做出了大量扩展,使其成为实际可用的张量模型。

新增属性

  • grad :用于储存当前张量的梯度值,类型为 Tensor

构造函数

def __init__(
    self,
    array,  # 输入数据,可以是数组或另一个Tensor
    *,
    device: Optional[Device] = None,  # 设备信息
    dtype=None,  # 数据类型
    requires_grad=True,  # 是否需要梯度
    **kwargs  # 其他参数
):
    if isinstance(array, Tensor):
        if device is None:
            device = array.device
        if dtype is None:
            dtype = array.dtype
        if device == array.device and dtype == array.dtype:
            cached_data = array.realize_cached_data()
        else:
            # fall back, copy through numpy conversion
            cached_data = Tensor._array_from_numpy(
                array.numpy(), device=device, dtype=dtype
            )
    else:
        device = device if device else cpu()
        cached_data = Tensor._array_from_numpy(array, device=device, dtype=dtype)

    self._init(
        None,
        [],
        cached_data=cached_data,
        requires_grad=requires_grad,
    )

__init__ 的实现逻辑包括对输入进行类型判断、数据重用及转换,处理失败则尝试从 Numpy Array 创建数据,并在最后调用 _init 进行初始化

静态方法

静态方法下包括:

  • _array_from_numpy:将 Numpy 数组转化为指定设备和数据类型的数组
  • make_from_op:将返回类型进一步限定为 Tensor
  • make_const 同理

属性访问相关

这部分由 @property 修饰

  • data : 返回脱离计算图的张量副本并允许替换(setter)
  • shape : 返回张量形状
  • dtype : 返回数据类型
  • device : 返回当前设备

核心方法

def detach(self):
    """Create a new tensor that shares the data but detaches from the graph."""
    return Tensor.make_const(self.realize_cached_data())

detach 用于创建当前张量的独立副本,同样可用于阻止梯度传播到我们不愿意看见的部分

def backward(self, out_grad=None):
    out_grad = (
        out_grad
        if out_grad
        else init.ones(*self.shape, dtype=self.dtype, device=self.device)
    )
    compute_gradient_of_variables(self, out_grad)

顾名思义, backward 用于启动反向传播过程,但当梯度数值缺失时使用全 1 张量.

numpy() 用于转换,不多赘述

运算符重载

Tensor 类重载了大量运算符,使其行为类似于数学中的张量:

def __add__(self, other):  # +
def __mul__(self, other):  # *
def __pow__(self, other):  # **
def __sub__(self, other):  # -
def __truediv__(self, other):  # /
def __matmul__(self, other):  # @
def __neg__(self):  # - (一元)
# 以及
__radd__ = __add__  # 反向加法
__rmul__ = __mul__  # 反向乘法

所有运算符都通过调用 needle.ops 中的相应操作来实现:

  1. 检查操作数类型(Tensor 或标量)
  2. 选择适当的操作(元素级或标量操作)
  3. 返回新张量

张量操作方法

def sum(self, axes=None):
def broadcast_to(self, shape):
def reshape(self, shape):
def transpose(self, axes=None):

这些方法是对相应操作的封装,提高代码可读性

字符串表示

最后,Tensor 类提供了__repr__ 和 __str__ 方法,用于以字符串形式显示张量的数据内容。

总体来看,Tensor 的接口是高度统一的,且包含丰富的类型检查和校正,独立于设备,提供了丰富的微分支持。

问题 1:实现前向计算 && 问题 2:实现前向传播

此时我们的目光转向 ops_mathematic.py 这一文件是对新运算符的具体实现,讲义提供了 EWiseAdd 作为参考:

class EWiseAdd(TensorOp):
    def compute(self, a: NDArray, b: NDArray):
        return a + b

    def gradient(self, out_grad: Tensor, node: Tensor):
        return out_grad, out_grad

def add(a, b):
    return EWiseAdd()(a, b)

方法的意图十分明显:

  • compute 用于执行当前运算符
  • gradient 用于计算当前运算符下的前向传播(即求偏导),其中 out_grad 索引对应自变量

Compute

所以对于剩下的方法,调用 array_api 中的相关方法即可

注意一下名称中带 Scalar 的的方法,此处参数只有一个参数,此时操作符的另一个对象要调用 self.scalar 进行操作。

此外注意一下 Transpose ,它的默认行为是转置最后两个轴,讲义当中有写但我开始没看到…

给出部分参考实现:

   // Transpose 
   def compute(self, a):
        if self.axes is None:
            """ 
            [注]:这块和numpy的transpose有些区别,numpy的transpose默认是默认是转置所有维度,而这里转置最后两个维度 
            """
            # 批量矩阵转置:交换最后两个维度
            return array_api.swapaxes(a, -1, -2)
        elif len(self.axes) == 2 and not all(isinstance(x,int) for x in self.axes):
            raise ValueError("axes 元素必须是整数")
        elif len(self.axes) == 2:
            # 只交换两个指定的轴
            return array_api.swapaxes(a, self.axes[0], self.axes[1])
        elif len(self.axes) == a.ndim:
            # 完整置换
            return array_api.transpose(a, axes=self.axes)
        else:
            raise ValueError("axes 参数不合法")
    // 其他目前没啥好写的,调 api 就成

Gradient

Compute 中我们已经实现了当前运算,即前向传播,但为了实现自动微分,我们还需要能够计算反向传播,即将函数的相关导数与传入的反向梯度相乘。

实现思路其实很简单,将除了当前所求量外的所有量视为标量,也就是偏导,然后匹配维度,具体而言,我们想要达到:

$$\begin{equation}
\frac{\partial \ell}{\partial x} = \frac{\partial \ell}{\partial f(x,y)} \frac{\partial f(x,y)}{\partial x}.
\end{equation}$$

的结果,因为”传入的反向梯度”正是项 $\frac{\partial \ell}{\partial f(x,y)}$,因此我们希望我们的 gradient() 函数最终计算这个反向梯度与函数自身导数 $\frac{\partial f(x,y)}{\partial x}$ 的乘积。

参考乘法的梯度计算,我们不难发现:node.inputs 中依序储存了当前变量,所以整体的实现就是 求偏导,写代码的过程。

不过有一个需要注意的点,此处我们操作的是 needle 对象,所以不能调用 numpy 包进行操作而是通过 array_api 进行间接调用(因为后面的hw中我们将编写自己的 numpy 包)。

给出部分参考实现:

class Reshape(TensorOp):
    def __init__(self, shape):
        self.shape = shape

    def compute(self, a):
        return array_api.reshape(a, self.shape)

    def gradient(self, out_grad, node):
        """
        对于reshape(a, shape),a的梯度是reshape(out_grad, a.shape)
        """
        return (reshape(out_grad, node.inputs[0].shape),)

# ...

class BroadcastTo(TensorOp):
    def __init__(self, shape):
        self.shape = shape

    def compute(self, a):
        return array_api.broadcast_to(a, self.shape)

    def gradient(self, out_grad, node):
        """
        此处相当于是 broadcast_to(a, shape),a的梯度是out_grad
        注意:
        - 返回必须是张量元组形式
        - 考虑输入张量形状与目标形状不一致的情况,返回 (0,)
        """
        input_shape = node.inputs[0].shape
        reduce_axes = tuple(i for i in range(len(self.shape))
                            if i >= len(input_shape) or input_shape[i] == 1)
        grad = summation(out_grad, axes=reduce_axes)
        grad = reshape(grad, input_shape)
        return (grad,)

# ...

class Summation(TensorOp):
    def __init__(self, axes: Optional[tuple] = None):
        self.axes = axes

    def compute(self, a):
        return array_api.sum(a, axis=self.axes)

    def gradient(self, out_grad, node):
        """
        对于summation(a),a的梯度是out_grad
        """
        input_shape = node.inputs[0].shape
        if self.axes is None:
            axes = range(len(input_shape))
        elif isinstance(self.axes, int):
            axes = (self.axes,)
        else:
            axes = self.axes
        # 添加维度
        grad = out_grad
        grad_shape = list(grad.shape)
        for ax in sorted(axes):
            grad_shape.insert(ax, 1)
        grad = reshape(grad, tuple(grad_shape))
        grad = broadcast_to(grad, input_shape)
        return (grad,)

# ...

class MatMul(TensorOp):
    def compute(self, a, b):
        return array_api.matmul(a, b)

    def gradient(self, out_grad, node):
        """
        本质上是双输入运算符,需要分别返回每个输入张量的梯度
        对于matmul(a, b),a的梯度是matmul(out_grad, b.T),b的梯度是matmul(a.T, out_grad)
        """
        lhs, rhs = node.inputs
        # return (matmul(out_grad, transpose(rhs, axes=(-1, -2))),
        #         matmul(transpose(lhs, axes=(-1, -2)), out_grad))
        lhs_T = transpose(lhs, axes=(-1, -2))
        rhs_T = transpose(rhs, axes=(-1, -2))
        grad_lhs = matmul(out_grad, rhs_T)
        grad_rhs = matmul(lhs_T, out_grad)
        # 处理广播还原成 lhs/rhs 的原始形状
        while len(grad_lhs.shape) > len(lhs.shape):
            grad_lhs = summation(grad_lhs, axes=0)
        while len(grad_rhs.shape) > len(rhs.shape):
            grad_rhs = summation(grad_rhs, axes=0)
        return grad_lhs, grad_rhs

问题3:拓扑排序

在问题 1,2 中我们已经实现了计算图的基本构件,但图的节点之间的计算顺序我们并未进行考虑,计算图中的节点之间存在严格的依赖关系 – 每个节点的计算依赖于其输入节点的值。而拓扑排序能够确保节点按照正确的依赖关系进行处理:

  • 前向传播:按照拓扑顺序计算每个节点的值
  • 反向传播:按照拓扑排序的逆序计算梯度

讲义在这块提示我们需要使用后序遍历,这样能够保证父节点的位置一定在子节点后,所以实现一个 dfs 进行深搜即可

我靠 python 写 dfs 是真方便

def find_topo_sort(node_list: List[Value]) -> List[Value]:
    """
    给定节点列表,返回以这些节点结尾的拓扑排序列表。

    简单的算法是对给定的节点进行后序 DFS 遍历、
    根据输入的边倒序遍历。由于一个节点是在其所有前节点被遍历后才被添加到排序中的
    后,我们会得到一个拓扑排序。
    排序。
    """
    # topo_sort_dfs 所用参数
    visited = set()
    topo_order = []
    # 遍历所有节点
    for node in node_list:
        topo_sort_dfs(node, visited, topo_order)
    return topo_order


def topo_sort_dfs(node, visited, topo_order):
    """Post-order DFS"""
    # 判断有没有访问过这个节点
    if node in visited:
        return
    visited.add(node)

    # 遍历所有输出节点
    for input_node in node.inputs:
        topo_sort_dfs(input_node, visited, topo_order)

    # 添加节点到拓扑排序
    topo_order.append(node)

问题4:实现反向模式微分

那么在拓扑排序的基础上,我们将利用它来实现反向传播的自动微分。讲义贴心的给出了具体步骤:

1. 拓扑排序:首先获取计算图中节点的拓扑排序

2. 反向遍历:按照拓扑排序的逆序处理节点

3. 梯度累积:对于每个节点,计算其对输出的梯度贡献

4. 链式法则:应用链式法则将梯度传播到输入节点

所以给出伪代码实现框架:

def compute_gradient_of_variables(output_tensor, out_grad):
    # 1. 获取拓扑排序
    topo_order = find_topo_sort()
  
    # 2. 初始化梯度字典
    grad_dict = {}
    grad_dict[output_tensor] = out_grad
  
    # 3. 反向遍历拓扑排序
    for node in reversed(topo_order):
        # 获取当前节点的梯度
        current_grad = grad_dict[node]
      
        # 4. 调用节点的gradient方法计算输入梯度
        input_grads = node.op.gradient()
      
        # 5. 将梯度累积到输入节点
        for input_tensor, input_grad in (node.inputs, input_grads):
            if input_tensor in grad_dict:
                grad_dict[input_tensor] += input_grad
            else:
                grad_dict[input_tensor] = input_grad
  
    # 返回所有需要梯度的变量的梯度
    return grad_dict

问题 5 :Softmax 损失

在 hw0 的相关问题中,我们已经基于 Numpy 实现了 Softmax loss ,当时的实现是:

def softmax_loss(Z, y):
    """ Return softmax loss.  Note that for the purposes of this assignment,
    you don't need to worry about "nicely" scaling the numerical properties
    of the log-sum-exp computation, but can just compute this directly.

    Args:
        Z (np.ndarray[np.float32]): 2D numpy array of shape
            (batch_size, num_classes), containing the logit predictions for
            each class.
        y (np.ndarray[np.uint8]): 1D numpy array of shape (batch_size, )
            containing the true label of each example.

    Returns:
        Average softmax loss over the sample.
    """
    # 一行流
    # return -np.mean(    # 平均值    
    #     np.log(         # 对数
    #         np.exp(     # 指数
    #             (Z-np.max(Z,axis=1,keepdims=True))[np.arange(Z.shape[0]),y]      # 取最大值后再取对应标签的logit
    #             )/np.sum(    # 求和
    #                 np.exp(Z-np.max(Z,axis=1,keepdims=True)),axis=1,keepdims=True
    #                 )    # 求softmax
    #             )
    #         )

    # 数值稳定性:减去每行的最大值
    max_Z = np.max(Z, axis=1, keepdims=True)
    exp_Z = np.exp(Z - max_Z)  # 形状: (batch_size, num_classes)
    
    # 计算softmax分母
    sum_exp_Z = np.sum(exp_Z, axis=1, keepdims=True)  # 形状: (batch_size, 1)
    
    # 获取每个样本真实类别的概率
    n = Z.shape[0]
    true_class_probs = exp_Z[np.arange(n), y]  # 形状: (batch_size,)
    
    # 计算log softmax概率 for true class
    log_probs = np.log(true_class_probs / sum_exp_Z.flatten())  # 除法后形状匹配
    
    # 返回平均损失
    return -np.mean(log_probs)

注意到当时的实现使用到了 numpy 的 max 和 mean 函数,而这两个函数在我们当前的 needle 框架中并未实现。所用优先考虑进行实现:

class Max(TensorOp):
    def __init__(self, axes: Optional[tuple] = None, keepdims=False):
        self.axes = axes
        self.keepdims = keepdims
    def compute(self, a):
        return array_api.max(a, axis=self.axes, keepdims=self.keepdims)
    def gradient(self, out_grad, node):
        """
        最大值的梯度计算,注意需要使用 needle 的 Op 来构建计算图。
        """
        a = node.inputs[0]
        # 直接使用前向计算得到的 Tensor 结果 (node 本身)
        # 注意 node.realize_cached_data() 才是 NDArray, node 是 Value/Tensor
        max_val = node 
        
        # 恢复维度和形状
        # 如果 keepdims=False, 需要恢复被压缩的维度
        if not self.keepdims and self.axes is not None:
            out_shape = list(a.shape)
            if isinstance(self.axes, int): axes = (self.axes,)
            else: axes = self.axes
            
            for ax in axes:
                out_shape[ax] = 1
            max_val = reshape(max_val, out_shape)
        
        # 将最大值广播到原始输入的形状
        max_val_broadcasted = broadcast_to(max_val, a.shape)
        
        # 创建掩码
        mask = (a == max_val_broadcasted)
        
        # 处理多个最大值的情况
        # 使用 summation 和 broadcast_to 来计算每个区域最大值的数量
        div_factor = summation(mask, axes=self.axes, keepdims=True)
        mask = mask / broadcast_to(div_factor, a.shape)
        
        # 应用链式法则 (反向传播梯度)
        # 如果 keepdims=False, out_grad 也需要恢复维度并广播
        if not self.keepdims and self.axes is not None:
             # out_grad 的 shape 就是 max_val 在 keepdims=False 时的 shape, 
             # 所以可以用上面相同的逻辑恢复
            out_shape = list(a.shape)
            if isinstance(self.axes, int): axes = (self.axes,)
            else: axes = self.axes
            for ax in axes:
                out_shape[ax] = 1
            out_grad = reshape(out_grad, out_shape)
        return broadcast_to(out_grad, a.shape) * mask
def max(a, axes=None, keepdims=False):
    return Max(axes, keepdims)(a)


class Mean(TensorOp):
    def __init__(self, axes: Optional[tuple] = None, keepdims=False):
        self.axes = axes
        self.keepdims = keepdims
    def compute(self, a):
        return array_api.mean(a, axis=self.axes, keepdims=self.keepdims)
    def gradient(self, out_grad, node):
        """
        平均值的梯度计算,同样需要在计算图内。
        """
        a = node.inputs[0]
        input_shape = a.shape
        
        # 计算参与平均的元素数量
        if self.axes is None:
            count = 1
            for dim in input_shape:
                count *= dim
        else:
            count = 1
            axes = self.axes if isinstance(self.axes, tuple) else (self.axes,)
            for ax in axes:
                count *= input_shape[ax]
        # 如果 keepdims=False,恢复维度并广播
        # 不需要手动 reshape, broadcast_to 内部会处理
        grad = broadcast_to(out_grad, input_shape)
        # 梯度均匀分配,使用 DivScalar Op
        return grad / count
def mean(a, axes=None, keepdims=False):
    return Mean(axes, keepdims)(a)

注意到 max 类的实现用到了 summation 并且对输入维度做出了要求。但框架原本的实现是不包含 keepdims 这个输入参数,所以对 Summation类进行拓展:

class Summation(TensorOp):
    def __init__(self, axes: Optional[tuple] = None, keepdims: bool = False):
        self.axes = axes
        self.keepdims = keepdims # 添加 keepdims 属性
    def compute(self, a):
        # 将 keepdims 参数传递给 numpy.sum
        return array_api.sum(a, axis=self.axes, keepdims=self.keepdims)
    def gradient(self, out_grad, node):
        """
        此处对原代码做出了一些修改,主要是为了处理 keepdims=False 的情况。
        对于 summation(a, axes=None, keepdims=False),a的梯度是reshape(out_grad, a.shape)
        对于 summation(a, axes=None, keepdims=True),a的梯度是broadcast_to(out_grad, a.shape)
        """
        input_shape = node.inputs[0].shape
        
        if not self.keepdims:
            if self.axes is None:
                axes = range(len(input_shape))
            elif isinstance(self.axes, int):
                axes = (self.axes,)
            else:
                axes = self.axes
            
            grad_shape = list(out_grad.shape)
            for ax in sorted(axes):
                grad_shape.insert(ax, 1)
            grad = reshape(out_grad, tuple(grad_shape))
        else:
            grad = out_grad
        return (broadcast_to(grad, input_shape),)
    
def summation(a, axes=None, keepdims=False): # 添加 keepdims 参数
    return Summation(axes=axes, keepdims=keepdims)(a) # 传递 keepdims

所以 hw1 的 Softmax 的实现可以是:

def softmax_loss(Z, y_one_hot):
    """
    返回Softmax loss。 请注意,就本作业而言、
    你不需要担心如何 "很好地 "缩放数值属性
    的数值特性,而只需直接计算即可。

    参数
        Z(ndl.Tensor[np.float32]):形状为
            (batch_size,num_classes)的二维张量,包含每个类别的 logit 预测值。
            每个类别的 logit 预测值。
        y_one_hot(ndl.Tensor[np.int8]):形状(batch_size、num_classes)的二维张量,包含每个类别的对数预测。
            在每个示例的真实标签索引处包含一个 1,其他地方为 0。
            其他地方为 0。

    返回值
        样本的Average Softmax Loss(ndl.张量[np.float32])
    """
    batch_size = Z.shape[0]
    # log_sum_exp 部分
    # Z(batch, classes), max_Z(batch, 1)
    max_Z = ndl.max(Z, axes=1, keepdims=True)
    # log_sum_exp(batch, 1)
    log_sum_exp = ndl.log(ndl.summation(ndl.exp(Z - max_Z.broadcast_to(Z.shape)), axes=1)).reshape((batch_size, 1)) + max_Z
    
    # 提取真实类别对应的 logits (z_y)
    # Z(batch, classes), y_one_hot(batch, classes) -> z_y(batch, 1)
    z_y = ndl.summation(Z * y_one_hot, axes=1).reshape((batch_size, 1))
    # 计算每个样本的损失 (log_sum_exp - z_y)
    # loss_per_sample (batch, 1)
    loss_per_sample = log_sum_exp - z_y
    
    # 计算平均损失
    # 使用 sum / batch_size, 这与 mean() 等价但更明确
    avg_loss = ndl.summation(loss_per_sample) / batch_size
    
    return avg_loss

问题 6 两层神经网络的 SGD

其实这块框架中有完整的反向传播链条,不过此处似乎是为了加深对反向传播的理解让我们手写 SGD

def nn_epoch(X, y, W1, W2, lr=0.1, batch=100):
    """
    为两层神经网络运行一轮SGD
    
    参数:
    X (np.ndarray[np.float32]): 2D输入数组,形状为 (num_examples x input_dim)
    y (np.ndarray[np.uint8]): 1D类别标签数组,形状为 (num_examples,)
    W1 (ndl.Tensor[np.float32]): 第一层权重,形状为 (input_dim, hidden_dim)
    W2 (ndl.Tensor[np.float32]): 第二层权重,形状为 (hidden_dim, num_classes)
    lr (float): SGD的学习率
    batch (int): SGD小批量大小
    返回:
    Tuple: (W1, W2)
        W1: ndl.Tensor[np.float32]
        W2: ndl.Tensor[np.float32]
    """
    num_examples = X.shape[0]
    for start in range(0, num_examples, batch):
        end = start + batch
        X_batch = ndl.Tensor(X[start:end])   # (B, in_dim)
        y_batch = y[start:end]               # (B,)
        # Forward
        h = ndl.relu(ndl.matmul(X_batch, W1))                  # (B, hid_dim)
        z = ndl.matmul(h, W2)                                  # (B, num_classes)
        # Softmax
        max_z = ndl.max(z, axes=1, keepdims=True)
        exp_z = ndl.exp(z - max_z.broadcast_to(z.shape))
        sum_exp_z = exp_z.sum(axes=1, keepdims=True)
        probs = exp_z / sum_exp_z
        # One-hot
        y_one_hot = np.zeros((end-start, W2.shape[1]), dtype=np.float32)
        y_one_hot[np.arange(end-start), y_batch] = 1
        y_one_hot = ndl.Tensor(y_one_hot)

        # Backward
        B = end - start
        dZ = (probs - y_one_hot) / B                            # (B, num_classes)
        dW2 = ndl.matmul(h.transpose(), dZ)                     # (hid_dim, num_classes)
        dH = ndl.matmul(dZ, W2.transpose())                     # (B, hid_dim)
        relu_mask = (h.numpy() > 0).astype(np.float32)           # 取 mask
        dH_relu = ndl.Tensor(dH.numpy() * relu_mask)             # 屏蔽负区梯度
        dW1 = ndl.matmul(X_batch.transpose(), dH_relu)           # (in_dim, hid_dim)
        
        
        # 框架中的完整实现
        # Softmax loss
        # loss = softmax_loss(z, y_one_hot)

        # # Backward
        # loss.backward()

        # dW1 = W1.grad
        # dW2 = W2.grad
        
        # 参数更新
        W1 = W1 - lr * dW1
        W2 = W2 - lr * dW2
    return W1, W2

以上

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注