0%

TinyWebServer-C++11

前言

前几天follow完了tinywebsever的项目,分析了很多代码,最后也能跑起来。不过感觉整体的代码框架有些杂乱,代码也有冗余、不清晰的地方,比如互斥锁在c++11已经有专门的实现,不需要自己实现了。

作者也推荐了另一个c++11写的更简洁更优雅的项目实现:markparticle/WebServer: C++ Linux WebServer服务器 (github.com)

上一个项目最大的好处是作者专门写了一系列分析的文章,而这个项目没有教程也没什么注释,因此打算再写篇博客分析一下代码,写下注释,更重要的是把代码框架、逻辑理清楚,以及看看c++11实现的方便之处。


另外,在这里说一下size_t,很多c系的程序员对这个类型用的比较少,但这个项目里经常出现。

可以参考下:(24 封私信 / 80 条消息) size_t 这个类型的意义是什么? - 知乎 (zhihu.com)

主要还是为了可移植性,不同平台对于size_t的大小不同,64位系统是8字节,32位系统是4字节。为了方便移植,许多库函数的参数、返回值都是size_t。当换了个平台时,可以不改动代码而传入、接收更大或更小的值;并且系统不会使用更大的类型,从而加快速度。注意这些都是相对只用int、unsigned int、unsigned long作为类型对比的结果,用size_t有弹性。

但是,一个size_t类型的参数的用途却是用户定义的,比如可以把size_t就当int用,用来数组寻址等等,也可以用它来接收函数返回的参数然后作为一些长度,这些长度表示字节、还是两个字节都是用户决定的,它本身的值是多少就是多少。

一般用于作索引和表示单字节长度:

  • size_t传达了语义:您立即知道它表示一个以字节为单位的大小或一个索引,而不仅仅是另一个整数。
  • std::size_t是任何sizeof表达式的类型,并且保证能够表达C ++中任何对象(包括任何数组)的最大大小。通过扩展,它也保证对任何数组索引都足够大,因此它是数组上逐个索引循环的自然类型。

C++11可以将{}初始化器用于任何类型(可以使用等号,也可以不使用),这是一种通用的初始化语法。

在C++11中,集合(列表)的初始化已经成为C++的一个基本功能,被称为“初始化列表(initializer list)”

1
2
3
4
int a[] = { 1, 2, 3 };            //C++98支持,C++11支持
int b[]{2, 3, 4}; //C++98不支持,C++11支持
vector<int> c{ 1, 2, 3 }; //C++98不支持,C++11支持
map<int, float> d = {{ 1, 1.0f }, { 2, 2.0f }, { 3, 3.0f } };//C++98不支持,C++11支持

线程池

应用了很多新特性,比较难理解,要耐心一点。

右值引用可参考:C++11右值引用(一看即懂) (biancheng.net)

std::move()可参考:C++11 move()函数:将左值强制转换为右值 (biancheng.net)

std::forward()可参考:C++11完美转发及实现方法详解 (biancheng.net)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/*
* @Author : mark
* @Date : 2020-06-15
* @copyleft Apache 2.0
*/

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <functional>
class ThreadPool {//线程创建后就开始运行,顶层只用往线程池插入任务即可
public:
explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {//初始化一个pool
assert(threadCount > 0);
for(size_t i = 0; i < threadCount; i++) {//创建count个线程,每个线程绑定工作函数并detach分离
//lambda匿名函数,按值捕获pool_,本身是个指针,指向同一个实例。没有参数,省略“()”
std::thread([pool = pool_] {//接下来是函数体
std::unique_lock<std::mutex> locker(pool->mtx);//灵活锁,因为要取一个元素。不放while可以避免重复定义
while(true) {
if(!pool->tasks.empty()) {//有元素就取,这里一定要先锁再判断
auto task = std::move(pool->tasks.front());//左值转右值,转移task内存的所有权,把function取出来,调用移动构造函数
pool->tasks.pop();
locker.unlock();//取完可以解锁了
task();//工作
locker.lock();//工作完因为是while,再锁,接下来再取元素。
}
else if(pool->isClosed) break;//结束线程
else pool->cond.wait(locker);//没有元素,解锁并等待唤醒
}
}).detach();//在创建线程后,实现线程从主线程(进程)分离,这使得线程能在工作完后自动回收资源
}
}

ThreadPool() = default;

ThreadPool(ThreadPool&&) = default;

~ThreadPool() {
if(static_cast<bool>(pool_)) {//强制转型,pool_有指向的话就是true,那么就准备让线程退出
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;//退出标识
}
pool_->cond.notify_all();//唤醒所有线程,要把工作都做完才退出
}
}

template<class F>//以模板定义的&&既能接受左值也能接受右值,但注意,task作为参数,有名且能寻址,成为了左值
void AddTask(F&& task) {//添加一个task,右值传入,使得传入的对象的所有权被task获取,
{
std::lock_guard<std::mutex> locker(pool_->mtx);
//c++11新特性,emplace使对象在内存中调用构造函数,push会先构造再拷贝构造
pool_->tasks.emplace(std::forward<F>(task));//完美转发,保留传入的左右值属性,
//直接传task是个左值,如果F是一个function<>类型,这会导致移动构造和拷贝构造的区别。如果F是一个普通函数或指针等,一律调用普通构造函数
}
pool_->cond.notify_one();//唤醒等待队列中的第一个线程
}

private:
struct Pool {//线程池结构体,相当于在类里再封装一层
std::mutex mtx;
std::condition_variable cond;
bool isClosed;
std::queue<std::function<void()>> tasks;//任务队列,元素是一个函数,执行任务,没有返回值。没有参数是因为bind绑定好了参数,不需要外部传
};
std::shared_ptr<Pool> pool_;//使用共享指针,能自动销毁pool实例
};


#endif //THREADPOOL_H


先介绍下std::mutex:头文件<mutex> ,实际上跟linux中pthread的互斥锁差不多,手动上锁和解锁。

  • lock(),调用线程将锁住该互斥量。线程调用该函数会发生下面 3 种情况:(1). 如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock之前,该线程一直拥有该锁。(2). 如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。(3). 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。
  • unlock(), 解锁,释放对互斥量的所有权。
  • try_lock(),尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程也不会被阻塞。线程调用该函数也会出现下面 3 种情况,(1). 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。(2). 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉。(3). 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。

真正好用的是std::lock_guard:头文件<mutex> ,使用RAII机制,退出作用域就解锁。

  • template <class _Mutex>
    class lock_guard { // class with destructor that unlocks a mutex
    public:
        using mutex_type = _Mutex;
        //无adopt_lock参数,构造时加锁
        explicit lock_guard(_Mutex& _Mtx) : _MyMutex(_Mtx) { // construct and lock
            _MyMutex.lock();
        }
        //有adopt_lock参数,构造时不加锁
        lock_guard(_Mutex& _Mtx, adopt_lock_t) : _MyMutex(_Mtx) {} // construct but don't lock
        //析构解锁
        ~lock_guard() noexcept {
            _MyMutex.unlock();
        }
        //屏蔽拷贝构造
        lock_guard(const lock_guard&) = delete; 
        lock_guard& operator=(const lock_guard&) = delete; 
    
    private:
        _Mutex& _MyMutex;
    };
    
  • lock_guard具有两种构造方法:

    1. lock_guard(mutex& m)
    2. lock_guard(mutex& m, adopt_lock)其中mutex& m是互斥量,参数adopt_lock表示假定调用线程已经获得互斥体所有权并对其进行管理了。

再说下std::unique_lock:头文件<mutex>,也是使用RAII机制,定义和lock_guard相同。

主要还是说下二者的对比:

  • std::unique_lock 与std::lock_guard都能实现自动加锁与解锁功能,但是std::unique_lock要比std::lock_guard更灵活,但是更灵活的代价是占用空间相对更大一点且相对更慢一点。
  • 它提供了lock()unlock()接口,能记录现在处于上锁还是没上锁状态,在析构的时候,会根据当前状态来决定是否要进行解锁。而lock_guard一锁就锁住一个作用域,直到退出才解锁,没有lock和unlock接口,有时只想锁住一段代码,用unique_lock就更灵活。
  • unique_locklock_guard都不能复制,lock_guard不能移动,但是unique_lock可以
  • 可以参考[c++11]多线程编程(五)——unique_lock - 简书 (jianshu.com)

条件变量std::condition_variable:头文件 <condition_variable>,和linux的差不多了,可以看下(29条消息) C++11多线程条件变量std::condition_variable详解(转 )_山城盛夏的博客-CSDN博客_std::condition_variable 详解,当然不看也可以,无非是等待和唤醒。


关于std::function,主要是用来包装函数的,像函数一样调用,具体可以参考之前的博客:lambda表达式 | JySama

std::function是一个函数包装器,该函数包装器模板能包装任何类型的可调用实体,如普通函数,函数对象,lamda表达式等。包装器可拷贝,移动等,并且包装器类型仅仅依赖于调用特征,而不依赖于可调用元素自身的类型。std::function是C++11的新特性,包含在头文件<functional>中。

一个std::function类型对象实例可以包装下列这几种可调用实体:函数、函数指针、成员函数、静态函数、lamda表达式和函数对象。std::function对象实例可被拷贝和移动,并且可以使用指定的调用特征来直接调用目标元素。当std::function对象实例未包含任何实际可调用实体时,调用该std::function对象实例将抛出std::bad_function_call异常。


std::forward():完美转发

当我们将一个右值引用传入函数时,他在实参中有了命名,所以继续往下传或者调用其他函数时,根据C++ 标准的定义,这个参数变成了一个左值。那么他永远不会调用接下来函数的右值版本,这可能在一些情况下造成拷贝。为了解决这个问题 C++ 11引入了完美转发,根据右值判断的推倒,调用forward 传出的值,若原来是一个右值,那么他转出来就是一个右值,否则为一个左值。这样的处理就完美的转发了原有参数的左右值属性,不会造成一些不必要的拷贝。

std::forward必须配合T&&来使用。例如T&&接受左值int&时,T会被推断为int&,而T&&接受右值int&&时,T被推断为int。


std::thread:头文件<thread>,可移动不可复制

  • 默认构造函数,创建一个空的 std::thread 执行对象: thread() noexcept;
  • 初始化构造函数,创建一个 std::thread 对象,该 std::thread 对象可被 joinable,新产生的线程会调用 fn 函数,该函数的参数由 args 给出。
    • template <class Fn, class… Args> explicit thread(Fn&& fn, Args&&… args);

数据库

数据库如出一辙,很好理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/*
* @Author : mark
* @Date : 2020-06-16
* @copyleft Apache 2.0
*/
#ifndef SQLCONNPOOL_H
#define SQLCONNPOOL_H

#include <mysql/mysql.h>
#include <string>
#include <queue>
#include <mutex>
#include <semaphore.h>
#include <thread>
#include "../log/log.h"

class SqlConnPool {//创建全局唯一的数据库连接池,维护多个与数据库的连接
public:
static SqlConnPool *Instance();//单例,静态成员函数

MYSQL *GetConn();
void FreeConn(MYSQL * conn);
int GetFreeConnCount();

void Init(const char* host, int port,
const char* user,const char* pwd,
const char* dbName, int connSize);
void ClosePool();

private:
SqlConnPool();
~SqlConnPool();

int MAX_CONN_;
int useCount_;
int freeCount_;

std::queue<MYSQL *> connQue_;
std::mutex mtx_;
sem_t semId_;
};


#endif // SQLCONNPOOL_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*
* @Author : mark
* @Date : 2020-06-17
* @copyleft Apache 2.0
*/

#include "sqlconnpool.h"
using namespace std;

SqlConnPool::SqlConnPool() {//放init也行,或者init放这
useCount_ = 0;
freeCount_ = 0;
}

