0%

「orange」-server2

任务分发-epoll

  • 使用非阻塞的、ET模式的IO

每次添加事件、修改事件、删除事件都是差不多的流程,可以封装一下,并且epollfd的create和close可以变成RAII的管理模式。

因为上层调用时,事件的类型是不一样的(读转读、读转写、写转写等),为了接口易用与通用,让上层传入events描述事件类型。events是一个uinit32_t,也即32位无符号整数类型。

封装很简单,只需要支持添加、修改、删除、调用epoll_wait、设置非阻塞(这个也让上层决定,默认非阻塞)即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//Epoller.h
#ifndef EPOLLER_H
#define EPOLLER_H

#include <sys/epoll.h> //epoll操作
#include <fcntl.h> // fcntl()
#include <unistd.h> // close()
#include <cassert>
#include <vector>
class Epoller
{
private:
int eventSize;
const bool nonBlock;
int epollFd;

std::vector<struct epoll_event> events;
public:
Epoller(const int eventsize = 1024, const bool ifNonBlock = true):
eventSize(eventsize),nonBlock(ifNonBlock),epollFd(epoll_create(5)),events(eventsize)
{
assert(epollFd>=0);
}
~Epoller()
{
close(epollFd);
}

int setFdNonblock(int fd)
{
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);//出错返回-1
}

bool addFd(int fd, uint32_t events)
{
if(fd < 0) return false;
//如果需要设置非阻塞,根据布尔运算就会调用SetFdNonblock函数,如果返回不是-1就成功,是-1就返回false
if(nonBlock && (setFdNonblock(fd)==-1))
return false;

epoll_event ev = {0};
ev.data.fd = fd;//关联fd
ev.events = events;//上层设置好类型
return 0 == epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ev);//add,成功返回0
}

bool modFd(int fd, uint32_t events)
{
if(fd < 0) return false;
epoll_event ev = {0};
ev.data.fd = fd;
ev.events = events;
return 0 == epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ev);//mod
}

bool delFd(int fd)
{
if(fd < 0) return false;
return 0 == epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0);
}

//对于timeout:-1:永远等待;0:不等待直接返回,执行下面的代码;其他:在超时时间内没有事件发生,返回0,如果有事件发生立即返回
//默认不等待
int wait(int timeoutMs = 0) //成功返回多少事件就绪,超时返回0,出错返回-1
{
return epoll_wait(epollFd, &events[0], eventSize, timeoutMs);
}
int getEventFd(size_t i)
{
return events[i].data.fd;
}
uint32_t getEvents(size_t i)
{
return events[i].events;
}
};

#endif

调用epoller伪代码:

简单说明一下异常事件:

  • EPOLLRDHUP:对方调用close()正常断开连接,服务器会得知该信息;或者直接终止进程,操作系统会自动向服务器发FIN,也会得知。
  • EPOLLHUP:异常断开,当socket的一端认为对方发来了一个不存在的4元组请求的时候,会回复一个RST响应,在epoll上会响应为EPOLLHUP事件。
    • RST:表示重置连接、复位连接。一般有两种场景:
    • 对于客户端:当客户端向一个没有在listen的服务器端口发送的connect的时候,服务器会返回一个RST,因为服务器根本不知道这个4元组的存在。
    • 对于服务器:当客户端的系统突然崩溃(kill pid或者正常关机不行的,因为操作系统会发送FIN给对方),这时服务器从4元组发到客户端的数据就发回一个RST。
      • 使用shutdown、close关闭套接字,发送的是FIN,不是RST
      • 套接字关闭前,使用sleep。对运行的程序Ctrl+C,会发送FIN,不是RST
      • 套接字关闭前,执行return、exit(0)、exit(1),会发送FIN、不是RST
    • 以上后两个是操作系统干的。
  • EPOLLERR:上面两个异常事件都是服务器被动从客户端返回的(FIN或RST)信息得知的,EPOLLERR是服务器主动采取动作,如read和write时,发现对方出问题了,就出现这个异常。

