CS144-Lab1-实验笔记

Lab 1


0x00 Overview

checkpoint 0 中使用 Linux 内置的 TCP 协议 通过互联网套接字访问网站和发送邮件,体验了 TCP 提供的可靠、有序的双向字节流服务。但底层网络 (IP) 实际只提供“尽力而为”的数据报服务,即数据包可能丢失、乱序、篡改或重复。接下来的实验任务是:自己实现 TCP 协议,在不可靠的数据报网络之上,实现两台计算机之间的可靠字节流抽象。

本实验要求实现两个跨网络的字节流:

  1. “出站”字节流:本地应用写入套接字,由你的 TCP 发送给对端。
  2. “入站”字节流:从对端接收数据,供本地应用读取。

0x01 Implementation: putting substrings in sequence

TCP 发送方将其字节流分割成较短的段 (每个子串不超过约 1,460 字节),以便每个段都能放入一个数据报中。但网络可能会重新排序这些数据报、丢弃它们或多次传递它们。接收器必须将这些段重新组装成它们最初的连续字节流。

在本实验中,需要编写负责这种重装的数据结构:一个重装器 (Reassembler)。它将接收子串,每个子串包含一串字节,以及该字符串在更大流中的起始字节索引。流的每个字节都有其唯一的索引,从零开始向上计数。一旦重装器知道流的下一个字节,就会将其写入 ByteStream 的写入端——也就是在 checkpoint0 中实现的相同 ByteStream。重装器的“客户”可以从同一 ByteStream 的读取端读取数据。

实验要求实现 Reassembler 类 (在文件 reassembler.hhreassembler.cc 中),其公共接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Insert a new substring to be reassembled into a ByteStream.
void insert( uint64_t first_index, std::string data, bool is_last_substring );

// How many bytes are stored in the Reassembler itself?
// This function is for testing only; don't add extra state to support it.
uint64_t count_bytes_pending() const;

// Access output stream reader
Reader& reader() { return output_.reader(); }
const Reader& reader() const { return output_.reader(); }

// Access output stream writer, but const-only (can't write from outside)
const Writer& writer() const { return output_.writer(); }

插入子串时,遵循的逻辑如下:

我们首先用一个 map 数据结构来记录所有红色的片段 (std::map<std::pair<uint64_t, uint64_t>, std::string> storage_)。在 insert 函数中,实现逻辑如下:

识别 first_unassembled_idxfirst_unacceptable_idx

首先拿到 Writer 变量,first_unassembled_idx 是第一个没有写入 ByteStream 的字节索引,first_unacceptable_idx 是第一个不被接受的字节索引:因此,first_unassembled_idxWriter 已经写入的字节数,而 first_unacceptable_idxWriter 可用容量加上 first_unassembled_idx

1
2
3
4
Writer &writer = output_.writer();

uint64_t first_unassembled_idx = writer.bytes_pushed();
uint64_t first_unacceptable_idx = writer.available_capacity() + first_unassembled_idx;

记录流结束下标

如果 is_last_substring 为真,说明这是流的最后一个子串,我们需要记录流的结束下标 last_index_,它是 first_index + data.size() 和当前 last_index_ 的最小值:

1
2
if (is_last_substring)
last_index_ = std::min(last_index_, first_index + data.size());

丢弃超出容量的数据或空数据

如果 first_index 大于等于 first_unacceptable_idx,说明该子串的起始下标已经超出接收窗口,或者数据为空字符串,我们就直接丢弃该子串。如果已经接收到了流的最后一个子串,并且所有数据都已经写入 ByteStream,则关闭写入端:

1
2
3
4
5
6
if (first_index >= first_unacceptable_idx || data.empty())
{
if (last_index_ != UINT64_MAX && writer.bytes_pushed() == last_index_)
writer.close();
return;
}

截断超出容量的数据和已写入部分

如果 first_index + data.size() 超过了 first_unacceptable_idx,说明该子串的一部分数据超出了接收窗口,我们需要截断该子串,只保留在接收窗口内的数据。如果 first_index + data.size() 小于等于 first_unassembled_idx,说明该子串的所有数据都已经写入 ByteStream,我们直接返回即可。如果 first_index 小于 first_unassembled_idx,说明该子串的前面一部分数据已经写入 ByteStream,我们需要截断该子串,只保留未写入的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 截断超出容量的数据
if (first_index + data.size() > first_unacceptable_idx)
data = data.substr(0, first_unacceptable_idx - first_index);

// 截断已写入部分
if (first_index + data.size() <= first_unassembled_idx)
{
if (last_index_ != UINT64_MAX && writer.bytes_pushed() == last_index_)
writer.close();
return;
}
if (first_index < first_unassembled_idx)
{
data = data.substr(first_unassembled_idx - first_index);
first_index = first_unassembled_idx;
}

合并重叠区间

首先,插入新的子串到 storage_ 中,然后遍历 storage_,检查相邻的区间是否有重叠,如果有重叠则合并它们:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
uint64_t seg_start = first_index;
uint64_t seg_end = first_index + data.size();
storage_.insert({{seg_start, seg_end}, data});

for (auto it = storage_.begin(); it != storage_.end();)
{
auto next_it = next(it);
if (next_it == storage_.end())
break;
uint64_t exist_start = it->first.first, exist_end = it->first.second;
uint64_t next_start = next_it->first.first, next_end = next_it->first.second;
std::string &exist_data = it->second, &next_data = next_it->second;

if (next_it->first.first <= it->first.second)
{
if (next_end > exist_end)
{
exist_data += next_data.substr(exist_end - next_start);
exist_end = next_end;
}
pair<pair<uint64_t, uint64_t>, string> new_data = {{exist_start, exist_end}, exist_data};
storage_.erase(it);
storage_.erase(next_it);
storage_.insert(new_data);
it = storage_.begin();
}
else
it++;
}

尝试写入连续数据

此时,如果有数据可以写入,一定是第一个数据,且起点一定等于 first_unassembled_idx,我们就将这个连续的数据写入 ByteStream

1
2
3
4
5
6
7
if (storage_.empty())
return;
if (storage_.begin()->first.first == first_unassembled_idx)
{
writer.push(storage_.begin()->second);
storage_.erase(storage_.begin());
}

处理流结束

如果已经接收到了流的最后一个子串,并且所有数据都已经写入 ByteStream,则关闭写入端:

1
2
if (last_index_ != UINT64_MAX && writer.bytes_pushed() == last_index_)
writer.close();

计数当前缓存的字节数

记录在重装器中但尚未写入 ByteStream 的字节数,可以通过遍历 storage_ 中的所有区间,计算它们的长度之和来实现:

1
2
3
4
5
6
7
8
9
uint64_t Reassembler::count_bytes_pending() const
{
uint64_t cnt = 0;
for (const auto &kv : storage_)
{
cnt += kv.first.second - kv.first.first;
}
return cnt;
}