0%

网络编程之 socket 多端通信

回顾

上一例子已经了解了与socket相关几个基本函数,知道了 TCP 连接通信的基本流程:

TCP连接通信流程

上一个例子中仅仅是客户端与服务的连接后通信一条消息,这是最简单的 TCP 例子;现在则进一步引入Pthread,为客户端单独创建一个接收信息和发送信息的线程,使其可以随时的跟服务端通信。而为服务端单独创建一个与客户单进行通信的线程,负责接收客户的消息的接收以及响应(这里直接回复客户端发来的消息)。

pthread

POSIX线程(POSIX threads),简称 pthreads,是线程的POSIX标准。该标准定义了创建和操纵线程的一整套API。在类Unix操作系统(Unix、Linux、Mac OS X等)中,都使用 pthreads 作为操作系统的线程。

在多线程程序中经常用到的两个函数:pthread_create()pthread_join()

pthread_create

  1. 头文件:#include<pthread.h>

  2. 功能:创建线程,在线程成功创建以后,就开始运行相关的线程函数。

  3. 原型:

    1
    int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void*), void *arg);
  4. 参数:

    • tidp:指向线程 ID 的指针。
    • restrict_attr:用于指定各种不同的线程属性。
    • start_rtn:线程运行函数的起始地址。
    • arg:运行函数的参数。如果需要向start_rtn函数传递的参数不止一个,那么需要把这些参数放到一个结构中,然后把这个结构的地址作为 arg 的参数传入。
  5. 返回值:

    • 成功:返回 0。
    • 失败:返回出错编号。
  6. 注意:因为pthread并非Linux系统的默认库,而是 POSIX 线程库。在Linux中将其作为一个库来使用,因此加上 -lpthread(或 -pthread)以显式链接该库。函数在执行错误时的错误信息将作为返回值返回,**并不修改系统全局变量 errno**,所以无法使用perror()打印错误信息。

pthread_join

  1. 头文件:#include<pthread.h>

  2. 功能:以阻塞的方式等待 thread 指定的线程结束。当函数返回时,被等待线程的资源被收回。如果线程已经结束,那么该函数会立即返回。并且 thread 指定的线程必须是joinable的。

  3. 原型:

    1
    int pthread_join(pthread_t thread, void **retval);
  4. 参数:

    • thread:线程ID,标识唯一线程
    • retval:用户定义的指针,用来存储被等待线程的返回值。
  5. 返回值:

    • 成功:返回 0。
    • 失败:返回出错编号。
  6. 注意:一个线程不能被多个线程等待,也就是说对一个线程只能调用一次 pthread_join,否则只有一个能正确返回,其他的将返回 ESRCH 错误。

  7. 补充:在Linux中,默认情况下是在一个线程被创建后,必须使用此函数对创建的线程进行资源回收,但是可以设置Threads attributes来设置当一个线程结束时,直接回收此线程所占用的系统资源。

在Linux中,新建的线程并不是在原先的进程中,而是系统通过一个系统调用clone()。该系统调用copy了一个和原先进程完全一样的进程,并在这个进程中执行线程函数。不过这个copy过程和fork不一样。 copy后的进程和原先的进程共享了所有的变量,运行环境。这样,原先进程中的变量变动在copy后的进程中便能体现出来。

例子

Server

server.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#ifndef TCPServer_hpp
#define TCPServer_hpp

#include <iostream> // cout
#include <sys/socket.h> // socket
#include <netinet/in.h> // sockaddr_in
#include <arpa/inet.h> // inet_addr
#include <unistd.h> // close


#define BUFF_SIZE 4096


/** 返回值 枚举 */
typedef enum __retValue {
Ret_error = -2, // 出错
Ret_failed = -1, // 失败
Ret_success = 0, // 成功
}RetValue;


void* HandleClient(void* arg); /* 处理客户端消息 */


