0%

网络编程之 socket 实战

回顾

上一例子已经了解了 TCP 客户端与服务端的保活技术:心跳机制

现在通过模拟客户端请求服务端对远端用户的一些操作进行实战演练(远程唤醒用户实战列子。)。

本例子由客户端给服务器发送相关请求数据包,基本流程思路:

  1. 设计相关的通信协议(数据结构);
  2. 服务端根据客户端的不同请求命令,执行响应的操作予以回复客户端。

例子

相关通信协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/** 命令类型 枚举 */
typedef enum __cmdType {
Cmd_reqStatus = 0x01, // 请求用户状态
Cmd_respStatus = 0x02, // 回复用户状态
Cmd_reqWakeUp = 0x03, // 请求唤醒用户
Cmd_respWakeUp = 0x04, // 回复唤醒用户
Cmd_reqSuspend = 0x05, // 请求挂起用户
Cmd_respSuspend = 0x06, // 回复挂起用户
}CmdType;


/** 用户状态 枚举 */
typedef enum __userStatus {
Status_unknow = 0, // 未知状态
Status_online = 1, // 在线状态
Status_waking = 2, // 正在唤醒
Status_suspending = 3, // 正在挂起
Status_suspended = 4, // 挂起状态
Status_offline = 5, // 离线状态
}UserStatus;


/** 消息类型 枚举 */
typedef enum __msgType {
Msg_heart = 0, // 心跳包
Msg_other = 1, // 其他数据
}MsgType;


/** 消息头 结构体 */
typedef struct __msgHead {
MsgType msgType; // 消息类型
unsigned int msgLen; // 消息长度
char data[0]; // 消息实体
}MsgHead;


/** 请求消息体 结构体 */
typedef struct __msgReq {
unsigned int magic; // 魔术符
unsigned int cmd; // 命令类型
unsigned char userId[0]; // 用户id号
}MsgReq;


/** 响应消息体 结构体 */
typedef struct __msgResp {
unsigned int magic; // 魔术符
unsigned int cmd; // 命令类型
unsigned int status; // 用户状态
unsigned char userId[0]; // 用户id号
}MsgResp;

Server

server.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#ifndef TCPServer_hpp
#define TCPServer_hpp

#include <iostream> // cout
#include <sys/socket.h> // socket
#include <netinet/in.h> // sockaddr_in
#include <arpa/inet.h> // inet_addr
#include <unistd.h> // close
#include <map> // map


#define BUFF_SIZE 4096 // 缓存大小
#define HB_INTERVAL 3 // 心跳时间间隔(单位:秒)
#define CHECK_HB_COUNT 5 // 无法检测到心跳最大次数


/** 返回值 枚举 */
typedef enum __retValue {
Ret_error = -2, // 出错
Ret_failed = -1, // 失败
Ret_success = 0, // 成功
}RetValue;


void* CheckHeartBeat(void* arg); // 心跳检测


class TCPServer
{
private:
struct sockaddr_in serverAddr; // 服务端地址
socklen_t serAddrLen; // 地址长度
int serverSockfd; // 服务端 socket
struct timeval timeout; // 超时时间
int maxFd; // 最大 sockfd
fd_set masterSet; // 所有 sockfd 集合,包括服务端 sockfd 和所有已连接 sockfd
fd_set workingSet; // 工作集合
std::map<int, std::pair<std::string, int>> sockfdMap; // 记录连接的客户端 : <sockfd, <ip, checkCount>>

/**
接受客户端连接

@return 参见‘RetValue’
*/
RetValue AcceptConn();

/**
接收客户端消息

@return 参见‘RetValue’
*/
RetValue RecvMsg();

/**
格式化发送数据

@param buf 数据缓冲区
@param bufLen 缓冲区长度
@param cmd 命令类型
@param status 用户状态
@param userId 用户 ID
@return > 0:格式化成功(需要发送数据大小);否则,失败
*/
int RespDataFormat(char *buf, unsigned int bufLen, unsigned int cmd, unsigned int status, const char userId[]);

/**
解析接收的数据

@param buf 数据缓冲区
@param msgLen 消息长度
@param cmd 命令类型
@param userId 用户 ID
@return 解析是否成功;0:成功,否则失败
*/
int ReqDataParse(char *buf, unsigned int msgLen, unsigned int *cmd, char *userId);

/**
心跳检测

@param arg 线程参数(TCPServer实例指针)
*/
friend void* CheckHeartBeat(void* arg);

public:
TCPServer(std::string ipStr, unsigned int port);
~TCPServer();

/**
绑定 服务端 地址

@return 参见‘RetValue’
*/
RetValue BindAddr();

/**
监听端口连接请求

@param queueNum 最大等待处理连接队列数
@return 参见‘RetValue’
*/
RetValue Listen(unsigned int queueNum);

/**
运行服务端

@return 参见‘RetValue’
*/
RetValue Run();


};