SqlConnPool* SqlConnPool::Instance() {//单例模式
static SqlConnPool connPool;
return &connPool;
}
//初始化连接池
void SqlConnPool::Init(const char* host, int port,
const char* user,const char* pwd, const char* dbName,
int connSize = 10) {
assert(connSize > 0);//条件判断
for (int i = 0; i < connSize; i++) {
MYSQL *sql = nullptr;//定义一个sql指针
sql = mysql_init(sql);//用这个指针初始化一个sql结构体,返回一个指向这个结构体的指针
if (!sql) {//错误判断,写日志
LOG_ERROR("MySql init error!");
assert(sql);//报错
}
sql = mysql_real_connect(sql, host,
user, pwd,
dbName, port, nullptr, 0);//init后就connect,连接数据库,返回一个可用连接
if (!sql) {
LOG_ERROR("MySql Connect error!");
}
connQue_.push(sql);//放入队列中
}
MAX_CONN_ = connSize;
sem_init(&semId_, 0, MAX_CONN_);//初始化信号量的值,这个0表示只能在当前进程的所有线程之间共享
}
//获取一个可用连接
MYSQL* SqlConnPool::GetConn() {
MYSQL *sql = nullptr;//句柄
if(connQue_.empty()){
LOG_WARN("SqlConnPool busy!");
return nullptr;
}
//为什么前面判断了空这里还要用信号量呢?原因是线程可能在队列非空时纷涌而至,但实际上没有那么多连接可用,因此还是要信号量阻塞buffer
sem_wait(&semId_);
{//lock的作用域
lock_guard<mutex> locker(mtx_);//如果能取,要互斥
sql = connQue_.front();
connQue_.pop();
}
return sql;
}
//释放一个连接,放回队列中
void SqlConnPool::FreeConn(MYSQL* sql) {
assert(sql);//判空,不能放回虚假的连接
lock_guard<mutex> locker(mtx_);//互斥放回
connQue_.push(sql);
sem_post(&semId_);//post
}
//关闭连接池
void SqlConnPool::ClosePool() {
lock_guard<mutex> locker(mtx_);//锁住先,避免在close时被使用
while(!connQue_.empty()) {//循环取
auto item = connQue_.front();//auto真给力...
connQue_.pop();
mysql_close(item);//关闭连接
}
//避免在使用库完成应用程序后发生内存泄漏(例如,在关闭与服务器的连接之后),
//可以显式调用mysql_library_end()。这样可以执行内存 Management 以清理和释放库使用的资源。
mysql_library_end();
}
//获取当前可用连接大小
int SqlConnPool::GetFreeConnCount() {
lock_guard<mutex> locker(mtx_);
return connQue_.size();
}
//析构
SqlConnPool::~SqlConnPool() {
ClosePool();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
* @Author : mark
* @Date : 2020-06-19
* @copyleft Apache 2.0
*/

#ifndef SQLCONNRAII_H
#define SQLCONNRAII_H
#include "sqlconnpool.h"

/* 资源在对象构造初始化 资源在对象析构时释放*/
class SqlConnRAII {//以参数形式获取一个数据库连接
public:
SqlConnRAII(MYSQL** sql, SqlConnPool *connpool) {//双指针修改sql,为了获取连接,传入connpool
assert(connpool);
*sql = connpool->GetConn();//获取连接
sql_ = *sql;//记录
connpool_ = connpool;//为了释放,需要记录sql和connpool
}

~SqlConnRAII() {
if(sql_) { connpool_->FreeConn(sql_); }//析构释放
}

private:
MYSQL *sql_;
SqlConnPool* connpool_;
};

#endif //SQLCONNRAII_H

日志系统

阻塞队列,用互斥锁再封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/*
* @Author : mark
* @Date : 2020-06-16
* @copyleft Apache 2.0
*/
#ifndef BLOCKQUEUE_H
#define BLOCKQUEUE_H

#include <mutex>
#include <deque>
#include <condition_variable>
#include <sys/time.h>

template<class T>
class BlockDeque {
public:
explicit BlockDeque(size_t MaxCapacity = 1000);

~BlockDeque();

void clear();

bool empty();

bool full();

void Close();

size_t size();

size_t capacity();

T front();

T back();

void push_back(const T &item);

void push_front(const T &item);

bool pop(T &item);

bool pop(T &item, int timeout);

void flush();

private:
std::deque<T> deq_;

size_t capacity_;

std::mutex mtx_;

bool isClose_;

std::condition_variable condConsumer_;

std::condition_variable condProducer_;
};


template<class T>
BlockDeque<T>::BlockDeque(size_t MaxCapacity) :capacity_(MaxCapacity) {
assert(MaxCapacity > 0);
isClose_ = false;
}

template<class T>
BlockDeque<T>::~BlockDeque() {
Close();
};

template<class T>
void BlockDeque<T>::Close() {
{
std::lock_guard<std::mutex> locker(mtx_);
deq_.clear();//清除所有元素
isClose_ = true;
}
condProducer_.notify_all();//唤醒所有生产者,准备退出
condConsumer_.notify_all();//唤醒所有消费者,准备退出
};

template<class T>
void BlockDeque<T>::flush() {
condConsumer_.notify_one();//刷新,唤醒一个线程,准备工作
};

template<class T>
void BlockDeque<T>::clear() {
std::lock_guard<std::mutex> locker(mtx_);
deq_.clear();
}

template<class T>
T BlockDeque<T>::front() {
std::lock_guard<std::mutex> locker(mtx_);
return deq_.front();
}

template<class T>
T BlockDeque<T>::back() {
std::lock_guard<std::mutex> locker(mtx_);
return deq_.back();
}

template<class T>
size_t BlockDeque<T>::size() {
std::lock_guard<std::mutex> locker(mtx_);
return deq_.size();
}

template<class T>
size_t BlockDeque<T>::capacity() {
std::lock_guard<std::mutex> locker(mtx_);
return capacity_;
}
//有问题,前面close唤醒了线程,这里没有根据isclose变量直接退出,在pop那是退出了的,这里可能会卡住,一旦wait的太多,就可能一直写然后一直while。
//除非线程数严格小于容量
template<class T>
void BlockDeque<T>::push_back(const T &item) {
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.size() >= capacity_) {//条件变量的等待方式
condProducer_.wait(locker);//阻塞,等待唤醒,但唤醒后还是需要while看条件,因为有多个生产者在竞争
}
deq_.push_back(item);
condConsumer_.notify_one();//唤醒一个消费者线程
}

template<class T>
void BlockDeque<T>::push_front(const T &item) {
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.size() >= capacity_) {
condProducer_.wait(locker);
}
deq_.push_front(item);
condConsumer_.notify_one();
}

template<class T>
bool BlockDeque<T>::empty() {
std::lock_guard<std::mutex> locker(mtx_);
return deq_.empty();
}

template<class T>
bool BlockDeque<T>::full(){
std::lock_guard<std::mutex> locker(mtx_);
return deq_.size() >= capacity_;
}

template<class T>
bool BlockDeque<T>::pop(T &item) {
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.empty()){
condConsumer_.wait(locker);
if(isClose_){//如果close了就return了
return false;
}
}
item = deq_.front();
deq_.pop_front();
condProducer_.notify_one();//唤醒一个生产者线程
return true;
}

template<class T>
bool BlockDeque<T>::pop(T &item, int timeout) {//增加了超时处理,push没有超时处理
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.empty()){
if(condConsumer_.wait_for(locker, std::chrono::seconds(timeout))
== std::cv_status::timeout){
return false;//阻塞超时就结束
}
if(isClose_){//关了直接返回
return false;
}
}
item = deq_.front();
deq_.pop_front();
condProducer_.notify_one();
return true;
}

#endif // BLOCKQUEUE_H

std::chrono::seconds:一个类,获取多少时间,这里以临时变量的形式传给wait_for,持续…seconds,超时结果就是timeout,和cv_status的timeout相等,借此判断是否超时。

std::cv_status:定义于头文件 <condition_variable>,带作用域枚举 std::cv_status 描述定时等待是否因时限返回。成员:

  • no_timeout:条件变量因 notify_allnotify_one 或虚假地被唤醒
  • timeout:条件变量因时限耗尽被唤醒

wait_for:

返回值:若经过 rel_time 所指定的关联时限则为 std::cv_status::timeout,否则为 std::cv_status::no_timeout 。

1
2
std::cv_status wait_for( std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& rel_time);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/*
* @Author : mark
* @Date : 2020-06-16
* @copyleft Apache 2.0
*/
#ifndef LOG_H
#define LOG_H

#include <mutex>
#include <string>
#include <thread>
#include <sys/time.h>
#include <string.h>
#include <stdarg.h> // vastart va_end
#include <assert.h>
#include <sys/stat.h> //mkdir
#include "blockqueue.h"
#include "../buffer/buffer.h"

