OpenHarmony源码分析之分布式软总线:trans_service模块(2)/会话管理之新会话
一、 概述
trans_service模块基于系统内核提供的socket通信,向authmanager模块提供设备认证通道管理和设备认证数据的传输;向业务模块提供session管理和基于session的数据收发功能,并且通过GCM模块的加密功能提供收发报文的加解密保护。
在之前的博客OpenHarmony源码分析之分布式软总线:trans_service模块/认证通道管理中已经对认证通道管理的相关源码进行了详细的分析,因此本文重点介绍trans_service模块提供的第二个功能——会话管理。在OpenHarmony中,设备间的数据传输是基于TCP会话机制实现的,而在这一模块中主要是提供对会话的相关管理,接下来我们将重点分析关于新会话建立的相关源码。
二、 源码分析
/*函数功能:创建设备之间的TCP通信会话的管理器函数参数: asServer:是否作为server端 localIp:设备IP函数返回值: 成功:返回绑定的端口号 失败:返回失败码详细:*/int CreateTcpSessionMgr(bool asServer, const char* localIp){ if (localIp == NULL) {//若设备IP为NULL则返回失败码 return TRANS_FAILED; } if (InitTcpMgrLock() != 0 || GetTcpMgrLock() != 0) {//初始化锁并加锁 return TRANS_FAILED; } int ret = InitGSessionMgr();//初始化全局会话管理器 if (ReleaseTcpMgrLock() != 0 || ret != 0) {//解锁 FreeSessionMgr(); return TRANS_FAILED; } g_sessionMgr->asServer = asServer; //初始化server端套接字,生成监听套接字fd,并绑定IP和port int listenFd = OpenTcpServer(localIp, DEFAULT_TRANS_PORT); if (listenFd < 0) { SOFTBUS_PRINT("[TRANS] CreateTcpSessionMgr OpenTcpServer fail\n"); FreeSessionMgr(); return TRANS_FAILED; } //将该套接字fd从CLOSED转换到LISTEN状态,监听client端(主动端)发起的连接信息 int rc = listen(listenFd, LISTEN_BACKLOG); if (rc != 0) { SOFTBUS_PRINT("[TRANS] CreateTcpSessionMgr listen fail\n"); CloseSession(listenFd); FreeSessionMgr(); return TRANS_FAILED; } g_sessionMgr->listenFd = listenFd;//赋值全局会话管理器的监听套接字描述符//捕获SIGPIPE信号,SIG_IGN表示忽视该信号,不执行SIGPIPE默认操作:终止程序。此处是为了防止对端突然关闭socket引起程序终止。 signal(SIGPIPE, SIG_IGN); if (StartSelectLoop(g_sessionMgr) != 0) {//创建子线程启动select循环监听 SOFTBUS_PRINT("[TRANS] CreateTcpSessionMgr StartSelectLoop fail\n"); CloseSession(listenFd); FreeSessionMgr(); return TRANS_FAILED; } return GetSockPort(listenFd);}
- 在上述函数中,首先进行会话管理器的初始化,在函数 InitGSessionMgr() 中实现,代码分析如下:
/*函数功能:初始化全局会话管理器的属性,包括内存空间、各个属性的初始值函数参数:无函数返回值: 成功:返回0 失败:返回失败码详细:*/static int InitGSessionMgr(void){ if (g_sessionMgr != NULL) {//若g_sessionMgr不为NULL,表示已存在会话管理器 return 0; } g_sessionMgr = malloc(sizeof(TcpSessionMgr));//为全局会话管理器申请内存空间 if (g_sessionMgr == NULL) { return TRANS_FAILED; }//将刚申请的地址空间清0 (void)memset_s(g_sessionMgr, sizeof(TcpSessionMgr), 0, sizeof(TcpSessionMgr)); //以下为该全局会话管理器的各个属性赋初始值 g_sessionMgr->listenFd = -1; g_sessionMgr->isSelectLoopRunning = false; for (int i = 0; i < MAX_SESSION_SUM_NUM; i++) {//将会话表初始化为NULL g_sessionMgr->sessionMap_[i] = NULL; } for (int i = 0; i < MAX_SESSION_SERVER_NUM; i++) {//将服务端监听器表初始化为NULL g_sessionMgr->serverListenerMap[i] = NULL; } return 0;}
- 然后调用 OpenTcpServer() 函数初始化server端套接字,生成监听套接字fd,并绑定IP和port,此函数代码的分析如下:
/*函数功能: 初始化server端套接字,绑定ip地址及port函数参数: ip 需要进行绑定的ip地址; port 需要进行绑定的port函数返回值: 成功 返回生成的套接字描述符 失败 返回错误码详细:*/int OpenTcpServer(const char *ip, uint16_t port){ if (ip == NULL) { return -DBE_BAD_PARAM; } struct sockaddr_in addr = {0}; errno = 0; //将点分十进制的ip字符串转化为网络字节序的32位ip地址 int rc = inet_pton(AF_INET, ip, &addr.sin_addr); if (rc <= 0) { return -DBE_BAD_IP; } addr.sin_family = AF_INET;//ipv4 addr.sin_port = htons(port);//网络字节序的port errno = 0; int fd = socket(AF_INET, SOCK_STREAM, 0);//生成基于TCP协议的套接字描述符 if (fd < 0) { return -DBE_OPEN_SOCKET; } SetServerOption(fd);//设置套接字选项,地址可重用和禁用Nagle算法 errno = 0; //绑定server端IP地址和port rc = bind(fd, (struct sockaddr *)&addr, sizeof(addr)); if (rc < 0) { ShutDown(fd); return -DBE_BIND_SOCKET; } return fd;}
- 接下来在函数 StartSelectLoop() 中创建子线程启动select循环监听(此处的线程管理是基于Linux内核的),主要是监听新的tcp连接或者新的传输数据的到达,该函数源码分析如下:
/*函数功能:启动select循环,创建子线程执行循环select监听函数参数: tsm:会话管理器全局地址函数返回值: 成功:返回0 失败:返回失败码TRANS_FAILED详细:*/int StartSelectLoop(TcpSessionMgr *tsm){ if (tsm == NULL) { return TRANS_FAILED; } if (tsm->isSelectLoopRunning) {//通过标志位判断该会话管理器是否在执行select循环监听 return 0; } //初始化线程属性,"tcp"为线程名,线程栈大小为(0x800 | MIN_STACK_SIZE),单位为字节,线程优先级为20 ThreadAttr attr = {"tcp", 0x800, 20, 0, 0}; //创建子线程执行循环监听 register ThreadId threadId = (ThreadId)TcpCreate((Runnable)SelectSessionLoop, tsm, &attr); if (threadId == NULL) { return TRANS_FAILED; } tsm->isSelectLoopRunning = true; return 0;}
- 下面子线程执行回调函数 SelectSessionLoop() 启动循环监听,该函数如下:
/*函数功能:启动select执行会话循环监听函数参数: tsm:会话管理器地址函数返回值:无详细:通过轮询方式调用select循环监听读事件和异常事件*/static void SelectSessionLoop(TcpSessionMgr *tsm){ if (tsm == NULL) { return; } SOFTBUS_PRINT("[TRANS] SelectSessionLoop begin\n"); tsm->isSelectLoopRunning = true;//将正在循环监听标志置位 while (true) {//轮询监听可读事件或者异常事件的变化 fd_set readfds;//可读描述符fd集合 fd_set exceptfds;//异常描述符fd集合 //初始化select监听列表,包括可读集合和异常集合 int maxFd = InitSelectList(tsm, &readfds, &exceptfds); if (maxFd < 0) { break; } errno = 0; //启动监听,返回发生变化的描述符数量 int ret = select(maxFd + 1, &readfds, NULL, &exceptfds, NULL); if (ret < 0) {//发生错误,设置errno SOFTBUS_PRINT("RemoveExceptSessionFd\r\n"); if (errno == EINTR || RemoveExceptSessionFd(tsm, &exceptfds) == 0) { continue; } SOFTBUS_PRINT("[TRANS] SelectSessionLoop close all Session\n"); CloseAllSession(tsm); break; } else if (ret == 0) {//监听集合中,没有描述符的状态发生变化 continue; } else {//监听到有描述符状态发生改变 ProcessData(tsm, &readfds);//处理新数据或者新连接 } } tsm->isSelectLoopRunning = false;//循环监听结束}
- 当有新的连接或者数据到达监听的描述符时,调用 ProcessData() 函数进行处理,具体分析如下:
/*函数功能:处理新数据或者新连接函数参数: tsm:会话管理器地址 rfds:可读描述符集合函数返回值:无详细:*/static void ProcessData(TcpSessionMgr *tsm, fd_set *rfds){ if (tsm == NULL || tsm->listenFd == -1) { return; } if (FD_ISSET(tsm->listenFd, rfds)) {//新连接到达 ProcessConnection(tsm);//处理新会话连接事件 return; } ProcessSesssionData(tsm, rfds);//处理tcp会话中的新数据到达事件}
- 接下来先分析处理新的tcp连接到达的事件的源代码,主要是在函数 ProcessConnection() 中实现:
/*函数功能:处理新会话连接函数参数: tsm:会话管理器地址函数返回值:无详细:*/static void ProcessConnection(TcpSessionMgr *tsm){ struct sockaddr_in addr = { 0 }; socklen_t addrLen = sizeof(addr);//建立socket连接 int cfd = accept(tsm->listenFd, (struct sockaddr *)&addr, &addrLen); if (cfd < 0) { SOFTBUS_PRINT("[TRANS] ProcessConnection accept fail\n"); return; } TcpSession *session = CreateTcpSession();//新建一个tcp会话并初始化相关属性 if (session == NULL) { SOFTBUS_PRINT("[TRANS] ProcessConnection CreateTcpSession fail, fd = %d\n", cfd); CloseSession(cfd); return; }//通过设备IP在认证连接列表中查找状态为OnLine的设备 AuthConn* authConn = GetOnLineAuthConnByIp(inet_ntoa(addr.sin_addr)); //若该设备已存在,则将认证连接的设备id拷贝给新建会话的设备id if (authConn != NULL && strncpy_s(session->deviceId, MAX_DEV_ID_LEN, authConn->deviceId, strlen(authConn->deviceId)) != 0) { SOFTBUS_PRINT("[TRANS] Error on copy deviceId of session.");//拷贝失败,则释放会话资源并且关闭会话资源 free(session); CloseSession(cfd); return; }//若拷贝成功,或者该设备不存在认证连接列表中,则继续执行 session->fd = cfd;//赋值通信fd int result = AddSession(tsm, session);//将该会话添加到会话管理器中 if (result == false) { SOFTBUS_PRINT("[TRANS] AddSession fail\n"); free(session); CloseSession(cfd); return; } return;}
- 在函数 ProcessConnection() 中,调用accept()建立socket连接之后,系统创建了一个tcp会话并初始化相关属性,具体实现在函数 CreateTcpSession() 中:
/*函数功能:创建一个新的tcp会话并进行相关属性初始化函数参数:无函数返回值: 成功:返回tcp会话结构体地址 失败:返回NULL详细:*/TcpSession *CreateTcpSession(void){//申请会话地址空间 TcpSession *tcpSession = (TcpSession *)malloc(sizeof(TcpSession)); if (tcpSession == NULL) { return NULL; } //为新会话初始化名字 if (strcpy_s(tcpSession->sessionName, NAME_LENGTH, "softbus_Lite_unknown") != 0) { SOFTBUS_PRINT("[TRANS] CreateTcpSession cpy busname fail\n"); free(tcpSession); return NULL; } //初始化TCP会话结构体的相关属性: (void)memset_s(tcpSession->deviceId, MAX_DEV_ID_LEN, 0, MAX_DEV_ID_LEN); (void)memset_s(tcpSession->groupId, NAME_LENGTH, 0, NAME_LENGTH); (void)memset_s(tcpSession->sessionKey, SESSION_KEY_LENGTH, 0, SESSION_KEY_LENGTH); tcpSession->seqNum = 0; tcpSession->fd = -1; tcpSession->busVersion = 0; tcpSession->routeType = 0; tcpSession->isAccepted = false; tcpSession->seqNumList = malloc(sizeof(List)); if (tcpSession->seqNumList == NULL) { free(tcpSession); return NULL; } ListInitHead(tcpSession->seqNumList);//初始化链表头指针 return tcpSession;}
- 最后,将刚创建的会话添加到会话管理器中,在函数 AddSession() 实现,源码分析如下:
/*函数功能:添加新tcp会话到会话管理器中函数参数: tsm:会话管理器地址 session:新会话地址函数返回值: 成功:返回true 失败:返回false详细:*/static bool AddSession(TcpSessionMgr *tsm, TcpSession *session){ if (tsm == NULL || session == NULL) { SOFTBUS_PRINT("[TRANS] AddSession invalid para\n"); return false; } for (int i = 0; i < MAX_SESSION_SUM_NUM; i++) { if (tsm->sessionMap_[i] == NULL) {//将会话表中第一个为NULL的赋值为新会话 tsm->sessionMap_[i] = session; return true; } } return false;}
至此,处理新连接的过程结束。对于会话中的数据收发,将在接下来的博客中进行介绍。