0%

RDT base on UDP

Note

项目代码和测试代码和结果在GitHub上:Chen-Jin-yuan/RDT-base-on-UDP (github.com)

实现的功能如下:

  • 重新设计的报文头部;
  • 两次握手(only 1 RTT);
  • 基于ID标识(参考Quic协议);
  • 乱序确认,number递增(参考Quic协议);
  • 超时重传,基于number采样rtt(参考Quic协议);
  • 使用offset标识流而非number(参考Quic协议);
  • 基于maxoffset进行流量控制(参考Quic协议);
  • 基于心跳检测确认双方存活与退出。

Quic:RFC 9000 - QUIC: A UDP-Based Multiplexed and Secure Transport (ietf.org)

在不同内网的P2P通信可以达到2MB/s的速度,如果UDPSEGSIZE设置得大即一次发送的报文很大,可以达到10MB/s,但在高丢包率情况下会有意外情况。如果在相同内网中可以设置大一些。

因为平时比较忙,项目设计时分了好几段时间来做,所以有些地方思路会断开,代码也会有不足的地方,欢迎评论。

报文

报文生成(编辑器)

读入的文件数据buffer添加头部信息,形成新的buffer。

报文结构

最好32bits对齐

  • 编号,递增
  • 报文内容长度,字节数,一个报文大小最好不要超过MTU(1500Bytes)
  • 使用offset标识数据流
    • 这个offset是字节尾部还是头部合适呢?这涉及到两种排序的方案
      • 使用头部的话,因为头部+len就是偏移的尾部,也即下一个偏移的初始。这样我们在写入文件时要找连续的offset,头部+len就能得到下一个包的头部offset,这样可以用哈希表来找(哈希offest->报文结构)
      • 如果只有尾部的话实际上只能排序,然后判断下一个是不是连起来的(offset-len是否等于当前的offset),这样就用链表,每次放入(直接放入buffer)都是排序插入链表。
  • ID:这个ID是目标的ID
  • tag:包含ack、syn、fin、rst四位。bitset:C++ bitset类详解 (biancheng.net)
  • 接收方窗口大小
  • 暂存…
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
class MyUdpSeg
{
public:
using num_type = unsigned int; //32bits
using off_type = size_t; //64bits or 32bits
using win_type = size_t; //64bits or 32bits
using len_type = unsigned short; //16bits
using id_type = unsigned short; //16bits
using tag_type = bitset<8>; //8bits,?|?|?|?|ack|syn|fin|rst
static int maxHeadSize;

private: //data members
num_type number;
off_type offset;
len_type length;
win_type window;
id_type id;
tag_type tag;
//save...
string data;

public:
//Constructor 1: Construct class objects based on data
MyUdpSeg(const char* buf, num_type Number, off_type Offset,
len_type Length, tag_type Tag, win_type Window = 0, id_type Id = 0);

//Constructor 2: Construct class objects based on the complete UDP packet segment(string)
MyUdpSeg(string& udpSeg);

//Constructor 3: Retransmit, adjust the number
MyUdpSeg(MyUdpSeg& udpSeg, num_type Number);

//Copy constructor
MyUdpSeg(const MyUdpSeg& udpSeg);
//Move constructor
MyUdpSeg(MyUdpSeg&& udpSeg) noexcept;
//operator=, copy-swap
MyUdpSeg& operator=(MyUdpSeg udpSeg);

private:
//parse the string
vector<string> parse(string& str);
//swap for copy-swap
void swap(MyUdpSeg& udpSeg);

public:
//convert to string to send data
string seg_to_string();

//return a default obj
static MyUdpSeg initSeg();

//read members only
num_type getNumber() { return number; }
off_type getOffset() { return offset; }
len_type getLength() { return length; }
win_type getWindow() { return window; }
id_type getId() { return id; }
bool getTag(size_t i) { return tag[i]; }
string& getData() { return data; }
};

/*
* The headers are converted to a string to send
* The maximum number of digits in 32-bit decimal is 10 digits: 4,294,967,296
* The maximum number of digits in 64-bit decimal is 20 digits: 18,446,744,073,709,551,616
* The maximum number of digits in 16-bit decimal is 5 digits: 65,536
* There are 8 bits of identification
* There are also 6 spaces
*/
int MyUdpSeg::maxHeadSize = 10 + 20 + 20 + 5 + 5 + 8 + 6;