class Log {
public:
void init(int level, const char* path = "./log",
const char* suffix =".log",
int maxQueueCapacity = 1024);

static Log* Instance();//单例
static void FlushLogThread();//异步线程的回调函数,需要是staic,没有this隐藏参数

void write(int level, const char *format,...);
void flush();

int GetLevel();
void SetLevel(int level);
bool IsOpen() { return isOpen_; }

private:
Log();
void AppendLogLevelTitle_(int level);
virtual ~Log();
void AsyncWrite_();//互斥写

private:
static const int LOG_PATH_LEN = 256;
static const int LOG_NAME_LEN = 256;
static const int MAX_LINES = 50000;

const char* path_;
const char* suffix_;

int MAX_LINES_;

int lineCount_;
int toDay_;

bool isOpen_;

Buffer buff_;//一个日志仅有一个buffer,因为write被互斥锁锁住了
int level_;
bool isAsync_;

FILE* fp_;
std::unique_ptr<BlockDeque<std::string>> deque_; //智能指针,还没有实例
std::unique_ptr<std::thread> writeThread_;//指向一个thread,还没有实例
std::mutex mtx_;
};
//以宏的形式,感觉写个string形式也行
#define LOG_BASE(level, format, ...) \
do {\
Log* log = Log::Instance();\
if (log->IsOpen() && log->GetLevel() <= level) {\
log->write(level, format, ##__VA_ARGS__); \
log->flush();\
}\
} while(0);

#define LOG_DEBUG(format, ...) do {LOG_BASE(0, format, ##__VA_ARGS__)} while(0);
#define LOG_INFO(format, ...) do {LOG_BASE(1, format, ##__VA_ARGS__)} while(0);
#define LOG_WARN(format, ...) do {LOG_BASE(2, format, ##__VA_ARGS__)} while(0);
#define LOG_ERROR(format, ...) do {LOG_BASE(3, format, ##__VA_ARGS__)} while(0);

#endif //LOG_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/*
* @Author : mark
* @Date : 2020-06-16
* @copyleft Apache 2.0
*/
#include "log.h"

using namespace std;

Log::Log() {
lineCount_ = 0;
isAsync_ = false;
writeThread_ = nullptr;
deque_ = nullptr;
toDay_ = 0;
fp_ = nullptr;
}

Log::~Log() {
if(writeThread_ && writeThread_->joinable()) {//如果写线程存在且需要join,就需要join
while(!deque_->empty()) {//不断唤醒消费者线程,把日志写完
deque_->flush();
};
deque_->Close();//关掉
writeThread_->join();//join
}
if(fp_) {//如果文件打开了
lock_guard<mutex> locker(mtx_);
flush();//刷新fp的缓冲区,还有唤醒线程的功能,这里没用
fclose(fp_);//关掉
}
}

int Log::GetLevel() {//日志系统级别,越高级能写的类型越多,日志系统没到对应级别不能写
lock_guard<mutex> locker(mtx_);
return level_;
}

void Log::SetLevel(int level) {
lock_guard<mutex> locker(mtx_);
level_ = level;
}

void Log::init(int level = 1, const char* path, const char* suffix,
int maxQueueSize) {
isOpen_ = true;
level_ = level;
if(maxQueueSize > 0) {//如果设置了阻塞队列大小,就是异步
isAsync_ = true;
if(!deque_) {
unique_ptr<BlockDeque<std::string>> newDeque(new BlockDeque<std::string>);//创建一个实例
deque_ = move(newDeque);//unique指针只能移动构造

std::unique_ptr<std::thread> NewThread(new thread(FlushLogThread));//异步同时要实例化一个写线程
writeThread_ = move(NewThread);//因为是单独线程,所以用unique指针,转移所有权用move
}
} else {
isAsync_ = false;
}

lineCount_ = 0;

time_t timer = time(nullptr);
struct tm *sysTime = localtime(&timer);
struct tm t = *sysTime;
path_ = path;
suffix_ = suffix;
char fileName[LOG_NAME_LEN] = {0};
snprintf(fileName, LOG_NAME_LEN - 1, "%s/%04d_%02d_%02d%s",
path_, t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, suffix_);//日志文件名写到名称缓冲区
toDay_ = t.tm_mday;

{//互斥锁作用域,感觉没必要,init就主线程调用
lock_guard<mutex> locker(mtx_);
buff_.RetrieveAll();
if(fp_) { //如果文本打开了,就关了重新开
flush();
fclose(fp_);
}

fp_ = fopen(fileName, "a");//根据名称创建or打开
if(fp_ == nullptr) {//打开失败,没有目标文件夹,要先创建
mkdir(path_, 0777);//0777是最大的访问权
fp_ = fopen(fileName, "a");
}
assert(fp_ != nullptr);
}
}

void Log::write(int level, const char *format, ...) {
//获取时间
struct timeval now = {0, 0};
gettimeofday(&now, nullptr);
time_t tSec = now.tv_sec;
struct tm *sysTime = localtime(&tSec);
struct tm t = *sysTime;
//宏参数初始化
va_list vaList;

/* 日志日期 日志行数 */
if (toDay_ != t.tm_mday || (lineCount_ && (lineCount_ % MAX_LINES == 0)))//如果日志行数太多写满了,或着换了一天,要重新创建一个文件
{
unique_lock<mutex> locker(mtx_);//这个锁感觉放if外好一点,因为让一个线程进来创建好新文件更新day喝line就可以了,这样就会很多线程一起进这个if
locker.unlock();//等下要用一个锁,先创建好,解锁。这对于同步写有用,因为同步的话有很多线程会调用
//把lock锁if外,然后处理完,更新day和line,打开新文件,再解锁,其他线程就不会进if了

char newFile[LOG_NAME_LEN];
char tail[36] = {0};
snprintf(tail, 36, "%04d_%02d_%02d", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday);

if (toDay_ != t.tm_mday)//如果是换了一天
{//-72是什么鬼,不需要那么长吗?
snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s%s", path_, tail, suffix_);
toDay_ = t.tm_mday;//给天赋值
lineCount_ = 0;
}
else {
snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s-%d%s", path_, tail, (lineCount_ / MAX_LINES), suffix_);
}

locker.lock();
flush();
fclose(fp_);
fp_ = fopen(newFile, "a");//重新打开
assert(fp_ != nullptr);
}
//然后正常写
{
unique_lock<mutex> locker(mtx_);
lineCount_++;//写一行
//buffer不只是为日志系统写的,而且感觉这里不用buffer更好
int n = snprintf(buff_.BeginWrite(), 128, "%d-%02d-%02d %02d:%02d:%02d.%06ld ",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday,
t.tm_hour, t.tm_min, t.tm_sec, now.tv_usec);//向buffer写时间信息,n是写入的长度

buff_.HasWritten(n);//移动指针,前n个写时间信息
AppendLogLevelTitle_(level);//然后添加等级

va_start(vaList, format);//遍历参数
int m = vsnprintf(buff_.BeginWrite(), buff_.WritableBytes(), format, vaList);//不断向buffer写,返回写入长度
va_end(vaList);//关闭

buff_.HasWritten(m);//移动指针
buff_.Append("\n\0", 2);//加换行和终止

if(isAsync_ && deque_ && !deque_->full()) {//如果是异步的,放阻塞队列
deque_->push_back(buff_.RetrieveAllToStr());
}
else {//如果是同步的,开写
fputs(buff_.Peek(), fp_);
}
buff_.RetrieveAll();//清空缓冲区
}
}

void Log::AppendLogLevelTitle_(int level) {//添加信息头
switch(level) {
case 0:
buff_.Append("[debug]: ", 9);
break;
case 1:
buff_.Append("[info] : ", 9);
break;
case 2:
buff_.Append("[warn] : ", 9);
break;
case 3:
buff_.Append("[error]: ", 9);
break;
default:
buff_.Append("[info] : ", 9);
break;
}
}

void Log::flush() {//刷新缓冲区
if(isAsync_) {
deque_->flush();
}
fflush(fp_);//刷新文本的缓冲区,强制写完
}

void Log::AsyncWrite_() {//回调函数的运行函数
string str = "";
//这里只用了非超时的,如果有大量任务突然到来,可以创建多个线程使用超时pop
while(deque_->pop(str)) {//不断取str,写进文本
lock_guard<mutex> locker(mtx_);
fputs(str.c_str(), fp_);
}
}

Log* Log::Instance() {//单例函数
static Log inst;
return &inst;
}

void Log::FlushLogThread() {//回调函数,调用运行函数
Log::Instance()->AsyncWrite_();
}

缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/*
* @Author : mark
* @Date : 2020-06-26
* @copyleft Apache 2.0
*/

#ifndef BUFFER_H
#define BUFFER_H
#include <cstring> //perror
#include <iostream>
#include <unistd.h> // write
#include <sys/uio.h> //readv
#include <vector> //readv
#include <atomic>
#include <assert.h>
class Buffer {
public:
Buffer(int initBuffSize = 1024);
~Buffer() = default;

size_t WritableBytes() const;
size_t ReadableBytes() const ;
size_t PrependableBytes() const;

const char* Peek() const;
void EnsureWriteable(size_t len);
void HasWritten(size_t len);

void Retrieve(size_t len);
void RetrieveUntil(const char* end);

void RetrieveAll() ;
std::string RetrieveAllToStr();

const char* BeginWriteConst() const;
char* BeginWrite();

void Append(const std::string& str);
void Append(const char* str, size_t len);
void Append(const void* data, size_t len);
void Append(const Buffer& buff);

ssize_t ReadFd(int fd, int* Errno);
ssize_t WriteFd(int fd, int* Errno);

private:
char* BeginPtr_();
const char* BeginPtr_() const;
void MakeSpace_(size_t len);

std::vector<char> buffer_;//buffer是一个vector...new一个得了,取地址比较简明
std::atomic<std::size_t> readPos_;//原子类型,感觉还是用个互斥锁吧...资料太少了,查不到。不过操作buffer在顶层是被互斥锁锁住的,也许不用互斥
std::atomic<std::size_t> writePos_;
};

#endif //BUFFER_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
* @Author : mark
* @Date : 2020-06-26
* @copyleft Apache 2.0
*/
#include "buffer.h"

Buffer::Buffer(int initBuffSize) : buffer_(initBuffSize), readPos_(0), writePos_(0) {}//初始化列表,使用构造函数

size_t Buffer::ReadableBytes() const {//准备好的字节数
return writePos_ - readPos_;//原子地相减
}
size_t Buffer::WritableBytes() const {//可写入的字节数
return buffer_.size() - writePos_;
}

size_t Buffer::PrependableBytes() const {//已读字节数
return readPos_;
}

const char* Buffer::Peek() const {//返回readpos之后的字符串/位置,即准备好但没有取出的数据的起始地址
return BeginPtr_() + readPos_;//定位
}

void Buffer::Retrieve(size_t len) {//buffer被取出了多少字节
assert(len <= ReadableBytes());
readPos_ += len;
}

void Buffer::RetrieveUntil(const char* end) {//过滤到end之后
assert(Peek() <= end );//已读的记录比已取出的小,地址的比较
Retrieve(end - Peek());//被出去了这么多字节(地址)
}

void Buffer::RetrieveAll() {//全部取出,清空
bzero(&buffer_[0], buffer_.size());
readPos_ = 0;
writePos_ = 0;
}

std::string Buffer::RetrieveAllToStr() {//全部取出并转字string
std::string str(Peek(), ReadableBytes());
RetrieveAll();
return str;
}

const char* Buffer::BeginWriteConst() const {//返回写到的位置之后的字符串,有何意义?后面不是没写到吗
return BeginPtr_() + writePos_;
}

char* Buffer::BeginWrite() {//指向第一个能写的位置
return BeginPtr_() + writePos_;
}

void Buffer::HasWritten(size_t len) {//已写入多少个字节
writePos_ += len;
}

void Buffer::Append(const std::string& str) {//重载函数
Append(str.data(), str.length());
}

void Buffer::Append(const void* data, size_t len) {//任何指针类型data
assert(data);
Append(static_cast<const char*>(data), len);//其他类型,就强制转型为char*
}
//最终调用这个函数
void Buffer::Append(const char* str, size_t len) {//char*优先匹配
assert(str);
EnsureWriteable(len);//确保空间足够
std::copy(str, str + len, BeginWrite());
HasWritten(len);
}

void Buffer::Append(const Buffer& buff) {//添加另一个buff的数据
Append(buff.Peek(), buff.ReadableBytes());
}

void Buffer::EnsureWriteable(size_t len) {//确保这么大的长度能写
if(WritableBytes() < len) {//
MakeSpace_(len);//扩容
}
assert(WritableBytes() >= len);
}

ssize_t Buffer::ReadFd(int fd, int* saveErrno) {//接收
char buff[65535];//如果第一个缓冲区填不满,就用到这个缓冲区
struct iovec iov[2];
const size_t writable = WritableBytes();
/* 分散读, 保证数据全部读完 */
iov[0].iov_base = BeginPtr_() + writePos_;//定位可写的地方,不就是beginwrite()吗
iov[0].iov_len = writable;
iov[1].iov_base = buff;
iov[1].iov_len = sizeof(buff);

const ssize_t len = readv(fd, iov, 2);//2是指iovec结构的个数,返回值是有符号整型
if(len < 0) {
*saveErrno = errno;
}
else if(static_cast<size_t>(len) <= writable) {//转为无符号整型,长度是一样的
writePos_ += len;//小于可写的地方就更新当前的buffer
}
else {//写的超出了buffer可写的空间
writePos_ = buffer_.size();//更新
Append(buff, len - writable);//添加buff的数据,会扩大buffer的空间
}
return len;
}

ssize_t Buffer::WriteFd(int fd, int* saveErrno) {//写出
size_t readSize = ReadableBytes();
ssize_t len = write(fd, Peek(), readSize);//从当前开始(peek),写入准备好的数据
if(len < 0) {
*saveErrno = errno;
return len;
}
readPos_ += len;//更新
return len;
}
//char* 返回指向的字符串的首地址、也可以返回第一个字符、也可以返回整个字符串
char* Buffer::BeginPtr_() {//指向第一个char的地址
return &*buffer_.begin();//*begin()取第一个字符,&取地址
}

const char* Buffer::BeginPtr_() const {//区别是返回值要不要当常量处理
return &*buffer_.begin();
}

void Buffer::MakeSpace_(size_t len) {//扩容函数
if(WritableBytes() + PrependableBytes() < len) {//如果可写和已读都小于len,就必须重新开辟空间
buffer_.resize(writePos_ + len + 1);//resize
}
else {//否则,可以把已读的覆盖
size_t readable = ReadableBytes();//准备好的数据大小
std::copy(BeginPtr_() + readPos_, BeginPtr_() + writePos_, BeginPtr_());//把中间未读的,从头开始覆盖
readPos_ = 0;//已读为0
writePos_ = readPos_ + readable;//写的位置是准备好的数据的位置
assert(readable == ReadableBytes());
}
}

定时器

注意:这里没有加锁,上层的调用要加锁

chrono可以稍微参考下:(29条消息) C++11的chrono库,可实现毫秒微秒级定时_oncealong的博客-CSDN博客_chrono sleep。里面提了三种类型,虽然不详细。

  • std::chrono::high_resolution_clock:high_resolution_clock只不过是system_clock或者steady_clock的typedef。用于获取时间点。
    • std::chrono::system_clock 它表示当前的系统时钟,系统中运行的所有进程使用now()得到的时间是一致的。
    • std::chrono::steady_clock 为了表示稳定的时间间隔,后一次调用now()得到的时间总是比前一次的值大。用在需要得到时间间隔,并且这个时间间隔不会因为修改系统时间而受影响的场景;它是单调的时钟,相当于教练手中的秒表;只会增长,适合用于记录程序耗时,他表示的时钟是不能设置的。
    • 可以使用now()方法取得时间,是一个纳秒,相对系统启动的时间多少。一般用time_point:std::chrono::high_resolution_clock::time_point t1=std::chrono::high_resolution_clock::now();或者auto t1=std::chrono::high_resolution_clock::now();
  • std::chrono::milliseconds:表示毫秒,是一个时间间隔。
  • 在代码里面,now()+MS(timeout)被赋值到high_resolution_clock的time_point上,毫秒会转换为纳秒加上去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/*
* @Author : mark
* @Date : 2020-06-17
* @copyleft Apache 2.0
*/
#ifndef HEAP_TIMER_H
#define HEAP_TIMER_H

#include <queue>
#include <unordered_map>
#include <time.h>
#include <algorithm>
#include <arpa/inet.h>
#include <functional>
#include <assert.h>
#include <chrono>
#include "../log/log.h"

typedef std::function<void()> TimeoutCallBack;
typedef std::chrono::high_resolution_clock Clock;//时钟
typedef std::chrono::milliseconds MS;//时间间隔
typedef Clock::time_point TimeStamp;//时钟内的时间点,获取now()方法结果

struct TimerNode {//时间结构
int id;//这个id用来给哈希表映射,这样查找时间是O(1),通过id映射到位置。
TimeStamp expires;//时间点
TimeoutCallBack cb;//回调函数
bool operator<(const TimerNode& t) {
return expires < t.expires;//比较
}
};
class HeapTimer {
public:
HeapTimer() { heap_.reserve(64); }//先指定vector有64个空间,其他情况下会两倍两倍的扩容

~HeapTimer() { clear(); }

void adjust(int id, int newExpires);

void add(int id, int timeOut, const TimeoutCallBack& cb);

void doWork(int id);

void clear();

void tick();

void pop();

int GetNextTick();

private:
//用size_t作为索引
void del_(size_t i);

void siftup_(size_t i);

bool siftdown_(size_t index, size_t n);

void SwapNode_(size_t i, size_t j);

std::vector<TimerNode> heap_;//用vector实现堆,是一个小顶堆

std::unordered_map<int, size_t> ref_;//哈希表,i->size_t
};

#endif //HEAP_TIMER_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
* @Author : mark
* @Date : 2020-06-17
* @copyleft Apache 2.0
*/
#include "heaptimer.h"

void HeapTimer::siftup_(size_t i) {//向上过滤,用于插入节点
assert(i >= 0 && i < heap_.size());
size_t j = (i - 1) / 2;
while(j >= 0) {
if(heap_[j] < heap_[i]) { break; }
SwapNode_(i, j);
i = j;
j = (i - 1) / 2;
}
}

void HeapTimer::SwapNode_(size_t i, size_t j) {//交换节点的辅助函数
assert(i >= 0 && i < heap_.size());
assert(j >= 0 && j < heap_.size());
std::swap(heap_[i], heap_[j]);//交换vector元素
ref_[heap_[i].id] = i;//更改哈希表的映射
ref_[heap_[j].id] = j;
}

bool HeapTimer::siftdown_(size_t index, size_t n) {//向下过滤,用于建堆和删除顶点。这里是左闭右开的,n是取不到的右边界
assert(index >= 0 && index < heap_.size());
assert(n >= 0 && n <= heap_.size());
size_t i = index;
size_t j = i * 2 + 1;
while(j < n) {
if(j + 1 < n && heap_[j + 1] < heap_[j]) j++;
if(heap_[i] < heap_[j]) break;
SwapNode_(i, j);
i = j;
j = i * 2 + 1;
}
return i > index;//如果向下过滤了就返回true
}

void HeapTimer::add(int id, int timeout, const TimeoutCallBack& cb) {//插入节点,关联回调函数
assert(id >= 0);
size_t i;
if(ref_.count(id) == 0) {
/* 新节点:堆尾插入,调整堆 */
i = heap_.size();
ref_[id] = i;//先放i处
//结构体可以struct A = {...},调用默认构造函数,但必须把所有成员都赋值。常见的是struct A; A.x = ...逐个赋值
heap_.push_back({id, Clock::now() + MS(timeout), cb});//放i处,调用默认构造函数
siftup_(i);//向上过滤
}
else {//如果原来就有这个节点,说明没到时,重设时间,调整一下即可
/* 已有结点:调整堆 */
i = ref_[id];//获得位置
heap_[i].expires = Clock::now() + MS(timeout);//调整
heap_[i].cb = cb;
if(!siftdown_(i, heap_.size())) {//调整之后看看向上还是向下,如果不用向下过滤,那就向上过滤;如果向下过滤了,就不用向上了
siftup_(i);
}
}
}

void HeapTimer::doWork(int id) {
/* 删除指定id结点,并触发回调函数 */
if(heap_.empty() || ref_.count(id) == 0) {
return;
}
size_t i = ref_[id];
TimerNode node = heap_[i];//拷贝节点,如果是串行的话,是不是不需要拷贝,反正是调用完再删除
node.cb();//调用回调函数
del_(i);//删除
}

void HeapTimer::del_(size_t index) {
/* 删除指定位置的结点 */
assert(!heap_.empty() && index >= 0 && index < heap_.size());
/* 将要删除的结点换到队尾,然后调整堆 */
size_t i = index;
size_t n = heap_.size() - 1;//下标=大小-1
assert(i <= n);
if(i < n) {//删的不是最后一个元素就交换
SwapNode_(i, n);//把目前最后的元素换到前面去,此时要删的元素放到了最后
if(!siftdown_(i, n)) {//然后要调整这个元素的位置,先试一下向下过滤
siftup_(i);//不向下过滤的话就向上过滤。这里为什么要向上过滤呢?因为堆的兄弟之间没有关系,大堆有两个子堆,如果交换到另一个子堆就可能要向上
}
}
/* 队尾元素删除 */
ref_.erase(heap_.back().id);
heap_.pop_back();
}

void HeapTimer::adjust(int id, int timeout) {
/* 调整指定id的结点 */
assert(!heap_.empty() && ref_.count(id) > 0);
heap_[ref_[id]].expires = Clock::now() + MS(timeout);
siftdown_(ref_[id], heap_.size());//调整只可能更大,向下过滤
}

void HeapTimer::tick() {//这里的tick()时间复杂度要比链表形式的高
/* 清除超时结点 */
if(heap_.empty()) {
return;
}
while(!heap_.empty()) {
TimerNode node = heap_.front();//取顶
//这里先用预设时间-当前时间,结果是一个纳秒级的时间间隔,用间隔转换转到毫秒级,调用count(),它的作用是返回当前级别还有多少ticks(单位时间)
//比如3ms就有3ticks(在毫秒级下),因此这里是忽略毫秒级以下的数,只有剩余1毫秒及以上才不算超时。这是因为设定的超时时间是毫秒的,当然只看毫秒
if(std::chrono::duration_cast<MS>(node.expires - Clock::now()).count() > 0) { //等于0或小于0都超时
break;
}
node.cb();//超时,调用回调函数
pop();//删除顶部
}
}

void HeapTimer::pop() {
assert(!heap_.empty());
del_(0);//删除0号位置的节点
}

void HeapTimer::clear() {//清除
ref_.clear();
heap_.clear();
}

int HeapTimer::GetNextTick() {//看未超时的顶点剩下多少ticks
tick();//处理完超时的节点
size_t res = -1;
if(!heap_.empty()) {//如果非空
res = std::chrono::duration_cast<MS>(heap_.front().expires - Clock::now()).count();//看顶点还剩多少ticks(毫秒级别下)
if(res < 0) { res = 0; }//负数说明预设时间小于当前时间,也就是时间间隔是负的,说明超时了
}
return res;//-1说明空了,0说明超时了,大于0说明剩下的ticks
}

HTTP

处理响应

涉及到一个string.data(),看到比较好的文章里面提到了一点:

为什么C语言风格的字符串要以’\0’结尾,C++(string)可以不要?

c语言用char*指针作为字符串时,在读取字符串时需要一个特殊字符0来标记指针的结束位置,也就是通常认为的字符串结束标记。而c++语言则是面向对象的(string),长度信息直接被存储在了对象的成员中,读取字符串可以直接根据这个长度来读取,所以就没必要需要结束标记了。而且结束标记也不利于读取字符串中夹杂0字符的字符串。


  • 首先会尝试把文件信息写入stat结构体,根据文件找不找得到、文件权限,得到对应的状态码。stat结构体主要是获得文件size
  • 如果状态码是404那些,就把路径和stat结构体修改为404那些html文件的路径,如果是200OK,就再不修改。
  • 然后添加状态行、头部信息
  • 最后添加文件内容信息:
    • 先根据文件路径打开文件,可能是404那些html,也可能是真的文件。如果打开失败,会返回一个file not found的html
    • 打开成功的话会尝试去内存映射,stat结构体的size在这有用。如果映射失败,也会返回一个file not found的html
    • 如果打开成功,会添加文件的长度信息,把内存地址指针保存,可通过接口调用。因为不会真正写入文件内容
    • 在添加文件长度信息后,顺便添加一个空行。
  • 没有写入内容,等待外部写入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/*
* @Author : mark
* @Date : 2020-06-25
* @copyleft Apache 2.0
*/
#ifndef HTTP_RESPONSE_H
#define HTTP_RESPONSE_H

#include <unordered_map>
#include <fcntl.h> // open
#include <unistd.h> // close
#include <sys/stat.h> // stat
#include <sys/mman.h> // mmap, munmap

#include "../buffer/buffer.h"
#include "../log/log.h"

class HttpResponse {
public:
HttpResponse();
~HttpResponse();

void Init(const std::string& srcDir, std::string& path, bool isKeepAlive = false, int code = -1);
void MakeResponse(Buffer& buff);
void UnmapFile();
char* File();
size_t FileLen() const;
void ErrorContent(Buffer& buff, std::string message);
int Code() const { return code_; }

private:
void AddStateLine_(Buffer &buff);
void AddHeader_(Buffer &buff);
void AddContent_(Buffer &buff);

void ErrorHtml_();
std::string GetFileType_();

int code_;//响应状态码
bool isKeepAlive_;

std::string path_;//资源路径
std::string srcDir_;//资源文件夹路径

char* mmFile_; //指向内存映射后的文件的内存空间
struct stat mmFileStat_;//存储文件信息

//静态的且不允许修改
static const std::unordered_map<std::string, std::string> SUFFIX_TYPE;//把后缀类型映射到http的文件类型
static const std::unordered_map<int, std::string> CODE_STATUS;//把响应码映射到响应字符串
static const std::unordered_map<int, std::string> CODE_PATH;//把响应码映射到响应需要发送的html文件的路径
};


#endif //HTTP_RESPONSE_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/*
* @Author : mark
* @Date : 2020-06-27
* @copyleft Apache 2.0
*/
#include "httpresponse.h"

using namespace std;
//静态成员定义,用{}构造整体,用{key,value}构造元素
const unordered_map<string, string> HttpResponse::SUFFIX_TYPE = {
{ ".html", "text/html" },
{ ".xml", "text/xml" },
{ ".xhtml", "application/xhtml+xml" },
{ ".txt", "text/plain" },
{ ".rtf", "application/rtf" },
{ ".pdf", "application/pdf" },
{ ".word", "application/nsword" },
{ ".png", "image/png" },
{ ".gif", "image/gif" },
{ ".jpg", "image/jpeg" },
{ ".jpeg", "image/jpeg" },
{ ".au", "audio/basic" },
{ ".mpeg", "video/mpeg" },
{ ".mpg", "video/mpeg" },
{ ".avi", "video/x-msvideo" },
{ ".gz", "application/x-gzip" },
{ ".tar", "application/x-tar" },
{ ".css", "text/css "},
{ ".js", "text/javascript "},
};

const unordered_map<int, string> HttpResponse::CODE_STATUS = {
{ 200, "OK" },
{ 400, "Bad Request" },
{ 403, "Forbidden" },
{ 404, "Not Found" },
};

const unordered_map<int, string> HttpResponse::CODE_PATH = {
{ 400, "/400.html" },
{ 403, "/403.html" },
{ 404, "/404.html" },
};

HttpResponse::HttpResponse() {
code_ = -1;
path_ = srcDir_ = "";
isKeepAlive_ = false;
mmFile_ = nullptr;
mmFileStat_ = { 0 };
};

HttpResponse::~HttpResponse() {
UnmapFile();//解除内存映射,不用参数,用mmFile_指针
}

void HttpResponse::Init(const string& srcDir, string& path, bool isKeepAlive, int code){
assert(srcDir != "");
if(mmFile_) { UnmapFile(); }//一个上层实例由一个线程控制,那么一个类实例可以多次init,多次的话就要把原来的映射解除
code_ = code;//传入的状态,会根据之后的文件打开/访问成功与否改变
isKeepAlive_ = isKeepAlive;
//访问的文件路径
path_ = path;
srcDir_ = srcDir;
mmFile_ = nullptr;
mmFileStat_ = { 0 };
}
//向buff写入响应信息
void HttpResponse::MakeResponse(Buffer& buff) {//传入一个buff,没有真正写入文件内容(file not found除外,自定义返回了一个html)
/* 判断请求的资源文件 */
//stat函数,向stat结构体中写入path指定的文件信息,成功返回0,失败返回-1
//S_ISDIR()函数的作用是判断一个路径是不是目录,st_mode表示了文件对应的模式:文件,目录等。函数返回0表示是文件,返回1是文件夹
if(stat((srcDir_ + path_).data(), &mmFileStat_) < 0 || S_ISDIR(mmFileStat_.st_mode)) {//如果请求的文件不存在或者是文件夹,就404not found
code_ = 404;
}
//st_mode是个32位的整型变量,不过现在的linux操作系统只用了低16位(估计是鉴于以后拓展的考虑)
//最低9位代表了文件的许可权限,它标识了文件所有者(owner)、组用户(group)、其他用户(other)的读(r)、写(w)、执行(x)权限。
//S_IROTH:00004(无符号八进制数):others have read permission
else if(!(mmFileStat_.st_mode & S_IROTH)) {//也即,如果其他用户没有读权限的话就返回403forbidden
code_ = 403;
}
else if(code_ == -1) { //如果上面都没有,且没有被init为400,那就是初始值-1,表示ok
code_ = 200;
}
ErrorHtml_();//如果是error状态,会把对应的html页面文件信息添加到stat结构体中,把路径改了,成功就不做任何事情
AddStateLine_(buff);//添加状态行
AddHeader_(buff);//添加头部信息
AddContent_(buff);//返回文件内容,会尝试真正地打开文件,映射到内存,但是没有写入buff,会把内存指针放到mmFile_,File()接口调用
}

char* HttpResponse::File() {//返回文件映射到内存的位置
return mmFile_;
}

size_t HttpResponse::FileLen() const {//返回文件的大小
return mmFileStat_.st_size;
}

void HttpResponse::ErrorHtml_() {//如果是200OK,就不做任何事情
if(CODE_PATH.count(code_) == 1) {//如果有响应码对应的html文件,count找到返回1,否则0
path_ = CODE_PATH.find(code_)->second;//find返回一个迭代器,first是key,second是value。
//感觉可以用CODE_PATH[code_],因为前面已经找到了才执行,虽说内部也会遍历去find。另外,如果直接用[],没有这个元素会插入
stat((srcDir_ + path_).data(), &mmFileStat_);//把这个错误页面文件信息保存到stat中
}
}

void HttpResponse::AddStateLine_(Buffer& buff) {//向buff添加状态头部
string status;
if(CODE_STATUS.count(code_) == 1) {//如果有code_对应的状态
status = CODE_STATUS.find(code_)->second;//获取状态字符串
}
else {//上面处理了200,404,403,如果code_不知道被赋值成什么了,就400
code_ = 400;
status = CODE_STATUS.find(400)->second;
}
buff.Append("HTTP/1.1 " + to_string(code_) + " " + status + "\r\n");
}

void HttpResponse::AddHeader_(Buffer& buff) {//添加头部信息
buff.Append("Connection: ");
if(isKeepAlive_) {
buff.Append("keep-alive\r\n");
buff.Append("keep-alive: max=6, timeout=120\r\n");
} else{
buff.Append("close\r\n");
}
buff.Append("Content-type: " + GetFileType_() + "\r\n");
}

void HttpResponse::AddContent_(Buffer& buff) {//添加返回内容
//打开文件,string.data()返回c式字符串指针,c++11后与c_str()等价,结尾加'\0'。
int srcFd = open((srcDir_ + path_).data(), O_RDONLY);//O_RDONLY表示只读
if(srcFd < 0) { //打开失败
ErrorContent(buff, "File NotFound!");
return;
}

/* 将文件映射到内存提高文件的访问速度
MAP_PRIVATE 建立一个写入时拷贝的私有映射*/
LOG_DEBUG("file path %s", (srcDir_ + path_).data());

//成功返回创建的映射区的首地址;失败返回宏MAP_FAILED,这个宏就是-1。mmap返回一个void*
//返回值用mmret指向,表示指向一个int类型,解指针时以int类型解析,找4个字节
int* mmRet = (int*)mmap(0, mmFileStat_.st_size, PROT_READ, MAP_PRIVATE, srcFd, 0);
if(*mmRet == -1) {//取int,如果是-1表示失败了
ErrorContent(buff, "File NotFound!");//这里的notfound,实际上打开了文件了,但是写进内存出错了
return;
}
//取地址,修改指向的类型,两个指针指向的地址起始相同,但现在解指针按照char的解析类型解析,找一个字节。
//多少个字节都无妨,因为munmap传入的地址参数类型是void*指针,只要首地址正确就好
mmFile_ = (char*)mmRet;
close(srcFd);//关闭文件
buff.Append("Content-length: " + to_string(mmFileStat_.st_size) + "\r\n\r\n");//只添加内容长度,两个\r\n,后面那个表示空行
}

void HttpResponse::UnmapFile() {
if(mmFile_) {
munmap(mmFile_, mmFileStat_.st_size);//解除映射
mmFile_ = nullptr;
}
}

string HttpResponse::GetFileType_() {
/* 判断文件类型 */
//find_last_of返回最后一个.的位置,就是后缀类型前面那个.。逆向查找,返回的是下标
//size_type是string的长度表示方式,不同的机器大小不同,为了匹配机器上string的最大长度。因此找位置、长度这些要用size_type
string::size_type idx = path_.find_last_of('.');//找不到返回string::npos,表示不存在位置,值是-1
if(idx == string::npos) {
return "text/plain";
}
string suffix = path_.substr(idx);//从.开始返回后缀
if(SUFFIX_TYPE.count(suffix) == 1) {//有相应的类型就返回
return SUFFIX_TYPE.find(suffix)->second;
}
return "text/plain";//没有相应类型就返回这个
}

void HttpResponse::ErrorContent(Buffer& buff, string message) //自定义错误信息,在添加内容时遇到错误就返回这个html
{
string body;
string status;
body += "<html><title>Error</title>";
body += "<body bgcolor=\"ffffff\">";
if(CODE_STATUS.count(code_) == 1) {
status = CODE_STATUS.find(code_)->second;
} else {
status = "Bad Request";
}
body += to_string(code_) + " : " + status + "\n";
body += "<p>" + message + "</p>";
body += "<hr><em>TinyWebServer</em></body></html>";

buff.Append("Content-length: " + to_string(body.size()) + "\r\n\r\n");
buff.Append(body);
}

处理请求

用到正则,语法:正则表达式 – 语法 | 菜鸟教程 (runoob.com)

几种用法:C++ regex库的三种正则表达式操作 - 上官栋 - 博客园 (cnblogs.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/*
* @Author : mark
* @Date : 2020-06-25
* @copyleft Apache 2.0
*/
#ifndef HTTP_REQUEST_H
#define HTTP_REQUEST_H

#include <unordered_map>
#include <unordered_set>
#include <string>
#include <regex>
#include <errno.h>
#include <mysql/mysql.h> //mysql

#include "../buffer/buffer.h"
#include "../log/log.h"
#include "../pool/sqlconnpool.h"
#include "../pool/sqlconnRAII.h"

class HttpRequest {
public:
enum PARSE_STATE {//解析状态
REQUEST_LINE,
HEADERS,
BODY,
FINISH,
};

enum HTTP_CODE {
NO_REQUEST = 0,
GET_REQUEST,
BAD_REQUEST,
NO_RESOURSE,
FORBIDDENT_REQUEST,
FILE_REQUEST,
INTERNAL_ERROR,
CLOSED_CONNECTION,
};

HttpRequest() { Init(); }
~HttpRequest() = default;

void Init();
bool parse(Buffer& buff);

std::string path() const;
std::string& path();
std::string method() const;
std::string version() const;
std::string GetPost(const std::string& key) const;
std::string GetPost(const char* key) const;

bool IsKeepAlive() const;

/*
todo
void HttpConn::ParseFormData() {}
void HttpConn::ParseJson() {}
*/

private:
bool ParseRequestLine_(const std::string& line);
void ParseHeader_(const std::string& line);
void ParseBody_(const std::string& line);

void ParsePath_();
void ParsePost_();
void ParseFromUrlencoded_();

static bool UserVerify(const std::string& name, const std::string& pwd, bool isLogin);

PARSE_STATE state_;//描述目前的解析状态
std::string method_, path_, version_, body_;
std::unordered_map<std::string, std::string> header_;//保存信息,描述->参数
std::unordered_map<std::string, std::string> post_;

static const std::unordered_set<std::string> DEFAULT_HTML;
static const std::unordered_map<std::string, int> DEFAULT_HTML_TAG;
static int ConverHex(char ch);
};


#endif //HTTP_REQUEST_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
/*
* @Author : mark
* @Date : 2020-06-26
* @copyleft Apache 2.0
*/
#include "httprequest.h"
using namespace std;

const unordered_set<string> HttpRequest::DEFAULT_HTML{
"/index", "/register", "/login",
"/welcome", "/video", "/picture", };

const unordered_map<string, int> HttpRequest::DEFAULT_HTML_TAG {
{"/register.html", 0}, {"/login.html", 1}, };

void HttpRequest::Init() {
method_ = path_ = version_ = body_ = "";
state_ = REQUEST_LINE;
header_.clear();
post_.clear();
}

bool HttpRequest::IsKeepAlive() const {
if(header_.count("Connection") == 1) {//如果有头部有这个参数,就去找要不要keep
return header_.find("Connection")->second == "keep-alive" && version_ == "1.1";
}
return false;
}
//解析
bool HttpRequest::parse(Buffer& buff) {
const char CRLF[] = "\r\n";
if(buff.ReadableBytes() <= 0) {
return false;
}
while(buff.ReadableBytes() && state_ != FINISH) {
//search:查找 [first1, last1) 范围内第一个 [first2, last2) 子序列,返回指向first2的首地址。未找到就返回last1,请求数据就没/r/n
const char* lineEnd = search(buff.Peek(), buff.BeginWriteConst(), CRLF, CRLF + 2);//每次获取一行
std::string line(buff.Peek(), lineEnd);//拷贝一行,左闭右开
switch(state_)//看看现在是在解析什么
{
case REQUEST_LINE://请求行
if(!ParseRequestLine_(line)) {//会根据要访问什么资源把path弄好
return false;//有数据但是请求行都没准备好
}
ParsePath_();//解析path
break;
case HEADERS:
ParseHeader_(line);//不断解析头部
if(buff.ReadableBytes() <= 2) {//解析完一个头部后会判断是不是/r/n,两个字节就是空行,没有请求数据,就结束
state_ = FINISH;
}
break;
case BODY:
ParseBody_(line);
break;
default:
break;
}
if(lineEnd == buff.BeginWrite()) { break; }//空了,提前结束,避免buff出错,因为请求数据可能没有/r/n
buff.RetrieveUntil(lineEnd + 2);//过滤/r/n
}
LOG_DEBUG("[%s], [%s], [%s]", method_.c_str(), path_.c_str(), version_.c_str());
return true;
}
//解析请求文件路径
void HttpRequest::ParsePath_() {
if(path_ == "/") {//输入ip地址最后只有一个/,返回的就是主页面
path_ = "/index.html";
}
else {
for(auto &item: DEFAULT_HTML) {
if(item == path_) {//遍历集合
path_ += ".html";
break;
}
}
}
}

//看一个请求行的实例:GET /562f25980001b1b106000338.jpg HTTP/1.1
//第一个^表示开始,匹配的字符串开始必须是[^ ]*,这个表示匹配除了空格的所有,遇到空格结束。然后会跟一个空格,再匹配下一个连续的串,遇到空格为止
//然后匹配一个空格,匹配HTTP/,再匹配除了空格的内容,如果有/r/n,也会匹配。最后$表示结束,后面不再匹配
bool HttpRequest::ParseRequestLine_(const string& line) {
regex patten("^([^ ]*) ([^ ]*) HTTP/([^ ]*)$");//匹配模式
smatch subMatch;//存储结果,每个()中是一个子表达式,第0个参数是完整结果,1-n是()中的结果
if(regex_match(line, subMatch, patten)) {//严格要求各参数之间只有一个空格
method_ = subMatch[1];
path_ = subMatch[2];
version_ = subMatch[3];//说明传进来的line的/r/n被去掉了
state_ = HEADERS;//接下来解析请求头
return true;
}
LOG_ERROR("RequestLine Error");
return false;
}
//解析请求头,比如host: ... 或者host:...
//首先匹配冒号前面的,到冒号停下,然后匹配冒号,然后是一个空格,问号表示这个空格可以匹配0次或1次,因为报文中空格可以0次或多次
//然后匹配剩下的除换行符之外的所有字符,.相当于[^\n\r]。
void HttpRequest::ParseHeader_(const string& line) {
regex patten("^([^:]*): ?(.*)$");
smatch subMatch;
if(regex_match(line, subMatch, patten)) {//匹配成功
header_[subMatch[1]] = subMatch[2];//添加头部信息
}
else {//失败就是到空行了,准备解析请求体。上层会判断是否有请求体,没有就结束
state_ = BODY;
}
}
//解析请求体,就是post的请求数据
void HttpRequest::ParseBody_(const string& line) {
body_ = line;//拷贝
ParsePost_();//调用解析post
state_ = FINISH;//结束咯
LOG_DEBUG("Body:%s, len:%d", line.c_str(), line.size());
}

int HttpRequest::ConverHex(char ch) {//把一个十六进制的字符转为int数字
if(ch >= 'A' && ch <= 'F') return ch -'A' + 10;//+10是因为A本身在十六进制代表10
if(ch >= 'a' && ch <= 'f') return ch -'a' + 10;
return ch;//是不是忘了-'0'
}

void HttpRequest::ParsePost_() {
//application/x-www-form-urlencoded是最常见的 POST 提交数据方式。这里只解析这种格式
//请求数据实例:name=Professional%20Ajax&publisher=Wiley
if(method_ == "POST" && header_["Content-Type"] == "application/x-www-form-urlencoded") {
ParseFromUrlencoded_();//把body解码
if(DEFAULT_HTML_TAG.count(path_)) {//login的话,post的url就是当前请求的页面,就能解析到是/login.html,得到对应tag
int tag = DEFAULT_HTML_TAG.find(path_)->second;
LOG_DEBUG("Tag:%d", tag);
if(tag == 0 || tag == 1) {
bool isLogin = (tag == 1);//看是登录还是注册
if(UserVerify(post_["username"], post_["password"], isLogin)) {//处理登录和处理注册
path_ = "/welcome.html";
}
else {
path_ = "/error.html";
}
}
}
}
}

//name1=value1&name2=value2&name3=value3&.....&nameN=valueN
//用"+"取代空字符
//非数字, 字母用%HH格式进行替换, 其中HH是两位16进制数字, 表示被替换字符的ASCII码(例如"?"会被替换成"%3F", 对应十进制数是63,也就是问号对应的ASCII值)
//换行符用CR LF字符对表示, 对应的值是"%0D%0A";
void HttpRequest::ParseFromUrlencoded_() {
if(body_.size() == 0) { return; }

string key, value;
int num = 0;
int n = body_.size();
int i = 0, j = 0;

for(; i < n; i++) {//逐个解析
char ch = body_[i];
switch (ch) {
case '='://如果是等号,key就是j-i-1这一段,
key = body_.substr(j, i - j);//长度是i-j
j = i + 1;//下一个str起始
break;
case '+'://如果是+,换回空格
body_[i] = ' ';
break;
case '%'://后面跟两个十六进制的字符
num = ConverHex(body_[i + 1]) * 16 + ConverHex(body_[i + 2]);//转化为对应的ascii码
body_[i + 2] = num % 10 + '0';
body_[i + 1] = num / 10 + '0';//并不是转换为ascii对应的符号,转换为数字对应的字符形式而已,就是16进制转十进制
i += 2;
break;
case '&'://如果是&,就得到value
value = body_.substr(j, i - j);
j = i + 1;
post_[key] = value;//存储
LOG_DEBUG("%s = %s", key.c_str(), value.c_str());
break;
default:
break;
}
}
assert(j <= i);//处理最后一个value,它没有&
if(post_.count(key) == 0 && j < i) {
value = body_.substr(j, i - j);
post_[key] = value;
}
}

bool HttpRequest::UserVerify(const string &name, const string &pwd, bool isLogin) {
if(name == "" || pwd == "") { return false; }//空的话返回错误
LOG_INFO("Verify name:%s pwd:%s", name.c_str(), pwd.c_str());//否则就记录
MYSQL* sql;
SqlConnRAII(&sql, SqlConnPool::Instance());//初始化连接数据库,返回全局的静态的连接池
assert(sql);

bool flag = false;
unsigned int j = 0;
char order[256] = { 0 };
MYSQL_FIELD *fields = nullptr;
MYSQL_RES *res = nullptr;

if(!isLogin) { flag = true; }//如果是注册
/* 查询用户及密码 */
snprintf(order, 256, "SELECT username, password FROM user WHERE username='%s' LIMIT 1", name.c_str());//sql语句,查一个
LOG_DEBUG("%s", order);

if(mysql_query(sql, order)) { //执行语句,成功返回0,错误返回非0
mysql_free_result(res);//错误的话释放结果集并返回
return false;
}

res = mysql_store_result(sql);//完整的结果集
j = mysql_num_fields(res); //返回结果集中的列数
fields = mysql_fetch_fields(res);//返回所有字段结构的数组

while(MYSQL_ROW row = mysql_fetch_row(res)) {//遍历行,实际上只有一行,但这样可以取出行
LOG_DEBUG("MYSQL ROW: %s %s", row[0], row[1]);
string password(row[1]);
// 能select到说明又对应的username,看是登录还是注册
if(isLogin) {
if(pwd == password) { flag = true; }//标记成功,可以直接return的
else {
flag = false;
LOG_DEBUG("pwd error!");
//可以直接return的
}
}
//如果是注册,注意能进到这个while说明取出了row,就说明前面res select到了一个username,重名了
else {
flag = false;
LOG_DEBUG("user used!");
//可以直接return的
}
}
mysql_free_result(res);//释放结果集使用的内存,store后要释放

/* 注册行为 且 用户名未被使用*/
if(!isLogin && flag == true) {
LOG_DEBUG("regirster!");
bzero(order, 256);
snprintf(order, 256,"INSERT INTO user(username, password) VALUES('%s','%s')", name.c_str(), pwd.c_str());//插入表
LOG_DEBUG( "%s", order);
if(mysql_query(sql, order)) {
LOG_DEBUG( "Insert error!");
flag = false;
}
flag = true;
}

SqlConnPool::Instance()->FreeConn(sql);//这行不用,使用了RAII机制了
LOG_DEBUG( "UserVerify success!!");
return flag;
}

std::string HttpRequest::path() const{
return path_;
}

std::string& HttpRequest::path(){//外部可修改path_
return path_;
}
std::string HttpRequest::method() const {
return method_;
}

std::string HttpRequest::version() const {
return version_;
}
//接口函数,供外部使用,用于获取解析请求体后获得的参数
std::string HttpRequest::GetPost(const std::string& key) const {
assert(key != "");
if(post_.count(key) == 1) {
return post_.find(key)->second;
}
return "";
}

std::string HttpRequest::GetPost(const char* key) const {
assert(key != nullptr);
if(post_.count(key) == 1) {
return post_.find(key)->second;
}
return "";
}

上层调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/*
* @Author : mark
* @Date : 2020-06-15
* @copyleft Apache 2.0
*/

#ifndef HTTP_CONN_H
#define HTTP_CONN_H

#include <sys/types.h>
#include <sys/uio.h> // readv/writev
#include <arpa/inet.h> // sockaddr_in
#include <stdlib.h> // atoi()
#include <errno.h>

#include "../log/log.h"
#include "../pool/sqlconnRAII.h"
#include "../buffer/buffer.h"
#include "httprequest.h"
#include "httpresponse.h"

class HttpConn {
public:
HttpConn();

~HttpConn();

void init(int sockFd, const sockaddr_in& addr);

ssize_t read(int* saveErrno);

ssize_t write(int* saveErrno);

void Close();

int GetFd() const;

int GetPort() const;

const char* GetIP() const;

sockaddr_in GetAddr() const;

bool process();

int ToWriteBytes() {
return iov_[0].iov_len + iov_[1].iov_len;
}

bool IsKeepAlive() const {
return request_.IsKeepAlive();
}

static bool isET;//是否是ET触发模式
static const char* srcDir;
static std::atomic<int> userCount;

private:

int fd_;
struct sockaddr_in addr_;//internet环境下套接字的地址形式

bool isClose_;

int iovCnt_;//根据有没有文件要传,有的话就是两个。打开文件失败的话,会返回自定义的html,这个html是写到buff的,此时没有文件要传
struct iovec iov_[2];//0发送响应报文,1发送响应文件

Buffer readBuff_; // 读缓冲区
Buffer writeBuff_; // 写缓冲区

HttpRequest request_;
HttpResponse response_;
};


#endif //HTTP_CONN_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*
* @Author : mark
* @Date : 2020-06-15
* @copyleft Apache 2.0
*/
#include "httpconn.h"
using namespace std;

const char* HttpConn::srcDir;
std::atomic<int> HttpConn::userCount;
bool HttpConn::isET;

HttpConn::HttpConn() {
fd_ = -1;
addr_ = { 0 };
isClose_ = true;
};

HttpConn::~HttpConn() {
Close();
};

void HttpConn::init(int fd, const sockaddr_in& addr) {
assert(fd > 0);
userCount++;
addr_ = addr;
fd_ = fd;
writeBuff_.RetrieveAll();
readBuff_.RetrieveAll();
isClose_ = false;
LOG_INFO("Client[%d](%s:%d) in, userCount:%d", fd_, GetIP(), GetPort(), (int)userCount);
}

void HttpConn::Close() {
response_.UnmapFile();
if(isClose_ == false){
isClose_ = true;
userCount--;
close(fd_);
LOG_INFO("Client[%d](%s:%d) quit, UserCount:%d", fd_, GetIP(), GetPort(), (int)userCount);
}
}

int HttpConn::GetFd() const {
return fd_;
};

struct sockaddr_in HttpConn::GetAddr() const {
return addr_;
}

//ntoa:network to ascii,将网络地址转换成“.”点隔的字符串格式
//<arpa/inet.h>,char *inet_ntoa (struct in_addr);参数是结构体
//相反的函数是inet_addr,讲ip转换为长整型,参数是ip字符串,如addr_.sin_addr.s_addr = inet_addr("132.241.5.10");
const char* HttpConn::GetIP() const {
return inet_ntoa(addr_.sin_addr);
}

int HttpConn::GetPort() const {
return addr_.sin_port;//sin_port存储端口号(使用网络字节顺序)
}

ssize_t HttpConn::read(int* saveErrno) {//读数据到自己的缓冲区
ssize_t len = -1;
do {
len = readBuff_.ReadFd(fd_, saveErrno);//调用读缓冲区
if (len <= 0) {
break;
}
} while (isET);//如果是ET模式就一直读取直到len==0,LT模式就读一次就结束
return len;
}

ssize_t HttpConn::write(int* saveErrno) {//iov里的数据写出去
ssize_t len = -1;
do {
len = writev(fd_, iov_, iovCnt_);//写多个非连续缓冲区(聚集写),成功返回字节数,失败返回-1,因此用ssize_t,是signed的
if(len <= 0) {
*saveErrno = errno;
break;
}
if(iov_[0].iov_len + iov_[1].iov_len == 0) { break; } /* 传输结束 *///也就是TuWriteBytes()==0
else if(static_cast<size_t>(len) > iov_[0].iov_len) {//第一个缓冲区以写完
iov_[1].iov_base = (uint8_t*) iov_[1].iov_base + (len - iov_[0].iov_len);//更新第二个缓冲区剩下的
iov_[1].iov_len -= (len - iov_[0].iov_len);
if(iov_[0].iov_len) {//然后更新第一个缓冲区为0
writeBuff_.RetrieveAll();//写缓冲区已经发送完毕
iov_[0].iov_len = 0;
}
}
else {//c++中指针相互赋值要显式转换,void*可接受任意类型的赋值(不必转换);反过来不行,void*要给其他变量赋值要显示转换
iov_[0].iov_base = (uint8_t*)iov_[0].iov_base + len;//typedef unsigned char uint8_t;
iov_[0].iov_len -= len;
writeBuff_.Retrieve(len);//已经发送了len长度
}
} while(isET || ToWriteBytes() > 10240);//ET模式一直写,或者有太多字节要写了,就多写几次
return len;
}

bool HttpConn::process() {//解析请求数据,并把响应数据放到iovec结构体
request_.Init();//处理请求初始化
if(readBuff_.ReadableBytes() <= 0) {//如果一点数据都没接收到
return false;
}
else if(request_.parse(readBuff_)) {//有数据就处理,如果解析请求行错误就false
LOG_DEBUG("%s", request_.path().c_str());//解析请求行成功,后面数据有无无所谓。get的话请求行就有路径,post的话会检查请求体,没有就返回error
response_.Init(srcDir, request_.path(), request_.IsKeepAlive(), 200);//然后初始化响应
} else {
response_.Init(srcDir, request_.path(), false, 400);//请求错误,返回400 bad request
}

response_.MakeResponse(writeBuff_);//响应数据写到缓冲区
/* 响应头 */
iov_[0].iov_base = const_cast<char*>(writeBuff_.Peek());//赋予内存起始地址,用const_cast消去const
iov_[0].iov_len = writeBuff_.ReadableBytes();//大小
iovCnt_ = 1;

/* 文件 */
if(response_.FileLen() > 0 && response_.File()) {//文件不写到缓冲区,直接返回文件内存映射的指针,给iov1
iov_[1].iov_base = response_.File();
iov_[1].iov_len = response_.FileLen();
iovCnt_ = 2;
}
LOG_DEBUG("filesize:%d, %d to %d", response_.FileLen() , iovCnt_, ToWriteBytes());
return true;
}

服务器顶层

事务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/*
* @Author : mark
* @Date : 2020-06-15
* @copyleft Apache 2.0
*/
#ifndef EPOLLER_H
#define EPOLLER_H

#include <sys/epoll.h> //epoll_ctl()
#include <fcntl.h> // fcntl()
#include <unistd.h> // close()
#include <assert.h> // close()
#include <vector>
#include <errno.h>

class Epoller {
public:
explicit Epoller(int maxEvent = 1024);

~Epoller();

bool AddFd(int fd, uint32_t events);

bool ModFd(int fd, uint32_t events);

bool DelFd(int fd);

int Wait(int timeoutMs = -1);

int GetEventFd(size_t i) const;

uint32_t GetEvents(size_t i) const;

private:
int epollFd_;

std::vector<struct epoll_event> events_;//存放监听到的事件
};

#endif //EPOLLER_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/*
* @Author : mark
* @Date : 2020-06-19
* @copyleft Apache 2.0
*/

#include "epoller.h"

Epoller::Epoller(int maxEvent):epollFd_(epoll_create(512)), events_(maxEvent){//构造内核事件表描述符,以及事件集合
assert(epollFd_ >= 0 && events_.size() > 0);
}

Epoller::~Epoller() {
close(epollFd_);//关闭句柄
}

bool Epoller::AddFd(int fd, uint32_t events) {
if(fd < 0) return false;
epoll_event ev = {0};//创建一个epoll_event,前面的vector存放事件,这里是为了描述事件的类型,关联fd。不初始化也可以:epoll_event ev;
ev.data.fd = fd;//关联fd
ev.events = events;//上层设置好类型
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev);//add,成功返回0
}

bool Epoller::ModFd(int fd, uint32_t events) {//和add差不多
if(fd < 0) return false;
epoll_event ev = {0};
ev.data.fd = fd;
ev.events = events;
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, &ev);//mod
}