class TCPServer
{
private:
struct sockaddr_in serverAddr; // 服务端地址
socklen_t serAddrLen; // 地址长度
int serverSockfd; // 服务端 socket

/**
处理客户端消息

@param arg 线程参数(TCPServer实例指针)
*/
friend void* HandleClient(void* arg);


public:
TCPServer(std::string ipStr, unsigned int port);
~TCPServer();

/**
绑定 服务端 地址

@return 参见‘RetValue’
*/
RetValue BindAddr();

/**
监听端口连接请求

@param queueNum 最大等待处理连接队列数
@return 参见‘RetValue’
*/
RetValue Listen(unsigned int queueNum);

/**
运行服务端

@return 参见‘RetValue’
*/
RetValue Run();

};

#endif /* TCPServer_hpp */

server.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#include "TCPServer.hpp"


#define DEFAULT_QUEUE_NUM 5


TCPServer::TCPServer(std::string ipStr, unsigned int port)
{
if (true == ipStr.empty()
|| 0 >= ipStr.length())
{
std::cout << "Ip 地址不能为空!" << std::endl;
return;
}
// 创建 服务端 socket
serverSockfd = socket(PF_INET, SOCK_STREAM, 0);
if (0 > serverSockfd)
{
std::cout << "创建 socket 失败!" << std::endl;
return;
}
std::cout << "创建 socket 成功!" << std::endl;

// 设置 服务端 地址
serAddrLen = sizeof(serverAddr);
memset(&serverAddr, 0, serAddrLen); // 清空结构体
serverAddr.sin_family = AF_INET; // 协议族 类型
serverAddr.sin_port = htons(port); // 服务端 端口
serverAddr.sin_addr.s_addr = inet_addr(ipStr.c_str()); // 服务端 IP 地址
}


TCPServer::~TCPServer()
{
if (0 < serverSockfd)
{
std::cout << "关闭服务端 sockfd !" << std::endl;
close(serverSockfd); // 关闭服务端 sockfd
}
}


RetValue TCPServer::BindAddr()
{
// 绑定 IP 地址和端口号
int ret = bind(serverSockfd, (struct sockaddr*)&serverAddr, serAddrLen);
if (0 != ret)
{
std::cout << "绑定 IP 地址和端口失败!" << std::endl;
return Ret_failed;
}
std::cout << "绑定 IP 地址和端口成功!" << std::endl;
return Ret_success;
}


RetValue TCPServer::Listen(unsigned int queueNum)
{
// 监听端口(使 Socket 变为被动连接,等待客户端 Socket 连接)
int ret = listen(serverSockfd, 0 >= queueNum ? DEFAULT_QUEUE_NUM : queueNum);
if (0 != ret)
{
std::cout << "监听端口失败!" << std::endl;
return Ret_failed;
}
std::cout << "监听端口成功!" << std::endl;
return Ret_success;
}


RetValue TCPServer::Run()
{
// 客户端 地址
struct sockaddr_in clientAddr;
socklen_t cliAddrLen = sizeof(clientAddr);

while (1) // 循环等待客户端连接
{
// 阻塞式等待客户端连接
std::cout << "等待客户端连接。。。" << std::endl;
int clientSockfd = accept(serverSockfd, (struct sockaddr*)&clientAddr, &cliAddrLen);
if (0 > clientSockfd)
{
std::cout << "服务器接受客户端连接失败!" << std::endl;
continue;
}
std::cout << "服务器接受客户端(sockfd = " << clientSockfd << ")连接成功!" << std::endl;

pthread_t commP;
// 创建 与 客户通信线程
int ret = pthread_create(&commP, NULL, HandleClient, (void*)&clientSockfd);
if (0 != ret)
{
std::cout << "创建客户端(sockfd = " << clientSockfd << ")消息处理线程失败!" << std::endl;
close(clientSockfd);
continue;
}
std::cout << "创建客户端(sockfd = " << clientSockfd << ")消息处理线程成功!" << std::endl;
}

return Ret_success;
}