//Constructor 1: Construct class objects based on data
MyUdpSeg::MyUdpSeg(const char* buf, num_type Number, off_type Offset,
len_type Length, tag_type Tag, win_type Window, id_type Id) :
data(buf), number(Number), offset(Offset), window(Window),
length(Length), id(Id), tag(Tag)
{}

//Constructor 2: Construct class objects based on the complete UDP packet segment(string)
MyUdpSeg::MyUdpSeg(string& udpSeg)
{
vector<string> vec = parse(udpSeg);
// error packet segment
if (vec.size() < 6)
{
length = 0; //Indicates that this is a useless package
return;
}
/*
* have not string to unsigned int or size_t function
* but string to unsigned long is satisfiable
* just make sure it doesn't overflow
* use forced transformation to ignore warnings
*/
number = num_type(stoul(vec[0]));
offset = off_type(stoul(vec[1]));
window = win_type(stoul(vec[2]));
length = len_type(stoul(vec[3]));
id = id_type(stoul(vec[4]));
tag = tag_type(vec[5]);
if (vec.size() > 6) //if arry data. ACK may not carry data
data = vec[6];
}

//Constructor 3: Retransmit, adjust the number
MyUdpSeg::MyUdpSeg(MyUdpSeg& udpSeg, num_type Number) :
data(udpSeg.data), number(Number), offset(udpSeg.offset), window(udpSeg.window),
length(udpSeg.length), id(udpSeg.id), tag(udpSeg.tag)
{}

//Copy constructor
MyUdpSeg::MyUdpSeg(const MyUdpSeg& udpSeg) :
data(udpSeg.data), number(udpSeg.number), offset(udpSeg.offset), window(udpSeg.window),
length(udpSeg.length), id(udpSeg.id), tag(udpSeg.tag)
{}

//Move constructor
MyUdpSeg::MyUdpSeg(MyUdpSeg&& udpSeg) noexcept :
data(udpSeg.data), number(udpSeg.number), offset(udpSeg.offset), window(udpSeg.window),
length(udpSeg.length), id(udpSeg.id), tag(udpSeg.tag)
{}

//operator=, copy-swap
MyUdpSeg& MyUdpSeg::operator=(MyUdpSeg udpSeg)
{
swap(udpSeg);
return *this;
}
//swap for copy-swap
void MyUdpSeg::swap(MyUdpSeg& udpSeg)
{
using std::swap;
swap(this->number, udpSeg.number);
swap(this->offset, udpSeg.offset);
swap(this->length, udpSeg.length);
swap(this->window, udpSeg.window);
swap(this->id, udpSeg.id);
swap(this->tag, udpSeg.tag);
swap(this->data, udpSeg.data);
}

//parse the string
vector<string> MyUdpSeg::parse(string& str)
{
//Note that there are also spaces in the data, so parse up to six times
//Data should not be sliced

int spaceNum = 6;

str = str + " "; //add an space
vector<string> res;
size_t pos = 0;
size_t pos1;
while ((pos1 = str.find(' ', pos)) != string::npos)
{
if (spaceNum-- == 0)
break;

res.push_back(str.substr(pos, pos1 - pos));
while (str[pos1] == ' ')
pos1++;
pos = pos1;
}
//Get complete data
string data = str.substr(pos);
if (data != "")
res.push_back(data);

return res; //move construction
}

//convert to string to send data
string MyUdpSeg::seg_to_string()
{
string res;
res += to_string(number) + " ";
res += to_string(offset) + " ";
res += to_string(window) + " ";
res += to_string(length) + " ";
res += to_string(id) + " ";
res += tag.to_string() + " "; //std::bitset::to_string()
res += data;
return res;
}

MyUdpSeg MyUdpSeg::initSeg()
{
return MyUdpSeg("", 0, 0, 0, MyUdpSeg::tag_type(), 0, 0);
}

ACK

返回ACK报文,并通告可用窗口边界

  • ACK会告知接收方的可用窗口边界,如果有ACK通告更小的边界,发送方忽略它们
1
2
3
4
5
6
7
MyUdpSeg sendAck(MyUdpSeg::num_type number, MyUdpSeg::off_type offset, MyUdpSeg::win_type window,
MyUdpSeg::id_type id)
{
MyUdpSeg::tag_type tag;
tag.set(3, 1); //set ack from 0 to 1
return MyUdpSeg("", number, offset, 0, tag, window, id);
}