关于两个监听端口:

  • 之前在客户端中简单描述为“两个监听线程”,本来想用两个线程、两个epoll池的。但实际上可以只在主线程里根据epoll响应两个端口上的连接请求
  • 因此实际上两个监听端口都在主线程accept,通过epoll_wait判断是listenfd1还是listenfd2即可。
  • 这是因为发送线程不会接收信息,它唯一的作用就是用于判断连接这个端口的是客户端的接收线程。那么两个监听端口就可以共用一个epoll池,因为epoll池响应到的connfd一定是listenfd1上的,就不需要两个epoll池分别响应connfd了。
  • 不过因为这样叫方便,还是叫做什么什么线程来区分这两个东西,但其实都在主线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include "epoller.h"
#include "threadpool.h"
const int MAX_EVENT = 20;
const int timeout = 0;//不阻塞
unique_ptr<Epoller> epoller(1024);
void start()//主进程
{
threadpool threadp(20);
while(true)
{
int eventCnt = epoller.wait(timeout);
for(int i=0; i<eventCnt; i++)
{
int fd = epoller.getEventFd(i);
uint32_t events = epoller.getEvents(i)
//处理epoll出错和对端关闭情况
if(fd == listenfd1)//注意有两个listen端口
{
listen_1();//把所有connect的都accept,ET模式要循环到底
}
else if(fd == listenfd2)
{
listen_2();
}
else if(events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))//异常事件
{//一般来讲,某个异常是客户端出问题,这样两个connfd都应该会异常,然后都关闭,
//所以不用根据一个把另一个同时关了,关这次的事件即可

/*
* 帮这个用户调exit_,删除表和让影响的对方退出,exit内会close套接字并删除事件
* 这样用户直接exit_后,因为exit_内close了,就不会进入这里再exit_重复调用了
* 而如果用户没有exit_直接关闭或崩溃,就帮忙exit_,总之就是只调用一次
*/
if(usermap.fvalue_conn1_ip(fd) != "")//如果是conn1
exit_(fd);//nologin的话break_直接不会做什么事情
else//conn2
{
epoller.delFd(fd);//从epoll内删除事件
close(fd);//conn2直接close
}

//其他处理
}
else if(events & EPOLLIN)//处理读事件
{
threadp.addTask(bind(task,fd));//添加任务,该任务处理读与写
}
}
//其他处理
}
}

recv处理

task中的接收,只有recv才被epoll响应,send是直接工作的,所以不受ET影响,也不用重新注册

非阻塞情况下,recv返回大于0是字节数,返回等于0是网络断开了或copy出错,这是严重错误,不会设置errno;小于0(**-1**)是其他错误,保存在errno

注意:EPOLLHUP、EPOLLERR这两个在add和mod时不需要手动注册这两个事件,内核会自动添加这两个事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <errno.h>
const uint32_t CONNEVENT = EPOLLONESHOT | EPOLLRDHUP | EPOLLET | EPOLLIN;
string recv_str(int conn1)
{
char recvbuf[256];
string recvstr;
while(true)
{
memset(recvbuf, 0, sizeof(recvbuf));//把接收缓冲清零
size_t len = recv(conn1, recvbuf, sizeof(recvbuf), 0);
if(len == 0)//copy出错或断开
{
string myuser = usermap.fvalue_conn1_user(conn1);//获取发送方userid
LOG_ERROR("%s [recv] error!", myuser.c_str());
uint32_t connEvent = CONNEVENT;//因为是oneshot,每次重新注册
epoller.modFd(conn1, connEvent);//重新设置,等下一次
return "";//这次就不做了
}
else if(len == -1)
{
if(errno == EAGAIN || errno == EWOULDBLOCK)//说明读完了,两个errno是一样的意思
{
uint32_t connEvent = CONNEVENT;//因为是oneshot,每次重新注册
epoller.modFd(conn1, connEvent);
break;//正常结束
}
else if(errno == EINTR)//被中断了,重新读
continue;
else //其他错误
{
string myuser = usermap.fvalue_conn1_user(conn1);//获取发送方userid
LOG_ERROR("%s [recv] error!", myuser.c_str());
uint32_t connEvent = CONNEVENT;//因为是oneshot,每次重新注册
epoller.modFd(conn1, connEvent);//重新设置,等下一次
return "";//这次就不做了
}
}
else//接收到了信息
{
recvstr += string(recvbuf);
}
}
return recvstr;
}