bool Epoller::DelFd(int fd) {
if(fd < 0) return false;
epoll_event ev = {0};//可以不创建,epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, 0)
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, &ev);
}

//对于timeout:-1:永远等待;0:不等待直接返回,执行下面的代码;其他:在超时时间内没有事件发生,返回0,如果有事件发生立即返回
int Epoller::Wait(int timeoutMs) {//成功返回多少事件就绪,超时返回0,出错返回-1
return epoll_wait(epollFd_, &events_[0], static_cast<int>(events_.size()), timeoutMs);
}//&events_[0]等价于events;vector.size()返回类型是size_t,unsigned int转int

int Epoller::GetEventFd(size_t i) const {//调用wait后,从events事件集合中取出对应的可io的文件描述符
assert(i < events_.size() && i >= 0);
return events_[i].data.fd;
}

uint32_t Epoller::GetEvents(size_t i) const {//调用wait后,从events事件集合中取出对应的事件的类型描述
assert(i < events_.size() && i >= 0);
return events_[i].events;
}

顶层调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/*
* @Author : mark
* @Date : 2020-06-17
* @copyleft Apache 2.0
*/
#ifndef WEBSERVER_H
#define WEBSERVER_H

#include <unordered_map>
#include <fcntl.h> // fcntl()
#include <unistd.h> // close()
#include <assert.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "epoller.h"
#include "../log/log.h"
#include "../timer/heaptimer.h"
#include "../pool/sqlconnpool.h"
#include "../pool/threadpool.h"
#include "../pool/sqlconnRAII.h"
#include "../http/httpconn.h"