定时器

这部分主要考虑定时怎么设计,即如何得知超时。

简单的想法是,用链表维护定时器。对于超时的定时器,我们在定时器维护一个编号,告知发送窗口设置重发即可。

对于排序,有两种手段,一种是在插入节点时候按时间从小到大排序,这样检测就能从头检测知道第一个没超时就停下;一种是直接插入,检测超时是遍历所有节点。

显然在这个场景下,插入是频繁的,可能插入很多个节点才检测一次超时,因此使用直接插入要好得多。


定时器节点内容是:

  • 包的编号
  • 记录的时间信息
  • offset(补充)
  • 是否重传过(补充)

整体用一个STL list,可以用find函数(注意list本身没有find函数),list的remove必须是节点相等,则可以用remove_if;这时可以用algorithm头文件的remove函数,使用重载==,但是时间是O(n)。对于timer来说,编号不可能重复,因此用find()+list.erase()即可。

收到ACK时要删除定时器中的对应的包的节点,采样RTT就在此时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//timer node
struct timerNode
{
MyUdpSeg::num_type number; //segment number
chrono::system_clock::time_point time; //start time
timerNode(MyUdpSeg::num_type Number):number(Number),time(chrono::system_clock::now()){}
bool operator==(MyUdpSeg::num_type Number) { return number == Number; } //for find() function or remove()
};

//use example
using timerIter = list<timerNode>::iterator;
list<timerNode> timerList;
for (MyUdpSeg::num_type i = 0; i < 20; i++)
timerList.push_back(timerNode(i));
timerIter iter = find(timerList.begin(),timerList.end(),19); //O(n)
chrono::system_clock::time_point nowtime = chrono::system_clock::now();
cout << (*iter).number << endl;
cout << chrono::duration_cast<chrono::milliseconds>(nowtime - (*iter).time).count() << endl;
timerList.erase(iter); //O(1)
iter = find(timerList.begin(), timerList.end(), 19);
if (iter == timerList.end())
cout << "remove 19" << endl;
remove(timerList.begin(), timerList.end(), 1);
iter = find(timerList.begin(), timerList.end(), 1);
if (iter == timerList.end())
cout << "remove 1" << endl;

为了支持插入的时候排序以及其他功能,设计一个类封装。(发现using node_type = struct timerNode有问题,要去掉struct)

  • 允许插入节点
  • 处理超时事件,遍历链表,找出每个超时的节点
    • 对于这些节点,需要重传,因此要返回所有结点的包的编号;重传的包的节点插入交给上层
    • 尽管这些节点需要重传,但是还是需要保留的,因为原始包超时后重传了,但可能原始包的ack随后就到了,这时要用原始包来采样RTT。而对于重传的包的定时器就可以根据offset删除了,因为不需要再处理超时任务了,同时也不采样这些RTT。
    • 所以还要根据offset删除所有节点,这就要加一个offset
  • 当收到一个ack时,根据number删除节点,并返回计算的RTT;然后这个number可能有重传的包的定时器,根据offset一并删了,因为这个数据流offset已经不被需要了。
  • 还要注意,如果一个包重传了而不更改记录的时间(为了计算RTT)的话,那么下次检测这个节点肯定会再被重传,因为包的编号不一样的,可能会重传两个甚至更多相同的数据包;因此要加一个flag,一旦检测出一次超时了,下次就不会再报超时事件了,留下的作用是采样RTT。

重新设计和封装如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
struct timerNode
{
MyUdpSeg::num_type number; //segment number
MyUdpSeg::off_type offset; //data offset
chrono::system_clock::time_point time; //start time
bool timeout; //timeout flag
timerNode(MyUdpSeg::num_type Number, MyUdpSeg::off_type Offset) :
number(Number), offset(Offset),time(chrono::system_clock::now()), timeout(false) {}
timerNode(const timerNode& node) :
number(node.number), offset(node.offset),time(node.time), timeout(false) {}
bool operator==(MyUdpSeg::num_type Number) { return number == Number; } //for find() function or remove()
};