listen处理

简单一点的返回就accept返回-1就直接return了,但详细一点可以继续细分:

说明一下ECONNABORTED,这涉及TCP三次握手:

  • 首先客户端在connect时,发出第一次握手SYN
  • 此时epoll中listenfd收到响应,进入accept处理,返回SYN+ACK第二次握手,并等待第三次握手
  • 如果客户端返回ACK则accept成功,但如果客户端此时返回的是RST,就是这个异常事件了,但这个异常事件会被内核处理,上层直接跳过就好了。
  • 前面说到RST是很偶然的情况,所以一般不进行处理也行。包括被系统中断其实也不常见。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//listen不用oneshot,所以不用重新注册

void listen_1()//accept一次是一个客户连接,ET模式下要连续accept把连接建立完
{
while(true)
{
struct sockaddr_in client_addr;//获取客户的地址和端口号,连接后的不分配新端口
socklen_t len = sizeof(client_addr);//socklen_t 相当于 int,但使用int必须强制转型告知编译器
int conn1 = accept(listenfd1, (struct sockaddr*)&client_addr, &len);
//成功返回非负数
if(conn1 == -1)
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
return;//读完了就返回
else if(errno == ECONNABORTED || errno == EINTR)
continue;//被中断了或客户端断开了就继续
else
{
LOG_DEBUG("server accept error");
continue;
}
}

//向内核注册conn1
uint32_t connEvent = CONNEVENT;
epoller.addFd(conn1, connEvent);

string ip = string(inet_ntoa(client_addr.sin_addr));
usermap.ins_conn1_ip(conn1,ip);//添加映射
LOG_DEBUG("server accept ip-%s",ip.c_str());

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void listen_2()//accept一次是一个客户连接,ET模式下要连续accept把连接建立完
{
while(true)
{
struct sockaddr_in client_addr;//获取客户的地址和端口号,连接后的不分配新端口
socklen_t len = sizeof(client_addr);//socklen_t 相当于 int,但使用int必须强制转型告知编译器
int conn2 = accept(listenfd2, (struct sockaddr*)&client_addr, &len);
//成功返回非负数
if(conn2 == -1)
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
return;//读完了就返回
else if(errno == ECONNABORTED || errno == EINTR)
continue;//被中断了或客户端断开了就继续
else
{
LOG_DEBUG("server accept error");
continue;
}
}

//向内核注册conn2,注册是因为可能会有异常事件要处理
uint32_t connEvent = CONNEVENT;
epoller.addFd(conn2, connEvent);

string ip = string(inet_ntoa(client_addr.sin_addr));
usermap.ins_ip_conn2(ip,conn2);//添加映射
LOG_DEBUG("server accept [2] ip-%s",ip.c_str());

}

}

初始化监听端口

命令主线程端口为9000,发送线程端口为8000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
const int port1 = 9000;
const int port2 = 8000;
int listenfd1;
int listenfd2;
const uint32_t LISTENEVENT = EPOLLRDHUP | EPOLLET | EPOLLIN;
//因为accept是在主线程执行的,所以listenfd不需要oneshot,
//因为监听套接字在执行accept时主线程不会去调用wait,也就不会导致多线程竞争相同套接字

void init_all_Socket()
{
init_one_Socket(listenfd1,port1);
init_one_Socket(listenfd2,port2);//代码复用
}