#endif /* TCPServer_hpp */

server.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
#include "TCPServer.hpp"
#include "TransProtocol.h"


#define MAGIC "SYL "
#define DEFAULT_QUEUE_NUM 5


TCPServer::TCPServer(std::string ipStr, unsigned int port)
{
if (true == ipStr.empty()
|| 0 >= ipStr.length())
{
std::cout << "Ip 地址不能为空!" << std::endl;
return;
}
// 创建 服务端 socket
serverSockfd = socket(PF_INET, SOCK_STREAM, 0);
if (0 > serverSockfd)
{
std::cout << "创建 socket 失败!" << std::endl;
return;
}
std::cout << "创建 socket 成功!" << std::endl;

int opt = 1;
// 让端口释放后立即就可以被再次使用。
setsockopt(serverSockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 设置 服务端 地址
serAddrLen = sizeof(serverAddr);
memset(&serverAddr, 0, serAddrLen); // 清空结构体
serverAddr.sin_family = AF_INET; // 协议族 类型
serverAddr.sin_port = htons(port); // 服务端 端口
serverAddr.sin_addr.s_addr = inet_addr(ipStr.c_str()); // 服务端 IP 地址

maxFd = serverSockfd; // 初始化 maxFd
FD_ZERO(&masterSet); // 情况 masterSet
FD_SET(serverSockfd, &masterSet); // 添加监听 fd
}


TCPServer::~TCPServer()
{
for (int sockfd = 0; sockfd <= maxFd; sockfd++)
{
if (FD_ISSET(sockfd, &masterSet))
{
std::cout << "关闭(sockfd = " << sockfd << ") !" << std::endl;
close(sockfd); // 关闭 sockfd
}
}
}


RetValue TCPServer::BindAddr()
{
// 绑定 IP 地址和端口号
int ret = bind(serverSockfd, (struct sockaddr*)&serverAddr, serAddrLen);
if (0 != ret)
{
std::cout << "绑定 IP 地址和端口失败!" << std::endl;
return Ret_failed;
}
std::cout << "绑定 IP 地址和端口成功!" << std::endl;
return Ret_success;
}


RetValue TCPServer::Listen(unsigned int queueNum)
{
// 监听端口(使 Socket 变为被动连接,等待客户端 Socket 连接)
int ret = listen(serverSockfd, 0 >= queueNum ? DEFAULT_QUEUE_NUM : queueNum);
if (0 != ret)
{
std::cout << "监听端口失败!" << std::endl;
return Ret_failed;
}
std::cout << "监听端口成功!" << std::endl;
return Ret_success;
}


RetValue TCPServer::Run()
{
pthread_t threadId;
// 创建心跳检测线程
int ret = pthread_create(&threadId, NULL, CheckHeartBeat, (void*)this);
if (0 != ret)
{
std::cout << "创建 心跳检测 线程失败!" << std::endl;
return Ret_failed;
}
while (1)
{
FD_ZERO(&workingSet); // 每次循环清空 working set
memcpy(&workingSet, &masterSet, sizeof(masterSet));

timeout.tv_sec = 30;
timeout.tv_usec = 0;
int nums = select(maxFd + 1, &workingSet, NULL, NULL, &timeout);
if (0 > nums) // 出错
{
std::cout << "select 出错!" << std::endl;
return Ret_error;
}
else if (0 == nums) // 超时,可以通过心跳检测判断是否断开
{
continue;
}
else // 有新内容可读写
{
if (FD_ISSET(serverSockfd, &workingSet)) // 有新客户端连接请求
{
AcceptConn();
}
else // 客户端有消息更新
{
RecvMsg();
}
}
}
return Ret_success;
}


RetValue TCPServer::AcceptConn()
{
// 客户端 地址
struct sockaddr_in clientAddr;
socklen_t cliAddrLen = sizeof(clientAddr);

// 阻塞式等待客户端连接
std::cout << "等待客户端连接。。。" << std::endl;
int clientSockfd = accept(serverSockfd, (struct sockaddr*)&clientAddr, &cliAddrLen);
if (0 > clientSockfd)
{
std::cout << "服务器接受客户端连接失败!" << std::endl;
return Ret_failed;
}
std::string ipStr = inet_ntoa(clientAddr.sin_addr);
std::cout << "服务器接受客户端(sockfd = " << clientSockfd << " ,ip = " << ipStr << ")连接成功!" << std::endl;

// 添加 客户端 sockfd
sockfdMap.insert(make_pair(clientSockfd, make_pair(ipStr, 0)));

// 将新建的连接 clientSockfd 加入 masterSet
FD_SET(clientSockfd, &masterSet);

// 更新 max sockfd
if (clientSockfd > maxFd)
{
maxFd = clientSockfd;
}
return Ret_success;
}


#pragma mark -- 格式化发送数据
int TCPServer::RespDataFormat(char *buf, unsigned int bufLen, unsigned int cmd, unsigned int status, const char userId[])
{
if(NULL == buf)
{
return -1;
}
size_t uIdLen = (NULL == userId ? 0 : strlen(userId));
if(bufLen < (sizeof(MsgResp) + uIdLen))
{
return -2;
}
MsgResp *msgResp = (MsgResp *)buf;
memcpy(&msgResp->magic, MAGIC, 4);
msgResp->cmd = htonl(cmd);
msgResp->status = htonl(status);
if(userId != NULL)
{
memcpy(msgResp->userId, userId, uIdLen);
}
return (int)(uIdLen + sizeof(MsgResp));
}


#pragma mark -- 数据解析
int TCPServer::ReqDataParse(char *buf, unsigned int msgLen, unsigned int *cmd, char *userId)
{
if(NULL == buf)
{
return -1;
}
if(msgLen < (sizeof(MsgReq)))
{
return -2;
}

MsgReq *msgReq = (MsgReq *)buf;

if (0 != memcmp(MAGIC, &msgReq->magic, 4))
{
return -3;
}

if(NULL != cmd)
{
*cmd = ntohl(msgReq->cmd);
}

if(NULL != userId && msgLen > sizeof(MsgReq))
{
memcpy(userId, msgReq->userId, msgLen - sizeof(MsgReq));
}
return 0;
}


RetValue TCPServer::RecvMsg()
{
char recvBuf[BUFF_SIZE];
char sendBuf[BUFF_SIZE];

ssize_t recvLen = 0;
ssize_t sendMsgLen = 0;

for(int sockfd = 0; sockfd <= maxFd; sockfd++)
{
if(FD_ISSET(sockfd, &workingSet)) // 有数据可读写
{
bool closeConn = false; // 标记当前连接是否断开了

memset(recvBuf, 0, BUFF_SIZE);
memset(sendBuf, 0, BUFF_SIZE);

recvLen = recv(sockfd, &recvBuf, BUFF_SIZE, 0); // 接收消息
if (0 > recvLen) // 出错
{
std::cout << "接收消息出错(sockfd = " << sockfd << ")!" << std::endl;
closeConn = true;
}
else if (0 == recvLen) // 连接已断开
{
std::cout << "接收消息失败,(sockfd = " << sockfd << ")客户端连接已断开!" << std::endl;
closeConn = true;
}
else
{
MsgHead *recvMsgH = (MsgHead*)recvBuf;
if(Msg_heart == recvMsgH->msgType) // 心跳消息
{
sockfdMap[sockfd].second = 0; // 每次收到心跳包,count重置为0,标识连接中

std::cout << "接收客户端(sockfd = " << sockfd << ")的心跳数据:" << recvMsgH->data << std::endl;
}
else // 其他消息
{
unsigned int reqCmd = 0;
unsigned int respCmd = 0;
char userId[512];
memset(userId, 0, 512);

MsgHead *sendMsgH = (MsgHead *)sendBuf;

if (0 == ReqDataParse(recvMsgH->data, recvMsgH->msgLen, &reqCmd, userId))
{
std::cout << "接收到客户端(sockfd = " << sockfd << ")请求:cmd:" << reqCmd << "\tuserId:" << userId << std::endl;
// 休眠 3 秒 模拟相关操作
sleep(3);

respCmd = reqCmd + 1;

switch ((CmdType)reqCmd)
{
case Cmd_reqStatus:
{
sendMsgLen = RespDataFormat(sendMsgH->data, BUFF_SIZE - sizeof(MsgHead), respCmd, Status_suspended, userId);
}
break;

case Cmd_reqWakeUp:
{
sendMsgLen = RespDataFormat(sendMsgH->data, BUFF_SIZE - sizeof(MsgHead), respCmd, Status_waking, userId);
}
break;

case Cmd_reqSuspend:
{
sendMsgLen = RespDataFormat(sendMsgH->data, BUFF_SIZE - sizeof(MsgHead), respCmd, Status_suspending, userId);
}
break;

default:
break;
}
sendMsgH->msgLen = (unsigned int)sendMsgLen;
}
else
{
std::cout << "接收客户端(sockfd = " << sockfd << ")的数据:" << recvMsgH->data << std::endl;
//
sendMsgLen = recvLen - sizeof(MsgHead);
sendMsgH->msgLen = (unsigned int)sendMsgLen;
memcpy(sendMsgH->data, recvMsgH->data, sendMsgLen);
}

sendMsgH->msgType = Msg_other;

ssize_t sendLen = send(sockfd, sendBuf, sendMsgLen + sizeof(MsgHead), 0);

if (0 > sendLen) // 出错
{
std::cout << "发送消息出错!" << std::endl;
closeConn = true;
}
else if (0 == sendLen) // 连接已断开
{
std::cout << "发送消息失败,连接已断开!" << std::endl;
closeConn = true;
}
else // 发送成功!
{
if (0 == strcmp(recvMsgH->data, "quit")) // 遇到 “quit” 则退出
{
std::cout << "客户端(sockfd = " << sockfd << ")已主动关闭" << std::endl;
closeConn = true;
}
}
}

}

if(true == closeConn)
{
std::cout << "关闭客户端(sockfd = " << sockfd << ")" << std::endl;
close(sockfd); // 关闭套接字
FD_CLR(sockfd, &masterSet); // 从集合中清除相应 sockfd
sockfdMap.erase(sockfd); // 从map中移除该记录

if(sockfd == maxFd) // 需要更新 maxFd;
{
while(false == FD_ISSET(maxFd, &masterSet))
{
maxFd--;
}
}
}
}
}
return Ret_success;
}


#pragma mark -- 心跳检测 线程
void* CheckHeartBeat(void* arg)
{
std::cout << "心跳检测 线程启动!" << std::endl;
TCPServer* server = (TCPServer*)arg;
while(1)
{
std::cout << "当前已连接客户端数量:" << server->sockfdMap.size() << std::endl;

std::map<int, std::pair<std::string, int>>::iterator it = server->sockfdMap.begin();
for( ; it != server->sockfdMap.end(); ) // 循环检测所有客户端连接 sockfd 心跳
{
if(CHECK_HB_COUNT <= it->second.second) // (规定的时间内)没有收到心跳包,判定客户端掉线
{
int sockfd = it->first;
std::string ipStr = it->second.first;

std::cout << "客户端(sockfd = " << sockfd << "ipStr = " << ipStr << ")已经掉线" << std::endl;

close(sockfd); // 关闭该连接
FD_CLR(sockfd, &server->masterSet); // 从集合中清除相应 sockfd
if(sockfd == server->maxFd) // 需要更新 maxFd;
{
// 更新 maxFd
while(false == FD_ISSET(server->maxFd, &server->masterSet))
{
server->maxFd--;
}
}

server->sockfdMap.erase(it++); // 从map中移除该记录
}
else if(CHECK_HB_COUNT > it->second.second
&& 0 <= it->second.second)
{
it->second.second += 1;
it++;
}
else
{
it++;
}
}
sleep(HB_INTERVAL); // 定时检查
}
}

server.main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include "TCPServer.hpp"


#define DEMO_PORT 5578
#define DEMO_IP "127.0.0.1"
#define MAX_LISTEN_QUEUE 10

int main()
{
TCPServer server(DEMO_IP, DEMO_PORT);
if (Ret_success != server.BindAddr())
{
exit(1);
}
if (Ret_success != server.Listen(MAX_LISTEN_QUEUE))
{
exit(1);
}
if (Ret_success != server.Run())
{
exit(1);
}

return 0;
}

Client

client.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#ifndef TCPClient_hpp
#define TCPClient_hpp

#include <iostream> // cout
#include <sys/socket.h> // socket
#include <netinet/in.h> // sockaddr_in
#include <arpa/inet.h> // inet_addr
#include <unistd.h> // close


#define BUFF_SIZE 4096
#define HB_INTERVAL 10 // 心跳时间间隔(单位:秒)


/** 返回值 枚举 */
typedef enum __retValue {
Ret_error = -2, // 出错
Ret_failed = -1, // 失败
Ret_success = 0, // 成功
}RetValue;


void* SendHeartBeat(void *arg); /** 发送心跳 */
void* SendMsg(void *arg); /** 发送消息 */
void* RecvMsg(void *arg); /** 接收消息 */


class TCPClient
{
private:
struct sockaddr_in serverAddr; // 服务端地址
socklen_t serAddrLen; // 地址长度
int clientSockfd; // 客户端 socket


/**
格式化发送数据

@param buf 数据缓冲区
@param bufLen 缓冲区长度
@param cmd 命令类型
@param userId 用户 ID
@return > 0:格式化成功(需要发送数据大小);否则,失败
*/
int SendDataFormat(char *buf, unsigned int bufLen, unsigned int cmd, const char userId[]);

/**
解析接收的数据

@param buf 数据缓冲区
@param msgLen 消息长度
@param cmd 命令类型
@param status 用户状态
@param userId 用户 ID
@return 解析是否成功;0:成功,否则失败
*/
int RecvDataParse(char *buf, unsigned int msgLen, unsigned int *cmd, unsigned int *status, char *userId);

/**
发送心跳

@param arg 线程参数(TCPClient实例指针)
*/
friend void* SendHeartBeat(void* arg);

/**
发送消息

@param arg 线程参数(TCPClient实例指针)
*/
friend void* SendMsg(void* arg);

/**
接收消息

@param arg 线程参数(TCPClient实例指针)
*/
friend void* RecvMsg(void* arg);


public:
TCPClient(std::string ipStr, unsigned int port);
~TCPClient();

/**
连接 服务端

@return 参见‘RetValue’
*/
RetValue Connect();

/**
运行 客户端

@return 参见‘RetValue’
*/
RetValue Run();

};

#endif /* TCPClient_hpp */

client.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
#include "TCPClient.hpp"
#include <pthread.h>
#include <sstream>
#include "TransProtocol.h"

#define MAGIC "SYL "
#define USER_ID "ShenYuanLuo"


TCPClient::TCPClient(std::string ipStr, unsigned int port)
{
if (true == ipStr.empty()
|| 0 >= ipStr.length())
{
std::cout << "Ip 地址不能为空!" << std::endl;
return;
}
// 创建 客户端 socket
clientSockfd = socket(PF_INET, SOCK_STREAM, 0);
if (0 > clientSockfd)
{
std::cout << "创建 socket 失败!" << std::endl;
return;
}
std::cout << "创建 socket 成功!" << std::endl;

// 设置 服务端 地址
serAddrLen = sizeof(serverAddr);
memset(&serverAddr, 0, serAddrLen); // 清空结构体
serverAddr.sin_family = AF_INET; // 协议族 类型
serverAddr.sin_port = htons(port); // 服务端 端口
serverAddr.sin_addr.s_addr = inet_addr(ipStr.c_str()); // 服务端 IP 地址
}


TCPClient::~TCPClient()
{
if (0 < clientSockfd)
{
std::cout << "关闭客户端 sockfd !" << std::endl;
close(clientSockfd); // 关闭客户端 sockfd
}
}


RetValue TCPClient::Connect()
{
std::cout << "开始连接 服务端。。。" << std::endl;
// 连接 服务端
int ret = connect(clientSockfd, (struct sockaddr*)&serverAddr, serAddrLen);
if (0 != ret)
{
std::cout << "连接 服务端 失败!" << std::endl;
return Ret_failed;
}
std::cout << "连接 服务端 成功!" << std::endl;
return Ret_success;
}

RetValue TCPClient::Run()
{
pthread_t heartBeatP;
pthread_t sendP;
pthread_t recvP;

// 创建发送心跳线程
int ret = pthread_create(&heartBeatP, NULL, SendHeartBeat, (void*)this);
if(0 != ret)
{
std::cout << "创建发送心跳线程失败!" << std::endl;
return Ret_failed;
}

// 创建发送消息线程
ret = pthread_create(&sendP, NULL, SendMsg, (void*)this);
if (0 != ret)
{
std::cout << "创建发送消息线程失败!" << std::endl;
return Ret_failed;
}

// 创建接收消息线程
ret = pthread_create(&recvP, NULL, RecvMsg, (void*)this);
if (0 != ret)
{
std::cout << "创建接收消息线程失败!" << std::endl;
return Ret_failed;
}

// 等待线程结束
void *result;
if (0 != pthread_join(heartBeatP, &result))
{
perror("等待 发送心跳 线程失败");
return Ret_failed;
}
if (0 != pthread_join(sendP, &result))
{
perror("等待 发送消息 线程失败");
return Ret_failed;
}
if (0 != pthread_join(recvP, &result))
{
perror("等待 接收消息 线程失败");
return Ret_failed;
}
return Ret_success;
}



#pragma mark -- 格式化发送数据
int TCPClient::SendDataFormat(char *buf, unsigned int bufLen, unsigned int cmd, const char userId[])
{
if(NULL == buf)
{
return -1;
}
size_t uIdLen = (NULL == userId ? 0 : strlen(userId));
if(bufLen < (sizeof(MsgReq) + uIdLen))
{
return -2;
}
memset(buf, 0, bufLen);
MsgReq *msgReq = (MsgReq *)buf;
memcpy(&msgReq->magic, MAGIC, 4);
msgReq->cmd = htonl(cmd);
if(userId != NULL)
{
memcpy(msgReq->userId, userId, uIdLen);
}
return (int)(uIdLen + sizeof(MsgReq));
}


#pragma mark -- 数据解析
int TCPClient::RecvDataParse(char *buf, unsigned int msgLen, unsigned int *cmd, unsigned int *status, char *userId)
{
if(NULL == buf)
{
return -1;
}
if(msgLen < (sizeof(MsgResp)))
{
return -2;
}

MsgResp *resp = (MsgResp *)buf;

if (0 != memcmp(MAGIC, &resp->magic, 4))
{
return -3;
}

if(NULL != cmd)
{
*cmd = ntohl(resp->cmd);
}

if(NULL != status)
{
*status = ntohl(resp->status);
}

if(NULL != userId && msgLen > sizeof(MsgResp))
{
memcpy(userId, resp->userId, msgLen - sizeof(MsgResp));
}
return 0;
}



#pragma mark -- 心跳发送
void* SendHeartBeat(void* arg)
{
std::cout << "发送心跳线程启动.\n";
TCPClient* client = (TCPClient*)arg;
MsgHead *head = (MsgHead *)malloc(BUFF_SIZE);
static int count = 0;
std::string msgStr;
size_t msgLen;
while(1)
{
count++;
memset(head, 0, BUFF_SIZE);
std::stringstream msgSS;
msgSS << "客户端心跳:"<< count;
msgStr = msgSS.str();
msgLen = msgStr.length();

head->msgType = Msg_heart;
head->msgLen = (unsigned int)msgLen;
memcpy(head->data, msgStr.c_str(), msgLen);

ssize_t sendLen = send(client->clientSockfd, head, sizeof(head) + msgLen, 0);
if (0 > sendLen) // 出错
{
std::cout << "发送心跳出错!" << std::endl;
free(head);
exit(1);
}
else if (0 == sendLen) // 连接已断开
{
std::cout << "连接已断开,发送心跳失败!" << std::endl;
free(head);
exit(1);
}
else // 发送成功
{
// std::cout << "发送心跳成功!" << std::endl;
}
sleep(HB_INTERVAL);
}

return NULL;
}


#pragma mark -- 发送消息
void* SendMsg(void *arg)
{
std::cout << "发送消息线程启动!" << std::endl;
TCPClient *client = (TCPClient *)arg;

char sendBuf[BUFF_SIZE];
MsgHead *reqMsgH = (MsgHead *)sendBuf;
unsigned int reqCmd;
size_t headLen = sizeof(MsgHead);
while(1)
{
std::cout << "请输入操作:" << std::endl;
std::cout << "【0】.退出\t" << "【1】.用户状态\t" << "【3】.唤醒用户\t" << "【5】.挂起用户" << std::endl;

memset(reqMsgH, 0, BUFF_SIZE);

std::cin >> reqCmd;

reqMsgH->msgType = Msg_other;

if (0 == reqCmd)
{
std::string msgStr = "quit";
reqMsgH->msgLen = (unsigned int)msgStr.length();
memcpy(reqMsgH->data, msgStr.c_str(), reqMsgH->msgLen);
}
else
{
reqMsgH->msgLen = client->SendDataFormat(reqMsgH->data, (unsigned int)(BUFF_SIZE - headLen), reqCmd, USER_ID);
}

ssize_t sendLen = send(client->clientSockfd, reqMsgH, reqMsgH->msgLen + headLen, 0);

if (0 > sendLen) // 出错
{
std::cout << "发送消息出错!" << std::endl;
exit(1);
}
else if (0 == sendLen) // 连接已断开
{
std::cout << "发送消息失败,连接已断开!" << std::endl;
exit(1);
}
else // 发送成功!
{
std::cout << "成功发送字节数:" << sendLen << std::endl;
}
}
return NULL;
}


#pragma mark -- 接收消息
void* RecvMsg(void *arg)
{
std::cout << "接收消息线程启动!" << std::endl;
TCPClient *client = (TCPClient *)arg;
char recvBuf[BUFF_SIZE];
ssize_t recvLen;

unsigned int cmd = 0;
unsigned int status = 0;
char userId[64];

while (1)
{
memset(recvBuf, 0, BUFF_SIZE);
recvLen = recv(client->clientSockfd, recvBuf, BUFF_SIZE, 0);
if (0 > recvLen) // 出错
{
std::cout << "接收消息出错!" << std::endl;
exit(1);
}
else if (0 == recvLen) // 连接已断开
{
std::cout << "接收消息失败,连接已断开!" << std::endl;
exit(1);
}
else // 接收成功
{
MsgHead *msgHead = (MsgHead*)recvBuf;
if(Msg_heart == msgHead->msgType) // 心跳消息
{

}
else
{
memset(userId, 0, 64);

if (0 == client->RecvDataParse(msgHead->data, msgHead->msgLen, &cmd, &status, userId))
{
std::cout << "接收到服务端消息:cmd:" << cmd << "\tstatus:" << status << "\tuserId:" << userId << std::endl;
}
else
{
std::cout << "接收到服务端消息:" << msgHead->data << std::endl;
if (0 == strcmp(msgHead->data, "quit")) // 遇到 “quit” 则退出
{
close(client->clientSockfd);
exit(1);
}
}
}
}
}
return NULL;
}

client.main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include "TCPClient.hpp"


#define DEMO_PORT 5578
#define DEMO_IP "127.0.0.1"


int main()
{
TCPClient client(DEMO_IP, DEMO_PORT);
if (Ret_success != client.Connect())
{
exit(1);
}
if (Ret_success != client.Run())
{
exit(1);
}

return 0;
}

Demo下载