Chatper 8 循环神经网络

新征程!

在前面我们关注的是 DL 中一类在图片识别方面应用相当广泛的模型 CNN ,而第八章介绍的则是针对时序数据处理的另一类模型—— RNN (Recurrent Neural Network)。RNN的核心思想源于 David Rumelhart 于1986年提出的时序反向传播(BPTT)算法,最开始是用于解决序列数据的梯度计算问题,不过随着 LSTM 、 GRU 以及 Transformer 等相关结构的提出,RNN 现在的生态位业已扩展至 ODE、类脑研究以及多模态时序建模等诸多领域

而由于其时序数据建模的理论特性,在自然语言处理方面可以说是得天独厚,事实上书中也正是以文本处理来引入 RNN 的。

AR 相关略去不表,自回归模型接触过建模基本上都会提及

事实上传统的 AR 模型并不适合文本预测,它本质上是线性模型,但语言本身是高度非线性的,一个词的含义和选择极大地依赖于远距离的上下文、语法结构和语义逻辑,这种依赖关系无法用简单的线性加权来刻画。

但并不是说 AR 在文本处理方面并无应用,实际上自回归模型(AR)的思想是所有现代生成式模型(包括RNN、Transformer)的基石。它们的核心思想都是:使用序列中过去的观测值来预测未来的值

文本预处理

实际上,文本可以被视为是单词或者字符序列的拼接,我们可以提取其中的元素进行统计预测,当然在此之前,必要的预处理是不可或缺的,包括数据读取,词元化以及词表构建。

数据读取

这个很简单,使用 requests 获取网站下的对应文本内容即可

# Data_loader
import os
import requests

url = 'https://www.gutenberg.org/files/35/35-0.txt'
response = requests.get(url)

# 检查请求是否成功
if response.status_code == 200:
    text = response.text
    
    # 确保数据目录存在
    os.makedirs('./data', exist_ok=True)
    
    file_name = './data/time-machine-data.txt'
    with open(file_name, 'w', encoding='utf-8') as f:
        f.write(text)
    print("下载成功!文件保存为:", file_name)
    print("文件大小:", os.path.getsize(file_name))
    print(f"文本行数:{len(text.split('\n'))}")
else:
    print(f"下载失败,状态码: {response.status_code}")

然后是文本清洗以及词元分割

def data_clean_for_time_machine(text):
    """
    清理文本,只针对《时间机器》这本书
    """
    if not isinstance(text, str):
        raise TypeError("text should be a string")
    start_idx = text.find(r"*** START OF THE PROJECT GUTENBERG EBOOK 35 ***")
    end_idx = text.find(r"*** END OF THE PROJECT GUTENBERG EBOOK 35 ***")
    if start_idx == -1 or end_idx == -1:
        raise ValueError("text is not a valid Project Gutenberg book")
    text = text[start_idx:end_idx]
    lines = []
    for line in text.split("\n"):
        if not line.strip() or line.strip().isupper():
            continue
        lines.append(line.strip())
    return " ".join(lines).lower()  # 合并为单字符串并转化为小写

def tokenize(text):
    """
    文本分词:将连续的文本序列切分成一个个词元
    """
    if not isinstance(text, str):
        raise TypeError("text should be a string")
    return text.split()

对分割后的文本进行统计,使用映射就构建出了我们的词表:

from collections import Counter

class Vocab:
    def __init__(self, tokens=None, min_freq=0, reserved_tokens=[]):
        if tokens is None:
            tokens = []
        
        # 统计词频
        counter = Counter(tokens)
        self.min_freq = sorted(counter.items(), key=lambda x: x[1], reverse=True)

        # 构建词表
        self.idx_to_token = list(reserved_tokens)
        self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}

        # 加入词频大于等于min_freq的词
        for token, freq in self.min_freq:
            if freq < min_freq:
                break
            if token not in self.token_to_idx:
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1

    def __len__(self):
        return len(self.idx_to_token)
    
def numericalize(tokens: list, vocab: Vocab):
    """
    将词元列表转换为数字列表,并同步返回词表
    :param tokens: 词元列表
    :param vocab: 词表
    :return: 数字列表,词表
    """
    indices = []
    for token in tokens:
        if token in vocab.token_to_idx:
            indices.append(vocab.token_to_idx[token])
        else:
            indices.append(vocab.token_to_idx['<unk>'])
    return indices, vocab