void init_one_Socket(int& listenfd, const int port)
{
//定义socketfd,它要绑定监听的网卡地址和端口
listenfd = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);//第三个参数写0也可以,这里表示创建tcp套接字
if(listenfd < 0)
{
LOG_ERROR("create listen socket error, port-%d",port);
exit(1);
}
//定义sockaddr_in
struct sockaddr_in socketaddr;
socketaddr.sin_family = AF_INET;//ipv4
socketaddr.sin_port = htons(port);//字节序转换
socketaddr.sin_addr.s_addr = htonl(INADDR_ANY);//INADDR_ANY表示监听所有网卡地址,0.0.0.0;

//端口复用,在bind前设置,否则bind时出错就晚了
int optval = 1;
int ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&optval, sizeof(int));
if(ret == -1)
{
LOG_ERROR("set socket setsockopt error !");
close(listenfd);
exit(1);
}

//绑定套接字和地址端口信息,sockaddr_in转成sockaddr
if(bind(listenfd,(struct sockaddr *)&socketaddr,sizeof(socketaddr))==-1)
{
LOG_ERROR("bind port-%d error !",port);
close(listenfd);
exit(1);
}
//开始监听,SOMAXCONN是系统给出的请求队列最大长度
if(listen(listenfd,SOMAXCONN) == -1)
{
LOG_ERROR("listen port-%d error!", port);
close(listenfd);
exit(1);
}
uint32_t listenEvent = LISTENEVENT;
if(!epoller.addFd(listenfd, listenEvent))
{
LOG_ERROR("add listen to epoll error!");
close(listenfd);
exit(1);
}
LOG_INFO("server listenning to port-%d", port);
}

说明一下SO_REUSEADDR

  • SO_REUSEADDR:

  • 端口复用

  • 一般来说,一个端口释放后会等待一会之后才能再被使用,因为主动关闭会time_wait

  • SO_REUSEADDR只有针对time-wait连接,确保server重启成功的这一个作用

  • linux系统time-wait连接持续时间为1min。

  • SOL_SOCKET表示在套接字级别上设置选项

  • optval=1:不等于0打开,等于0关闭

  • SO_REUSEADDR是让端口释放后立即就可以被再次使用。

  • SO_REUSEPORT是让多个进程可以绑定相同端口,并发性更好,可扩展性强

日志系统

之前的博客有完整的从头编写的步骤,可以完整地学习:WebServer模块单元测试 | JySama,在日志系统部分。

阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#ifndef BLOCKQUEUE_H
#define BLOCKQUEUE_H

#include <mutex>
#include <queue>
#include <condition_variable>
#include <cassert>
#include <chrono>
template<class T>
class blockqueue
{
private:
std::queue<T> que;
std::mutex mux;
std::condition_variable condprod;
std::condition_variable condcons;
size_t size;
bool isclose;
public:
blockqueue(int maxsize = 1024):size(maxsize),isclose(false)
{
assert(maxsize>0);//初始化检查
}

~blockqueue()//上层调用close即可
{
}

void close();
void clear();
bool empty();
bool full();

void push(const T &task);
bool pop(T& task);
bool pop(T& task, int timeout);


};


template<class T>
bool blockqueue<T>::empty()
{
//这是日志系统调用的函数,阻塞队列的pop不能调用,否则会死锁
std::lock_guard<std::mutex> locker(mux);
return que.empty();
}

template<class T>
bool blockqueue<T>::full()
{
//这是日志系统调用的函数,阻塞队列的push不能调用,否则会死锁
std::lock_guard<std::mutex> locker(mux);
return que.size()>=size;
}

template<class T>
void blockqueue<T>::push(const T &task)
{
//插入元素,首先抢占互斥锁,但即使抢占了互斥锁也可能不能插入,队列可能是满的,这时要释放锁让消费者线程获得锁
std::unique_lock<std::mutex> locker(mux);//要用条件变量,用unique锁
while(que.size()>=size)//避免虚假唤醒,notify_one一般不会导致虚假唤醒,但要随时最好准备。并且当要关闭时会notify_all
condprod.wait(locker);//等待唤醒
if(isclose)
return;//不能插入元素
que.push(task);//插入元素
condcons.notify_one();//唤醒消费者
}

