从实例看muduo网络库各模块交互过程

文章目录

    • muduo网络库的核心代码模块
    • 各模块功能解释
      • Channel
      • Poller
        • EpollPoller
      • EventLoop
      • EventLoopThr云计算技术与应用ead
      • EventLoopThreadPool
      • TcpServer
      • TcpConnection
    • 云计算简单解释实际应用出发

muduo网络库的核心代码模块

1、channel
2、Poller 和它的子类 EpollPoller
3、云计算是什么意思EventLoop
4、Thread、EventLoopThrea木多d、EventLoopThreadPool
5、Sock、Acceptor
6、Buffer
7、TcpServer、TCPConnection

至于其他还有Logger模块就不是重点了吧。


各模块json数据云计算是什么能解释

经过我三天的研究以及之前的源c#是什么语言码铺垫整理出来了第一个版本,当然后面会持续更新,预计更新到国庆节回来,那个版本应该是能看了。


Channel

根据收到的事件,调用相应的回调。

一个chjson是什么意思annel绑定一个fd
生命周期openstack核心组件:新连接产生 -> 该连接断开云计算是什么意思

由poller管理木多的女人命好不好,从属于loop,可配置epoll监听事件。


Poller

muduo中多路事件分发器的核心模块,包含了一个 channopenstack组件el 数组,同时也是一个抽象基类(我只继承了epoll模块),
可以说:One loop pjsonp跨域原理er poller.

EpollPoller

实现了:
epoll_c木铎之心reate:构造函数
epoll_ctl:一堆的muduo enable 函数
epoll_wait:poll方法。
通过epoll_wait,将有事件的channelc#教程通过参数传递给EventLoop。

此处参数:evenopenstack包含两个主要模块ts[i].data.ptr。(经验呐!!!我觉得有这么一点,这篇就亮了!!!还不止呢。)

void EpollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const {
//for(Channel* channel:activeChannels){
//这样剧让不行了!!!
//eventloop 即将拿到它的poller返回的所有发生事件列表
for (int i = 0; i < numEvents; ++i) {
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);  //666666666,这一行代码,N年的功力,你接得住吗?
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}
void EpollPoller::update(int operation, Channel* channel) {
epoll_event event;
int fd = channel->fd();
memset(&event, 0, sizeof(event));
event.events = channel->events();
event.data.ptr = channel;           //上面那行,配合上这行看
event.data.fd = fd;
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) {
if (operation == EPOLL_CTL_DEL) {
LOG_ERROR("epoll_ctl del error:%d\n", errno);
}
else {
LOG_FATAL("epoll_ctl add/mod error:%d\n", errno);
}
}
}

EventLoop

事件循环,One loop per thread,per po木铎金声ller,mopenstacktrove概要any channels,per wakeupchannel.

这个 wakeupchannejs事件循环l 是干嘛的呢?专门用于监听唤醒 eventfd 相应的 loop,这个事件通知openstack是一个开源的机制没有见过吧,反正我是第一次见,基于文件描述符的,据说事件循环比 condition 要高档一些,co目多读什么ndition都显得有点老了,这个比较年轻。

//通过轮询的方式唤醒channel
int createEventfd() {
//创建一个能被用户应用程序用于时间等待唤醒机制的eventfd对象
//eventfd 单纯的使用文件描述符实现的线程间的通知机制,可以很好的融入select、poll、epoll的I/O复用机制中
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0) {
LOG_ERROR("Failed in eventfd%d\n", errno);
}
return evtfd;
}

每一个 EventLoop,都配备有一个wakeupfd,有一个wakeup channel 专门负责处理 wakeupfd 事件。
处理办法:随便读个数据,唤醒本 loop 起来干活了。(每一个 EventLoop 都监听了 wakeupchannel 的 EPOLL_IN 事件)


loop() 开始运行后,
1、通过poll函数进行epoll_wait,获取activeChannel。
2、唤醒相应channel。
3、执行doPendingFunction 方法(这里默认有子loop。)
这里又是个openstack与k8s区别巧夺天工的设计:

void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}

看这一行:functors.swap(pendingFunc#面试题ctors_);

这里为什么要把 pendingFunctors_ 置换出来?这个置openstack与k8s区别换有openstack组件有哪些意思吗?openstack是一个开源的那可太有意思了。

如果不置换,直接拿着 pendingFunctors_ 去执行,这个资源是不是要被锁住?那接下来有新事件过来要放哪里?再开个pendingFunctors_ 2号吗?

这样一置换,相当于这些事件可以并发执行了。

有意思吧。


再看这个queueIjson格式转换nLoop 和runIc#接口nLoop:

void EventLoop::runInLoop(Functor cb) {
if (isInLoopThread()) {
cb();
}
else {
queueInLoop(std::move(cb));
}
}
void EventLoop::queueInLoop(Functor cb) {
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.push_back(std::move(cb));
}
if (!isInLoopThread() || callingPendingFunctors_) {
//callingPendingFunctors_:我是在本线程,而且我还在执行回调,有啥事情赶紧的拿过来,不然一会儿loop转一圈过去又阻塞了
wakeup();
}
}

如果有子loop,木多主loop只处理 Acceptor(runInLoop),剩下的全都在q目多读什么ueueInLoop -c#是什么&gjson文件是干什么的t; wakeup 子Loop->handleRe云计算是对什么技术的发展和应用ad(或者事件已就绪)->唤醒 channel。


EventLoopThread

功能:用于绑定一个 loop 和一个thread。
类成员:一个 EventLoop 指针,一个thread 对象,锁、条件变量、回调等。