那么通过将离散的单词索引转换为连续的向量表示后,我们可以考虑将其组织成批次以支持并行训练

不过为了批次大小的统一性,我们通常需要使用无意义词元(通常是 <pad>)进行填充,填充后的张量形状通常为 (batch_size, sequence_length),其中 sequence_length 是批次中最长序列的长度(或预设的截断长度)

在输入进 RNN 之前,我们还需要了解一个 RNN 中独有的结构:嵌入层(Embedding Layer)。这个层的作用是将里算的索引转化为稠密的实数向量从而让 RNN 易于捕获相关语义信息。

具体来说,嵌入层是一个可学习的参数矩阵 $E \in \mathbb{R}^{V \times d}$ 其中:

  • $V$ 是词表大小。
  • $d$ 是嵌入维度(超参数)。
  • 对于索引 $i$,其嵌入向量为 $E[i]$(即矩阵的第 $i$ 行)。

那么将批张量输入嵌入层后,输出将转换为形状为 (batch_size, sequence_length, embedding_dim) 的浮点数张量。每个时间步对应一个词的嵌入向量

RNN 的基本架构

基本 RNN(Vanilla RNN) 是一种简单的循环神经网络,其核心是循环连接使得隐藏状态能够作为输入传递历史信息,具体来说

RNN维护一个隐藏状态 $h_t$,它编码了到时间步 $t$ 为止的序列信息。隐藏状态初始化为零向量或可学习参数:

$$h_0 = \mathbf{0}$$

而在在每个时间步 $t$,RNN根据当前输入 $x_t$ 和前一个隐藏状态 $h_{t-1}$ 计算新隐藏状态 $h_t$:

$$h_t = \tanh(W_{xh} x_t + W_{hh} h_{t-1} + b_h)$$

其中:

  • $W_{xh} \in \mathbb{R}^{h \times d}$:输入到隐藏的权重矩阵($h$ 是隐藏层维度)。
  • $W_{hh} \in \mathbb{R}^{h \times h}$:隐藏到隐藏的权重矩阵(循环连接)。
  • $b_h \in \mathbb{R}^{h}$:隐藏层偏置向量。
  • $\tanh$:激活函数,将输出压缩到 $[-1, 1]$,帮助稳定梯度。

那么对于追中的隐藏状态 $h_t$,RNN 通常通过一个全连接层输出 $y_t$

$$y_t = W_{hy} h_t + b_y$$

其中:

$W_{hy} \in \mathbb{R}^{o \times h}$:隐藏到输出的权重矩阵($o$ 是输出维度,如词表大小)。

$b_y \in \mathbb{R}^{o}$:输出层偏置向量。

而对于分类任务(如语言建模),$y_t$ 后接 softmax 函数得到概率分布:

$$p_t = \text{softmax}(y_t)$$

也就是说,基本 RNN 的参数包括 $W_{xh}, W_{hh}, b_h, W_{hy}, b_y$, 常见的损失被定义为每个时间步预测负对数似然之和

$$L = -\sum_{t=1}^{T} \log p_t(x_{t+1})$$

其中 $p_t(x_{t+1})$ 是模型预测下一个词 $x_{t+1}$ 的概率。

那么基于核心框架,我们来实现自己的 RNN 框架:

由于 RNN 的参数较多,我们这边采用一个参数类来对模型超参数进行统一配置:

class RNNConfig:
    """配置类,集中管理所有超参数"""
    def __init__(self):
        self.batch_size = 32
        self.seq_len = 50        # 序列长度
        self.embed_size = 64    # 嵌入长度
        self.hidden_size = 128
        self.lr = 0.01
        self.epochs = 20  
        self.dropout_rate = 0.3
        self.weight_decay = 1e-4 
        self.train_split = 0.8
        self.min_freq = 3 
        self.reserved_tokens = ['<pad>', '<unk>', '<eos>']  

接下来首先构建 RNN 的基本单元$h_t = \tanh(W_{xh} x_t + W_{hh} h_{t-1} + b_h)$,不过这块在训练时发现 $\text{tanh}$ 太容易梯度爆炸了所以换成了 ReLU