template<class T>
bool blockqueue<T>::pop(T& task)
{
std::unique_lock<std::mutex> locker(mux);
while(que.empty() and !isclose)
condcons.wait(locker);

if(isclose)//关闭的信号
return false;

task = que.front();
que.pop();
condprod.notify_one();
return true;
}

template<class T>
bool blockqueue<T>::pop(T& task, int timeout)
{
std::unique_lock<std::mutex> locker(mux);
while(que.empty() and !isclose)//为空就等待,等待过程中如果超时就返回
if(condcons.wait_for(locker,std::chrono::seconds(timeout)) == std::cv_status::timeout)//超时
return false;

if(isclose)//关闭的信号
return false;
task = que.front();
que.pop();
condprod.notify_one();
return true;
}

template<class T>
void blockqueue<T>::close()
{
/*
* 当关闭时,我们的目标是要让所有的线程都退出,也就是不能被阻塞到push和pop里
* 调用此函数的时机是,上层日志系统已经把任务都做完,然后关闭队列
* 则调用这个函数后,所有想再次尝试push和pop的线程都不允许
*/
std::lock_guard<std::mutex> locker(mux);//要锁住,然后clear队列
clear();
isclose = true;//修改信号
//唤醒所有线程
condprod.notify_all();
condcons.notify_all();
}

template<class T>
void blockqueue<T>::clear()
{
//close已经锁住了,不用锁了
//高效的方式,swap一个空队列
std::queue<T> empty;
std::swap(empty, que);
}

#endif

上层日志系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//头文件
#ifndef LOG_H
#define LOG_H

#include<mutex>
#include "time.h"
#include<stdarg.h>
#include<string>
#include<stdio.h>
#include<cassert>
#include<thread>
#include <dirent.h> //opendir
#include <sys/stat.h> //mkdir
#include "blockqueue.h"
using namespace std;
class Log
{
private:
Log();//单例模式构造函数私有,成员函数才能调用构造函数
public:
Log(Log const&) = delete;
Log& operator=(Log const&) = delete;
~Log();//关闭...//析构函数实际上和构造函数一样,可以private,因为本质上是成员函数调用
static Log* instance();//单例

//函数声明和定义,只能有一个使用默认参数,如果函数的声明和定义是分开的,那缺省函数不能在函数声明和定义中同时出现
//默认参数在函数声明中提供,当又有声明又有定义时,定义中不允许默认参数(定义中的默认参数是无用的,必须传入参数才能找到匹配的函数)
void init(int level=1, const char* fpath = "./log",int maxqueue_size=1024,int threadnum=1);//不能用构造函数传参,使用一个init传参初始化

void setlevel(int level){loglevel = level;}//修改level的接口,只允许主线程修改,因此不用互斥
int getlevel(){return loglevel;}
bool isopen(){return logisopen;}//看是否打开日志的接口

void createthread(int threadnum);
static void logthread();//异步线程的回调函数,需要是staic,没有this隐藏参数
void write(int level, const char *format,...);//同步写,解耦
void close();
private:
void asyncwrite();//互斥写,不用lambda表达式,因为要用到log类的变量,并修改它们
void changefile(struct tm *nowtime);//write函数的解耦
struct tm* gettime();
private:
static const int maxlines = 52000;
FILE *logfp;
int linecounts;
int filenum;
const char* path;
int logday;
bool isasync;
bool logisopen;
int loglevel;
unique_ptr<blockqueue<string>> blockque;//不用lambda表达式可以用uniqueptr,因为一个指针一起用。用指针是因为要根据队列长度动态构造
mutex mux;
};

