博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Muduo网络库源码分析之Acceptor和TcpServer
阅读量:4184 次
发布时间:2019-05-26

本文共 14347 字,大约阅读时间需要 47 分钟。

Acceptor

用于 accept 一个 TCP 连接,accept 接受成功后通知 TCP 连接的使用者。Acceptor 主要是供 TcpServer 使用的,其生命期由后者控制。一个 Acceptor 相当于持有服务端的一个 socket 描述符,该 socket 可以 accept 多个 TCP 客户连接,这个 accept 操作就是 Acceptor 实现的。

这里用到了一些封装好的 socket 和地址结构,如 class InetAddress 表示 sockaddr_in 的封装,如可以通过ip地址和port端口生成一个sockaddr_in; class Socket封装了部分关于socket套接字的操作,如Socket::bindAddress(InetAddress&) 将socket和一个sockaddr_in地址绑定,Socket::accept(InetAddress& peerAddr)将一个socket允许连接一个客户端地址peerAddr,Socket::listen()监听socket,Socket::shutdownWrite()实现关闭socket的写。这些类的封装可以看我这篇博客的分析。

Acceptor在构造的时候会创建一个 socket 描述符 acceptSocket_(这是一个Socket类型即socket的RAII封装),并通过一个 Channel(注册事件及回调函数)管理 acceptSocket_::fd 成员(即socket描述符),一旦该 socket 可读即有TCP 客户连接请求,则 Channel::handleEvent() 将会调用 Acceptor::hanleRead() 执行 accept 接受一个TCP客户连接。Acceptor::handleRead() 还会将新的TCP客户连接和客户端地址通过回调函数 newConnectionCallback(connfd,peerAddr) 传给该TCP客户连接的使用者,通常是 TcpServer 类,这里的回调函数 newConnectionCallback 是在 Acceptor::setNewConnectionCallback(newConnectionCallback) 指定的,TcpServer 构造时 new 一个 Acceptor 后,会通过这个函数指定回调函数为 TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)。值得注意的是这里又是统一事件源的思想,即通过 Channel 和 Poller 管理事件。Acceptor::listen() 的工作是:启动acceptSocket_::listen() 监听 socket 描述符,并通过 Channel::enableReading() 将socket 的可读事件注册到 Poller 的事件集合中。

Acceptor.h