class RNNCell(nn.Module):
    """RNN基础单元"""
    def __init__(self, input_size, hidden_size):
        super(RNNCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size


        # 参数初始化
        self.W_xh = nn.Parameter(torch.randn(input_size, hidden_size) * 0.01)
        self.W_hh = nn.Parameter(torch.randn(hidden_size, hidden_size) * 0.01)
        self.b_h = nn.Parameter(torch.zeros(hidden_size))
        self.layer_norm = nn.LayerNorm(hidden_size)

    def forward(self, x, h):
        """前向传播"""
        h_next = torch.relu(self.layer_norm(x @ self.W_xh + h @ self.W_hh + self.b_h))
        return h_next

那么在 RNNCell 的基础上,我们尝试构建 RNN 类:

class RNN(nn.Module):
    def __init__(self, config):
        super(RNN, self).__init__()
        self.config = config
        
        self.embedding = nn.Embedding(config.vocab_size, config.embed_size)
        self.rnn_cell = RNNCell(config.embed_size, config.hidden_size)
        self.dropout = nn.Dropout(config.dropout_rate) 
        self.output_layer = nn.Linear(config.hidden_size, config.vocab_size)
        
        self._initialize_weights()
        
    def _initialize_weights(self):
        """权重初始化"""
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.constant_(module.bias, 0)
            elif isinstance(module, nn.Embedding):
                nn.init.uniform_(module.weight, -0.1, 0.1)

    def forward(self, x, h=None):
        batch_size, seq_len = x.shape
        
        if h is None:
            h = torch.zeros(batch_size, self.config.hidden_size, device=x.device)
        
        x_embed = self.embedding(x)
        x_embed = self.dropout(x_embed)  # 嵌入层后dropout
        
        h_seq = []
        for i in range(seq_len):
            h = self.rnn_cell(x_embed[:, i, :], h)
            h_seq.append(h)

        h_seq = torch.stack(h_seq, dim=1)
        h_seq = self.dropout(h_seq)  # RNN输出后dropout
        output = self.output_layer(h_seq)
        
        return output, h

以及对应的训练类

class RNNTrainer:
    """训练器类,封装训练逻辑"""
    def __init__(self, model, config, device):
        self.model = model
        self.config = config
        self.device = device
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(model.parameters(), lr=config.lr, weight_decay=config.weight_decay)
        self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            self.optimizer,
            mode='min',
            factor=0.5,
            patience=12
        )
        
    def train_epoch(self, train_loader, epoch):
        """单epoch训练"""
        self.model.train()
        self.config.seq_len = np.random.randint(32, 64)  # 序列长度随机变化
        total_loss = 0
        
        for batch_idx, (X, y) in enumerate(train_loader):
            X, y = X.to(self.device), y.to(self.device)
            
            self.optimizer.zero_grad()
            output, _ = self.model(X)
            loss = self.criterion(output.reshape(-1, self.config.vocab_size), 
                                 y.reshape(-1))
            loss.backward()
            
            # 梯度裁剪,防止梯度爆炸
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            
            self.optimizer.step()
            total_loss += loss.item()
            
            if batch_idx % 100 == 0:
                print(f'Train Epoch: {epoch} [{batch_idx * len(X)}/{len(train_loader.dataset)} '
                      f'({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')
        
        return total_loss / len(train_loader)

    def evaluate(self, test_loader):
        """评估模型"""
        self.model.eval()
        test_loss = 0
        
        with torch.no_grad():
            for X, y in test_loader:
                X, y = X.to(self.device), y.to(self.device)
                output, _ = self.model(X)
                loss = self.criterion(output.reshape(-1, self.config.vocab_size), 
                                     y.reshape(-1))
                test_loss += loss.item() * X.size(0)
        
        return test_loss / len(test_loader.dataset)

    def train(self, train_loader, test_loader, visualizer=None):
        """完整训练流程"""
        for epoch in range(1, self.config.epochs + 1):
            train_loss = self.train_epoch(train_loader, epoch)
            test_loss = self.evaluate(test_loader)

            self.scheduler.step(test_loss)
            
            print(f'Epoch {epoch}/{self.config.epochs}: '
                  f'Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')
            
            if visualizer:
                visualizer.update_train(epoch, train_loss, 0)  # 准确率设为0,语言建模不计算准确率
                visualizer.update_test(epoch, test_loss, 0)