class WebServer {
public:
WebServer(
int port, int trigMode, int timeoutMS, bool OptLinger,
int sqlPort, const char* sqlUser, const char* sqlPwd,
const char* dbName, int connPoolNum, int threadNum,
bool openLog, int logLevel, int logQueSize);

~WebServer();
void Start();

private:
bool InitSocket_();
void InitEventMode_(int trigMode);
void AddClient_(int fd, sockaddr_in addr);

void DealListen_();
void DealWrite_(HttpConn* client);
void DealRead_(HttpConn* client);

void SendError_(int fd, const char*info);
void ExtentTime_(HttpConn* client);
void CloseConn_(HttpConn* client);

void OnRead_(HttpConn* client);
void OnWrite_(HttpConn* client);
void OnProcess(HttpConn* client);

static const int MAX_FD = 65536;

static int SetFdNonblock(int fd);

int port_;
bool openLinger_;
int timeoutMS_; /* 毫秒MS */
bool isClose_;
int listenFd_;//fd是socket
char* srcDir_;

//监听是接受tcp连接,所谓的连接是指维护客户与服务器之间的数据交换
uint32_t listenEvent_;//监听模式,维护服务器的监听事件的类型
uint32_t connEvent_;//连接模式,维护客户端与服务器之间的连接事件的类型
//动态创建的,都用智能指针
std::unique_ptr<HeapTimer> timer_;
std::unique_ptr<ThreadPool> threadpool_;
std::unique_ptr<Epoller> epoller_;
std::unordered_map<int, HttpConn> users_;//一个用户的socket匹配一个连接
};


