开篇吐槽C/C++没有像python或者GO那样的包管理和调用,干个啥玩意都要折腾半天,还涉及到gcc/msvc/c++版本/vs版本等平台的不同导致无法编译痛苦的嗷嗷叫。怨不得有很多人重复造轮子,你的轮子我用不了呀!本篇就写了一个如何把一个轮子改成自己喜欢的姿势。
背景需求
每个工作背景都不同,需要总结一下:
1.C++编写,跨平台(windows上用的vs2013,所以是C++11标准)
2.支持http和ws的server,支持多线程,支持uri映射(类似addhandler("/hello",onrequestcallback);),支持请求中相关字段的获取(get/post参数读取、header、cookie等读取)
3.不用性能超强,并发数也不大(几十并发)
4.依赖库少一点,不想为了一个小功能引入boost之类的大家伙
之前一直用的是libevent库提供的http服务作为一个接口服务器,后来有了新的需求要增加ws支持,官方的不支持ws,自己魔改libevent增加ws支持实力不够改的兼容性不好,框架网上有大把的框架,用现成的吧。
筛选
查找了很多库https://github.com/fffaraz/awesome-cpp#networking
前后测试了两遍没有完全满足的,都要二次封装,那么就选一个库作为基石来鼓捣。
最后选择的是mongoose(具体过程不说了,完全是个人情况而定的)
mongoose一个很单纯的C编写的库,包含了client和server的功能,单线程业务处理,不过提供的方法倒是挺多的,非常方便使用。不过这个库不是性能优先,如果非常注重性能(几万并发起步)那还是不要用了,不过却满足我的当前需求。
建立http服务
mongoose简单示例(官方)
#include "mongoose.h"
//连接上所有事件的回调函数
static void ev_handler(struct mg_connection *nc, int ev, void *p)
{
//http请求
if (ev == MG_EV_HTTP_REQUEST)
{
//发送返回值,有多个函数可以用不同姿势发送返回值
mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK");
}
}
int main(void)
{
struct mg_mgr mgr;
struct mg_connection *nc;
mg_mgr_init(&mgr, NULL); //初始化连接管理器
//绑定端口,设置回调函数
nc = mg_bind(&mgr, "8000", ev_handler);
if (nc == NULL) {
printf("Failed to create listener\n");
return 1;
}
//允许http和websocket
mg_set_protocol_http_websocket(nc);
//事件循环
for (;;) {
mg_mgr_poll(&mgr, 1000);
}
//释放连接管理器
mg_mgr_free(&mgr);
return 0;
}
先建立连接管理器(mg_mgr),然后所有的连接都在mg_mgr中以链表的形式管理。
然后在mg_mgr中bind一个本地端口,并设置该连接允许http和websocket(mg_set_protocol_http_websocket),然后处理回调消息就完事了,非常简单。
那么也简单了,按照这几个方法,很容易封装出来一个C++的版本。
多线程的使用
多线程的官方示例:
#include "mongoose.h"
static sig_atomic_t s_received_signal = 0;
static const char *s_http_port = "8000";
static const int s_num_worker_threads = 5;
static unsigned long s_next_id = 0;
static void signal_handler(int sig_num) {
signal(sig_num, signal_handler);
s_received_signal = sig_num;
}
static struct mg_serve_http_opts s_http_server_opts;
static sock_t sock[2];
// This info is passed to the worker thread
struct work_request {
unsigned long conn_id; // needed to identify the connection where to send the reply
// optionally, more data that could be required by worker
};
// This info is passed by the worker thread to mg_broadcast
struct work_result {
unsigned long conn_id;
int sleep_time;
};
static void on_work_complete(struct mg_connection *nc, int ev, void *ev_data) {
(void) ev;
char s[32];
struct mg_connection *c;
for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c)) {
if (c->user_data != NULL) {
struct work_result *res = (struct work_result *)ev_data;
if ((unsigned long)c->user_data == res->conn_id) {
sprintf(s, "conn_id:%lu sleep:%d", res->conn_id, res->sleep_time);
mg_send_head(c, 200, strlen(s), "Content-Type: text/plain");
mg_printf(c, "%s", s);
}
}
}
}
void *worker_thread_proc(void *param) {
struct mg_mgr *mgr = (struct mg_mgr *) param;
struct work_request req = {0};
while (s_received_signal == 0) {
if (read(sock[1], &req, sizeof(req)) < 0)
perror("Reading worker sock");
int r = rand() % 10;
sleep(r);
struct work_result res = {req.conn_id, r};
mg_broadcast(mgr, on_work_complete, (void *)&res, sizeof(res));
}
return NULL;
}
static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) {
(void) nc;
(void) ev_data;
switch (ev) {
case MG_EV_ACCEPT:
nc->user_data = (void *)++s_next_id;
break;
case MG_EV_HTTP_REQUEST: {
struct work_request req = {(unsigned long)nc->user_data};
if (write(sock[0], &req, sizeof(req)) < 0)
perror("Writing worker sock");
break;
}
case MG_EV_CLOSE: {
if (nc->user_data) nc->user_data = NULL;
}
}
}
int main(void) {
struct mg_mgr mgr;
struct mg_connection *nc;
int i;
if (mg_socketpair(sock, SOCK_STREAM) == 0) {
perror("Opening socket pair");
exit(1);
}
signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler);
mg_mgr_init(&mgr, NULL);
nc = mg_bind(&mgr, s_http_port, ev_handler);
if (nc == NULL) {
printf("Failed to create listener\n");
return 1;
}
mg_set_protocol_http_websocket(nc);
s_http_server_opts.document_root = "."; // Serve current directory
s_http_server_opts.enable_directory_listing = "no";
for (i = 0; i < s_num_worker_threads; i++) {
mg_start_thread(worker_thread_proc, &mgr);
}
printf("Started on port %s\n", s_http_port);
while (s_received_signal == 0) {
mg_mgr_poll(&mgr, 200);
}
mg_mgr_free(&mgr);
closesocket(sock[0]);
closesocket(sock[1]);
return 0;
}
开始看的我有点懵逼,建立了一个mg_socketpair来完成业务线程与工作线程的通信,在多线程的工作线程中要用mg_broadcast回调在回调里面真正发送数据,开始没有很好地理解写错了,崩溃了整整一宿(20:00到3:00)。后来仔细阅读了代码结合一些其他人的描述,才弄明白原理。
首先mongoose是业务线程是单线程,就是mg_mgr_poll那个循环就是业务线程,也就是说ev_handler这个回调和发送返回值操作都要在业务线程中做,不在业务线程中调用发送函数(例如mg_send_head)是错误的直接蹦。工作线程是可以多个线程来处理业务,处理完毕后要通知回业务线程。这个通知的方法就是mg_broadcast。
至于这个mg_socketpair可以认为是一个队列,用于业务线程接收到事件后,封装数据的发送到工作线程。补个图。
多线程之间通信
ev_handler回调的数据发送到工作线程
官方示例中使用mg_socketpair通信,其实就是建立了一对socket一个作为发送者一个作为接收者,就是socket通信而已。实际上可以用队列比较方便。建议使用无锁队列效率比较高(写的第一版用的是加锁队列,效率差了十多倍),无锁队列库:https://github.com/cameron314/concurrentqueue (个人比较喜欢的header only方式)
数据传输这里,开始我直接传递的event_data指针,发现完全不行,因为ev_handler返回后该数据会被销毁,所以要在回调中把event_data中所需要的数据都缓存下来。比如http的回调事件MG_EV_HTTP_REQUEST,event_data是http_message*数据,里面包含uri、mehtod、body、message、header等等信息,都要自己另存下来,否则回调一返回数据就会被销毁了。保存好这份数据后,将其插入队列等待工作线程取数据处理。这时候回调返回,业务线程可以继续loop去干其他事情。
http_message *msg = (http_message *)event_data;
//保存回调里面的关键数据
MG_EVENT_DATA_PTR cbdata = new MG_EVENT_DATA;
cbdata->event_type = event_type;
cbdata->body.assign(msg->body.p, msg->body.len);
cbdata->method.assign(msg->method.p, msg->method.len);
cbdata->uri.assign(msg->uri.p, msg->uri.len);
cbdata->query.assign(msg->query_string.p, msg->query_string.len);
cbdata->proto.assign(msg->proto.p, msg->proto.len);
cbdata->contentlength = msg->content_length;
for (int i = 0; i < MG_MAX_HTTP_HEADERS; i++)
{
if (msg->header_names[i].len == 0)
break;
cbdata->headers.emplace_back(std::string(msg->header_names[i].p, msg->header_names[i].len), std::string(msg->header_values[i].p, msg->header_values[i].len));
}
//将cbdata插入队列
工作线程处理与返回
工作线程是多线程,例如4线程就用std::thread开启4个work_handler()。里面的逻辑就是先从队列里面取出来数据,然后取出来数据后回调给用户接口,用户接口干活,干完之后封装返回值,这些都不用多说就是个简单的callback而已。这里再次强调不能直接在工作线程中调用发送方法(例如mg_send_head),要用回调将数据返回给业务线程去处理。重点就是mg_broadcast这个坑,写的真尼玛晦涩难懂,还都是坑,怪不得网上很多人都说官方的多线程例子是坨屎。
首先说说mg_broadcast这个方法
//工作线程中调用
void mg_broadcast(struct mg_mgr *mgr, mg_event_handler_t cb, void *data, size_t len) {
struct ctl_msg ctl_msg;
/*
* Mongoose manager has a socketpair, `struct mg_mgr::ctl`,
* where `mg_broadcast()` pushes the message.
* `mg_mgr_poll()` wakes up, reads a message from the socket pair, and calls
* specified callback for each connection. Thus the callback function executes
* in event manager thread.
*/
if (mgr->ctl[0] != INVALID_SOCKET && data != NULL &&
len < sizeof(ctl_msg.message)) {
size_t dummy;
ctl_msg.callback = cb;
memcpy(ctl_msg.message, data, len);
dummy = MG_SEND_FUNC(mgr->ctl[0], (char *) &ctl_msg,
offsetof(struct ctl_msg, message) + len, 0);
dummy = MG_RECV_FUNC(mgr->ctl[0], (char *) &len, 1, 0);
(void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
}
}
//业务线程中接收
static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) {
struct ctl_msg ctl_msg;
int len = (int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg), 0);
size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0);
DBG(("read %d from ctl socket", len));
(void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) {
struct mg_connection *nc;
for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) {
//回调方法
ctl_msg.callback(nc, MG_EV_POLL,
ctl_msg.message MG_UD_ARG(nc->user_data));
}
}
}
void mg_broadcast(struct mg_mgr *mgr, mg_event_handler_t cb, void *data, size_t len);
第二个参数是广播的回调方法,data和len是数据。工作线程中调用mg_broadcast来广播,触发的回调方法里面就是业务线程了(开始没理解,后来通过打印调用和回调的threadid才明白)。
内部很简单其实就是一个在内部建立了一个mg_socketpair(上面描述过),将要传递的数据封装到结构体里面,工作线程中调用send把结构体发送出去,业务线程里面recv到这个结构体,然后触发回调,很简单的逻辑。这里官方代码很坑,connection在连接后 ( MG_EV_ACCEPT 事件),初始化了一个唯一connid( s_next_id )给每个connection(存储到了user_data里面),广播会将每个connection都回调一遍(上面代码里面nc那个链表的遍历),然后在回调里面判断当前的connection是不是我要发送数据的connid,不是的话返回,是的话继续发送数据。官方示例里面在on_work_complete又遍历了一遍所有connection这里其实是错误的,一个大坑,因为回调已经是遍历所有connection回调了。这个也符合mg_broadcast这个名字,广播给所有connection回调。
这里还有一个大坑,因为mg_broadcast其实也是异步的,利用socket发送用户数据,那个结构体的定义为:
struct ctl_msg {
mg_event_handler_t callback;
char message[MG_CTL_MSG_MESSAGE_SIZE]; //默认是8192
};
所以这里传递的数据不能是原始返回内容的buffer,太小了不合适呀,所以要new一个buffer传递进去,然后在回调里面去delete掉。但这里会引入另外一个问题,broadcast里面是遍历所有正常的connection,如果某个用户接口处理时间比较长比如要2秒,返回的时候connection已经断开了(多线程下是可能存在的),那么这个广播后就不能触发回调,导致这份buffer无法被删除造成内存泄露。
问题总结:
1.广播时connection断开了无法删除buffer
2.异步的,不知道当前connection的状态
3.工作线程与业务线程传递数据使用广播,意味着每次都要通知一遍所有的connection,这完全没有必要呀,我有connid那只回调一个connection就可以了哇。当然原本的broadcast到所有nc还是有必要的。
开始魔改mg_broadcast吧。。。改后的代码:
struct ctl_msg {
mg_event_handler_t callback;
uint32_t connid; //connID,可以遍历指定的connection
int syncflag; //异步和同步执行标记
void *data; //传递的数据
};
//增加connid参数,可以指定connid来触发回调,如果connid为0那就是广播(给connection初始化connid的时候不能设置为0)
//data: 用户数据指针
//ret:返回值指针,NULL为异步,如果设置了就意味着同步发送 发送完毕后才返回。同步方式可以节省内存,异步发送需要新开辟内存
void mg_broadcast(struct mg_mgr *mgr, uint32_t connid, mg_event_handler_t cb, void *data, int *ret) {
struct ctl_msg ctl_msg;
/*
* Mongoose manager has a socketpair, `struct mg_mgr::ctl`,
* where `mg_broadcast()` pushes the message.
* `mg_mgr_poll()` wakes up, reads a message from the socket pair, and calls
* specified callback for each connection. Thus the callback function executes
* in event manager thread.
*/
if (mgr->ctl[0] != INVALID_SOCKET && data != NULL) {
size_t dummy;
ctl_msg.callback = cb;
ctl_msg.connid = connid;
if (ret)
ctl_msg.syncflag = 1;
ctl_msg.data = data;
dummy = MG_SEND_FUNC(mgr->ctl[0], (char *)&ctl_msg, sizeof(ctl_msg), 0);
dummy = MG_RECV_FUNC(mgr->ctl[0], (char *)&ctl_msg.syncflag, sizeof(int), 0);
(void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
if (ret)
*ret = ctl_msg.syncflag;
}
}
static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) {
struct ctl_msg ctl_msg;
size_t dummy;
int ret = 1;
int len = (int)MG_RECV_FUNC(mgr->ctl[1], (char *)&ctl_msg, sizeof(ctl_msg), 0);
//不是同步执行 就立刻返回
if (ctl_msg.syncflag == 0)
dummy = MG_SEND_FUNC(mgr->ctl[1], (char *)&ret, sizeof(int), 0);
DBG(("read %d from ctl socket", len));
(void)dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) {
struct mg_connection *nc;
if (ctl_msg.connid == 0)
{
//先广播所有正常的连接
for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) {
ctl_msg.callback(nc, MG_EV_POLL,
ctl_msg.data MG_UD_ARG(nc->user_data));
}
//最后发送一个nc为NULL的回调 表示广播结束了 用于清理内存等等
ctl_msg.callback(NULL, MG_EV_POLL,
ctl_msg.data MG_UD_ARG(nc->user_data));
}
else
{
for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) {
if (nc->connid == ctl_msg.connid)
break;
}
if (nc == NULL)
ret = 0;
ctl_msg.callback(nc, MG_EV_POLL,
ctl_msg.data MG_UD_ARG(nc->user_data));
}
}
//同步执行的情况下 执行完毕了 返回ret
if (ctl_msg.syncflag)
dummy = MG_SEND_FUNC(mgr->ctl[1], (char *)&ret, sizeof(int), 0);
}
增加connid参数,可以指定connid来触发回调,如果connid为0那就是广播到所有connection(给connection初始化connid的时候不能设置为0)。
增加异步、同步控制变量(int *ret),传入NULL还为异步执行,如果设置了就意味着同步发送,发送完毕后才返回。同步方式可以节省内存,异步发送需要新开辟内存。
回调变动:
指定connid:如果connection还活着,那么正常回调(一次),如果connection关闭了没了,那么回调的mg_connection*是个NULL,用于清理缓冲区
未指定connid:广播到所有connection上,并且在所有connection回调完毕后,附加一次mg_connection*为NULL的广播,表示广播结束,用于清理缓冲区
小魔改:
上述保存connection的connid用的是mg_connection的user_data变量,但是我们实现业务肯定也有一堆数据要存储,而且不同类型的请求例如http和ws要保存的数据类型也不一样,区分处理很麻烦,随手魔改了mg_connection结构体,直接给他加上了connid这个变量。开始做第一版的时候,用了mg_connection结构体里面的sock这个变量作为connid,也就是socket的句柄,不过后来想想大并发下socket关闭后再建立新连接可能句柄是会重复的,那就失去唯一的作用了,所以新增了connid变量。
好啦,完事!
用压力测试跑了下几十个并发,对比发现低并发下跟其他库比如libevent差不多,满足要求。
完整代码:
FHttpServer.h
#pragma once
#include <string>
#include <chrono>
#include <vector>
#include <map>
#include <thread>
#include <mutex>
#include <functional>
#include "mongoose.h"
#include "blockingconcurrentqueue.h"
typedef std::vector <std::pair<std::string, std::string>> StringPairArray;
typedef struct
{
union socket_address saddr;
//uint32_t connFlags;
uint32_t connID;
int event_type;
std::string body; //如果是websocket 这里是data
std::string method;
std::string uri;
std::string query;
std::string proto;
StringPairArray headers;
size_t contentlength;
int wsflags; //ws frame事件中的flags
}MG_EVENT_DATA, *MG_EVENT_DATA_PTR;
//http请求信息
class fsRequest
{
MG_EVENT_DATA_PTR m_pEventData;
public:
fsRequest(MG_EVENT_DATA_PTR eventdata) : m_pEventData(eventdata){};
~fsRequest(){};
uint32_t GetConnID();
std::string GetURI();
std::string GetMethod();
std::string GetRemoteIP();
std::string GetQueryParam(const char *name);
std::string GetPostParam(const char *name);
std::string GetHeader(const char *name);
std::string GetCookies();
size_t GetDataLen();
std::string GetData();
private:
std::string GetParam(mg_str *query_string, const char *name);
};
//http返回值操作
class fsResponse
{
struct mg_mgr *m_mgr;
uint32_t m_connID;
int m_nStatusCode;
std::string m_strStatus;
StringPairArray *m_pDefaultHeaders; //默认的返回headers
StringPairArray m_arrHeaders; //如果修改了,那么使用自己的副本
bool m_bsenddata;
public:
fsResponse(uint32_t connID, struct mg_mgr *mgr, StringPairArray *defaultHeaders);
~fsResponse();
//设置http返回code和reason,要在Send之前调用
void SetHttpStatusCode(int nStatusCode, const char *reason);
//设置header 唯一header名 要在Send之前调用
void SetHttpHeader(const std::string &key, const std::string &value);
//追加header,类似多个SetCookie这种头部 要在Send之前调用
void AppendHttpHeader(const std::string &key, const std::string &value);
//发送数据 直接发送
void SendHttpContent(const std::string &data, const char *type = NULL);
//以chunk方式发送数据,调用SendChunkContent发送数据,不要直接用SendContent
void SendHttpChunkBegin();
//发送chunk数据,结束chunk发送一个空buf
void SendHttpChunkContent(const std::string &data);
private:
void _SendData(const std::string *content);
};
//websocket的消息信息
class fsWSMessage
{
MG_EVENT_DATA_PTR m_pEventData;
public:
fsWSMessage(MG_EVENT_DATA_PTR eventdata) : m_pEventData(eventdata){};
~fsWSMessage(){};
uint32_t GetConnID();
std::string GetMsg();
int GetFlag();
};
//websocket回话操作
class fsWSSession
{
struct mg_mgr *m_mgr;
uint32_t m_connID;
public:
fsWSSession(uint32_t connID, struct mg_mgr *mgr) :
m_connID(connID),
m_mgr(mgr){};
~fsWSSession(){};
//设置该ws连接的name
void SetName(const std::string &name);
//发送websocket数据 同步执行
bool SendFrame(const std::string &data, int type = WEBSOCKET_OP_TEXT);
//关闭连接
void CloseConnect();
};
//websocket连接的user_data数据,描述该连接的一些关键信息
struct _websocketconndata
{
uint32_t m_connID; //这里要保存connid是因为close事件中nc的sock是-1 程序关联是靠着connid
std::string uri; //用于后面映射表查找的key
std::string name; //用于自定义组广播
};
class FHttpServer
{
//struct mg_serve_http_opts m_mHttpServerOpts;
struct mg_mgr m_mMgr;
bool m_exitflag;
//默认的返回headers
StringPairArray m_defaultHeaders;
//工作线程和队列
tsBlockQuque<MG_EVENT_DATA_PTR> m_TaskQueue;
std::vector <std::thread> m_threadPool;
//http回调方法和映射表
typedef std::function<void(fsRequest &, fsResponse &)> funcHttpCallback;
std::map <std::string, funcHttpCallback> m_mapHttpRouter;
//websocket回调方法和映射表
typedef std::function<void(fsRequest &, fsWSSession &)> funcWSOnReadyCallback;
typedef std::function<void(fsWSMessage &, fsWSSession &)> funcWSOnMsgCallback;
typedef std::function<void(uint32_t)> funcWSOnCloseCallback;
struct stWebSocketCallback
{
funcWSOnReadyCallback onready;
funcWSOnMsgCallback onmessage;
funcWSOnCloseCallback onclose;
};
std::map <std::string, stWebSocketCallback> m_mapWSRouter;
public:
FHttpServer();
~FHttpServer();
//启动服务 addr可以是"8888", ":8888", "127.0.0.1:8888"; nThread为工作线程数
bool StartServer(const std::string &addr, int nThread);
//停止服务
void StopServer();
//添加默认返回的http头,唯一值
void AddDefaultHeader(const std::string &key, const std::string &value);
//添加http回调接口
void AddHttpHandler(const std::string &uri, funcHttpCallback callback);
//添加websocket回调接口
void AddWSHandler(const std::string &uri,
funcWSOnReadyCallback onready,
funcWSOnMsgCallback onmessage,
funcWSOnCloseCallback onclose);
//////////////////////////////////////////////////////////////////////////
//websocket使用的接口
//////////////////////////////////////////////////////////////////////////
//发送数据到connID这个ws连接上 同步执行
bool SendWSFrame(uint32_t connID, const std::string &data, int type = WEBSOCKET_OP_TEXT);
//关闭ws连接
void CloseWSConn(uint32_t connID);
//广播数据到name这个组
void BroadcastWS(const std::string &name, const std::string &data, int type = WEBSOCKET_OP_TEXT);
//////////////////////////////////////////////////////////////////////////
//内部使用的接口,外部不要调用
void _settaskquque(MG_EVENT_DATA_PTR eventdata);
bool _iswshandler(const std::string &uri);
bool _isstop();
private:
static void onEventCallback(mg_connection *nc, int event_type, void *event_data);
void TaskQuqueRunner();
void WorkEventHandler(MG_EVENT_DATA_PTR eventdata);
};
FHttpServer.cpp
#include "stdafx.h"
#include "FHttpServer.h"
#define is_websocket(a) (a & MG_F_IS_WEBSOCKET)
FHttpServer::FHttpServer() : m_exitflag(false)
{
mg_mgr_init(&m_mMgr, this);
AddDefaultHeader("Content-Type", "text/plain; charset=GBK");
AddDefaultHeader("Connection", "Keep-Alive");
//待补充...
}
FHttpServer::~FHttpServer()
{
StopServer();
}
bool FHttpServer::StartServer(const std::string &addr, int nThread)
{
//mg_connection *connection = mg_bind(&m_mMgr, addr.c_str(), FHttpServer::OnEventHandler, this);
mg_connection *connection = mg_bind(&m_mMgr, addr.c_str(), onEventCallback);
if (connection == NULL)
return false;
//both http and websocket
mg_set_protocol_http_websocket(connection);
//初始化工作队列
for (int i = 0; i < nThread; i++)
m_threadPool.emplace_back(&FHttpServer::TaskQuqueRunner, this);
printf("starting http server on: %s(tid:%d)\n", addr.c_str(), GetCurrentThreadId());
std::thread([this]{
// loop
while (!m_exitflag)
mg_mgr_poll(&m_mMgr, 200); // ms
}).detach();
return true;
}
void FHttpServer::StopServer()
{
if (m_exitflag)
return;
printf("set stop flag.\n");
m_exitflag = true;
printf("free mgr.\n");
mg_mgr_free(&m_mMgr);
printf("wait for work thread finish...\n");
for (std::thread &th : m_threadPool)
{
if (th.joinable())
th.join();
}
printf("server stopped.");
}
void FHttpServer::AddDefaultHeader(const std::string &key, const std::string &value)
{
for (auto &header : m_defaultHeaders)
{
if (mg_ncasecmp(key.c_str(), header.first.c_str(), key.length()) == 0)
{
header.second = value;
return;
}
}
m_defaultHeaders.emplace_back(key, value);
}
void FHttpServer::AddHttpHandler(const std::string &uri, funcHttpCallback callback)
{
m_mapHttpRouter.emplace(uri, callback);
}
void FHttpServer::AddWSHandler(const std::string &uri,
funcWSOnReadyCallback onready,
funcWSOnMsgCallback onmessage,
funcWSOnCloseCallback onclose)
{
stWebSocketCallback cb = {onready, onmessage, onclose};
m_mapWSRouter.emplace(uri, cb);
}
bool FHttpServer::SendWSFrame(uint32_t connID, const std::string &data, int type /*= WEBSOCKET_OP_TEXT*/)
{
return fsWSSession(connID, &m_mMgr).SendFrame(data, type);
}
void FHttpServer::CloseWSConn(uint32_t connID)
{
fsWSSession(connID, &m_mMgr).CloseConnect();
}
void FHttpServer::BroadcastWS(const std::string &name, const std::string &data, int type /*= WEBSOCKET_OP_TEXT*/)
{
struct wsData
{
int type;
std::string name;
std::string buf;
};
wsData *param = new wsData{ type, name, data };
//传入connid为0,广播到所有nc上
mg_broadcast(&m_mMgr, 0, [](struct mg_connection *nc, int ev, void *data){
if (nc)
{
if (is_websocket(nc->flags))
{
wsData *p = (wsData *)data;
_websocketconndata *pWSData = (_websocketconndata *)nc->user_data;
if (pWSData->name == p->name)
mg_send_websocket_frame(nc, p->type, p->buf.c_str(), p->buf.length());
}
}
else
{
//nc为NULL就是广播完毕了 这类清理内存
delete ((wsData *)data);
}
}, param, NULL);
}
void FHttpServer::_settaskquque(MG_EVENT_DATA_PTR eventdata)
{
m_TaskQueue.enqueue(eventdata);
}
bool FHttpServer::_iswshandler(const std::string &uri)
{
if (m_mapWSRouter.find(uri) == m_mapWSRouter.end())
return false;
return true;
}
bool FHttpServer::_isstop()
{
return m_exitflag;
}
void FHttpServer::onEventCallback(mg_connection *nc, int event_type, void *event_data)
{
if (event_type == MG_EV_HTTP_REQUEST ||
event_type == MG_EV_WEBSOCKET_HANDSHAKE_REQUEST ||
event_type == MG_EV_WEBSOCKET_HANDSHAKE_DONE ||
event_type == MG_EV_WEBSOCKET_FRAME ||
(event_type == MG_EV_CLOSE && is_websocket(nc->flags))//只有websocket才需要close事件
)
{
FHttpServer *pServer = (FHttpServer *)nc->mgr->user_data;
//printf("EventCallback(%d): type:%d, sockid:%d, stop:%d\n", GetCurrentThreadId(), event_type, connection->connid, pServer->_isstop());
//ws的握手
if (event_type == MG_EV_WEBSOCKET_HANDSHAKE_REQUEST)
{
//检测ws的映射表是否允许继续
std::string uri = std::string(((http_message *)event_data)->uri.p, ((http_message *)event_data)->uri.len);
if (!pServer->_iswshandler(uri))
{
mg_http_send_error(nc, 403, "");
return;
}
//创建user_data
_websocketconndata *pWSData = new _websocketconndata;
pWSData->m_connID = nc->connid;
pWSData->uri = uri;
pWSData->name = pWSData->uri; //默认uri就是name
nc->user_data = pWSData;
//websocket request不用回调 直接返回就好了
return;
}
MG_EVENT_DATA_PTR cbdata = new MG_EVENT_DATA;
cbdata->event_type = event_type;
cbdata->connID = nc->connid;
if (event_type == MG_EV_WEBSOCKET_FRAME)
{
websocket_message *msg = (websocket_message *)event_data;
cbdata->body.assign((char *)msg->data, msg->size);
cbdata->wsflags = msg->flags;
//从user_data里面读取uri
cbdata->uri = ((_websocketconndata *)nc->user_data)->uri;
}
else if (event_type == MG_EV_CLOSE)
{
//已经做过过滤了 这里就是websocket的close
_websocketconndata *pWSData = (_websocketconndata *)nc->user_data;
//connect的sock在close事件中是-1 所以要用user_data中的还原出来
cbdata->connID = pWSData->m_connID;
//从user_data中读取uri
cbdata->uri = pWSData->uri;
//删掉user_data
delete pWSData;
nc->user_data = NULL;
}
else
{
//用于读取远程IP
cbdata->saddr = nc->sa;
/*
std::string body; //如果是websocket 这里是data
std::string method;
std::string uri;
std::string query;
std::string proto;
std::vector<std::pair<std::string, std::string>> headers;
size_t contentlength;
*/
http_message *msg = (http_message *)event_data;
cbdata->body.assign(msg->body.p, msg->body.len);
cbdata->method.assign(msg->method.p, msg->method.len);
cbdata->uri.assign(msg->uri.p, msg->uri.len);
cbdata->query.assign(msg->query_string.p, msg->query_string.len);
cbdata->proto.assign(msg->proto.p, msg->proto.len);
cbdata->contentlength = msg->content_length;
for (int i = 0; i < MG_MAX_HTTP_HEADERS; i++)
{
if (msg->header_names[i].len == 0)
break;
cbdata->headers.emplace_back(std::string(msg->header_names[i].p, msg->header_names[i].len), std::string(msg->header_values[i].p, msg->header_values[i].len));
}
}
//printf("AddEvent Data(%d): %d\n", GetCurrentThreadId(), data->event_type);
pServer->_settaskquque(cbdata);
}
}
void FHttpServer::TaskQuqueRunner()
{
while (true)
{
MG_EVENT_DATA_PTR eventdata;
if (m_TaskQueue.wait_dequeue_timed(eventdata, std::chrono::seconds(1)))
{
//printf("TaskQuque Get Data(%d): %d\n", GetCurrentThreadId(), eventdata->event_type);
WorkEventHandler(eventdata);
//用完了要删掉
delete eventdata;
}
else
{
//printf("Empty quque.\n");
//结束标记并且队列没有数据了 返回
if (m_exitflag)
break;
}
}
return;
}
void FHttpServer::WorkEventHandler(MG_EVENT_DATA_PTR eventdata)
{
//printf("WorkHandler Data(%d): %d\n", GetCurrentThreadId(), eventdata->event_type);
if (eventdata->event_type == MG_EV_HTTP_REQUEST)
{
//普通http请求
//OnHttpHandler(eventdata);
//处理映射表
auto iter = m_mapHttpRouter.find(eventdata->uri);
if (iter == m_mapHttpRouter.end())
{
fsResponse(eventdata->connID, &m_mMgr, NULL).SetHttpStatusCode(404, "Not Found");
return;
}
//准备变量
fsRequest request(eventdata);
fsResponse response(eventdata->connID, &m_mMgr, &m_defaultHeaders);
//回调
iter->second(request, response);
return;
}
//剩下的都是ws的事件了
stWebSocketCallback wsCB = m_mapWSRouter[eventdata->uri];
if (eventdata->event_type == MG_EV_WEBSOCKET_HANDSHAKE_DONE && wsCB.onready)
wsCB.onready(fsRequest(eventdata), fsWSSession(eventdata->connID, &m_mMgr));
else if (eventdata->event_type == MG_EV_WEBSOCKET_FRAME && wsCB.onmessage)
wsCB.onmessage(fsWSMessage(eventdata), fsWSSession(eventdata->connID, &m_mMgr));
else if (eventdata->event_type == MG_EV_CLOSE && wsCB.onclose)
wsCB.onclose(eventdata->connID);
}
uint32_t fsRequest::GetConnID()
{
return m_pEventData->connID;
}
std::string fsRequest::GetURI()
{
return m_pEventData->uri;
}
std::string fsRequest::GetMethod()
{
return m_pEventData->method;
}
std::string fsRequest::GetRemoteIP()
{
char addr[32];
mg_sock_addr_to_str(&m_pEventData->saddr, addr, sizeof(addr), MG_SOCK_STRINGIFY_IP);
return std::string(addr);
}
std::string fsRequest::GetParam(mg_str *query_string, const char *name)
{
char value[128];
int ret = mg_get_http_var(query_string, name, value, sizeof(value) - 1);
if (ret <= 0)
return std::string();
if (ret < sizeof(value))
return std::string(value, ret);
//数据过长了
std::unique_ptr<char> buf(new char[ret + 1]);
ret = mg_get_http_var(query_string, name, buf.get(), ret + 1);
return std::string(buf.get(), ret);
}
std::string fsRequest::GetQueryParam(const char *name)
{
mg_str query_string;
query_string.p = m_pEventData->query.c_str();
query_string.len = m_pEventData->query.length();
return GetParam(&query_string, name);
}
std::string fsRequest::GetPostParam(const char *name)
{
mg_str query_string;
query_string.p = m_pEventData->body.c_str();
query_string.len = m_pEventData->body.length();
return GetParam(&query_string, name);
}
std::string fsRequest::GetHeader(const char *name)
{
for (auto &header : m_pEventData->headers)
{
if (mg_ncasecmp(name, header.first.c_str(), header.first.length()) == 0)
return header.second;
}
return std::string();
}
std::string fsRequest::GetCookies()
{
return GetHeader("Cookie");
}
size_t fsRequest::GetDataLen()
{
return m_pEventData->body.length();
}
std::string fsRequest::GetData()
{
return m_pEventData->body;
}
fsResponse::fsResponse(uint32_t connID, struct mg_mgr *mgr, StringPairArray *defaultHeaders) :
m_connID(connID),
m_mgr(mgr),
m_nStatusCode(200),
m_strStatus("OK"),
m_pDefaultHeaders(defaultHeaders),
m_bsenddata(false)
{
}
fsResponse::~fsResponse()
{
if (!m_bsenddata)
{
std::string str;
_SendData(&str);
}
}
void fsResponse::SetHttpStatusCode(int nStatusCode, const char *reason)
{
m_nStatusCode = nStatusCode;
m_strStatus = reason;
}
void fsResponse::SetHttpHeader(const std::string &key, const std::string &value)
{
//修改了header 就建立自己的副本
if (m_pDefaultHeaders)
{
m_arrHeaders = *m_pDefaultHeaders;
m_pDefaultHeaders = NULL;
}
for (auto &header : m_arrHeaders)
{
if (mg_ncasecmp(key.c_str(), header.first.c_str(), key.length()) == 0)
{
header.second = value;
return;
}
}
m_arrHeaders.emplace_back(key, value);
}
void fsResponse::AppendHttpHeader(const std::string &key, const std::string &value)
{
//修改了header 就建立自己的副本
if (m_pDefaultHeaders)
{
m_arrHeaders = *m_pDefaultHeaders;
m_pDefaultHeaders = NULL;
}
m_arrHeaders.emplace_back(key, value);
}
void fsResponse::SendHttpContent(const std::string &data, const char *type /*= NULL*/)
{
if (type)
SetHttpHeader("Content-Type", type);
_SendData(&data);
}
void fsResponse::SendHttpChunkBegin()
{
_SendData(NULL);
}
void fsResponse::SendHttpChunkContent(const std::string &data)
{
mg_broadcast(m_mgr, m_connID, [](struct mg_connection *nc, int ev, void *data){
std::string *buf = (std::string *)data;
if (nc)
mg_send_http_chunk(nc, buf->c_str(), buf->length());
delete buf;
}, new std::string(data), NULL);
}
void fsResponse::_SendData(const std::string *content)
{
//不要重复发送
if (m_bsenddata)
return;
std::string *pSendData = new std::string("HTTP/1.1 " + std::to_string(m_nStatusCode) + " " + m_strStatus + "\r\n");
//拼接header
StringPairArray *headers = m_pDefaultHeaders;
if (headers == NULL)
headers = &m_arrHeaders;
size_t nHeanderSize = headers->size();
for (size_t i = 0; i < nHeanderSize; i++)
{
std::string str = headers->at(i).first + ": " + headers->at(i).second + "\r\n";
pSendData->append(str);
}
if (content == NULL)
{
//开启了chunk模式
pSendData->append("Transfer-Encoding: chunked\r\n\r\n");
}
else
{
//有Content
pSendData->append("Content-Length: " + std::to_string(content->length()) + "\r\n\r\n");
//拼接数据
pSendData->append(*content);
}
//发送 要用mg_broadcast来完成回调 异步发送
mg_broadcast(m_mgr, m_connID, [](struct mg_connection *nc, int ev, void *data){
std::string *buf = (std::string *)data;
if (nc)
mg_send(nc, buf->c_str(), buf->length());
delete buf;
}, pSendData, NULL);
m_bsenddata = true;
}
uint32_t fsWSMessage::GetConnID()
{
return m_pEventData->connID;
}
std::string fsWSMessage::GetMsg()
{
return m_pEventData->body;
}
int fsWSMessage::GetFlag()
{
return m_pEventData->wsflags;
}
void fsWSSession::SetName(const std::string &name)
{
mg_broadcast(m_mgr, m_connID, [](struct mg_connection *nc, int ev, void *data){
std::string *name = (std::string *)data;
if (nc)
((_websocketconndata *)nc->user_data)->name = *name;
delete name;
}, new std::string(name), NULL);
}
bool fsWSSession::SendFrame(const std::string &data, int type /*= WEBSOCKET_OP_TEXT*/)
{
int ret = 0;
struct wsData
{
int type;
const std::string *buf;
};
wsData param = { type, &data };
//需要返回值 所以使用同步的方式发送
mg_broadcast(m_mgr, m_connID, [](struct mg_connection *nc, int ev, void *data){
if (nc)
{
wsData *p = (wsData *)data;
mg_send_websocket_frame(nc, p->type, p->buf->c_str(), p->buf->length());
}
}, ¶m, &ret);
if (ret)
return true;
return false;
}
void fsWSSession::CloseConnect()
{
mg_broadcast(m_mgr, m_connID, [](struct mg_connection *nc, int ev, void *data){
if (nc)
mg_send_websocket_frame(nc, WEBSOCKET_OP_CLOSE, "bye", 3);
}, NULL, NULL);
}
最后使用方法:
FHttpServer server;
server.AddHttpHandler("/hello", [](fsRequest &req, fsResponse &response){
printf("uri:%s\n", req.GetURI().c_str());
printf("remote:%s\n", req.GetRemoteIP().c_str());
printf("method:%s\n", req.GetMethod().c_str());
printf("p:%s\n", req.GetQueryParam("p").c_str());
printf("body(%d):%s\n", req.GetDataLen(), req.GetData().c_str());
response.SendHttpContent("hello word!");
});
uint32_t connID = -1;
server.AddWSHandler("/ws",
[&](fsRequest &req, fsWSSession &session)
{
printf("ws on ready %s.\n", req.GetQueryParam("id").c_str());
session.SendFrame("ready.");
session.SetName(req.GetQueryParam("id"));
connID = req.GetConnID();
}, //onready
[](fsWSMessage &req, fsWSSession &session)
{
printf("body:%s\n", req.GetMsg().c_str());
session.SendFrame(req.GetMsg());
},
[](uint32_t connID)
{
printf("ws on close.\n");
} //onclose
);
server.StartServer("8881", 32);
while (true)
{
int x = getchar();
if (x == 'q')
break;
server.SendWSFrame(connID, "hello");
server.BroadcastWS("123", "broadcase msg.");
}
server.StopServer();
代码刚刚写完还未彻底测试过可能很多地方还有bug,趁着思路还在行文总结学习思考的过程,分享一下。