//我们想用一个函数封装write函数,比如logoinfo调用level1的write,并且还要能判断loglevel支不支持
//但函数封装变参函数,为了传递可变参数,实际上还要修改write的实现,不如用宏来实现,使用##__VA_ARGS__传递可变参数,让编译器把宏替换为真实的函数
//##__VA_ARGS__的优点是,对于宏调用,如果format是一个字符串也即后面没有可变参数,## 操作将使预处理器(preprocessor)去除掉它前面的那个逗号。
//宏与类无关了,这里必须isopen了才能使用
#define LOG_BASE(level, format, ...) \
do {\
Log* log = Log::instance();\
if (log->isopen() && log->getlevel() <= level) {\
log->write(level, format, ##__VA_ARGS__); \
}\
} while(0);

#define LOG_DEBUG(format, ...) do {LOG_BASE(0, format, ##__VA_ARGS__)} while(0);
#define LOG_INFO(format, ...) do {LOG_BASE(1, format, ##__VA_ARGS__)} while(0);
#define LOG_WARN(format, ...) do {LOG_BASE(2, format, ##__VA_ARGS__)} while(0);
#define LOG_ERROR(format, ...) do {LOG_BASE(3, format, ##__VA_ARGS__)} while(0);

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#include "log.h"
using namespace std;

Log* Log::instance()
{
static Log instance;//调用构造函数
return &instance;
}
struct tm* Log::gettime()
{
struct tm *nowtime;
time_t t;
t = time(NULL);
nowtime = localtime(&t);
return nowtime;
}

void Log::changefile(struct tm *nowtime)//完成行数增加、判断文件切换
{
//接下来涉及行数的改写,以及文件的切换,要互斥。因为实际上一个线程切换文件即可,如果有线程在切换其他线程不可动
//unique_lock<mutex> locker(mux);互斥交给上层,因为fputs也要互斥
linecounts++;//先++,因为行数是从0开始的,++后刚好判断是不是满了,这个操作要互斥
if(logday != nowtime->tm_mday || linecounts == maxlines)//如果换了一天或行数满了
{
char newname[48];//用snprintf,不能用string了
if(logday != nowtime->tm_mday)//如果是换了一天
{
logday = nowtime->tm_mday;//修改天
linecounts = 0;//换文件了
filenum = 0;//文件份数从0开始
//为了格式化命名,要用format,这里用snprintf写入str
snprintf(newname, 48, "%s/%d-%02d-%02d_log%05d.txt",//补充一个前缀-文件夹
path, nowtime->tm_year+1900, nowtime->tm_mon+1, nowtime->tm_mday, filenum);
}
else//行数满了
{
linecounts = 0;
filenum++;
snprintf(newname, 48, "%s/%d-%02d-%02d_log%05d.txt",
path, nowtime->tm_year+1900, nowtime->tm_mon+1, nowtime->tm_mday, filenum);
}

fflush(logfp);//在关闭文件前要把文件缓存区的内容写完
fclose(logfp);
logfp = fopen(newname,"w");
assert(logfp != nullptr);//创建失败报错
}
//locker.unlock();
}

void Log::write(int level, const char *format,...)
{
//初始化时间
struct tm *nowtime = gettime();

//-----------------根据传入的信息整理成一行日志-------------------------
char infobuffer[128];//一般一行日志没那么长,128足够了
char timebuffer[36];//时间头
string allinfo;
//分级
switch(level)
{
case 0:
allinfo += "[debug]";
break;
case 1:
allinfo += "[info]";
break;
case 2:
allinfo += "[warning]";
break;
case 3:
allinfo += "[error]";
break;
default:
allinfo += "[info]";
break;
}
//添加时间信息
snprintf(timebuffer,36, "%d-%02d-%02d_%02d:%02d:%02d:",
nowtime->tm_year+1900, nowtime->tm_mon+1, nowtime->tm_mday,
nowtime->tm_hour,nowtime->tm_min,nowtime->tm_sec);//只精确到秒,更具体的信息交给内容体现

allinfo += string(timebuffer);

//写内容
va_list vaList;
va_start(vaList,format);
vsnprintf(infobuffer,128,format,vaList);
va_end(vaList);

allinfo += string(infobuffer)+"\n";//注意换个行
//----------------------------------------------------------------------------------------------

//分异步还是同步,要不要切换文件交给异步线程判断
if(isasync && !blockque->full())
blockque->push(allinfo);//异步直接插入
else
{
//在写之前看要不要创建新文件
//如果当前日期变了,一般来说判断day就可以了;或是行数已满,就换一个文件

lock_guard<mutex> locker(mux);//互斥
changefile(nowtime);//直接交给该函数完成
fputs(allinfo.c_str(),logfp);//操作文件缓冲区,也要互斥
fflush(logfp);
}
}

Log::Log()//初始化一部分变量
{
//初始行数为-1,因为是先++然后判断再写入,初始为0的话,第一份文件会少一行,
//比如最大行为2,初始为0;则++,写入;++就换文件了,只写了一行,所以初始要是-1。换文件后置为0
//因为换文件后没++了,写了一行,就是正确的
linecounts = -1;
filenum = 0;
isasync = false;
blockque = nullptr;
logday = 0;
logfp = nullptr;
logisopen = false;//init才算打开
}
void Log::close()
{

logisopen = false;
if(isasync)//异步的话要让线程退出
{
while(!blockque->empty());//等待工作完成

blockque->close();
}
if(logfp)//如果打开了文件要关闭
{
//由于其他线程可能正在使用,因此要等待互斥锁
lock_guard<mutex> locker(mux);
fflush(logfp);//刷新缓冲区
fclose(logfp);
logfp = nullptr;
}
}
Log::~Log()
{
close();
}

void Log::logthread()//异步线程回调函数
{
Log::instance()->asyncwrite();//调用类成员函数
}
//等级越低允许记录的权限越低,fpath是文件夹,文件名内部设置
void Log::init(int level, const char* fpath,int maxqueue_size,int threadnum)
{
if(logisopen == true)
return;//只允许init一次
logisopen = true;
loglevel = level;
if(maxqueue_size>0)//有阻塞队列则异步
{
isasync = true;
//创建阻塞队列
unique_ptr<blockqueue<string>> que(new blockqueue<string>(maxqueue_size));
blockque = move(que);//移动赋值
createthread(threadnum);
}
else
isasync = false;

//初始化时间
struct tm *nowtime = gettime();
logday = nowtime->tm_mday;
path = fpath;

char filename[48];//用snprintf,不能用string了
snprintf(filename, 48, "%s/%d-%02d-%02d_log%05d.txt",//补充一个前缀-文件夹
path, nowtime->tm_year+1900, nowtime->tm_mon+1, nowtime->tm_mday, filenum);

//初步打开文件,没有文件夹就创建文件夹
if(opendir(path) == NULL)//如果文件夹不存在
mkdir(path,0777);//0777是最大的访问权

logfp = fopen(filename,"w");
assert(logfp!=nullptr);
}

void Log::asyncwrite()
{
string str;
while(blockque->pop(str))
{
struct tm* nowtime = gettime();
lock_guard<mutex> locker(mux);//互斥
changefile(nowtime);//每写一行判断要不要换文件
fputs(str.c_str(),logfp);
fflush(logfp);
}
}

void Log::createthread(int threadnum)
{
for(int i=0;i<threadnum;i++)
{
thread(logthread).detach();//因为内部函数采用单例调用,logthread不用传入this指针
}
}

聊天记录存储

有了日志系统,要不要也设置一个聊天记录系统呢?实际上聊天记录要比日志系统简单很多,因为日志系统要把多个线程的日志记录写到一个文件,而聊天记录的存储实际上只是单线程记录的。因为设置了oneshot,在一次响应的过程中不会被其他线程响应,因此一个user的聊天记录存储工作就在这个线程里。

其次,聊天记录是同步的还是异步的,这也需要考量一番。我习惯把日志系统设置为异步的,是因为日志系统可以看成“汇聚所有应写的日志”,放入阻塞队列里用工作线程慢慢写入日志即可,这里面还有一个要点就是写入的文件在一段时间永远是同一个。

而聊天记录不同,一个user的聊天记录应该保存在一个文件里(目前先只考虑一个文件,而不考虑文件行数过多切换文件的情况),这样多个user在操作时就有多个文件的打开关闭。这样子用一个阻塞队列把这些记录汇集起来从逻辑上来说是有点奇怪的:

  • 一方面工作线程在获取阻塞队列的信息时把文件切换来切换去
  • 一方面这个系统(如果真设计出来)保存的行数信息、天数信息都没什么用。
  • 以及,因为聊天记录的字节大小不像一般的log记录那么精简,如果用阻塞队列很可能会有一段延时,导致记录的时间不对

因此,聊天记录同步存储即可,就在task内完成记录,因为task本身就是一个线程,这也该是线程完成的事情。并且每次只需要userid即可知道写哪个文件,所以实际上写一个函数即可。

每记录一次都打开、关闭文件,而不是当用户时登录时打开文件并记录映射,等用户退出时关闭文件,因为经验不足还无法预知用户会干什么事情以及退出代码写的对不对,所以先这样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include<stdio.h>
#include<string>
#include "time.h"
bool msg_log(string userid, string& msglog)
{
string filename = userid+".txt";
FILE *fp = fopen(filename.c_str(),"a");//不用互斥,同时间只有一个线程操作
if(fp == nullptr)
return false;
//初始化时间
struct tm *nowtime;
time_t t;
t = time(NULL);
nowtime = localtime(&t);
char timebuffer[36];//时间头
//添加时间信息
snprintf(timebuffer,36, "%d-%02d-%02d_%02d:%02d:%02d:",
nowtime->tm_year+1900, nowtime->tm_mon+1, nowtime->tm_mday,
nowtime->tm_hour,nowtime->tm_min,nowtime->tm_sec);//只精确到秒,更具体的信息交给内容体现
string allmsg = string(timebuffer)+msglog;
fputs(allmsg.c_str(),fp);
fclose(fp);
return true;
}

退出函数

写一个退出函数,在服务器开启时用一个线程接收退出信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//定义一个退出密码
const string exitpwd = "jysama";
void deal_close()
{
char buf[64];
const string exitstr = "exit -p"+exitpwd+"\n";
const string quitstr = "quit -p"+exitpwd+"\n";
while (fgets(buf, sizeof(buf), stdin) != NULL)//gets已不被编译器支持,不太安全
{
if(strcmp(buf,exitstr.c_str())==0)
break;
else if(strcmp(buf,quitstr.c_str())==0)
break;
else
cout << "error" << endl;

bzero(buf,sizeof(buf));
}
//close all
}

顶层

所有工作已经完成,现在进行封装。

比较复杂,不放代码了。。。

数据库建立

1
2
3
4
5
6
7
8
9
// 建立yourdb库
create database yourdb;

// 创建user表
USE yourdb;
CREATE TABLE user(
userid char(50) NULL,
password char(50) NULL
)ENGINE=InnoDB;

makefile

g++ -o:

-O: 同时减少代码的长度和执行时间,其效果等价于 -O1

-O0: 表示不做优化

-O1: 表示默认优化

-O2: 告诉 g++ 产生尽可能小和尽可能快的代码。除了完成-O1 的优化之外,还进行一些额外的调整工作,如指令调整等

-O3: 包括循环展开和其他一些与处理性相关的优化工作,选项将使编译的速度比 -O 慢,但通常产生的代码执行速度会更快。


g++ -Wall:-Wall 打印警告信息


1
2
3
4
5
目标...: 依赖...
命令1
命令2
...
注意每条命令前必须有且仅有一个 tab 保持缩进,这是语法要求。
1
2
3
4
5
6
7
8
9
CXX = g++
CFLAGS = -std=c++14 -O2 -Wall
TARGET = server
OBJS = global_variables.cpp log.cpp usermap.cpp sqlconnpool.cpp udp_hole_punch.cpp server.cpp main.cpp

server: $(OBJS)
$(CXX) $(CFLAGS) $(OBJS) -o $(TARGET) -lpthread -lmysqlclient
clean:
rm -f $(TARGET)

编译

激动人心的时刻到了,因为一直是在markdown手写的,没有编写边测试,所以也不知道有多少错。


没什么大的bug,接下来还有一些问题要调。