#ifndef MUDUO_NET_ACCEPTOR_H#define MUDUO_NET_ACCEPTOR_H#include 
#include
#include
#include
namespace muduo{namespace net{class EventLoop;class InetAddress;////// Acceptor of incoming TCP connections.///// accept一个TCP连接class Acceptor : boost::noncopyable{ public: /* 新建立连接之后的回调 */ typedef boost::function
NewConnectionCallback; Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport); ~Acceptor(); /* 设置用户任务回调 */ void setNewConnectionCallback(const NewConnectionCallback& cb) { newConnectionCallback_ = cb; } bool listenning() const { return listenning_; } void listen(); // 开始监听 private: void handleRead(); // listenfd -> Channel上的可读事件回调 EventLoop* loop_; // 所在的 EventLoop Socket acceptSocket_; // listenfd Channel acceptChannel_; // listenfd 对应的 Channel NewConnectionCallback newConnectionCallback_; // 处理新连接的回调函数,accept 后调用 bool listenning_; // 是否正在 listen int idleFd_; //占位fd,用于fd满的情况};}}#endif // MUDUO_NET_ACCEPTOR_H

Acceptor.cc

#include 
#include
#include
#include
#include
#include
#include
#include
//#include
//#include
#include
using namespace muduo;using namespace muduo::net;/* Acceptor 的数据成员包括Socket、Channel等,用于接受一个连接 */Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport) : loop_(loop), /* 创建 listenfd */ acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())), /* 创建 listenfd 对应的 Channel */ acceptChannel_(loop, acceptSocket_.fd()), listenning_(false), /* 打开空的fd,用于占位 */ idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)){ assert(idleFd_ >= 0); acceptSocket_.setReuseAddr(true); //设置listenfd 复用addr acceptSocket_.setReusePort(reuseport);//复用port acceptSocket_.bindAddress(listenAddr);//绑定ip和port acceptChannel_.setReadCallback( //设置 Channel 的可读回调函数为 handleRead() boost::bind(&Acceptor::handleRead, this));}Acceptor::~Acceptor(){ acceptChannel_.disableAll(); acceptChannel_.remove(); ::close(idleFd_);}/* 构造函数和listen()执行创建TCP服务端的传统步骤 socket bind listen */void Acceptor::listen(){ loop_->assertInLoopThread(); listenning_ = true; //改变这个标志 acceptSocket_.listen(); //listen acceptChannel_.enableReading();// 注册读事件,有读事件发生时调用handleRead()}/* 当epoll监听到listenfd时,开始执行此回调函数 */void Acceptor::handleRead(){ loop_->assertInLoopThread(); InetAddress peerAddr; //FIXME loop until no more /* accept 一个连接 */ int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { // string hostport = peerAddr.toIpPort(); // LOG_TRACE << "Accepts of " << hostport; /* 接受完连接后回调 newConnectionCallback_ * 传回connfd,创建TcpConnection 再将连接分配给其他线程 */ if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); } else { sockets::close(connfd); } } else { /* * 本进程的fd达到上限后无法为新连接创建socket描述符 * 既然没有socketfd来表示这个连接,也就无法close它 * 程序继续运行,下次epoll_wait会直接返回,因为listenfd还是可读 * 这样程序就陷入了 busy loop * * 处理fd 满的时候,采用了这样一种方法 * 就是先占住一个空的fd,然后当fd满的时候 * 先关闭这个空闲文件,获得一个文件描述符的名额 * 再调用accept拿到新socket连接的描述符 * 随后立即close调它,这样就优雅地断开了客户端连接 * 最后重新打开一个空闲文件,把"坑"占住,以备情况再发生 */ LOG_SYSERR << "in Acceptor::handleRead"; // Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of libev. if (errno == EMFILE) //fd的数目达到上限 { ::close(idleFd_); //关闭占位fd idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);//接受这个连接 ::close(idleFd_); //关掉 idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);//重新打开此fd 占位 } }}

TcpServer

管理 accept 获得的 TcpConnection,是供用户直接使用的,生命期由用户控制。用户只需设置好相应的几个回调函数,然后调用TcpServer::start() 即可。

每个 TCP 客户连接由一个类 TcpConenction 管理(具体执行消息的接收发送之类的),而 TcpServer 的工作就是管理这些 TcpConnection,TcpConnection 类将在后面文章中给出。TcpServer 持有 boost::scoped_ptr< TcpConnection> 的指针 TcpConnectionPtr。

TcpServer 在构造时接收一个由 ip 地址和 port 构成的 InetAddress 参数,并将此地址传给 Acceptor 用于接收该地址的 TCP 连接请求。TcpServer 持有 scoped_ptr< Acceptor> acceptor_ 用于接收 TcpServer 监听端口上的 TCP 连接请求,注意 Accpetor 每次 accept 连接后都要将新连接的描述符 connfd 和地址 peerAddr 返回给使用者,这里 TcpServer在构造时通过 accptor_->setNewConnectionCallback(bind(&TcpServer::newConnection,this,_1,_2)) 将TcpServer::newConnection 传给 Acceptor,acceptor_ 在接受 TCP 客户连接后将调用TcpServer::newConnection(connfd,peerAddr),而 TcpSrever::newConnection() 的主要功能就是为 < connfd,peerAddr> 创建一个 TcpConnection 管理该 TCP 客户连接,并向 TcpConnection 注册一些回调函数,比如 connectionCallback 主要是在 TcpServer 中由用户指定的一些连接处理函数最后一路经由 TcpSrever 传到 TcpConnection 中才被调用,此外还有用户指定的消息处理回调等都是经由 TcpServer 传给 TcpConnection 中去具体执行。此外 TcpServer::newConnection() 中还会执行 TcpConnection::connectEstablished() 该函数将会使这个具体的 TcpConnection 连接对应的描述符 connfd 加入 poll 的事件表,即也是通过一个 Channel 管理一个具体的 TCP 客户连接。

TcpServer 采用 map< string,TcpConnectionPtr> 管理所有的 TCP 客户连接,其中 string 是由 TcpServer 的服务端地址加上一个 int 构成表示 TcpConnectionPtr 的名字。

TcpServer 中由用户指定的回调有:

  • connectionCallback(),当 TcpConnection 建立时调用( 由 TcpConnection::connectEstablished() 调用 connectionCallback() )用于执行用户指定的连接回调。

  • messageCallback(), 当 TcpConenction 有网络消息的时候执行该函数由 Channel::handleEvent() -> TcpConnection::handleRead() -> messageCallback()

  • writeCompleteCallback(), 由用户指定的当 TCP 连接上的消息发送完毕时执行的回调。

这些函数都是用户在 TcpServer 创建后通过 TcpServer::set*Callback 系列函数注册的。当 Acceptor 接受一个新的 TCP 连接时执行 Acceptor::handleRead()->TcpServer::newConnection()->TcpConnection::set*Callback() 这样完成用于指定函数的传递。那么执行呢?这个要在 TcpConenction 对应的 socket 事件就绪时可读/可写时由 Channel::handEvent() 执行这些用户指定的回调。

TcpServer::removeConnection() 主要功能从 TcpServer 中移除一个TcpConnection,但是不能直接移除,而要通过线程转移函数完成。TcpServer::removeConenction() 将执行EventLoop::runInLoop(bind(&TcpServer::removeConnectionInLoop) -> EventLoop::runInLoop() -> TcpServer::removeConnectionInLoop()TcpServer::removeConenctionInLoop() 将一个 TcpConnection 从 TcpServer 中移除,并向EventLoop 注册回调 EventLoop::runInLoop(bind(&TcpConenction::connectDestroyed)),然后执行 TcpConnection::connectDestroyed()

TcpServer.h

#ifndef MUDUO_NET_TCPSERVER_H#define MUDUO_NET_TCPSERVER_H#include 
#include
#include
#include
#include
#include
#include
namespace muduo{namespace net{class Acceptor;class EventLoop;class EventLoopThreadPool;////// TCP server, supports single-threaded and thread-pool models.////// This is an interface class, so don't expose too much details./* */class TcpServer : boost::noncopyable{ public: typedef boost::function
ThreadInitCallback; enum Option { kNoReusePort, kReusePort, }; //TcpServer(EventLoop* loop, const InetAddress& listenAddr); /* 构造时接受一个ip和port组成的InetAddress参数,用来构造 Acceptor(listenfd) */ TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option = kNoReusePort); ~TcpServer(); // force out-line dtor, for scoped_ptr members. const string& ipPort() const { return ipPort_; } const string& name() const { return name_; } EventLoop* getLoop() const { return loop_; } /// Set the number of threads for handling input. /// /// Always accepts new connection in loop's thread. /// Must be called before @c start /// @param numThreads /// - 0 means all I/O in loop's thread, no thread will created. /// this is the default value. /// - 1 means all I/O in another thread. /// - N means a thread pool with N threads, new connections /// are assigned on a round-robin basis. /* 设置线程数目 */ void setThreadNum(int numThreads); void setThreadInitCallback(const ThreadInitCallback& cb) { threadInitCallback_ = cb; } /// valid after calling start() boost::shared_ptr
threadPool() { return threadPool_; } /// Starts the server if it's not listenning. /// /// It's harmless to call it multiple times. /// Thread safe. void start(); /// Set connection callback. /// Not thread safe. void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } /// Set message callback. /// Not thread safe. void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } /// Set write complete callback. /// Not thread safe. void setWriteCompleteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; } private: /// Not thread safe, but in loop /* 新连接到达后,Acceptor会回调 newConnection */ void newConnection(int sockfd, const InetAddress& peerAddr); /// Thread safe. void removeConnection(const TcpConnectionPtr& conn); /// Not thread safe, but in loop void removeConnectionInLoop(const TcpConnectionPtr& conn); /* TcpConnection对象的名字到指向它的share_ptr,TcpServer用map来管理所有的连接 */ typedef std::map
ConnectionMap; /* 负责接受tcp连接的EventLoop,如果threadnum为1,那么它是唯一的IO线程 */ EventLoop* loop_; // the acceptor loop const string ipPort_; //ip port const string name_; //server 的名字 /* 内部通过 Acceptor 负责 listenfd 的建立和 accept 连接 */ boost::scoped_ptr
acceptor_; // avoid revealing Acceptor 避免暴露给用户 boost::shared_ptr
threadPool_;//线程池,每个线程运行一个EventLoop ConnectionCallback connectionCallback_; //连接建立和关闭时的callback MessageCallback messageCallback_; //消息到来时的callback WriteCompleteCallback writeCompleteCallback_; //消息写入对方缓冲区时的callback ThreadInitCallback threadInitCallback_; //EventLoop线程初始化时的回调函数 AtomicInt32 started_; // always in loop thread int nextConnId_; //下一个连接的id,用于给tcp连接构造名字 ConnectionMap connections_; //使用这个map管理所有的连接 };}}#endif // MUDUO_NET_TCPSERVER_H

TcpServer.c

#include 
#include
#include
#include
#include
#include
#include
#include
// snprintfusing namespace muduo;using namespace muduo::net;TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option) : loop_(CHECK_NOTNULL(loop)), /* 由InetAddress拿到ip和port */ ipPort_(listenAddr.toIpPort()), /* server 的 name */ name_(nameArg), /* 用传入的listenAddr构造Acceptor */ acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)), threadPool_(new EventLoopThreadPool(loop, name_)), /* 用默认的处理连接和消息的回调函数 初始化 */ connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), /* id 从1 开始 */ nextConnId_(1){ /* 将newConnection传给acceptor_,acceptor_执行完accept后会调用这个函数 */ acceptor_->setNewConnectionCallback( boost::bind(&TcpServer::newConnection, this, _1, _2));}TcpServer::~TcpServer(){ loop_->assertInLoopThread(); LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing"; for (ConnectionMap::iterator it(connections_.begin()); it != connections_.end(); ++it) { TcpConnectionPtr conn(it->second); it->second.reset(); conn->getLoop()->runInLoop( boost::bind(&TcpConnection::connectDestroyed, conn)); }}/* 设置线程数目,这一步就可以决定采用的是多线程还是单线程 */void TcpServer::setThreadNum(int numThreads){ assert(0 <= numThreads); threadPool_->setThreadNum(numThreads);}/* * TcpServer 的启动流程 * 1. 启动线程池的线程 * 2. 开始listen * 3. 注册listenfd的读事件 */void TcpServer::start(){ if (started_.getAndSet(1) == 0) { /* 启动一个IO线程 */ threadPool_->start(threadInitCallback_); /* 断言没有在监听 */ assert(!acceptor_->listenning()); /* 开始listen */ loop_->runInLoop( boost::bind(&Acceptor::listen, get_pointer(acceptor_))); }}/* * Acceptor接受连接后 调用这个回调函数 * 为
创建一个TcpConnection对象conn来管理该连接 * 把它加入 ConnectionMap * 设置好 callback * 再调用conn->connectEstablished() * 注册connfd的可读事件并回调用户提供的ConnectionCallback */void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr){ loop_->assertInLoopThread(); /* 从线程池取一个loop 线程 */ EventLoop* ioLoop = threadPool_->getNextLoop(); char buf[64]; /* 构造tcp连接的名称 每个TcpConnection 对象有一个名字 */ snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); /* connid++ */ ++nextConnId_; /*连接名字格式:servername + server.ip+server.port + connid */ string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toIpPort(); InetAddress localAddr(sockets::getLocalAddr(sockfd)); // FIXME poll with zero timeout to double confirm the new connection // FIXME use make_shared if necessary /* 新建TcpConnection 对象 conn */ TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); /* * 把它加入 ConnectionMap * key 是连接的name,value 为指向这个对象的 shared_ptr */ connections_[connName] = conn; /* 设置好 callback */ /* TcpConnection 建立时调用 */ conn->setConnectionCallback(connectionCallback_); /* 消息到来时调用 */ conn->setMessageCallback(messageCallback_); /* 成功将所有数据写入对方内核缓冲区时调用 */ conn->setWriteCompleteCallback(writeCompleteCallback_); /* TCP连接关闭时的回调函数,内部使用,不能用户指定 */ conn->setCloseCallback( boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe /* * 在loop线程中执行建立tcp连接的流程 * 主要是设置tcp状态,注册读事件 * 以及执行tcp 建立的回调函数connectionCallback_() */ ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));}/* 从 TcpServer 中移除一个 TcpConnection */void TcpServer::removeConnection(const TcpConnectionPtr& conn){ // FIXME: unsafe // loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));}void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn){ loop_->assertInLoopThread(); LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_ << "] - connection " << conn->name(); /* 从 TcpServer 中删除这个 TcpConnection */ size_t n = connections_.erase(conn->name()); (void)n; assert(n == 1); EventLoop* ioLoop = conn->getLoop(); /* 在 ioLoop 中执行 connectDestroyed() */ ioLoop->queueInLoop( boost::bind(&TcpConnection::connectDestroyed, conn));}

转载地址:http://dquoi.baihongyu.com/

你可能感兴趣的文章
三十之惑–面霸的八月(第二部分)
查看>>
在Mac上通过VMware Fushion 15.1配置静态IP虚拟机实录
查看>>
CentOS 7.7 x86-64安装系统字体及美化实录
查看>>
在macOS 10.13.6上安装go 1.13.8实录!
查看>>
在macOS 10.13.6下安装Grafana实录
查看>>
使用Go语言遇到的“坑”收集
查看>>
在Mac上设置环境变量并永久生效的方法
查看>>
使用govendor灵活管理Go程序中的依赖包
查看>>
安装vim-go插件之后遇到的gopls警告信息不消失的问题的解决方法
查看>>
在CentOS 7.7 x86_64上安装python3.7.7
查看>>
在CentOS 7.7 x86_64上安装python3的selenium 3模块实录
查看>>
在CentOS 7.7 x86_64上安装InfluxDB 1.8.0实录
查看>>
CentOS 7.5 如何升级Git实录
查看>>
Python中的urllib.quote和Go中的url.QueryEscape关系探讨
查看>>
在Mac上使用pip3安装python的数据统计模块实录
查看>>
在Mac上使用pip3安装交互式环境IPython实录
查看>>
在Mac上使用pip3安装Jupyter Notebook并简单使用
查看>>
在Mac上利用pip3安装pyecharts模块
查看>>
go连接Kafka报错kafka: client has run out of available brokers to talk to
查看>>
在CentOS 7.5上升级SQLite3过程实录
查看>>