class TimerList
{
public:
using node_type = timerNode;
using rtt_type = unsigned int;
using rto_type = double;
using timerIter = list<node_type>::iterator;
private:
list<node_type> timerList;
public:
TimerList(){}
~TimerList(){}
//insert by node
void insertTimer(const node_type& node);
//insert by number
void insertTimer(MyUdpSeg::num_type Number, MyUdpSeg::off_type Offset);

//delete by number, return RTT/ms
rtt_type deleteTimer(MyUdpSeg::num_type Number);

//deal with timeout packets, return numbers
vector<MyUdpSeg::num_type> tick(rto_type RTO);

//return size
size_t size() { return timerList.size(); }
private:
//delete all by offset, call by "rtt_type deleteTimer(MyUdpSeg::num_type Number);"
void deleteTimer_(MyUdpSeg::off_type Offset);
};

void TimerList::insertTimer(const node_type& node)
{
timerList.push_back(node); // call move or copy constructor function, not need to construct
}
void TimerList::insertTimer(MyUdpSeg::num_type Number, MyUdpSeg::off_type Offset)
{
timerList.emplace_back(Number, Offset); //call constructor function
}

TimerList::rtt_type TimerList::deleteTimer(MyUdpSeg::num_type Number)
{
//find the node
timerIter iter = find(timerList.begin(), timerList.end(), Number); //O(n)
if (iter == timerList.end()) //not found
return 0; //zero used to error detect

//sample RTT
chrono::system_clock::time_point nowtime = chrono::system_clock::now();
rtt_type RTT = rtt_type(chrono::duration_cast<chrono::milliseconds>(nowtime - (*iter).time).count());

//get offset
MyUdpSeg::off_type offset = (*iter).offset;

//delete
timerList.erase(iter); //O(1)

//delete node with the same offset
deleteTimer_(offset);

return RTT;
}

void TimerList::deleteTimer_(MyUdpSeg::off_type Offset)
{
for (timerIter iter = timerList.begin(); iter != timerList.end();)
{
if ((*iter).offset == Offset)
iter = timerList.erase(iter); //erase return next iterator
else
iter++;
}
}

vector<MyUdpSeg::num_type> TimerList::tick(rto_type RTO)
{
if (RTO <= 0) return{};

vector<MyUdpSeg::num_type> number_retrans;
for (timerIter iter = timerList.begin(); iter != timerList.end(); iter++)
{
if ((*iter).timeout == true)
continue;
else
{
chrono::system_clock::time_point nowtime = chrono::system_clock::now();
rtt_type interval = rtt_type(chrono::duration_cast<chrono::milliseconds>(nowtime - (*iter).time).count());
if (interval > RTO) //need to retransmit
{
number_retrans.push_back((*iter).number);
(*iter).timeout = true;
}
}
}
return number_retrans;
}

缓冲(窗口)

用于建立接收窗口和发送窗口,并维护它们的发送与接收

TCP的缓冲区是一个双向链表,所以这里也使用双向链表,这插入和删除上还是很方便的。

对于缓冲区节点,收到一个ack时,可以知道编号,删除该节点和定时器并计算RTT;问题是可能有重传的包,编号是不一样的,此时用offset找所有重传的包,找到就删了,因为这个包是否收到已经不重要了(RTT也不采样这个)。

而节点只能重载一个operator==(因为offset和number参数类型可能是一样的,重载失败),前面要find编号,可以重载编号。然后因为要根据offset删所有重传的包,这时就要多实现一个函数,然后用remove_if了。这个函数在remove_if内传参是传入节点,我们还要一个参数offset,所以要用函数对象bind。(当然也可以遍历链表自己删,定时器用的是自己遍历的)

1
auto bindFunc1 = bind(lambda,std::placeholders::_1,std::placeholders::_2);

对于发送和接收窗口来说,通用的设计如下:

链表的设计是:STL list。

节点的设计是:

  • 有一个报文对象MyUdpSeg
  • flag标识这个节点发过了没

类的设计是:

  • 有添加和删除节点的功能
  • 有查找节点并返回其引用的功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//buffer node
struct bufferNode
{
MyUdpSeg udpSeg; //segment object
bool isSent;

//Constructor 1: construct by lvalue
bufferNode(MyUdpSeg& UdpSeg) :udpSeg(UdpSeg), isSent(false) {}
//Constructor 2: construct by rvalue
bufferNode(MyUdpSeg&& UdpSeg) noexcept :udpSeg(UdpSeg), isSent(false) {}
//Constructor 3: Retransmit, adjust the number
bufferNode(MyUdpSeg& UdpSeg, MyUdpSeg::num_type Number) :udpSeg(UdpSeg, Number), isSent(false) {}

//copy constructor
bufferNode(const bufferNode& node):udpSeg(node.udpSeg),isSent(node.isSent){}

bool operator==(MyUdpSeg::num_type Number) { return udpSeg.getNumber() == Number; } //for find() function
};
1
2
3
4
5
6
7
8
9
10
11
//lambda for remove_if
auto pred_lambda = [](bufferNode& bufNode, MyUdpSeg::off_type offset) -> bool // "-> bool" is omittable
{
return bufNode.udpSeg.getOffset() == offset;
};
auto pred = bind(pred_lambda,std::placeholders::_1, myOffset);