#endif //WEBSERVER_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
/*
* @Author : mark
* @Date : 2020-06-17
* @copyleft Apache 2.0
*/

#include "webserver.h"

using namespace std;

WebServer::WebServer(
int port, int trigMode, int timeoutMS, bool OptLinger,
int sqlPort, const char* sqlUser, const char* sqlPwd,
const char* dbName, int connPoolNum, int threadNum,
bool openLog, int logLevel, int logQueSize):
port_(port), openLinger_(OptLinger), timeoutMS_(timeoutMS), isClose_(false),
timer_(new HeapTimer()), threadpool_(new ThreadPool(threadNum)), epoller_(new Epoller())
{
srcDir_ = getcwd(nullptr, 256);//获取当前路径
assert(srcDir_);
strncat(srcDir_, "/resources/", 16);
HttpConn::userCount = 0;
HttpConn::srcDir = srcDir_;
SqlConnPool::Instance()->Init("localhost", sqlPort, sqlUser, sqlPwd, dbName, connPoolNum);

InitEventMode_(trigMode);
if(!InitSocket_()) { isClose_ = true;}

if(openLog) {
Log::Instance()->init(logLevel, "./log", ".log", logQueSize);
if(isClose_) { LOG_ERROR("========== Server init error!=========="); }
else {
LOG_INFO("========== Server init ==========");
LOG_INFO("Port:%d, OpenLinger: %s", port_, OptLinger? "true":"false");
LOG_INFO("Listen Mode: %s, OpenConn Mode: %s",
(listenEvent_ & EPOLLET ? "ET": "LT"),
(connEvent_ & EPOLLET ? "ET": "LT"));
LOG_INFO("LogSys level: %d", logLevel);
LOG_INFO("srcDir: %s", HttpConn::srcDir);
LOG_INFO("SqlConnPool num: %d, ThreadPool num: %d", connPoolNum, threadNum);
}
}
}