这块我在多次固定序列长度进行训练效果不理想后考虑了一下随机序列变化,不过看上去似乎也只是抑制了 test_loss 进一步上升,并不能很好的改善模型的泛化能力

固定序列:

随机序列:

前前后后考虑了权重衰减、自适应学习率、正则化、 Dropout 和层归一化,这模型的 loss 始终就是降不下来。说实话孩子没辙了

那么就先这样吧,算算上次更新已经是半个月以前了,这模型一直在调笔记压着没发。明天补补 GRU 和 LSTM吧


GRU(Gate Recurrent Unit)

那么在了解完 RNN 的基础架构后,我们知道基本 RNN 的架构决定了它的一些缺点,比如梯度消失和长程依赖问题,实际上,为了解决这些问题,业界对 RNN 尝试过不少改进,而 LSTM 和 GRU 就是其中的代表。 LSTM 和 GRU 在思想上是一脉相承的,只是 LSTM 的结构会更复杂一些。所有我们先来看 GRU 的组成和架构。

我们可以将 GRU 看作是对基本 RNN 的一次“智能升级”。因为它的核心思想非常直观:既然 RNN 存在长程依赖以及 To remember or not 的问题,那么引入一个由神经网络组成的“门控”机制,来有选择地记住重要的历史信息,并忘记无关的信息是不是就能有效解决这个问题呢。

所以 GRU 的解决方案是引入两个新的单元,我们称之为门控单元:

  • 更新门:决定有多少旧记忆需要被保留到未来
  • 重置门:决定有多少旧记忆需要被用来帮助理解现在

听上去似乎很抽象,但我们接下来来看一下数学表述应该就会很清晰了

运作机制

一个GRU单元在每一个时间步 $t$ 接收两个输入:

  • 当前时间的输入 $x_t$
  • 上一个时间的隐藏状态 $h_{t – 1}$(即“旧记忆”)

它会产生一个新的隐藏状态 $h_{t}$ (即“新记忆”)。

具体来说整个过程为:

1. 计算两个“门”

GRU会同时计算出两个门的数值。它们都是通过当前输入和旧记忆计算出来的,数值范围通过 Sigmoid 函数映射到 0 到 1 之间,代表“打开”的程度。

对于重置门,它的计算公式为:

$$r_t = \sigma(W_{xr} x_t + W_{hr} h_{t-1} + b_r)$$

  • $r_t$ 的值接近 1 表示“重要,需要保留”旧记忆来帮助处理当前输入。
  • $r_t$ 的值接近 0 表示“无关,可以重置”旧记忆,更关注当前输入。

对应更新门的计算方式为:

$$z_t = \sigma(W_{xz} x_t + W_{hz} h_{t-1} + b_z)$$

  • z_t 的值接近 1 表示“几乎完全保留”旧记忆。
  • z_t 的值接近 0 表示“几乎完全忽略”旧记忆,用新信息取而代之。

那么依据 重置门 和 更新门, GRU 可以依靠他们准备生成下一阶段的 “新记忆” ,具体来说:

$$\tilde{h}_t = \tanh(W_{xh} x_t + W_{hh} (r_t \odot h_{t-1}) + b_h)\\
h_t = z_t \odot h_{t-1} + (1 – z_t) \odot \tilde{h}_t$$

这里面有两个关键操作:

  • $r_t ⊙ h_{t-1}$(⊙是逐元素乘法):如果重置门 $r_t$ 接近0,那么旧记忆 $h_{t-1}$ 的影响就会被大幅削弱,候选状态 $\tilde{h}_t$ 将主要基于当前输入 $x_t$。这适用于当前信息与遥远过去无关的场景。
  • 其次是一个加权组合:
    • 如果 $z_t \approx 1$,则 $h_t \approx h_{t-1}$。这意味着直接拷贝旧记忆,忽略当前输入。这完美地实现了长期信息的保留,跨越多个时间步而几乎不衰减,从而解决了梯度消失问题。
    • 如果 $z_t \approx 0$,则 $h_t \approx \tilde{h}_t$。这意味着用候选状态完全更新记忆,更像一个基本RNN。

所以从这个角度出发

重置门控制的是:在理解当前新输入 $x_t$ 时,我们应该在多大程度上“忘记”过去的记忆 $h_{t-1}$

