{ "cells": [ { "cell_type": "markdown", "id": "be16f748-e12a-44a9-ad2b-81e320efdac4", "metadata": {}, "source": [ "\n", "\n", "\n", "\n", "\n", "
\n", "\n", "Supplementary code for the Build a Large Language Model From Scratch book by Sebastian Raschka
\n", "
Code repository: https://github.com/rasbt/LLMs-from-scratch\n", "
汉化的库: https://github.com/GoatCsu/CN-LLMs-from-scratch.git\n", "
\n", "
\n", "\n", "
\n" ] }, { "cell_type": "markdown", "id": "6f678e62-7bcb-4405-86ae-dce94f494303", "metadata": {}, "source": [ "# 多头注意力在数据载入的运用" ] }, { "cell_type": "code", "execution_count": 1, "id": "ac9b5847-0515-45cd-87b0-46541f6a1f79", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "torch version: 2.2.2\n" ] } ], "source": [ "# NBVAL_IGNORE_OUTPUT\n", "from importlib.metadata import version\n", "\n", "print(\"torch version:\", version(\"torch\"))" ] }, { "cell_type": "markdown", "id": "070000fc-a7b7-4c56-a2c0-a938d413a790", "metadata": {}, "source": [ "完整的章节代码位于[ch03.ipynb](./ch03.ipynb)。\n", "\n", "该笔记本包含了本章的核心内容——多头注意力实现(以及第二章中的数据加载pipeline)。" ] }, { "cell_type": "markdown", "id": "3f60dc93-281d-447e-941f-aede0c7ff7fc", "metadata": {}, "source": [ "## 第二章的数据载入器" ] }, { "cell_type": "code", "execution_count": 2, "id": "0ed4b7db-3b47-4fd3-a4a6-5f4ed5dd166e", "metadata": {}, "outputs": [], "source": [ "class GPTDatasetV1(Dataset):\n", " def __init__(self, txt, tokenizer, max_length, stride):\n", " self.input_ids = [] # 输入ID列表\n", " self.target_ids = [] # 目标ID列表\n", "\n", " # 对整个文本进行分词\n", " token_ids = tokenizer.encode(txt, allowed_special={'<|endoftext|>'})\n", "\n", " # 使用滑动窗口将文本分割成重叠的最大长度序列\n", " for i in range(0, len(token_ids) - max_length, stride):\n", " input_chunk = token_ids[i:i + max_length] # 输入片段\n", " target_chunk = token_ids[i + 1: i + max_length + 1] # 目标片段(右移一个位置)\n", " self.input_ids.append(torch.tensor(input_chunk)) # 将输入片段转换为张量\n", " self.target_ids.append(torch.tensor(target_chunk)) # 将目标片段转换为张量\n", "\n", " def __len__(self):\n", " return len(self.input_ids) # 返回数据集的大小\n", "\n", " def __getitem__(self, idx):\n", " return self.input_ids[idx], self.target_ids[idx] # 获取特定索引的输入和目标\n", "\n", "def create_dataloader(txt, batch_size=4, max_length=256, stride=128, shuffle=True):\n", " # 初始化分词器\n", " tokenizer = tiktoken.get_encoding(\"gpt2\")\n", "\n", " # 创建数据集\n", " dataset = GPTDatasetV1(txt, tokenizer, max_length, stride)\n", "\n", " # 创建数据加载器\n", " dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)\n", "\n", " return dataloader # 返回数据加载器\n", "\n", "\n", "with open(\"small-text-sample.txt\", \"r\", encoding=\"utf-8\") as f:\n", " raw_text = f.read() # 读取文本文件\n", "\n", "tokenizer = tiktoken.get_encoding(\"gpt2\") # 初始化分词器\n", "encoded_text = tokenizer.encode(raw_text) # 对文本进行编码\n", "\n", "vocab_size = 50257 # 词汇表大小\n", "output_dim = 256 # 输出维度\n", "max_len = 1024 # 最大序列长度\n", "context_length = max_len # 上下文长度\n", "\n", "\n", "token_embedding_layer = nn.Embedding(vocab_size, output_dim) # 创建词嵌入层\n", "pos_embedding_layer = torch.nn.Embedding(context_length, output_dim) # 创建位置嵌入层\n", "\n", "max_length = 4 # 每个输入片段的最大长度\n", "dataloader = create_dataloader(raw_text, batch_size=8, max_length=max_length, stride=max_length) # 创建数据加载器" ] }, { "cell_type": "code", "execution_count": 3, "id": "664397bc-6daa-4b88-90aa-e8fc1fbd5846", "metadata": {}, "outputs": [], "source": [ "for batch in dataloader:\n", " x, y = batch\n", "\n", " token_embeddings = token_embedding_layer(x)\n", " pos_embeddings = pos_embedding_layer(torch.arange(max_length))\n", "\n", " input_embeddings = token_embeddings + pos_embeddings\n", "\n", " break" ] }, { "cell_type": "code", "execution_count": 4, "id": "d3664332-e6bb-447e-8b96-203aafde8b24", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "torch.Size([8, 4, 256])\n" ] } ], "source": [ "print(input_embeddings.shape)" ] }, { "cell_type": "markdown", "id": "bd298bf4-e320-40c1-9084-6526d07e6d5d", "metadata": {}, "source": [ "# 第三章的多头注意力" ] }, { "cell_type": "markdown", "id": "58b2297b-a001-49fd-994c-b1700866cd01", "metadata": {}, "source": [ "## 一种变体" ] }, { "cell_type": "code", "execution_count": 5, "id": "a44e682d-1c3c-445d-85fa-b142f89f8503", "metadata": {}, "outputs": [], "source": [ "class CausalSelfAttention(nn.Module):\n", " \"\"\"\n", " 该类实现了因果自注意力机制(Causal Self Attention),\n", " 用于自回归模型(例如GPT模型中的注意力层)。\n", " \"\"\"\n", "\n", " def __init__(self, d_in, d_out, context_length, dropout, qkv_bias=False):\n", " \"\"\"\n", " 初始化因果自注意力层。\n", " \n", " 参数:\n", " - d_in: 输入维度\n", " - d_out: 输出维度\n", " - context_length: 上下文长度(即注意力机制能“看到”的最大令牌数)\n", " - dropout: Dropout率\n", " - qkv_bias: 是否为查询、键和值使用偏置(默认为False)\n", " \"\"\"\n", " super().__init__()\n", " self.d_out = d_out\n", " # 定义查询、键、值的线性变换\n", " self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)\n", " self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)\n", " self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)\n", " \n", " # Dropout层\n", " self.dropout = nn.Dropout(dropout) # 新增的Dropout层\n", "\n", " # 注册一个buffer,用于存储因果掩码\n", " self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1)) # 新增掩码,禁止未来的信息\n", "\n", " def forward(self, x):\n", " \"\"\"\n", " 前向传播函数计算因果自注意力输出。\n", " \n", " 参数:\n", " - x: 输入的张量,形状为 (batch_size, num_tokens, d_in)\n", " \n", " 返回:\n", " - context_vec: 自注意力机制的输出,形状为 (batch_size, num_tokens, d_out)\n", " \"\"\"\n", " b, n_tokens, d_in = x.shape # 获取输入张量的维度\n", " keys = self.W_key(x) # 键(K)\n", " queries = self.W_query(x) # 查询(Q)\n", " values = self.W_value(x) # 值(V)\n", "\n", " # 计算注意力分数(查询和键的点积)\n", " attn_scores = queries @ keys.transpose(1, 2) # 这里的转置(transpose)是为了匹配维度\n", " \n", " # 使用掩码阻止未来的tokens看到当前token\n", " attn_scores.masked_fill_( # 这里的操作是原地修改\n", " self.mask.bool()[:n_tokens, :n_tokens], -torch.inf) # 将掩码区域填充为负无穷\n", "\n", " # 计算注意力权重并进行softmax归一化\n", " attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)\n", " attn_weights = self.dropout(attn_weights) # 使用Dropout层\n", "\n", " # 计算上下文向量(加权和)\n", " context_vec = attn_weights @ values\n", " return context_vec\n", "\n", "\n", "class MultiHeadAttentionWrapper(nn.Module):\n", " \"\"\"\n", " 该类实现了多头注意力机制(Multi-Head Attention)包装器。\n", " 它包含多个因果自注意力头,并在输出时将多个头的结果合并。\n", " \"\"\"\n", "\n", " def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):\n", " \"\"\"\n", " 初始化多头注意力层。\n", " \n", " 参数:\n", " - d_in: 输入维度\n", " - d_out: 输出维度\n", " - context_length: 上下文长度\n", " - dropout: Dropout率\n", " - num_heads: 注意力头的数量\n", " - qkv_bias: 是否为查询、键和值使用偏置(默认为False)\n", " \"\"\"\n", " super().__init__()\n", " # 定义多个因果自注意力头\n", " self.heads = nn.ModuleList(\n", " [CausalSelfAttention(d_in, d_out, context_length, dropout, qkv_bias) \n", " for _ in range(num_heads)] # 为每个头创建一个CausalSelfAttention实例\n", " )\n", " \n", " # 定义最终的线性变换,用于将多个头的输出合并\n", " self.out_proj = nn.Linear(d_out * num_heads, d_out * num_heads)\n", "\n", " def forward(self, x):\n", " \"\"\"\n", " 前向传播函数,计算多头注意力输出。\n", " \n", " 参数:\n", " - x: 输入的张量,形状为 (batch_size, num_tokens, d_in)\n", " \n", " 返回:\n", " - out: 多头注意力的输出,形状为 (batch_size, num_tokens, d_out * num_heads)\n", " \"\"\"\n", " # 将多个头的输出拼接在一起\n", " context_vec = torch.cat([head(x) for head in self.heads], dim=-1)\n", " \n", " # 通过线性变换得到最终输出\n", " return self.out_proj(context_vec)" ] }, { "cell_type": "code", "execution_count": 6, "id": "7898551e-f582-48ac-9f66-3632abe2a93f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "context_vecs.shape: torch.Size([8, 4, 256])\n" ] } ], "source": [ "torch.manual_seed(123)\n", "\n", "context_length = max_length\n", "d_in = output_dim\n", "\n", "num_heads=2\n", "d_out = d_in // num_heads\n", "\n", "mha = MultiHeadAttentionWrapper(d_in, d_out, context_length, 0.0, num_heads)\n", "\n", "batch = input_embeddings\n", "context_vecs = mha(batch)\n", "\n", "print(\"context_vecs.shape:\", context_vecs.shape)" ] }, { "cell_type": "markdown", "id": "1e288239-5146-424d-97fe-74024ae711b9", "metadata": {}, "source": [ "## 另一种变体" ] }, { "cell_type": "code", "execution_count": 7, "id": "2773c09d-c136-4372-a2be-04b58d292842", "metadata": {}, "outputs": [], "source": [ "class MultiHeadAttention(nn.Module):\n", " \"\"\"\n", " 该类实现了多头自注意力机制(Multi-Head Attention),\n", " 用于自回归模型(如Transformer和GPT中的注意力层)。\n", " \"\"\"\n", "\n", " def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):\n", " \"\"\"\n", " 初始化多头自注意力层。\n", " \n", " 参数:\n", " - d_in: 输入维度\n", " - d_out: 输出维度\n", " - context_length: 上下文长度(即注意力机制能“看到”的最大令牌数)\n", " - dropout: Dropout率\n", " - num_heads: 注意力头的数量\n", " - qkv_bias: 是否为查询、键和值使用偏置(默认为False)\n", " \"\"\"\n", " super().__init__()\n", "\n", " # 检查输出维度是否能被头数整除\n", " assert d_out % num_heads == 0, \"d_out must be divisible by num_heads\"\n", "\n", " self.d_out = d_out\n", " self.num_heads = num_heads\n", " self.head_dim = d_out // num_heads # 将输出维度除以头数,得到每个头的维度\n", "\n", " # 定义查询、键、值的线性变换\n", " self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)\n", " self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)\n", " self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)\n", "\n", " # 定义输出的线性变换层,用于合并多个头的输出\n", " self.out_proj = nn.Linear(d_out, d_out)\n", " self.dropout = nn.Dropout(dropout)\n", "\n", " # 注册一个buffer,用于存储因果掩码\n", " self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1)) # 新增掩码,禁止未来的信息\n", "\n", " def forward(self, x):\n", " \"\"\"\n", " 前向传播函数,计算多头自注意力输出。\n", " \n", " 参数:\n", " - x: 输入张量,形状为 (batch_size, num_tokens, d_in)\n", " \n", " 返回:\n", " - context_vec: 多头自注意力的输出,形状为 (batch_size, num_tokens, d_out)\n", " \"\"\"\n", " b, num_tokens, d_in = x.shape # 获取输入张量的维度\n", "\n", " # 计算键、查询和值的表示\n", " keys = self.W_key(x) # 键(K)\n", " queries = self.W_query(x) # 查询(Q)\n", " values = self.W_value(x) # 值(V)\n", "\n", " # 将最后的维度按头数进行拆分:\n", " # 将 (b, num_tokens, d_out) 转换为 (b, num_tokens, num_heads, head_dim)\n", " keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) \n", " values = values.view(b, num_tokens, self.num_heads, self.head_dim)\n", " queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)\n", "\n", " # 转置以适配矩阵相乘:\n", " # (b, num_tokens, num_heads, head_dim) -> (b, num_heads, num_tokens, head_dim)\n", " keys = keys.transpose(1, 2)\n", " queries = queries.transpose(1, 2)\n", " values = values.transpose(1, 2)\n", "\n", " # 计算缩放点积注意力(self-attention)\n", " attn_scores = queries @ keys.transpose(2, 3) # 点积计算每个头的注意力分数\n", "\n", " # 通过掩码将未来的信息遮掩(变成负无穷)\n", " mask_bool = self.mask.bool()[:num_tokens, :num_tokens] # 将掩码转换为布尔类型\n", " attn_scores.masked_fill_(mask_bool, -torch.inf) # 使用掩码将未来的信息填充为负无穷\n", "\n", " # 计算注意力权重并进行softmax归一化\n", " attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)\n", " attn_weights = self.dropout(attn_weights) # 使用Dropout层\n", "\n", " # 计算上下文向量(加权和)\n", " context_vec = (attn_weights @ values).transpose(1, 2) # 恢复维度 (b, num_tokens, num_heads, head_dim)\n", "\n", " # 合并头部的输出,并进行线性变换\n", " context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out) # 合并头的输出\n", " context_vec = self.out_proj(context_vec) # 可选的投影层\n", "\n", " return context_vec" ] }, { "cell_type": "code", "execution_count": 8, "id": "779fdd04-0152-4308-af08-840800a7f395", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "context_vecs.shape: torch.Size([8, 4, 256])\n" ] } ], "source": [ "torch.manual_seed(123)\n", "\n", "context_length = max_length\n", "d_in = output_dim\n", "d_out = d_in\n", "\n", "mha = MultiHeadAttention(d_in, d_out, context_length, 0.0, num_heads=2)\n", "\n", "batch = input_embeddings\n", "context_vecs = mha(batch)\n", "\n", "print(\"context_vecs.shape:\", context_vecs.shape)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.6" } }, "nbformat": 4, "nbformat_minor": 5 }