与 lab 0 相比,lab 01 显然就非常的“亲切友好”。在这次实验中,我们将着手实现 needle 库;具体而言,我们将基于 python 实现一个基础的自动微分框架,并基于实现框架重新实现 MNIST 中关于手写数字分类的相关问题。
讲义提示当前项目中包含两个重要文件:autograd.py
以及 ops_mathematic.py
前者实现了计算图框架的基础并构成了自动微分的框架,后者聚焦于自动微分中的相关运算符。
我们先来看autograd.py
。
整体而言,autograd.py
包括如下类
autograd.py
├── Op # 运算符基类
│ ├── TensorOp # 输出张量的运算符操作类
│ └── TensorTupleOp # 输出张量元组的操作符运算类
└── Value
├── TensorTuple # 表示张量元组
└── Tensor # 表示张量,主要的数值容器
现在考虑两个基类 – Op 和 Value
Op 类
Op
类是操作符的基类,定义了所有运算符的基本接口和行为模式。在自动微分系统中,操作符表示计算图中的节点,负责定义前向计算和反向梯度传播的规则,该类下的方法有:
def __call__(self, *args):
raise NotImplementedError()
__call__
是 python 自定义类的一个魔法方法,用于实现函数式编程接口,此处直接抛出异常,要求子类必须重写此方法以实现具体操作.
同样的包括:
def compute(self, *args: Tuple[NDArray]):
"""计算运算符的前向传递。"""
raise NotImplementedError()
def gradient(self, out_grad: "Value", node: "Value") -> Union["Value", Tuple["Value"]]:
"""为给定的输出旁点计算每个输入值的部分旁点。"""
raise NotImplementedError()
以及
def gradient_as_tuple(self, out_grad: "Value", node: "Value") -> Tuple["Value"]:
"""Convenience method to always return a tuple from gradient call"""
其中 compute
用于实现操作的前向计算,它接收 Numpy 数组作为输入,返回计算结果,过程中只进行数学运算。
gradient
则用于实现反向传播的梯度计算,接收输出梯度out_grad
和当前节点 node
,返回对每个输入节点的梯度。这也是自动微分的核心方法。
gradient_as_tuple
确保梯度总是以元组形式返回
Value 类
Value
类则是计算图中所有值的基类,表示计算图中的一个节点。它封装了数据、操作和计算状态,是自动微分系统的核心数据结构。
属性
op: Optional[Op] # 生成此值的操作
inputs: List["Value"] # 输入值列表
cached_data: NDArray # 缓存的计算结果
requires_grad: bool # 是否需要计算梯度
op
:指向生成此值的操作符,如果为None
表示是常量(叶子节点)inputs
:此值的输入节点列表,构建了计算图的依赖关系cached_data
:缓存前向计算的结果,避免重复计算requires_grad
:标记此值是否需要参与梯度计算
核心方法
def realize_cached_data(self):
"""
运行计算以实现缓存数据
"""
# avoid recomputation
if self.cached_data is not None:
return self.cached_data
# note: data implicitly calls realized cached data
self.cached_data = self.op.compute(
*[x.realize_cached_data() for x in self.inputs]
)
return self.cached_data
realize_cached_data
用于获取缓存数据,如果缓存不存在,则递归调用输入节点的方法然后运行相关计算,此处是惰性求值
def is_leaf(self):
return self.op is None
此方法用于判断当前是否为叶子节点
剩下的方法包括:
_init
:初始化方法,更新全局张量计数器并依据输入决定是否处理梯度make_const
:创建常量值make_from_op
:通过操作创建值__del__
:析构函数,用于实时跟踪并回收系统中的张量
我们着重看一下 make_from_op
这个主要方法:
@classmethod
def make_from_op(cls, op: Op, inputs: List["Value"]):
# 创建新实例而不调用__init__
value = cls.__new__(cls)
# 使用_init方法初始化实例
value._init(op, inputs)
# 处理非懒加载模式
if not LAZY_MODE:
# 如果不需要梯度,返回分离的副本
if not value.requires_grad:
return value.detach()
# 否则立即计算缓存数据
value.realize_cached_data()
return value
注意到代码在初始化实例时没有采用可能被重写的的 __init__
,而是调用了更基础的 __new__
来确保所有子类进行相同的初始化。此外方法会根据 LAZY_MODE 标志位判断当前是否立即进行缓存数据的计算。
Tensor 类
Tensor 类作为 Value 的子类,在我们接下来要实现的自动微分系统中有着至关重要的作用,它对基类做出了大量扩展,使其成为实际可用的张量模型。
新增属性
grad
:用于储存当前张量的梯度值,类型为 Tensor
构造函数
def __init__(
self,
array, # 输入数据,可以是数组或另一个Tensor
*,
device: Optional[Device] = None, # 设备信息
dtype=None, # 数据类型
requires_grad=True, # 是否需要梯度
**kwargs # 其他参数
):
if isinstance(array, Tensor):
if device is None:
device = array.device
if dtype is None:
dtype = array.dtype
if device == array.device and dtype == array.dtype:
cached_data = array.realize_cached_data()
else:
# fall back, copy through numpy conversion
cached_data = Tensor._array_from_numpy(
array.numpy(), device=device, dtype=dtype
)
else:
device = device if device else cpu()
cached_data = Tensor._array_from_numpy(array, device=device, dtype=dtype)
self._init(
None,
[],
cached_data=cached_data,
requires_grad=requires_grad,
)
__init__ 的实现逻辑包括对输入进行类型判断、数据重用及转换,处理失败则尝试从 Numpy Array 创建数据,并在最后调用 _init
进行初始化
静态方法
静态方法下包括:
_array_from_numpy
:将 Numpy 数组转化为指定设备和数据类型的数组make_from_op
:将返回类型进一步限定为 Tensormake_const
同理
属性访问相关
这部分由 @property
修饰
- data : 返回脱离计算图的张量副本并允许替换(setter)
- shape : 返回张量形状
- dtype : 返回数据类型
- device : 返回当前设备
核心方法
def detach(self):
"""Create a new tensor that shares the data but detaches from the graph."""
return Tensor.make_const(self.realize_cached_data())
detach 用于创建当前张量的独立副本,同样可用于阻止梯度传播到我们不愿意看见的部分
def backward(self, out_grad=None):
out_grad = (
out_grad
if out_grad
else init.ones(*self.shape, dtype=self.dtype, device=self.device)
)
compute_gradient_of_variables(self, out_grad)
顾名思义, backward 用于启动反向传播过程,但当梯度数值缺失时使用全 1 张量.
numpy()
用于转换,不多赘述
运算符重载
Tensor 类重载了大量运算符,使其行为类似于数学中的张量:
def __add__(self, other): # +
def __mul__(self, other): # *
def __pow__(self, other): # **
def __sub__(self, other): # -
def __truediv__(self, other): # /
def __matmul__(self, other): # @
def __neg__(self): # - (一元)
# 以及
__radd__ = __add__ # 反向加法
__rmul__ = __mul__ # 反向乘法
所有运算符都通过调用 needle.ops
中的相应操作来实现:
- 检查操作数类型(Tensor 或标量)
- 选择适当的操作(元素级或标量操作)
- 返回新张量
张量操作方法
def sum(self, axes=None):
def broadcast_to(self, shape):
def reshape(self, shape):
def transpose(self, axes=None):
这些方法是对相应操作的封装,提高代码可读性
字符串表示
最后,Tensor 类提供了__repr__
和 __str__
方法,用于以字符串形式显示张量的数据内容。
总体来看,Tensor 的接口是高度统一的,且包含丰富的类型检查和校正,独立于设备,提供了丰富的微分支持。
问题 1:实现前向计算 && 问题 2:实现前向传播
此时我们的目光转向 ops_mathematic.py
这一文件是对新运算符的具体实现,讲义提供了 EWiseAdd
作为参考:
class EWiseAdd(TensorOp):
def compute(self, a: NDArray, b: NDArray):
return a + b
def gradient(self, out_grad: Tensor, node: Tensor):
return out_grad, out_grad
def add(a, b):
return EWiseAdd()(a, b)
方法的意图十分明显:
compute
用于执行当前运算符gradient
用于计算当前运算符下的前向传播(即求偏导),其中out_grad
索引对应自变量
Compute
所以对于剩下的方法,调用 array_api
中的相关方法即可
注意一下名称中带 Scalar
的的方法,此处参数只有一个参数,此时操作符的另一个对象要调用 self.scalar
进行操作。
此外注意一下 Transpose ,它的默认行为是转置最后两个轴,讲义当中有写但我开始没看到…
给出部分参考实现:
// Transpose
def compute(self, a):
if self.axes is None:
"""
[注]:这块和numpy的transpose有些区别,numpy的transpose默认是默认是转置所有维度,而这里转置最后两个维度
"""
# 批量矩阵转置:交换最后两个维度
return array_api.swapaxes(a, -1, -2)
elif len(self.axes) == 2 and not all(isinstance(x,int) for x in self.axes):
raise ValueError("axes 元素必须是整数")
elif len(self.axes) == 2:
# 只交换两个指定的轴
return array_api.swapaxes(a, self.axes[0], self.axes[1])
elif len(self.axes) == a.ndim:
# 完整置换
return array_api.transpose(a, axes=self.axes)
else:
raise ValueError("axes 参数不合法")
// 其他目前没啥好写的,调 api 就成
Gradient
Compute 中我们已经实现了当前运算,即前向传播,但为了实现自动微分,我们还需要能够计算反向传播,即将函数的相关导数与传入的反向梯度相乘。
实现思路其实很简单,将除了当前所求量外的所有量视为标量,也就是偏导,然后匹配维度,具体而言,我们想要达到:
$$\begin{equation}
\frac{\partial \ell}{\partial x} = \frac{\partial \ell}{\partial f(x,y)} \frac{\partial f(x,y)}{\partial x}.
\end{equation}$$
的结果,因为”传入的反向梯度”正是项 $\frac{\partial \ell}{\partial f(x,y)}$,因此我们希望我们的 gradient() 函数最终计算这个反向梯度与函数自身导数 $\frac{\partial f(x,y)}{\partial x}$ 的乘积。
参考乘法的梯度计算,我们不难发现:node.inputs 中依序储存了当前变量,所以整体的实现就是 求偏导,写代码的过程。
不过有一个需要注意的点,此处我们操作的是 needle 对象,所以不能调用 numpy 包进行操作而是通过 array_api 进行间接调用(因为后面的hw中我们将编写自己的 numpy 包)。
给出部分参考实现:
class Reshape(TensorOp):
def __init__(self, shape):
self.shape = shape
def compute(self, a):
return array_api.reshape(a, self.shape)
def gradient(self, out_grad, node):
"""
对于reshape(a, shape),a的梯度是reshape(out_grad, a.shape)
"""
return (reshape(out_grad, node.inputs[0].shape),)
# ...
class BroadcastTo(TensorOp):
def __init__(self, shape):
self.shape = shape
def compute(self, a):
return array_api.broadcast_to(a, self.shape)
def gradient(self, out_grad, node):
"""
此处相当于是 broadcast_to(a, shape),a的梯度是out_grad
注意:
- 返回必须是张量元组形式
- 考虑输入张量形状与目标形状不一致的情况,返回 (0,)
"""
input_shape = node.inputs[0].shape
reduce_axes = tuple(i for i in range(len(self.shape))
if i >= len(input_shape) or input_shape[i] == 1)
grad = summation(out_grad, axes=reduce_axes)
grad = reshape(grad, input_shape)
return (grad,)
# ...
class Summation(TensorOp):
def __init__(self, axes: Optional[tuple] = None):
self.axes = axes
def compute(self, a):
return array_api.sum(a, axis=self.axes)
def gradient(self, out_grad, node):
"""
对于summation(a),a的梯度是out_grad
"""
input_shape = node.inputs[0].shape
if self.axes is None:
axes = range(len(input_shape))
elif isinstance(self.axes, int):
axes = (self.axes,)
else:
axes = self.axes
# 添加维度
grad = out_grad
grad_shape = list(grad.shape)
for ax in sorted(axes):
grad_shape.insert(ax, 1)
grad = reshape(grad, tuple(grad_shape))
grad = broadcast_to(grad, input_shape)
return (grad,)
# ...
class MatMul(TensorOp):
def compute(self, a, b):
return array_api.matmul(a, b)
def gradient(self, out_grad, node):
"""
本质上是双输入运算符,需要分别返回每个输入张量的梯度
对于matmul(a, b),a的梯度是matmul(out_grad, b.T),b的梯度是matmul(a.T, out_grad)
"""
lhs, rhs = node.inputs
# return (matmul(out_grad, transpose(rhs, axes=(-1, -2))),
# matmul(transpose(lhs, axes=(-1, -2)), out_grad))
lhs_T = transpose(lhs, axes=(-1, -2))
rhs_T = transpose(rhs, axes=(-1, -2))
grad_lhs = matmul(out_grad, rhs_T)
grad_rhs = matmul(lhs_T, out_grad)
# 处理广播还原成 lhs/rhs 的原始形状
while len(grad_lhs.shape) > len(lhs.shape):
grad_lhs = summation(grad_lhs, axes=0)
while len(grad_rhs.shape) > len(rhs.shape):
grad_rhs = summation(grad_rhs, axes=0)
return grad_lhs, grad_rhs
问题3:拓扑排序
在问题 1,2 中我们已经实现了计算图的基本构件,但图的节点之间的计算顺序我们并未进行考虑,计算图中的节点之间存在严格的依赖关系 – 每个节点的计算依赖于其输入节点的值。而拓扑排序能够确保节点按照正确的依赖关系进行处理:
- 前向传播:按照拓扑顺序计算每个节点的值
- 反向传播:按照拓扑排序的逆序计算梯度
讲义在这块提示我们需要使用后序遍历,这样能够保证父节点的位置一定在子节点后,所以实现一个 dfs 进行深搜即可
我靠 python 写 dfs 是真方便
def find_topo_sort(node_list: List[Value]) -> List[Value]:
"""
给定节点列表,返回以这些节点结尾的拓扑排序列表。
简单的算法是对给定的节点进行后序 DFS 遍历、
根据输入的边倒序遍历。由于一个节点是在其所有前节点被遍历后才被添加到排序中的
后,我们会得到一个拓扑排序。
排序。
"""
# topo_sort_dfs 所用参数
visited = set()
topo_order = []
# 遍历所有节点
for node in node_list:
topo_sort_dfs(node, visited, topo_order)
return topo_order
def topo_sort_dfs(node, visited, topo_order):
"""Post-order DFS"""
# 判断有没有访问过这个节点
if node in visited:
return
visited.add(node)
# 遍历所有输出节点
for input_node in node.inputs:
topo_sort_dfs(input_node, visited, topo_order)
# 添加节点到拓扑排序
topo_order.append(node)
问题4:实现反向模式微分
那么在拓扑排序的基础上,我们将利用它来实现反向传播的自动微分。讲义贴心的给出了具体步骤:
1. 拓扑排序:首先获取计算图中节点的拓扑排序
2. 反向遍历:按照拓扑排序的逆序处理节点
3. 梯度累积:对于每个节点,计算其对输出的梯度贡献
4. 链式法则:应用链式法则将梯度传播到输入节点
所以给出伪代码实现框架:
def compute_gradient_of_variables(output_tensor, out_grad):
# 1. 获取拓扑排序
topo_order = find_topo_sort()
# 2. 初始化梯度字典
grad_dict = {}
grad_dict[output_tensor] = out_grad
# 3. 反向遍历拓扑排序
for node in reversed(topo_order):
# 获取当前节点的梯度
current_grad = grad_dict[node]
# 4. 调用节点的gradient方法计算输入梯度
input_grads = node.op.gradient()
# 5. 将梯度累积到输入节点
for input_tensor, input_grad in (node.inputs, input_grads):
if input_tensor in grad_dict:
grad_dict[input_tensor] += input_grad
else:
grad_dict[input_tensor] = input_grad
# 返回所有需要梯度的变量的梯度
return grad_dict
问题 5 :Softmax 损失
在 hw0 的相关问题中,我们已经基于 Numpy 实现了 Softmax loss ,当时的实现是:
def softmax_loss(Z, y):
""" Return softmax loss. Note that for the purposes of this assignment,
you don't need to worry about "nicely" scaling the numerical properties
of the log-sum-exp computation, but can just compute this directly.
Args:
Z (np.ndarray[np.float32]): 2D numpy array of shape
(batch_size, num_classes), containing the logit predictions for
each class.
y (np.ndarray[np.uint8]): 1D numpy array of shape (batch_size, )
containing the true label of each example.
Returns:
Average softmax loss over the sample.
"""
# 一行流
# return -np.mean( # 平均值
# np.log( # 对数
# np.exp( # 指数
# (Z-np.max(Z,axis=1,keepdims=True))[np.arange(Z.shape[0]),y] # 取最大值后再取对应标签的logit
# )/np.sum( # 求和
# np.exp(Z-np.max(Z,axis=1,keepdims=True)),axis=1,keepdims=True
# ) # 求softmax
# )
# )
# 数值稳定性:减去每行的最大值
max_Z = np.max(Z, axis=1, keepdims=True)
exp_Z = np.exp(Z - max_Z) # 形状: (batch_size, num_classes)
# 计算softmax分母
sum_exp_Z = np.sum(exp_Z, axis=1, keepdims=True) # 形状: (batch_size, 1)
# 获取每个样本真实类别的概率
n = Z.shape[0]
true_class_probs = exp_Z[np.arange(n), y] # 形状: (batch_size,)
# 计算log softmax概率 for true class
log_probs = np.log(true_class_probs / sum_exp_Z.flatten()) # 除法后形状匹配
# 返回平均损失
return -np.mean(log_probs)
注意到当时的实现使用到了 numpy 的 max 和 mean 函数,而这两个函数在我们当前的 needle 框架中并未实现。所用优先考虑进行实现:
class Max(TensorOp):
def __init__(self, axes: Optional[tuple] = None, keepdims=False):
self.axes = axes
self.keepdims = keepdims
def compute(self, a):
return array_api.max(a, axis=self.axes, keepdims=self.keepdims)
def gradient(self, out_grad, node):
"""
最大值的梯度计算,注意需要使用 needle 的 Op 来构建计算图。
"""
a = node.inputs[0]
# 直接使用前向计算得到的 Tensor 结果 (node 本身)
# 注意 node.realize_cached_data() 才是 NDArray, node 是 Value/Tensor
max_val = node
# 恢复维度和形状
# 如果 keepdims=False, 需要恢复被压缩的维度
if not self.keepdims and self.axes is not None:
out_shape = list(a.shape)
if isinstance(self.axes, int): axes = (self.axes,)
else: axes = self.axes
for ax in axes:
out_shape[ax] = 1
max_val = reshape(max_val, out_shape)
# 将最大值广播到原始输入的形状
max_val_broadcasted = broadcast_to(max_val, a.shape)
# 创建掩码
mask = (a == max_val_broadcasted)
# 处理多个最大值的情况
# 使用 summation 和 broadcast_to 来计算每个区域最大值的数量
div_factor = summation(mask, axes=self.axes, keepdims=True)
mask = mask / broadcast_to(div_factor, a.shape)
# 应用链式法则 (反向传播梯度)
# 如果 keepdims=False, out_grad 也需要恢复维度并广播
if not self.keepdims and self.axes is not None:
# out_grad 的 shape 就是 max_val 在 keepdims=False 时的 shape,
# 所以可以用上面相同的逻辑恢复
out_shape = list(a.shape)
if isinstance(self.axes, int): axes = (self.axes,)
else: axes = self.axes
for ax in axes:
out_shape[ax] = 1
out_grad = reshape(out_grad, out_shape)
return broadcast_to(out_grad, a.shape) * mask
def max(a, axes=None, keepdims=False):
return Max(axes, keepdims)(a)
class Mean(TensorOp):
def __init__(self, axes: Optional[tuple] = None, keepdims=False):
self.axes = axes
self.keepdims = keepdims
def compute(self, a):
return array_api.mean(a, axis=self.axes, keepdims=self.keepdims)
def gradient(self, out_grad, node):
"""
平均值的梯度计算,同样需要在计算图内。
"""
a = node.inputs[0]
input_shape = a.shape
# 计算参与平均的元素数量
if self.axes is None:
count = 1
for dim in input_shape:
count *= dim
else:
count = 1
axes = self.axes if isinstance(self.axes, tuple) else (self.axes,)
for ax in axes:
count *= input_shape[ax]
# 如果 keepdims=False,恢复维度并广播
# 不需要手动 reshape, broadcast_to 内部会处理
grad = broadcast_to(out_grad, input_shape)
# 梯度均匀分配,使用 DivScalar Op
return grad / count
def mean(a, axes=None, keepdims=False):
return Mean(axes, keepdims)(a)
注意到 max 类的实现用到了 summation 并且对输入维度做出了要求。但框架原本的实现是不包含 keepdims 这个输入参数,所以对 Summation类进行拓展:
class Summation(TensorOp):
def __init__(self, axes: Optional[tuple] = None, keepdims: bool = False):
self.axes = axes
self.keepdims = keepdims # 添加 keepdims 属性
def compute(self, a):
# 将 keepdims 参数传递给 numpy.sum
return array_api.sum(a, axis=self.axes, keepdims=self.keepdims)
def gradient(self, out_grad, node):
"""
此处对原代码做出了一些修改,主要是为了处理 keepdims=False 的情况。
对于 summation(a, axes=None, keepdims=False),a的梯度是reshape(out_grad, a.shape)
对于 summation(a, axes=None, keepdims=True),a的梯度是broadcast_to(out_grad, a.shape)
"""
input_shape = node.inputs[0].shape
if not self.keepdims:
if self.axes is None:
axes = range(len(input_shape))
elif isinstance(self.axes, int):
axes = (self.axes,)
else:
axes = self.axes
grad_shape = list(out_grad.shape)
for ax in sorted(axes):
grad_shape.insert(ax, 1)
grad = reshape(out_grad, tuple(grad_shape))
else:
grad = out_grad
return (broadcast_to(grad, input_shape),)
def summation(a, axes=None, keepdims=False): # 添加 keepdims 参数
return Summation(axes=axes, keepdims=keepdims)(a) # 传递 keepdims
所以 hw1 的 Softmax 的实现可以是:
def softmax_loss(Z, y_one_hot):
"""
返回Softmax loss。 请注意,就本作业而言、
你不需要担心如何 "很好地 "缩放数值属性
的数值特性,而只需直接计算即可。
参数
Z(ndl.Tensor[np.float32]):形状为
(batch_size,num_classes)的二维张量,包含每个类别的 logit 预测值。
每个类别的 logit 预测值。
y_one_hot(ndl.Tensor[np.int8]):形状(batch_size、num_classes)的二维张量,包含每个类别的对数预测。
在每个示例的真实标签索引处包含一个 1,其他地方为 0。
其他地方为 0。
返回值
样本的Average Softmax Loss(ndl.张量[np.float32])
"""
batch_size = Z.shape[0]
# log_sum_exp 部分
# Z(batch, classes), max_Z(batch, 1)
max_Z = ndl.max(Z, axes=1, keepdims=True)
# log_sum_exp(batch, 1)
log_sum_exp = ndl.log(ndl.summation(ndl.exp(Z - max_Z.broadcast_to(Z.shape)), axes=1)).reshape((batch_size, 1)) + max_Z
# 提取真实类别对应的 logits (z_y)
# Z(batch, classes), y_one_hot(batch, classes) -> z_y(batch, 1)
z_y = ndl.summation(Z * y_one_hot, axes=1).reshape((batch_size, 1))
# 计算每个样本的损失 (log_sum_exp - z_y)
# loss_per_sample (batch, 1)
loss_per_sample = log_sum_exp - z_y
# 计算平均损失
# 使用 sum / batch_size, 这与 mean() 等价但更明确
avg_loss = ndl.summation(loss_per_sample) / batch_size
return avg_loss
问题 6 两层神经网络的 SGD
其实这块框架中有完整的反向传播链条,不过此处似乎是为了加深对反向传播的理解让我们手写 SGD
def nn_epoch(X, y, W1, W2, lr=0.1, batch=100):
"""
为两层神经网络运行一轮SGD
参数:
X (np.ndarray[np.float32]): 2D输入数组,形状为 (num_examples x input_dim)
y (np.ndarray[np.uint8]): 1D类别标签数组,形状为 (num_examples,)
W1 (ndl.Tensor[np.float32]): 第一层权重,形状为 (input_dim, hidden_dim)
W2 (ndl.Tensor[np.float32]): 第二层权重,形状为 (hidden_dim, num_classes)
lr (float): SGD的学习率
batch (int): SGD小批量大小
返回:
Tuple: (W1, W2)
W1: ndl.Tensor[np.float32]
W2: ndl.Tensor[np.float32]
"""
num_examples = X.shape[0]
for start in range(0, num_examples, batch):
end = start + batch
X_batch = ndl.Tensor(X[start:end]) # (B, in_dim)
y_batch = y[start:end] # (B,)
# Forward
h = ndl.relu(ndl.matmul(X_batch, W1)) # (B, hid_dim)
z = ndl.matmul(h, W2) # (B, num_classes)
# Softmax
max_z = ndl.max(z, axes=1, keepdims=True)
exp_z = ndl.exp(z - max_z.broadcast_to(z.shape))
sum_exp_z = exp_z.sum(axes=1, keepdims=True)
probs = exp_z / sum_exp_z
# One-hot
y_one_hot = np.zeros((end-start, W2.shape[1]), dtype=np.float32)
y_one_hot[np.arange(end-start), y_batch] = 1
y_one_hot = ndl.Tensor(y_one_hot)
# Backward
B = end - start
dZ = (probs - y_one_hot) / B # (B, num_classes)
dW2 = ndl.matmul(h.transpose(), dZ) # (hid_dim, num_classes)
dH = ndl.matmul(dZ, W2.transpose()) # (B, hid_dim)
relu_mask = (h.numpy() > 0).astype(np.float32) # 取 mask
dH_relu = ndl.Tensor(dH.numpy() * relu_mask) # 屏蔽负区梯度
dW1 = ndl.matmul(X_batch.transpose(), dH_relu) # (in_dim, hid_dim)
# 框架中的完整实现
# Softmax loss
# loss = softmax_loss(z, y_one_hot)
# # Backward
# loss.backward()
# dW1 = W1.grad
# dW2 = W2.grad
# 参数更新
W1 = W1 - lr * dW1
W2 = W2 - lr * dW2
return W1, W2
以上
发表回复