one loop per thread 在此处体现:

startloop:启动底层新线程,c#教程执行回调,配置 loo云计算的概念p 并返回,创建一个独立的 loop,并开启事件循环。


EventLoopThreadPool

事件循环线程池。
包含一个baseloop的指针,第一个EventLoopThread的vector,以及一个EventLoop的vector。

start:创建一定数量的事木铎之心件循环线程,添加到 sc#td::vectorjson格式<std::uniqueopenstack核心组件_ptr<EventLoopThread>>。并启动这些线程,添加到std::vectojson格式r<云计算是什么;EventLoop*>中。
Get云计算最简单解释NextLoop:如果工c#和java作在多线程中,bjs事件循环aseloop 会默认以轮询的方式分配channel给subloop。


TcpServer

负责处理新连接。
Acceptor、EventLoopThreadPool、TCPConnection。

TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const std::string& nameArg,
Option option = kNoReusePort);
//设置底层subloop个数
void setThreadNum(int numThreads);
//开启服务器监听
void start();
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const std::string& nameArg,
Option option = kNoReusePort)
:loop_(CheckLoopNotNull(loop)),
ipport_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Accept(loop, listenAddr, option == kReusePort)),
//在这里对sock进行了初始化,不过还没有监听,更没有accept,关于accept在后续章节再提,快了
//只有在 server start 之后才会listen,listen到才会去accept
threadpool_(new EventLoopThreadPool(loop, name_)),
//构建一个 EventLoopThreadPool (可以视为mainreactor)对象,
//不过也就是构建一下,不干啥,关于EventLoopThreadPool的章节后面会提
//start之后会创建制定数量的线程,并绑定新的loop,返回地址。
connectionCallback_(),
messageCallback_(),
nextConnId_(1)
{
//当有新用户连接时,会执行NewConnectionCallback
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));
//在Acceptor 的handleread方法了被调用
}
//开启服务器监听
void TcpServer::start() {
if (started_ == 0) {  //防止被多次start
threadpool_->start(threadInitCallback_);	//EventLoopThreadPool的start
loop_->runInLoop(std::bind(&Accept::listen, acceptor_.get()));	//这里拿来run的loop就是mainloop
++started_;
}
}
//当有新链接来的时候,acceptor会调用这个回调
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
//根据轮询算法,选择一个subloop,唤醒subloop
EventLoop* ioloop = threadpool_->GetNextLoop();
char buf[64] = { 0 };
snprintf(buf, sizeof buf, "-%s#%d", ipport_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;
LOG_INFO("TcpConnnection::newConnection [%s] -new connection [%s] from %s \n",
name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
//通过sockfd获取其本机IP
sockaddr_in local;
::bzero(&local, sizeof local);
socklen_t addrlen = sizeof local;
if (::getSockname(sockfd, (sockaddr*)&local, &addrlen) < 0) {
//pass,日志打印,写漏了
}
InetAddress localAddr(::getLocalAddr(sockfd));
//根据连接成功的fd,创建TCPConnection连接对象
//一个连接对应一个 TCPConnectionptr 管理
//关于TCPConnection的事情也是接下来展开
TcpConnectionptr conn(new TcpConnection(ioloop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioloop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
//把当前connfd封装成channel分发给subloop
}
//设置底层subloop个数
void TcpServer::setThreadNum(int numThreads) {
threadpool_->setThreadNum(numThreads);
}

TcpCoc#接口nnection

目多读什么个conn对应一个fd。

创建channel、绑定读、写、关闭、错误回调。


从实际应用出发

直接看这段代码:

int main(int argc, char **argv){
EventLoop loop;
InetAddress addr(ip, port);
ChatServer server(&loop,addr,"ChatServer");
server.start();
loop.loop();
return 0;
}

这里面先初始化了一个 loop 的对象,这是一个baseloop,设置wakeup回调类型,以及时间发生后回调操作,监听wakeup chc#委托annel 的EPOLLIN事件。

然后是 ChatServer server(&loop,addr,“ChatServer”);
1、构建Acceptor,执行到bind之后,为acceptchannel设置 ReadCallback 回调,绑定了监听套接字,在回调函数中有 accept 和 NewConnectionCallBack 回调。

2、构建EventLoopThreadPool,啥也不干。

3、设置 newConnectc#是什么ionCallBack。

4、server.start(); 开启服务监听,将Acceptor::listen 函数绑定在 lc#面试题oop上,开启listen,配置channel enableReading。
EventLoop云计算技术与应用ThreadPool.start 创建事件循环线程,并运行起来。
将线程和事件循云计算技术与应用专业环绑定起c#来。

5、loop.loop() 启动主loop。


再看这段代码:

void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buff, Timestamp time){
string buf = buff->retrieveAllAsString();
json js = json::parse(buf);
//通过msgid获取业务回调,进行网络模块和任务模块之间的解耦合
auto msgHandler = ChatService::instance()->getHandle(js["msgid"].get<int>());
//回调消息绑定好的事件处理器,执行相应的业务处理
msgHandler(conn,js,time);
//成功解耦
}

1、调用Buc#程序设计ffer模块转码数据(这个模块的设计也很nice,可惜我还没把握住)。
2、调用JSON解析数据。
3、conn->send

在当前线程:sendInLoop;
不在当前线程:channel-> runInLoop->quejson解析ueInLoop,(为啥不直接调用呢。。)


且先云计算的概念总结到此处,夜以深了,我该去想我该想的人了,哎、