//the simplified form is as follows
auto pred = bind([](bufferNode& bufNode, MyUdpSeg::off_type offset) { //lambda
return bufNode.udpSeg.getOffset() == offset; },
placeholders::_1, myOffset); //parameters

类的设计如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
//buffer node
struct bufferNode
{
MyUdpSeg udpSeg; //segment object
bool isSent;

//Constructor 1: construct by lvalue
bufferNode(MyUdpSeg& UdpSeg) :udpSeg(UdpSeg), isSent(false) {}
//Constructor 2: construct by rvalue
bufferNode(MyUdpSeg&& UdpSeg) noexcept :udpSeg(UdpSeg), isSent(false) {}
//Constructor 3: Retransmit, adjust the number
bufferNode(MyUdpSeg& UdpSeg, MyUdpSeg::num_type Number) :udpSeg(UdpSeg, Number), isSent(false) {}

//copy constructor
bufferNode(const bufferNode& node):udpSeg(node.udpSeg),isSent(node.isSent){}

bool operator==(MyUdpSeg::num_type Number) { return udpSeg.getNumber() == Number; } //for find() function
};



class BufferList
{
public:
using node_type = bufferNode;
using bufferIter = list<node_type>::iterator;
using node_return_type = pair<node_type&, bool>;
private:
list<node_type> bufferList;
public:
BufferList(){}
~BufferList(){}

//insert node
void insertNode(MyUdpSeg& UdpSeg);
void insertNode(MyUdpSeg&& UdpSeg);
void insertNode(MyUdpSeg& UdpSeg, MyUdpSeg::num_type Number);
void sortInsertNode(MyUdpSeg& UdpSeg); //insert sorted by offset(used by recvbuf)

//delete by number
void deleteNode(MyUdpSeg::num_type Number);

//get node by number, return pair: reference、bool (to check node)
node_return_type getNode(MyUdpSeg::num_type Number);
/*
* Note for getNode function:
* For each return value, initialize it with a new variable
* Do not assign a value to pair again after initialization, it will act on the initialized node
*/

//return size
size_t size() { return bufferList.size(); }

bufferIter begin() { return bufferList.begin(); }
bufferIter end() { return bufferList.end(); }
void deleteFront() { bufferList.erase(begin()); }

//return a default node to ues getNode function
static node_type initNode();

private:
void deleteNode_(MyUdpSeg::off_type Offset);
};

void BufferList::insertNode(MyUdpSeg& UdpSeg)
{
bufferList.emplace_back(UdpSeg);
}
void BufferList::insertNode(MyUdpSeg&& UdpSeg)
{
bufferList.emplace_back(move(UdpSeg));
}
void BufferList::insertNode(MyUdpSeg& UdpSeg, MyUdpSeg::num_type Number)
{
bufferList.emplace_back(UdpSeg, Number);
}
void BufferList::sortInsertNode(MyUdpSeg& UdpSeg)
{
MyUdpSeg::off_type offset = UdpSeg.getOffset();
bufferIter iter = bufferList.begin();
for (; iter != bufferList.end(); iter++)
{
if ((*iter).udpSeg.getOffset() < offset)
continue;
else if ((*iter).udpSeg.getOffset() == offset) //A package with the same data
return;
else
break;
}
bufferList.insert(iter,UdpSeg);

}
void BufferList::deleteNode(MyUdpSeg::num_type Number)
{
//find the node
bufferIter iter = find(bufferList.begin(), bufferList.end(), Number); //O(n)
if (iter == bufferList.end()) //not found
{
cout << "no delete" << endl;
return; //do nothing
}


//get offset
MyUdpSeg::off_type offset = (*iter).udpSeg.getOffset();

//remove
bufferList.erase(iter); //O(1)

//remove all nodes with the same offset
deleteNode_(offset);

}
void BufferList::deleteNode_(MyUdpSeg::off_type Offset)
{
bufferList.remove_if(bind([](bufferNode& bufNode, MyUdpSeg::off_type offset) {
return bufNode.udpSeg.getOffset() == offset; },
placeholders::_1, Offset));

}
BufferList::node_return_type BufferList::getNode(MyUdpSeg::num_type Number)
{

//find the node
bufferIter iter = find(bufferList.begin(), bufferList.end(), Number); //O(n)
if (iter == bufferList.end()) //not found
{
node_type node = initNode(); //must check bool value firstly, node may be inexistent
return node_return_type(node, false);
}

return node_return_type(*iter,true);
}