WebServer::~WebServer() {
close(listenFd_);
isClose_ = true;
free(srcDir_);
SqlConnPool::Instance()->ClosePool();
}

void WebServer::InitEventMode_(int trigMode) {
//对端正常断开连接(调用 close()),在服务器端会触发一个 epoll 事件,EPOLLRDHUP监听挂断事件,在底层完成
//EPOLLRDHUP:https://blog.csdn.net/midion9/article/details/49883063
//EPOLLHUP:https://blog.csdn.net/voidccc/article/details/8619632
listenEvent_ = EPOLLRDHUP;//说明要监听对端是否挂断
connEvent_ = EPOLLONESHOT | EPOLLRDHUP;//连接事务每次响应后要重新设置
switch (trigMode)
{
case 0:
break;
case 1:
connEvent_ |= EPOLLET;
break;
case 2:
listenEvent_ |= EPOLLET;
break;
case 3:
listenEvent_ |= EPOLLET;
connEvent_ |= EPOLLET;
break;
default:
listenEvent_ |= EPOLLET;
connEvent_ |= EPOLLET;
break;
}
HttpConn::isET = (connEvent_ & EPOLLET);
}

void WebServer::Start() {
int timeMS = -1; /* epoll wait timeout == -1 无事件将阻塞 */
if(!isClose_) { LOG_INFO("========== Server start =========="); }
while(!isClose_) {
if(timeoutMS_ > 0) {//设置了超时处理的话,每轮都处理超时的事件
timeMS = timer_->GetNextTick();//获取下一个定时器超时的剩余时间
}
int eventCnt = epoller_->Wait(timeMS);//等待事件触发,有事件触发立即返回,没有会等timeMS的时间
//等待的事件包括监听事件和读写事件
for(int i = 0; i < eventCnt; i++) {
/* 处理事件 */
int fd = epoller_->GetEventFd(i);//获取fd描述符
uint32_t events = epoller_->GetEvents(i);//获取时间的类型
if(fd == listenFd_) {//如果监听到一个新连接
DealListen_();
}
//如果是触发的事件是对端断开或者对端异常(ERR通常是服务器读写(自身采取行动)发现对方异常触发)
else if(events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {//异常事件
assert(users_.count(fd) > 0);//确定user内保存了这个描述符
CloseConn_(&users_[fd]);
}
else if(events & EPOLLIN) {//如果是读事件
assert(users_.count(fd) > 0);
DealRead_(&users_[fd]);
}
else if(events & EPOLLOUT) {//如果是写事件
assert(users_.count(fd) > 0);
DealWrite_(&users_[fd]);
} else {//非预期事件
LOG_ERROR("Unexpected event");
}
}
}
}

void WebServer::SendError_(int fd, const char*info) {
assert(fd > 0);
int ret = send(fd, info, strlen(info), 0);//向某个socket发生信息,送到对方的socket由对方处理这个信息
if(ret < 0) {
LOG_WARN("send error to client[%d] error!", fd);
}
close(fd);
}

void WebServer::CloseConn_(HttpConn* client) {//关闭连接
assert(client);//首先会有一个连接类维护这个连接
LOG_INFO("Client[%d] quit!", client->GetFd());
epoller_->DelFd(client->GetFd());//获取连接类维护的socket并从事件表中删除,并不关闭
client->Close();//关闭连接,这个会关闭fd。实际上连接类的析构也会close(),但在使用过程中要手动释放资源
}

void WebServer::AddClient_(int fd, sockaddr_in addr) {
assert(fd > 0);
//创建http连接
users_[fd].init(fd, addr);//不指定value的写法,会讲value以0初始化,然后这里再调用init
//创建计时器关联http连接
if(timeoutMS_ > 0) {//有超时处理的话,就绑定一个定时器,回调函数用来关闭连接。这种形式的回调函数不再需要是static
timer_->add(fd, timeoutMS_, std::bind(&WebServer::CloseConn_, this, &users_[fd]));//绑定参数,this是第一个参数,client是
}
//向内核事件表注册
epoller_->AddFd(fd, EPOLLIN | connEvent_);
//设置非阻塞
SetFdNonblock(fd);
LOG_INFO("Client[%d] in!", users_[fd].GetFd());
}

void WebServer::DealListen_() {
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
do {
int fd = accept(listenFd_, (struct sockaddr *)&addr, &len);//尝试通过listen socket连接,获取地址
if(fd <= 0) { return;}//连接失败,即没有数据了,返回
else if(HttpConn::userCount >= MAX_FD) {//请求过多,无法连接
SendError_(fd, "Server busy!");//向socket发送错误信息
LOG_WARN("Clients is full!");
return;
}
AddClient_(fd, addr);//添加连接
} while(listenEvent_ & EPOLLET);//ET模式要把事件全部处理,应该就是把socket缓冲区的地址全部读完
}

void WebServer::DealRead_(HttpConn* client) {//处理读事件
assert(client);
ExtentTime_(client);//有响应就刷新时间
threadpool_->AddTask(std::bind(&WebServer::OnRead_, this, client));//唤醒一个线程处理读事件
}

void WebServer::DealWrite_(HttpConn* client) {//处理写事件
assert(client);
ExtentTime_(client);
threadpool_->AddTask(std::bind(&WebServer::OnWrite_, this, client));
}

void WebServer::ExtentTime_(HttpConn* client) {//重新设置时间
assert(client);
if(timeoutMS_ > 0) { timer_->adjust(client->GetFd(), timeoutMS_); }
}

void WebServer::OnRead_(HttpConn* client) {//调用读
assert(client);
int ret = -1;
int readErrno = 0;
ret = client->read(&readErrno);//读取数据,把数据读到缓冲区
if(ret <= 0 && readErrno != EAGAIN) {//如果读不到数据且不是因为缓冲区空了,那么就异常,关闭连接
CloseConn_(client);
return;
}
OnProcess(client);//读完了就解析请求数据
}

void WebServer::OnProcess(HttpConn* client) {//解析数据
if(client->process()) {//如果解析成功了,无论是正常请求还是bad请求,通知内核事件输出,即修改成out
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
} else {//缓冲区一点数据都没收到,,继续读
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN);
}
}