更新门 z_t 决定的是:最终的隐状态 $x_t$ 由多少比例的旧状态 $h_{t-1}$”和多少比例的候选新状态 $\tilde{h}_t$组合而成。

通过引入 重置门 和 更新门 两个机制,GRU 能有效缓解梯度消失。更新门提供了从旧状态 $h_{t-1}$ 到新状态 $h_t$ 的快速路,允许梯度无损地流动,从而能够学习长程依赖

实现

在实现上,我们考虑两个环节:

首先是门控单元 – GRUCell 的实现:

class GRUCell(nn.Module):
    """GRU基础单元"""
    def __init__(self, input_size, hidden_size):
        super(GRUCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # GRU参数
        self.linear_x = nn.Linear(input_size, 3 * hidden_size)
        self.linear_h = nn.Linear(hidden_size, 3 * hidden_size, bias=False)
        
        self._initialize_weights()

    def _initialize_weights(self):
        """权重初始化"""
        nn.init.xavier_uniform_(self.linear_x.weight)
        nn.init.xavier_uniform_(self.linear_h.weight)
        if self.linear_x.bias is not None:
            nn.init.zeros_(self.linear_x.bias)

    def forward(self, x, h_prev):
        """前向传播 - GRU公式实现"""
        # 计算线性变换
        x_linear = self.linear_x(x)      # (batch, 3*hidden)
        h_linear = self.linear_h(h_prev) # (batch, 3*hidden)
        
        # 分割成三个门的分量
        x_r, x_z, x_n = x_linear.chunk(3, dim=1)  # 每块: (batch, hidden)
        h_r, h_z, h_n = h_linear.chunk(3, dim=1)
        
        # GRU核心计算
        reset_gate = torch.sigmoid(x_r + h_r)      # r_t
        update_gate = torch.sigmoid(x_z + h_z)     # z_t
        candidate = torch.tanh(x_n + reset_gate * h_n)  # \tilde{h}_t
        
        # 计算新隐藏状态
        h_t = (1 - update_gate) * candidate + update_gate * h_prev
        
        return h_t

然后将其嵌入 RNN 即可

class GRUModel(nn.Module):
    """GRU模型"""
    def __init__(self, config):
        super(GRUModel, self).__init__()
        self.config = config
        
        # 与RNN相同的组件
        self.embedding = nn.Embedding(config.vocab_size, config.embed_size)
        self.dropout = nn.Dropout(config.dropout_rate) 
        self.output_layer = nn.Linear(config.hidden_size, config.vocab_size)
        
        # GRU特定组件 - 多层GRU
        self.gru_layers = nn.ModuleList()
        for layer in range(config.num_layers):
            input_size = config.embed_size if layer == 0 else config.hidden_size
            self.gru_layers.append(GRUCell(input_size, config.hidden_size))
        
        self._initialize_weights()
        
    def _initialize_weights(self):
        """权重初始化"""
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.constant_(module.bias, 0)
            elif isinstance(module, nn.Embedding):
                nn.init.uniform_(module.weight, -0.1, 0.1)

    def forward(self, x, h=None):
        """前向传播"""
        batch_size, seq_len = x.shape
        
        # 初始化多层隐藏状态
        if h is None:
            h = torch.zeros(self.config.num_layers, batch_size, 
                        self.config.hidden_size, device=x.device)
        
        x_embed = self.embedding(x)
        x_embed = self.dropout(x_embed)
        
        new_h = []  # 存储每层新的隐藏状态
        current_input = x_embed
        
        for layer in range(self.config.num_layers):
            h_layer = h[layer]  
            layer_hidden_seq = []
            
            # 时间步处理
            for t in range(seq_len):
                h_layer = self.gru_layers[layer](current_input[:, t, :], h_layer)
                layer_hidden_seq.append(h_layer)
            
            # 当前层输出作为下一层输入
            current_input = torch.stack(layer_hidden_seq, dim=1)
            
            new_h.append(h_layer)
            
            # 层间dropout
            if layer < self.config.num_layers - 1:
                current_input = self.dropout(current_input)
        
        # 最终输出
        output = self.output_layer(current_input)
        
        new_h = torch.stack(new_h, dim=0)
        return output, new_h

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注