BufferList::node_type BufferList::initNode()
{
return node_type(MyUdpSeg::initSeg());
}

发送窗口

功能

  • 发送窗口里有缓冲链表对象和定时器链表对象
  • 加载数据
    • 将传入的数据封装成节点放到缓冲链表里(支持普通发送)
    • 读取文件流,如果直到把缓冲链表对象读满(支持文件发送)
  • 发送数据,标记为已发送,并注册定时器;最后一个数据进行标记,发送结束
  • 根据定时器tick后需要重发的包编号向量,重新插入新的节点数据。
  • 收到ack,删除节点(及其重传的节点)
  • 握手,syn;发送完毕是fin

能发一条消息也能发文件,因此要解耦。对于加载数据来说用一个goOn来指示,文件读完就goOn为false否则为true即需要继续发;对于自行调用发送信息也是如此,指示即可;默认为false。

变量

  • 递增的number序列

  • 数据偏移量offset

  • window的大小,窗口能发送的字节数,窗口更新:

    • 因为是最多能发送的字节数,所以包括重传在内的所有包都不能太多,因此bufferList.size()* SegDataSize < window才允许继续发;
    • 更新时最开始想使用window = MyUdpSeg::win_type(offset) + Window,Window是接收窗口传来的还能接收的字节数recvWindow-maxOffset,用这个来更新是因为当一切正常时,offset和maxOffset是相等的,这个更新符合直觉。但是这样更新的话,window只会越来越大,肯定是错误的。
    • 后来打算用window = MyUdpSeg::win_type(bufferList.size() * SegDataSize) + Window,因为结合前面的判断,能新发送的字节数就是Window这么多,也符合直觉。但这可能导致重传的包很多也能继续发,导致越来越堵。
    • 因此再定义一个maxWindow,这个和接收窗口的大小一样,window = min(updateWindow, maxWindow),即如果已经很满了就不增大了。
  • 动态更新的RTO

接收窗口

功能

  • 接收窗口有缓冲链表对象
  • 接收数据
    • 把收到的一个数据包放入缓冲链表,注意插入链表要按offset排序插入
    • 发送ack和当前的剩余窗口
  • 写入数据,检测到一连串的offset超过阈值就写入;
    • 写入后更新窗口
    • 如果最后一个数据包里有结束标记,就通知关闭窗口,然后进行关闭挥手
  • 握手,syn+ack;接收完毕是fin+ack

变量

  • 最大offset偏移
  • window的大小,最大能接收的字节数

退出(基于心跳检测)

如何沟通退出呢,当接收会话得知自己接收完了会返回true,如果此时就退出了,那么发送会话可能因为ack丢了而无法退出也无法检测,因为udp是无连接的,对端close了也不会通知。

需要一个可靠的措施,它不会像tcp一样复杂,确保接收器不需要定时器来重传某些包。

这里的考量是,发送器为了确保对方存活,用一个额外的线程去进行心跳检测,下面是一般的心跳检测步骤:

  • 1.客户端每隔一个时间间隔发生一个探测包给服务器(秒级别的较长一段时间)
  • 2.客户端发包时启动一个超时定时器
  • 3.服务器端接收到检测包,应该回应一个包
  • 4.如果客户机收到服务器的应答包,则说明服务器正常,删除超时定时器
  • 5.如果客户端的超时定时器超时,依然没有收到应答包,则说明服务器挂了

这里的心跳检测主要是用于保活,但我这里是为了退出,可以直接复用接收器发回的ack包,发送器也不需要再进行发送包,就检测最近一次接收ack到当前时间的间隔即可,间隔多长结束呢,这里感觉好一点是4个rto(2个rto太容易发生了)。