void* HandleClient(void* arg)
{
int sockfd = *(int*)arg;
while (1) // 循环处理客户端消息
{
// 接收消息
char recvBuf[BUFF_SIZE];
memset(recvBuf, 0, BUFF_SIZE);
ssize_t recvLen = recv(sockfd, recvBuf, BUFF_SIZE, 0);
if (0 > recvLen) // 出错
{
std::cout << "接收消息出错(sockfd = " << sockfd << ")!" << std::endl;
close(sockfd);
break;
}
else if (0 == recvLen) // 连接已断开
{
std::cout << "接收消息失败,(sockfd = " << sockfd << ")客户端连接已断开!" << std::endl;
close(sockfd);
break;
}
else // 接收成功
{
recvBuf[recvLen] = '\0';
std::cout << "接收到客户端(sockfd = " << sockfd << ")消息:" << recvBuf << std::endl;
}

// 发送消息(这里直接回复客户端发来的数据)
ssize_t sendLen = send(sockfd, recvBuf, recvLen, 0);
if (0 > sendLen) // 出错
{
std::cout << "发送消息出错(sockfd = " << sockfd << ")!" << std::endl;
close(sockfd);
break;
}
else if (0 == sendLen) // 连接已断开
{
std::cout << "发送消息失败,(sockfd = " << sockfd << ")客户端连接已断开!" << std::endl;
close(sockfd);
break;
}
else // 发送成功
{
// std::cout << "发送消息成功!" << std::endl;
if (0 == strcmp(recvBuf, "quit"))
{
std::cout << "客户端(sockfd = " << sockfd << ")已主动关闭连接!" << std::endl;
break;
}
}
}
std::cout << "关闭客户端 (sockfd = " << sockfd << ")" << std::endl;
close(sockfd);

return NULL;
}

server.main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include "TCPServer.hpp"


#define DEMO_PORT 5578
#define DEMO_IP "127.0.0.1"
#define MAX_LISTEN_QUEUE 10

int main()
{
TCPServer server(DEMO_IP, DEMO_PORT);
if (Ret_success != server.BindAddr())
{
exit(1);
}
if (Ret_success != server.Listen(MAX_LISTEN_QUEUE))
{
exit(1);
}
if (Ret_success != server.Run())
{
exit(1);
}

return 0;
}

Client

client.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#ifndef TCPClient_hpp
#define TCPClient_hpp

#include <iostream> // cout
#include <sys/socket.h> // socket
#include <netinet/in.h> // sockaddr_in
#include <arpa/inet.h> // inet_addr
#include <unistd.h> // close


#define BUFF_SIZE 4096


/** 返回值 枚举 */
typedef enum __retValue {
Ret_error = -2, // 出错
Ret_failed = -1, // 失败
Ret_success = 0, // 成功
}RetValue;


void* SendMsg(void *arg); /** 发送消息 */
void* RecvMsg(void *arg); /** 接收消息 */


class TCPClient
{
private:
struct sockaddr_in serverAddr; // 服务端地址
socklen_t serAddrLen; // 地址长度
int clientSockfd; // 客户端 socket

/**
发送消息

@param arg 线程参数(TCPClient实例指针)
*/
friend void* SendMsg(void* arg);

/**
接收消息

@param arg 线程参数(TCPClient实例指针)
*/
friend void* RecvMsg(void* arg);


public:
TCPClient(std::string ipStr, unsigned int port);
~TCPClient();

/**
连接 服务端

@return 参见‘RetValue’
*/
RetValue ConnServer();

/**
运行 客户端

@return 参见‘RetValue’
*/
RetValue Run();

};

#endif /* TCPClient_hpp */

client.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include "TCPClient.hpp"
#include <pthread.h>


