> 文档中心 > OpenHarmony源码分析之分布式软总线:trans_service模块(2)/会话管理之新会话

OpenHarmony源码分析之分布式软总线:trans_service模块(2)/会话管理之新会话


一、 概述

trans_service模块基于系统内核提供的socket通信,向authmanager模块提供设备认证通道管理和设备认证数据的传输;向业务模块提供session管理和基于session的数据收发功能,并且通过GCM模块的加密功能提供收发报文的加解密保护。
在之前的博客OpenHarmony源码分析之分布式软总线:trans_service模块/认证通道管理中已经对认证通道管理的相关源码进行了详细的分析,因此本文重点介绍trans_service模块提供的第二个功能——会话管理。在OpenHarmony中,设备间的数据传输是基于TCP会话机制实现的,而在这一模块中主要是提供对会话的相关管理,接下来我们将重点分析关于新会话建立的相关源码。

二、 源码分析

  1. 此模块的入口函数CreateTcpSessionMgr() ,该函数的主要功能是创建会话管理器。源码分析如下:
/*函数功能:创建设备之间的TCP通信会话的管理器函数参数:    asServer:是否作为server端    localIp:设备IP函数返回值:    成功:返回绑定的端口号    失败:返回失败码详细:*/int CreateTcpSessionMgr(bool asServer, const char* localIp){    if (localIp == NULL) {//若设备IP为NULL则返回失败码 return TRANS_FAILED;    }    if (InitTcpMgrLock() != 0 || GetTcpMgrLock() != 0) {//初始化锁并加锁 return TRANS_FAILED;    }    int ret = InitGSessionMgr();//初始化全局会话管理器    if (ReleaseTcpMgrLock() != 0 || ret != 0) {//解锁 FreeSessionMgr(); return TRANS_FAILED;    }    g_sessionMgr->asServer = asServer;    //初始化server端套接字,生成监听套接字fd,并绑定IP和port    int listenFd = OpenTcpServer(localIp, DEFAULT_TRANS_PORT);    if (listenFd < 0) { SOFTBUS_PRINT("[TRANS] CreateTcpSessionMgr OpenTcpServer fail\n"); FreeSessionMgr(); return TRANS_FAILED;    }    //将该套接字fd从CLOSED转换到LISTEN状态,监听client端(主动端)发起的连接信息    int rc = listen(listenFd, LISTEN_BACKLOG);    if (rc != 0) { SOFTBUS_PRINT("[TRANS] CreateTcpSessionMgr listen fail\n"); CloseSession(listenFd); FreeSessionMgr(); return TRANS_FAILED;    }    g_sessionMgr->listenFd = listenFd;//赋值全局会话管理器的监听套接字描述符//捕获SIGPIPE信号,SIG_IGN表示忽视该信号,不执行SIGPIPE默认操作:终止程序。此处是为了防止对端突然关闭socket引起程序终止。    signal(SIGPIPE, SIG_IGN);    if (StartSelectLoop(g_sessionMgr) != 0) {//创建子线程启动select循环监听 SOFTBUS_PRINT("[TRANS] CreateTcpSessionMgr StartSelectLoop fail\n"); CloseSession(listenFd); FreeSessionMgr(); return TRANS_FAILED;    }    return GetSockPort(listenFd);}
  1. 在上述函数中,首先进行会话管理器的初始化,在函数 InitGSessionMgr() 中实现,代码分析如下:
/*函数功能:初始化全局会话管理器的属性,包括内存空间、各个属性的初始值函数参数:无函数返回值:    成功:返回0    失败:返回失败码详细:*/static int InitGSessionMgr(void){    if (g_sessionMgr != NULL) {//若g_sessionMgr不为NULL,表示已存在会话管理器 return 0;    }    g_sessionMgr = malloc(sizeof(TcpSessionMgr));//为全局会话管理器申请内存空间    if (g_sessionMgr == NULL) { return TRANS_FAILED;    }//将刚申请的地址空间清0    (void)memset_s(g_sessionMgr, sizeof(TcpSessionMgr), 0, sizeof(TcpSessionMgr));    //以下为该全局会话管理器的各个属性赋初始值    g_sessionMgr->listenFd = -1;    g_sessionMgr->isSelectLoopRunning = false;    for (int i = 0; i < MAX_SESSION_SUM_NUM; i++) {//将会话表初始化为NULL g_sessionMgr->sessionMap_[i] = NULL;    }    for (int i = 0; i < MAX_SESSION_SERVER_NUM; i++) {//将服务端监听器表初始化为NULL g_sessionMgr->serverListenerMap[i] = NULL;    }    return 0;}
  1. 然后调用 OpenTcpServer() 函数初始化server端套接字,生成监听套接字fd,并绑定IP和port,此函数代码的分析如下:
/*函数功能: 初始化server端套接字,绑定ip地址及port函数参数:    ip    需要进行绑定的ip地址;    port  需要进行绑定的port函数返回值:    成功    返回生成的套接字描述符    失败    返回错误码详细:*/int OpenTcpServer(const char *ip, uint16_t port){    if (ip == NULL) { return -DBE_BAD_PARAM;    }    struct sockaddr_in addr = {0};    errno = 0;    //将点分十进制的ip字符串转化为网络字节序的32位ip地址    int rc = inet_pton(AF_INET, ip, &addr.sin_addr);    if (rc <= 0) { return -DBE_BAD_IP;    }    addr.sin_family = AF_INET;//ipv4    addr.sin_port = htons(port);//网络字节序的port    errno = 0;    int fd = socket(AF_INET, SOCK_STREAM, 0);//生成基于TCP协议的套接字描述符    if (fd < 0) { return -DBE_OPEN_SOCKET;    }    SetServerOption(fd);//设置套接字选项,地址可重用和禁用Nagle算法    errno = 0;    //绑定server端IP地址和port    rc = bind(fd, (struct sockaddr *)&addr, sizeof(addr));    if (rc < 0) { ShutDown(fd); return -DBE_BIND_SOCKET;    }    return fd;}
  1. 接下来在函数 StartSelectLoop() 中创建子线程启动select循环监听(此处的线程管理是基于Linux内核的),主要是监听新的tcp连接或者新的传输数据的到达,该函数源码分析如下:
/*函数功能:启动select循环,创建子线程执行循环select监听函数参数:    tsm:会话管理器全局地址函数返回值:    成功:返回0    失败:返回失败码TRANS_FAILED详细:*/int StartSelectLoop(TcpSessionMgr *tsm){    if (tsm == NULL) { return TRANS_FAILED;    }    if (tsm->isSelectLoopRunning) {//通过标志位判断该会话管理器是否在执行select循环监听 return 0;    }    //初始化线程属性,"tcp"为线程名,线程栈大小为(0x800 | MIN_STACK_SIZE),单位为字节,线程优先级为20    ThreadAttr attr = {"tcp", 0x800, 20, 0, 0};    //创建子线程执行循环监听    register ThreadId threadId = (ThreadId)TcpCreate((Runnable)SelectSessionLoop, tsm, &attr);    if (threadId == NULL) { return TRANS_FAILED;    }    tsm->isSelectLoopRunning = true;    return 0;}
  1. 下面子线程执行回调函数 SelectSessionLoop() 启动循环监听,该函数如下:
/*函数功能:启动select执行会话循环监听函数参数:    tsm:会话管理器地址函数返回值:无详细:通过轮询方式调用select循环监听读事件和异常事件*/static void SelectSessionLoop(TcpSessionMgr *tsm){    if (tsm == NULL) { return;    }    SOFTBUS_PRINT("[TRANS] SelectSessionLoop begin\n");    tsm->isSelectLoopRunning = true;//将正在循环监听标志置位    while (true) {//轮询监听可读事件或者异常事件的变化 fd_set readfds;//可读描述符fd集合 fd_set exceptfds;//异常描述符fd集合 //初始化select监听列表,包括可读集合和异常集合 int maxFd = InitSelectList(tsm, &readfds, &exceptfds); if (maxFd < 0) {     break; } errno = 0; //启动监听,返回发生变化的描述符数量 int ret = select(maxFd + 1, &readfds, NULL, &exceptfds, NULL); if (ret < 0) {//发生错误,设置errno     SOFTBUS_PRINT("RemoveExceptSessionFd\r\n");     if (errno == EINTR || RemoveExceptSessionFd(tsm, &exceptfds) == 0) {  continue;     }     SOFTBUS_PRINT("[TRANS] SelectSessionLoop close all Session\n");     CloseAllSession(tsm);     break; } else if (ret == 0) {//监听集合中,没有描述符的状态发生变化     continue; } else {//监听到有描述符状态发生改变     ProcessData(tsm, &readfds);//处理新数据或者新连接 }    }    tsm->isSelectLoopRunning = false;//循环监听结束}
  1. 当有新的连接或者数据到达监听的描述符时,调用 ProcessData() 函数进行处理,具体分析如下:
/*函数功能:处理新数据或者新连接函数参数:    tsm:会话管理器地址    rfds:可读描述符集合函数返回值:无详细:*/static void ProcessData(TcpSessionMgr *tsm, fd_set *rfds){    if (tsm == NULL || tsm->listenFd == -1) { return;    }    if (FD_ISSET(tsm->listenFd, rfds)) {//新连接到达 ProcessConnection(tsm);//处理新会话连接事件 return;    }    ProcessSesssionData(tsm, rfds);//处理tcp会话中的新数据到达事件}
  1. 接下来先分析处理新的tcp连接到达的事件的源代码,主要是在函数 ProcessConnection() 中实现:
/*函数功能:处理新会话连接函数参数:    tsm:会话管理器地址函数返回值:无详细:*/static void ProcessConnection(TcpSessionMgr *tsm){    struct sockaddr_in addr = { 0 };    socklen_t addrLen = sizeof(addr);//建立socket连接    int cfd = accept(tsm->listenFd, (struct sockaddr *)&addr, &addrLen);    if (cfd < 0) { SOFTBUS_PRINT("[TRANS] ProcessConnection accept fail\n"); return;    }    TcpSession *session = CreateTcpSession();//新建一个tcp会话并初始化相关属性    if (session == NULL) { SOFTBUS_PRINT("[TRANS] ProcessConnection CreateTcpSession fail, fd = %d\n", cfd); CloseSession(cfd); return;    }//通过设备IP在认证连接列表中查找状态为OnLine的设备    AuthConn* authConn = GetOnLineAuthConnByIp(inet_ntoa(addr.sin_addr));    //若该设备已存在,则将认证连接的设备id拷贝给新建会话的设备id    if (authConn != NULL && strncpy_s(session->deviceId, MAX_DEV_ID_LEN, authConn->deviceId, strlen(authConn->deviceId)) != 0) { SOFTBUS_PRINT("[TRANS] Error on copy deviceId of session.");//拷贝失败,则释放会话资源并且关闭会话资源 free(session); CloseSession(cfd); return;    }//若拷贝成功,或者该设备不存在认证连接列表中,则继续执行    session->fd = cfd;//赋值通信fd    int result = AddSession(tsm, session);//将该会话添加到会话管理器中    if (result == false) { SOFTBUS_PRINT("[TRANS] AddSession fail\n"); free(session); CloseSession(cfd); return;    }    return;}
  1. 在函数 ProcessConnection() 中,调用accept()建立socket连接之后,系统创建了一个tcp会话并初始化相关属性,具体实现在函数 CreateTcpSession() 中:
/*函数功能:创建一个新的tcp会话并进行相关属性初始化函数参数:无函数返回值:    成功:返回tcp会话结构体地址    失败:返回NULL详细:*/TcpSession *CreateTcpSession(void){//申请会话地址空间    TcpSession *tcpSession = (TcpSession *)malloc(sizeof(TcpSession));    if (tcpSession == NULL) { return NULL;    }    //为新会话初始化名字    if (strcpy_s(tcpSession->sessionName, NAME_LENGTH, "softbus_Lite_unknown") != 0) { SOFTBUS_PRINT("[TRANS] CreateTcpSession cpy busname fail\n"); free(tcpSession); return NULL;    }    //初始化TCP会话结构体的相关属性:    (void)memset_s(tcpSession->deviceId, MAX_DEV_ID_LEN, 0, MAX_DEV_ID_LEN);    (void)memset_s(tcpSession->groupId, NAME_LENGTH, 0, NAME_LENGTH);    (void)memset_s(tcpSession->sessionKey, SESSION_KEY_LENGTH, 0, SESSION_KEY_LENGTH);    tcpSession->seqNum = 0;    tcpSession->fd = -1;    tcpSession->busVersion = 0;    tcpSession->routeType = 0;    tcpSession->isAccepted = false;    tcpSession->seqNumList = malloc(sizeof(List));    if (tcpSession->seqNumList == NULL) { free(tcpSession); return NULL;    }    ListInitHead(tcpSession->seqNumList);//初始化链表头指针    return tcpSession;}
  1. 最后,将刚创建的会话添加到会话管理器中,在函数 AddSession() 实现,源码分析如下:
/*函数功能:添加新tcp会话到会话管理器中函数参数:    tsm:会话管理器地址    session:新会话地址函数返回值:    成功:返回true    失败:返回false详细:*/static bool AddSession(TcpSessionMgr *tsm, TcpSession *session){    if (tsm == NULL || session == NULL) { SOFTBUS_PRINT("[TRANS] AddSession invalid para\n"); return false;    }    for (int i = 0; i < MAX_SESSION_SUM_NUM; i++) { if (tsm->sessionMap_[i] == NULL) {//将会话表中第一个为NULL的赋值为新会话     tsm->sessionMap_[i] = session;     return true; }    }    return false;}

至此,处理新连接的过程结束。对于会话中的数据收发,将在接下来的博客中进行介绍。