接收器也是如此,但是接收器没有rto。所以接收器和发送器都约定好用1秒钟。发送方退出的时候就是检测到没有接收方响应的时候,此时接收方可能已经接受完文件退出、或者出错。接收方退出的时候是处理完所有数据的时候,这不需要心跳检测;也可能在心跳检测发现发送方出错时退出。因此发送方退出情况只用一种、接收方退出有两种(或的关系)

  • 在开始时握手阶段,因为双方开启的时候不是同步的,所以一开始握手检测时间要长一些(10~30s),这里设置为30s。当发送方成功从握手退出时(此时收到了syn+ack),可以修改为1s的心跳;而接收方收到syn时,还不能直接修改,因为可能丢包,要等第一个不是握手包到来才修改为1s。
  • 当接收方认为对方退出时,自己还不能退出,因为可能数据接收完整但是还没写完。这里的逻辑是:写完一次后收到后面的包,然后也许等了许久(别管为什么,只是也许)导致超时才再次准备写,但这次写之前因为超时了所以没去写。这时只需要再执行一次写数据的任务即可,如果完整就可以一次写完;如果是其他情况也没什么损失。

因为用到一些线程,要加锁。这里面发送器线程之间共享sendOver和lastAckTime,用一个互斥锁即可。

UDP会话

socket需要设置成非阻塞,否则recvfrom可能无法退出sendto不管怎么样都不会阻塞,因为sendto没有缓冲区,不需要拷贝:

send 和 sendto 函数在 UDP 层没有概念上的输出缓冲区(总要拷贝数据,但这个缓冲区和TCP的缓冲区在概念上不同),在 TCP 层有输出缓冲区,recv 和recvfrom 无论在 UDP 层还是 TCP 层都有接收缓冲区。

更详细的介绍是:

  • udp sendto 函数,它的作用和tcp一样,是拷贝到缓冲区,但是请注意udp栈底层实现的原理是:
  • 针对每个udp包如果目前有物理连路带宽可以发送,那么立即发送;如果没有,那么直接丢弃该udp包
  • 这个过程是非常非常快的,而且因为是在内核态执行,因此优先级高于普通操作
  • 从这个意义上来说,sendto函数根本不会阻塞,事实上也不会阻塞,因为只有2个结果:立即发送出去,或者直接丢包

总结一下就是:UDP不可靠,它不必保存应用程序的数据拷贝,因此无需真正的发送缓冲区(应用进程的数据在沿协议栈往下传递,以某种形式拷贝到内核缓冲区,然而数据链路层在送出数据之后将丢弃该拷贝)。

发送器

发送文件:

  • 首先建立一个发送窗口,窗口中有多个buffer;并且打开文件(fp);
  • 每个buffer依次读取文件,然后编辑成报文后发送;
  • 等待ACK,更新发送窗口;
  • 如果发送窗口可用,继续读取文件、发送

这里的问题是,整个流程是串行的吗?需要思考的是发送文件和接收ACK是怎么个交流法。

  • 如果是串行的,那么每次把发送窗口发完,然后等待ACK;这里的问题是一旦有一个ACK来了,是不继续等直接更新窗口(这样更新太频繁了)还是继续等(要等多少个呢?)没更新发送窗口时,假设数据不会发送,那么等ACK就可以一直等待直到发送窗口能增大一个阈值为止。这里的问题是实际上有数据需要重传,如果窗口一直没更新就会导致数据重传不了,这是致命的问题。
  • 因此需要两个线程,一个维护发送一个维护接收,它们动态更新发送窗口

接收器

接收文件:

  • 首先建立一个接收窗口,这里文件名由于在主项目里能根据服务器得到,所以这里不设置udp交流得到文件名,然后打开文件
  • 对于收到的每个包,返回一个ack确认,这样可以乱序确认,可以做到tcp中sack的优化(TCP乱序确认就不能快速重传)。接收的是一个char的buffer,要进行解析恢复成报文的数据结构。
  • 写入文件,要按序写,所以从接收窗口左边界对连续的包进行检测,找到连续的段就写入(基于quic的话是1/2的最大窗口再写入),然后更新窗口。

还是那个问题,流程是否是串行的?

  • 首先要一直接收,接收到就发一个ack这个没问题。但是如果接收到一个包就检测能不能写入文件就太慢了,会导致ack延迟太多。
  • 所以另开一个线程不断检测能否写入文件和更新窗口。