TCPClient::TCPClient(std::string ipStr, unsigned int port)
{
if (true == ipStr.empty()
|| 0 >= ipStr.length())
{
std::cout << "Ip 地址不能为空!" << std::endl;
return;
}
// 创建 客户端 socket
clientSockfd = socket(PF_INET, SOCK_STREAM, 0);
if (0 > clientSockfd)
{
std::cout << "创建 socket 失败!" << std::endl;
return;
}
std::cout << "创建 socket 成功!" << std::endl;

// 设置 服务端 地址
serAddrLen = sizeof(serverAddr);
memset(&serverAddr, 0, serAddrLen); // 清空结构体
serverAddr.sin_family = AF_INET; // 协议族 类型
serverAddr.sin_port = htons(port); // 服务端 端口
serverAddr.sin_addr.s_addr = inet_addr(ipStr.c_str()); // 服务端 IP 地址
}


TCPClient::~TCPClient()
{
if (0 < clientSockfd)
{
std::cout << "关闭客户端 sockfd !" << std::endl;
close(clientSockfd); // 关闭客户端 sockfd
}
}


RetValue TCPClient::ConnServer()
{
std::cout << "开始连接 服务端。。。" << std::endl;
// 连接 服务端
int ret = connect(clientSockfd, (struct sockaddr*)&serverAddr, serAddrLen);
if (0 != ret)
{
std::cout << "连接 服务端 失败!" << std::endl;
return Ret_failed;
}
std::cout << "连接 服务端 成功!" << std::endl;
return Ret_success;
}


RetValue TCPClient::Run()
{
pthread_t sendP;
pthread_t recvP;

// 创建发送消息线程
int ret = pthread_create(&sendP, NULL, SendMsg, (void*)this);
if (0 != ret)
{
std::cout << "创建发送消息线程失败!" << std::endl;
return Ret_failed;
}
// 创建接收消息线程
ret = pthread_create(&recvP, NULL, RecvMsg, (void*)this);
if (0 != ret)
{
std::cout << "创建接收消息线程失败!" << std::endl;
return Ret_failed;
}
// 等待线程结束
void *result;
if (0 != pthread_join(sendP, &result))
{
perror("等待 发送消息 线程失败");
exit(1);
}
if (0 != pthread_join(recvP, &result))
{
perror("等待 接收消息 线程失败");
exit(1);
}
return Ret_success;
}


void* SendMsg(void *arg)
{
std::cout << "发送消息线程启动!" << std::endl;
TCPClient *client = (TCPClient *)arg;
std::string sendMsg;
while(1)
{
std::cin >> sendMsg;

ssize_t sendLen = send(client->clientSockfd, sendMsg.c_str(), sendMsg.length(), 0);
if (0 > sendLen) // 出错
{
std::cout << "发送消息出错!" << std::endl;
break;
}
else if (0 == sendLen) // 连接已断开
{
std::cout << "发送消息失败,连接已断开!" << std::endl;
break;
}
else // 发送成功!
{
if (0 == strcmp(sendMsg.c_str(), "quit"))
{
break;
}
// std::cout << "发送消息成功!" << std::endl;
}
}
return NULL;
}


void* RecvMsg(void *arg)
{
std::cout << "接收消息线程启动!" << std::endl;
TCPClient *client = (TCPClient *)arg;
char recvBuf[BUFF_SIZE];
ssize_t recvLen;
while (1)
{
memset(recvBuf, 0, BUFF_SIZE);
recvLen = recv(client->clientSockfd, recvBuf, BUFF_SIZE, 0);
if (0 > recvLen) // 出错
{
std::cout << "接收消息出错!" << std::endl;
break;
}
else if (0 == recvLen) // 连接已断开
{
std::cout << "接收消息失败,连接已断开!" << std::endl;
break;
}
else // 接收成功
{
recvBuf[recvLen] = '\0';
std::cout << "接收到服务端消息:" << recvBuf << std::endl;
if (0 == strcmp(recvBuf, "quit")) // 遇到 “quit” 则退出
{
break;
}
}
}
return NULL;
}

client.main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include "TCPClient.hpp"


#define DEMO_PORT 5578
#define DEMO_IP "127.0.0.1"


int main()
{
TCPClient client(DEMO_IP, DEMO_PORT);
if (Ret_success != client.ConnServer())
{
exit(1);
}
if (Ret_success != client.Run())
{
exit(1);
}

return 0;
}

Demo下载