void WebServer::OnWrite_(HttpConn* client) {//调用写
assert(client);
int ret = -1;
int writeErrno = 0;
ret = client->write(&writeErrno);//传输数据
if(client->ToWriteBytes() == 0) {
/* 传输完成 */
if(client->IsKeepAlive()) {//如果是keep
OnProcess(client);//改成in
return;
}
}
else if(ret < 0) {//传输数据小于0
if(writeErrno == EAGAIN) {//如果是socket的发送缓存被占满,要继续写
/* 继续传输 */
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);//总是告知继续写,除非写完
return;
}
}
CloseConn_(client);//成功且不keep就关掉连接
}

/* Create listenFd */
bool WebServer::InitSocket_() {//初始化监听窗口,注入内核事件表
int ret;
struct sockaddr_in addr;
if(port_ > 65535 || port_ < 1024) {
LOG_ERROR("Port:%d error!", port_);
return false;
}
addr.sin_family = AF_INET;//AF_INET 表示 IPv4 地址
addr.sin_addr.s_addr = htonl(INADDR_ANY);//本函数将一个32位数从主机字节顺序转换成网络字节顺序,ANY泛指本机,监听所有网卡
addr.sin_port = htons(port_);////将整型变量从主机字节顺序转变成网络字节顺序,就是整数在地址空间存储方式变为高位字节存放在内存的低地址处。
struct linger optLinger = { 0 };//设置tcp连接断开方式,默认是优雅退出即全0
if(openLinger_) {
/* 优雅关闭: 直到所剩数据发送完毕或超时 */
optLinger.l_onoff = 1;
optLinger.l_linger = 1;//在close前延迟linger的时间,这段时间是优雅退出时间,超时则返回错误
}

listenFd_ = socket(AF_INET, SOCK_STREAM, 0);//开启一个连接,返回描述符,SOCK_STREAM基于TCP
if(listenFd_ < 0) {
LOG_ERROR("Create socket error!", port_);
return false;
}

ret = setsockopt(listenFd_, SOL_SOCKET, SO_LINGER, &optLinger, sizeof(optLinger));
if(ret < 0) {
close(listenFd_);
LOG_ERROR("Init linger error!", port_);
return false;
}

int optval = 1;
/* 端口复用 */
/* 只有最后一个套接字会正常接收数据。 */
//打开地址复用功能,允许服务器bind一个地址,即使这个地址当前已经存在已建立的连接
//optval=true:如果在已经处于 ESTABLISHED状态下的socket调用closesocket(一般不会立即关闭而经历TIME_WAIT的过程)后想继续重用该socket
//参考:https://blog.csdn.net/c_base_jin/article/details/94353956
//https://blog.csdn.net/u010144805/article/details/78579528
ret = setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, (const void*)&optval, sizeof(int));
if(ret == -1) {
LOG_ERROR("set socket setsockopt error !");
close(listenFd_);
return false;
}

ret = bind(listenFd_, (struct sockaddr *)&addr, sizeof(addr));//设置完就可以bind一个地址了,监听所有网卡
if(ret < 0) {
LOG_ERROR("Bind Port:%d error!", port_);
close(listenFd_);
return false;
}

//第一个参数即为要监听的socket描述字,第二个参数为相应socket可以排队的最大连接个数。
//socket()函数创建的socket默认是一个主动类型的,listen函数将socket变为被动类型的,等待客户的连接请求。
ret = listen(listenFd_, 6);
if(ret < 0) {
LOG_ERROR("Listen port:%d error!", port_);
close(listenFd_);
return false;
}
ret = epoller_->AddFd(listenFd_, listenEvent_ | EPOLLIN);//添加事件
if(ret == 0) {
LOG_ERROR("Add listen error!");
close(listenFd_);
return false;
}
/*
当 listenfd 设置成阻塞模式(默认行为,无需额外设置)时,如果连接 pending 队列中有需要处理的连接,accept 函数会立即返回,
否则会一直阻塞下去,直到有新的连接到来。
当 listenfd 设置成非阻塞模式,无论连接 pending 队列中是否有需要处理的连接,accept 都会立即返回,不会阻塞。
如果有连接,则 accept 返回一个大于 0 的值,这个返回值即是我们上文所说的 clientfd;如果没有连接,accept 返回值小于 0
*/
SetFdNonblock(listenFd_);//设置为非阻塞
LOG_INFO("Server port:%d", port_);
return true;
}

/*
阻塞方式是文件读写操作的默认方式,但是应用程序员可通过使用O_NONBLOCK 标志来人为
的设置读写操作为非阻塞方式 .( 该标志定义在 < linux/fcntl.h > 中,在打开文件时指定 ) .

如果设置了 O_NONBLOCK 标志,read 和 write 的行为是不同的 ,如果进程没有数据就绪时调用了 read ,
或者在缓冲区没有空间时调用了 write ,系统只是简单的返回 EAGAIN,而不会阻塞进程.
*/

//fcntl系统调用可以用来对已打开的文件描述符进行各种控制操作以改变已打开文件的的各种属性
//F_GETFL:获取文件打开方式的标志,标志值含义与open调用一致,然后或上非阻塞标志
//F_SETFL:设置文件打开方式标志为arg指定方式
int WebServer::SetFdNonblock(int fd) {
assert(fd > 0);
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);//这里感觉是F_GETFL不是F_GETFD
}

main

config里啥东西没有,主要也没啥好配的,就直接main里面启动。服务器顶层的isclose没作用,ctrl+c终止进程,资源由操作系统自动回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* @Author : mark
* @Date : 2020-06-18
* @copyleft Apache 2.0
*/
#include <unistd.h>
#include "server/webserver.h"

int main() {
/* 守护进程 后台运行 */
//daemon(1, 0);

//服务器端口1316,监听和连接事件都是ET模式,连接1分钟无动作就关闭,linger全0是优雅退出
//mysql端口3306,用户名、密码、数据库名称
//连接池数量(同时维持连接的个数)、...
WebServer server(
1316, 3, 60000, false, /* 端口 ET模式 timeoutMs 优雅退出 */
3306, "root", "root", "webserver", /* Mysql配置 */
12, 6, true, 1, 1024); /* 连接池数量 线程池数量 日志开关 日志等级 日志异步队列容量 */
server.Start();
}

压力测试截图

image-20220925220717478