主要参考UNIX网络编程卷1: 套接字联网API(第三版)

套接字对

发送端的IP地址和端口号与目的端的IP地址和端口号,组成一个套接字对,对应唯一的一个TCP链接

image-20211118171739791

套接字地址结构

套接字地址结构

大多数套接字函数都需要一个指向套接字地址结构的指针作为参数。

每个协议族都定义它的套接字地址结构。这些结构的名字均以sockaddr_开头,并以对应每个协议族的唯一后缀结尾,如IPv4套接字地址结构struct sockaddr_in,IPv6套接字地址结构struct sockaddr_in6

IPV4 sockaddr_in

通常也被称为”网际套接字地址结构”,它以sockaddr_in命名,定义在头文件中。

image-20211118172548113

image-20211118172732549

传递套接字地址结构的函数

从进程传到内核

bind,connect,sendto

image-20211118173036348

从内核传到进程

accept,recvfrom,getsockname,getpeername

image-20211118173043752

字节排序函数

网络字节序(network byte order)是大端字节序

大端小端字节序

  同时参考了博客大端(Big Endian)/小端(Little Endian)字节序 - Folm - 博客园 (cnblogs.com)。文章写得好,背景也养眼。

  术语大端小端表示多个字节值(如int, long)的哪一端字节存储在虚拟主存中该值的起始地址。其中MSB表示most significant bit最高位,LSB表示least significant bit最低位。

  注意!字节序是以字节为单位的,不要考虑单个字节的顺序。

image-20211118173256919

  看一下虚拟机的字节序:程序中令int a=127,64位系统中,int是4字节。gdb二进制查看内存,4表示看4个字节,t表示以二进制显示,b表示间隔1字节。我们发现内存的低位存放低位,是小端字节序。对于一个字节的8位来讲,并不需要我们去考虑实际存放的顺序。

image-20211119164701545

  而网络字节序是大端字节序。int a=127按照主机的小端字节序,一个4字节大小的值在内存中(地址由低到高)依次为0x 8f 00 00 00。在TCP,IP协议看来,高位放在起始地址,这是127左移24位的大小。

  为了TCP,IP能正确认出我们填写的服务器ip地址和端口地址,需要将按主机字节序存放的多字节数值,转换成按网络字节序存放的多字节数值。

 然而对于网络间用write,read传递的值,强制转换成了const void* buf,所以全是单字节值,不需要考虑字节序。

主机字节序和网络字节序的转换函数

其中h表示host,n表示network,s表示short,l表示long

1
2
3
4
5
#include <netinet/in.h>
uint16_t htons(uint16_t host16bitvalue);
uint32_t htonl(uint32_t host32bitvalue);
uint16_t ntohs(uint16_t net16bitvalue);
uint32_t ntohl(uint32_t net32bitvalue);

字节操纵函数

1
2
3
4
#include <strings.h>
void bzero(void* dest, size_t nbytes);
void bcopy(const void* src, void* dest, size_t nbytes);
int bcmp(const void *ptr1, const void* ptr2, size_t nbytes);

bzero把目标字节串中的指定数目的字节置为0;

bcopy把指定数目的字节从源字节串移到目标字节串;

bcmp比较两个任意字节串,相同返回0,不同返回非0.

IPV4的地址转换函数

在ASCII字符串(点分十进制数串)与网络字节序的二进制值之间转换网际地址

1
2
int inet_aton(const char* strptr, struct in_addr *addrptr);
char* inet_ntoa(struct in_addr inaddr);

inet_aton将strptr所指C字符串转换成一个32位的网络字节序二进制值,并通过指针addrptr来存储,成功返回1,否则返回0。

inet_ntoa将一个32位网络字节序的二进制IPv4地址转换成相应的点分十进制数串。

基本套接字函数

image-20211118175340842

socket

指定期望的通信协议类型,获得一个套接字描述符

1
2
3
#include <sys/socket.h>
int socket(int family, int type, int protocol);
//若成功则为非负描述符,若出错则为-1

bind

把一个本地协议地址(IP地址和端口号)赋予一个套接字描述符

1
2
3
#include <sys/socket.h>
int bind(int sockfd, const struct sockaddr *myaddr, socklen_t addrlen);
//成功返回0,失败返回-1

connect

TCP客户建立与TCP服务器的连接

image-20211130105238429

  1. 服务器必须准备好接受外来的连接。这通常通过调用socket、bind和listen这3个函数来完成,我们称之为被动打开(passive open)。
  2. 客户通过调用connect发起主动打开(active open)。这导致客户TCP发送一个SYN(同步)分节,它告诉服务器客户将在(待建立的)连接中发送的数据的初始序列号。通常SYN分节不携带数据,其所在的IP数据报只含有一个IP首部、一个TCP首部及可能有的TCP选项。
  3. 服务器必须确认(ACK)客户的SYN,同时自己也得发送一个SYN分节,它还有服务器将在同一连接中发送的数据的初始序列号。服务器在单个分节中发送SYN和对客户SYN的ACK(确认)。
  4. 客户必须确认服务器的SYN。
1
2
3
#include <sys/socket.h>
int connect(int sockfd, const struct sockaddr* servaddr, socklen_t addrlen);
//成功返回0,出错返回-1

激发三路握手,仅在连接建立成功或出错时才返回:SYN分节无响应,返回ETIMEDOUT错误;响应RST,返回ECONNREFUSED错误;ICMP不可达错误报文,返回EHOSTUNREACH或ENETUNREACH错误。

若connect失败则套接字不可再用,必须关闭并重新调用socket

close

关闭一个套接字描述符

image-20211130110745392

  1. 某个应用进程首先调用close,我们称该端执行主动关闭(active close)。该端的TCP于是发送一个FIN分节,表示数据发送完毕。
  2. 接收到这个FIN的对端执行被动关闭(passive close)。这个FIN由TCP确认。它的接收也作为一个文件结束符(end-of-file)传递给接收端应用进程(放在已排队等候该应用进程接收的任何其他数据之后),因为FIN的接收意味着接收端应用进程在相应连接上再无额外数据可接收。
  3. 一段时间后,接收到这个文件结束符的应用进程将调用close关闭它的套接字。这导致它的TCP也发送一个FIN。
  4. 接收这个最终FIN的原发送端TCP(即执行主动关闭的那一端)确认这个FIN。
1
2
3
#include <unistd.h>
int close(int sockfd);
//成功返回0,出错返回-1

listen

由TCP服务器调用,做两件事

1
2
3
#include <sys/socket.h>
int listen(int sockfd, int backlog);
//成功返回0,失败返回-1
  • listen函数指示内核应该接收指向该套接字描述符的连接请求。
  • backlog参数规定了内核应该为该套接字描述符排队的最大连接个数。

监听套接字描述符的两个队列:

image-20211118180647106

image-20211118180652448

accept

由TCP服务器调用,用于从已完成连接队列队头取一个已完成连接。

1
2
#include <sys/socket.h>
int accept(int sockfd, struct sockaddr* cliaddr, socklen_t* addrlen);

内核为每个由服务器进程接受的客户连接创建一个已连接描述符。accept返回已连接描述符,填写客户的套接字地址。

fork和exec

fork

fork函数是Unix中派生进程的唯一方法

1
2
3
#include <unistd.h>
pid_t fork(void);
//返回:在子进程中为0,在父进程中为子进程ID,若出错则为-1

子进程分享父进程调用fork之前打开的所有描述符,网络服务器利用这个特性让子进程分享已连接套接字,而父进程则关闭这个已连接套接字,子进程则关闭监听套接字。

exec

在fork创建新进程后可以调用exec把自身替换成新的程序

典型的并发服务器程序轮廓

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pid_t pid;
int listenfd, connfd;
listenfd = socket( ... ); //fill in sockaddr_in{} with server's well-known port
listen(listenfd, listenq);
for ( ; ; ){
connfd = accept(listenfd, ... ); //probably blocks
if ( (pid = fork()) == 0 ){
close(listenfd); //child closes listening socket file descriptor
doit(connfd); //process the request
close(connfd); //done with this client
return 0; //child terminates
}
close(connfd); //parent closes connected socket fd, and continue listen
}

getsockname和getpeername

1
2
3
#include <sys/socket.h>
int getsockname(int sockfd, struct sockaddr* lockaddr, socklen_t* addrlen);
int getpeername(int sockfd, struct sockaddr* peeraddr, socklen_t* addrlen);

getsockname返回本地协议地址

在一个没有调用bind的TCP客户上,connect成功返回后,getsockname用于返回由内核赋予该连接的本地IP地址和本地端口号。

在以端口号0调用bind(告知内核去选择本地端口号)后,getsockname用于返回由内核赋予的本地端口号

getpeername返回外地协议地址

getpeername返回外地协议地址

服务器调用过accept后再调用fork时,外地套接字地址结构丢失,只能知道已连接套接字描述符,使用getpeername获取客户的套接字地址结构(IP地址和端口号)。

简易Remote Procedure Call

阻塞式/非阻塞式/复用 读写,多进程,多线程的细节将在各个版本迭代中呈现。由于数据都很小,所以read,write一次就行。

需求

编写一个简易RPC框架:YA-RPC,支持 At-least-once 语义。

使用YA-RPC编写一个demo程序,实现如下API:

1)远程调用 float sum(float a, float b)

2)远程调用 string uppercase(str)

3)不少于2个客户端,1个服务端

版本1 挨个服务,无读写,客户使用本地函数

YA_RPC框架由YA_PRC.h和YA_RPC.c构成

YA_RPC.h包含所有需要的库文件和服务器参数。

希望达到的效果

client.c,创建一个RPC_Client类的对象cli,调用cli提供的sum和uppercase方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include "YA_RPC.h"

int main()
{
RPC_Client cli;

float a = 1.1;
float b = 2.2 ;
float c = cli.sum(a, b);
cout<<"c="<<c<<endl;

string s1 = "it is a test";
string s2 = cli.uppercase(s1);
cout<<"s2="<<s2<<endl;

return 0;
}

server.c,创建一个RPC_Server类的对象srv,执行srv提供的Do方法

1
2
3
4
5
6
7
#include "YA_RPC.h"

int main(){
RPC_Server srv;
srv.Do();
return 0;
}

YA_RPC实现

YA_RPC.h包含了所有库和宏,客户类和服务器类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#ifndef YA_RPC
#define YA_RPC

#include <iostream>
#include <string>
#include <algorithm>
#include <netinet/in.h> //sockaddr
#include <strings.h> //bzero
#include <arpa/inet.h> //inet_aton
#include <sys/socket.h> //socket
#include <unistd.h> //close
#include <string.h> //strerror

#define SERVADDR "192.168.1.103"
#define SERVPORT 9999
#define LISTENQ 20

using namespace std;

class RPC_Client
{
public:
RPC_Client();
~RPC_Client();
public:
float sum(float a, float b);
string uppercase(string s1);
private:
struct sockaddr_in server_addr; //server's sockaddr
struct sockaddr_in client_addr; //client's sockaddr
socklen_t clisocklen = sizeof(client_addr); //clent's sockaddr length
int sockfd;
};

class RPC_Server
{
public:
RPC_Server();
~RPC_Server();
public:
void Do();
private:
struct sockaddr_in server_addr; //server's sockaddr
struct sockaddr_in client_addr; //client's sockaddr
socklen_t clisocklen = sizeof(client_addr); //clent's sockaddr length
int listenfd, connfd;
};

#endif

YA_RPC.c给出了客户类和服务器类的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include "YA_RPC.h"

//Client
RPC_Client::RPC_Client()
{
// user fill in server's information (struct sockaddr_in)
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVPORT);
inet_aton(SERVADDR, &(server_addr.sin_addr));

//create sockfd
int sockfd;
if( (sockfd=socket(AF_INET, SOCK_STREAM, 0)) == -1 )
{
cout<<"Socket Error:"<< strerror(errno)<<endl;
}

//connect
connect(sockfd, (struct sockaddr*)(&server_addr), sizeof(struct sockaddr));
//print client's sockaddr information
getsockname(sockfd, (struct sockaddr*)&client_addr, &clisocklen);
cout<<"connect success! "<<"server's ip: "<<inet_ntoa(client_addr.sin_addr)<<" port: "<<ntohs(client_addr.sin_port)<<endl;
}

RPC_Client::~RPC_Client()
{
close(sockfd);
}

float RPC_Client::sum(float a, float b)
{
float c = a+b;
return c;
}

string RPC_Client::uppercase(string s1)
{
transform(s1.begin(), s1.end(), s1.begin(), ::toupper);
return s1;
}

//Server
RPC_Server::RPC_Server()
{
//create a socket
listenfd = socket(AF_INET, SOCK_STREAM, 0);

//fill in server's sockaddr information
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
inet_aton(SERVADDR,&(server_addr.sin_addr)); //ip
server_addr.sin_port = htons(SERVPORT); //port

//bind server's sockaddr with socket
bind(listenfd,(sockaddr *)&server_addr, sizeof(server_addr));

//change the socket as listen socket
listen(listenfd, LISTENQ); //LISTENQ sets number of 2 listen array (connecting and conneted);
}

void RPC_Server::Do()
{
//accept and do
for( ; ; ){
connfd = accept(listenfd,(sockaddr*)&client_addr, &clisocklen);
cout<<"client's ip: "<<inet_ntoa(client_addr.sin_addr)<<" port: "<<ntohs(client_addr.sin_port)<<endl;
//here we wait user to close the tcp link
}
}

RPC_Server::~RPC_Server()
{
close(listenfd);
}

版本2 挨个服务,阻塞式读写,客户接收服务器的结果

修改YA_RPC.c中客户类的sum和uppercase方法,修改服务器类的Do方法,新增sum和uppercase方法。

后面的版本客户都是接收服务器返回的结果了。

套接字的发送缓冲区

  每一个socket都有一个发送缓冲区,当某个应用进程写数据到一个套接字中时,内核从该应用进程的缓冲区(这里用虚拟主存感觉比缓冲区要好)中复制所有数据到所写套接字的发送缓冲区。如果该套接字的发送缓冲区容不下该应用进程的所有数据(或是应用进程的缓冲区大于套接字的发送缓冲区,或是套接字的发送缓冲区中已有其他数据),该应用程序将被投入睡眠。

  可以用SO_SNDBUF套接字选项来更改缓冲区大小。MSS (Maximum Segment Szie)最大报文大小是网络层报文的最大字节数,MTU(Maximum Ttransmission Unit)最大传输单位是链路层帧的最大字节数。

image-20211119142148537

  这一端的TCP提取套接字发送缓冲区中的数据把它发送给对端TCP。对端TCP必须确认收到的数据,伴随来自对打的ACK不断到达,本段TCP才能从套接字发送缓冲区丢弃已确认的数据。TCP必须为已发送的数据保留一个副本,直到它被对端确认为止。

  每个套接字一个的发送缓冲区中的数据经过TCP,IP协议的处理,变成一个一个的分组传到数据链路的输出队列,如果队列已满,新到的分组将被丢弃,并沿着协议栈向上返回一个错误。TCP将注意到这个错误,并在以后某个时刻重传相应的分节。应用进程并不知道这种暂时的情况。

套接字的接收缓冲区

  相应的,每一个套接字除了有一个发送缓冲区,还有一个接收缓冲区。对端发来的数据会被缓存入套接字的接收缓冲区,应用进程一直没有读取的话,此数据会一直缓存在相应socket的接收缓冲区内。

阻塞式写接收缓冲区

  write不一定写入你希望它写的字节数,因为发送缓冲区可能不够,成功时它返回实际写入的字节数;

  如果失败返回-1,错误码放入errno(在errno.h中定义)中;具体错误和对应的errno如下:

  • 阻塞时被中断,进程被信号唤醒,由内核态返回用户态。(The call was interrupted by a signal before any data was written),errno为EINTR

  缓冲区满时阻塞。

1
2
3
4
5
#include <unistd.h>

ssize_t write(int fd, const void *buf, size_t count);

如果顺利write()会返回实际写入的字节数。当有错误发生时则返回-1,错误代码存入errno中。

  为了尽可能一次写n个字节,构造writen函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <unistd.h>

ssize_t writen(int fd, void* vptr, size_t n)
{
size_t nleft; //left bytes to write
ssize_t nwritten; //bytes have written
const char* ptr;

ptr = vptr;
nleft = n;
while (nleft>0){
if( ( nwritten = write(fd, ptr, nleft) ) <= 0 ){
if(nwritten<0 && errno==EINTR)
nwritten = 0;
else
return -1;
}
nleft -= nwritten;
ptr += nwritten;
}
return n;
}

阻塞式读接收缓冲区

  read不一定读取你希望它读的字节数,因为数据可能没有全部赋值到接收缓冲区中,成功时它返回实际读取的字节数;

  如果失败返回-1并设置errno;

  接收缓冲区空时阻塞。

  如果返回0,表示链接关闭。

1
2
3
4
5
#include <unistd.h>

ssize_t read(int fd, void *buf, size_t count);

成功返回读取的字节数,出错返回-1并设置errno,如果在调read之前已到达文件末尾,则这次read返回0

  为了尽可能读n个字节,构造readn函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <unistd.h>

ssize_t readn(int fd, void* vptr, size_t n)
{
size_t nleft; //left bytes to read
ssize_t nread; //bytes have read
const char* ptr;

ptr = vptr;
nleft = n;
while (nleft>0){
if( ( nread = read(fd, ptr, nleft) ) < 0 ){
if(errno==EINTR)
nread = 0;
else
return -1;
} else if (nread == 0)
break; /*EOF*/
nleft -= nread;
ptr += nread;
}
return (n-nleft);
}

  从一个套接字接收缓冲区中读文本行,一次1字节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <unistd.h>

ssize_t readline(int fd, void* vptr, size_t maxlen)
{
ssize_t n, readnum;
char c, *ptr;

ptr = vptr;
for (n=1; n<maxlen; n++){
again:
if( (readnum = read(fd, &c, 1)) == 1 ){
*ptr++ = c; /* equals ptr=c ptr+=1 */
if (c == '\n')
break; /*new line is stored*/
}else if (readnum == 0){
*ptr = 0;
return (n-1); /* EOF, n-1 bytes were read*/
}else{
if (errno == EINTR)
goto again;
else
return -1; /* error */
}
}
*ptr = 0; /* null terminate */
return n;
}

  每读一字节就调用一次系统的read函数,PAINFULLY SLOW!(极端痛苦的慢)。我们可以一次读很多字节到应用进程的缓存,再挨个读缓存中的字节,就这样改写readline。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <unistd.h>

static int read_cnt; //bytes in readbuf
static char* read_ptr; //address of readbuf
static char readbuf[MAXLINE];

static ssize_t my_read(int fd, char* ptr)
{
if (read_cnt <= 0){
again:
if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0 ){ //try to read read_buf bytes
if(errno == EINTR) /* interrupt */
goto again;
}else if(read_cnt == 0) /* EOF */
return 0;
read_ptr = read_buf;
}

read_cnt--;
*ptr = *read_ptr++; //return char to c and readptr+=1
return 1;
}

ssize_t readline(int fd, void* vptr, size_t maxlen)
{
ssize_t n, readnum;
char c, *ptr;

ptr = vptr;
for (n=1; n<maxlen; n++){
again:
if( (readnum = my_read(fd, &c)) == 1 ){
*ptr++ = c; /* equals ptr=c ptr+=1 */
if (c == '\n')
break; /*new line is stored*/
}else if (readnum == 0){
*ptr = 0;
return (n-1); /* EOF, n-1 bytes were read*/
}else{
return -1; /* error */
}
}
*ptr = 0; /* null terminate */
return n;
}

ssize_t readlinebuf(void** vptrptr)
{
if (read_cnt)
*vptrptr = read_ptr;
return read_cnt;
}

  my_read每次最多读MAXLINE个字符,然后每次返回一个字符。readlinebuf函数可以展示应用进程虚拟主存中的缓冲区是什么状态(还剩多少字符,剩余字符的起始位置),便于调用者查看当前文本行之后是否收到了新的数据。

改写版本1至版本2时,出现的bug

1. sockfd没用类的成员变量,用了局部变量

  DEBUG半天,发现自己在客户类的构造函数中用了局部变量sockfd,而没用类的成员变量sockfd,导致退出构造函数后,sockfd的值3丢失,变成了1。我还纳闷为什么我write东西给服务器,本地却显示一些奇怪的字符呢。原来套接字描述符变了。我在茫然地debug2个小时候,gdb打印sockfd发现是0。实在是印象深刻。

image-20211119194223391

image-20211119194311326

2. char* s=”it is a test”是常量,地址不可访问,故不可修改

  用char s[] = “it is a test”就可以了。或者在客户端的uppercase函数里再申请一个新数组,就不麻烦用户了。

版本2的具体实现

RPC_Server::Do监听客户想调用什么函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void RPC_Server::Do()
{
char require; // 1-sum; 2-uppercase
//accept and do
for( ; ; ){
connfd = accept(listenfd,(sockaddr*)&client_addr, &clisocklen);
cout<<"client's ip: "<<inet_ntoa(client_addr.sin_addr)<<" port: "<<ntohs(client_addr.sin_port)<<endl;

for( ; ; ){//read the require
ssize_t r;
if ( ( r = read(connfd, &require, sizeof(char))) <= 0 ){
break; //link terminate
}else{
if( require == '1'){
this->sum();
}else if( require == '2'){
this->uppercase();
}else{
cout<<"require error!";
}
}
}
//wait user close the tcp link
}
}

服务器和客户端的sum

1
2
3
4
5
6
7
8
void RPC_Server::sum()
{
float a, b;
read(connfd, &a, sizeof(float));
read(connfd, &b, sizeof(float));
float c = a+b;
write(connfd, &c, sizeof(float));
}
1
2
3
4
5
6
7
8
9
10
float RPC_Client::sum(float a, float b)
{
char require = '1';
float c;
write(sockfd, &require, sizeof(char));
write(sockfd, &a, sizeof(float));
write(sockfd, &b, sizeof(float));
read(sockfd, &c, sizeof(float));
return c;
}

服务器和客户端的uppercase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void RPC_Server::uppercase()
{
int size;
read(connfd, &size, sizeof(int));

char* s = (char *)malloc(size); //require mem
read(connfd, s, size);

for(int i=0; i<size-1; i++){
if(s[i]>='a' && s[i]<='z')
s[i] += ('A'-'a');
}

write(connfd, s, size);
}
1
2
3
4
5
6
7
8
9
10
11
12
char* RPC_Client::uppercase(char* s)
{
char require = '2';
write(sockfd, &require, sizeof(char));
int size = strlen(s) + 1; //contain '\0'
write(sockfd, &size, sizeof(int));
write(sockfd, s, size);
char* s1 = (char *)malloc(size);
read(sockfd, s1, size);

return s1;
}

版本3 多进程服务,阻塞式读写

把要写的内容放在一起

  客户类的sum函数先后调用系统write三次发送请求标志和两个参数,可以把它们放到一个结构体中中,仅调用一次write。服务器类在Do函数中读一次请求标志选择执行的函数,在sum函数中读出2个剩余的float。由于uppercase函数传递长度未知的字符串,所以仍然需要先读size,再在堆上申请一块空间来存放字符串。

1
2
3
4
5
struct SumData{
char require;
float a;
float b;
};
1
2
3
4
5
6
7
8
9
10
11
12
float RPC_Client::sum(float a, float b)
{
struct SumData sumdata;
sumdata.require = '1';
sumdata.a = a;
sumdata.b = b;

float c;
write(sockfd, &sumdata, sizeof(sumdata));
read(sockfd, &c, sizeof(float));
return c;
}
1
2
3
4
5
6
7
8
void RPC_Server::sum()
{
struct SumData sumdata;
struct SumData* p = &sumdata;
(connfd, (char*)p+1, sizeof(sumdata)-1); //char require has read by Do()
float c = sumdata.a+sumdata.b;
write(connfd, &c, sizeof(float));
}

fork

服务器改写Do函数:子进程分享父进程调用fork之前打开的所有描述符,网络服务器利用这个特性让子进程分享已连接套接字,而父进程则关闭这个已连接套接字,子进程则关闭监听套接字。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void RPC_Server::Do()
{
pid_t childpid;
char require; // 1-sum; 2-uppercase
//accept and do
for( ; ; ){
connfd = accept(listenfd,(sockaddr*)&client_addr, &clisocklen);
if( (childpid = fork()) == 0 ){
close(listenfd); /* child closes listen socketfd */
//read the require
for( ; ; ){
if ( ( read(connfd, &require, sizeof(char))) <= 0 ){
break; //link terminate
}else{
if( require == '1'){
this->sum();
}else if( require == '2'){
this->uppercase();
}else{
cout<<"require error!";
}
}
}
return;
}
close(connfd); /* parent closes connected socketfd */
}
}

客户端也改成多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include "YA_RPC.h"

int main()
{
for(int i=0; i<=100; i++)
{
cout<<"int process "<<i<<" :"<<endl;
pid_t childpid;
if( (childpid = fork()) == 0 )
{
RPC_Client cli;

float a = 1.1;
float b = 2.2 ;
float c = cli.sum(a, b);
cout<<"c="<<c<<endl;

char* s1 = "it is a test";
char* s2 = cli.uppercase(s1);
cout<<"s2="<<s2<<endl;

return 0;
}
}
return 1;
}

版本4 多线程服务,阻塞式读写

进程与线程

fork是昂贵的,线程的创建比进程快10-100倍,线程间通信也更容易。

线程共享:全局变量,进程指令,大多数数据,打开的文件(即描述符),信号处理函数和信号处置,当前工作目录,用户ID和组ID。

线程独有:线程ID,寄存器集合包括程序计数器和栈指针,栈(用于存放局部变量和返回地址),errno,信号掩码,优先级。

现在在主线程中accept已连接套接字,创建线程服务该已连接套接字

线程的创建和终止

pthread_create创建一个线程

1
2
#include <pthread.h>
int pthread_create(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void *), void *arg);
  • pthread_t *tid:一个进程内的每个线程都由线程ID标识,数据类型是pthread_t(往往是unsigned int)。如果新的线程成功创建,其ID就通过tid指针返回。
  • pthread_attr_t *attr:每个线程有许多属性:优先级、初始栈大小、是否该成为一个守护线程,等等。可以在创建线程时初始化一个取代默认设置的pthread_attr_t变量指定这些属性。空指针即采纳默认设置
  • void (func)(void ),void arg:func所指函数作为参数接受一个通用指针(void ),又作为返回值返回一个通用指针(void )。该函数的唯一调用参数是指针arg,如果我们需要给该函数传递多个参数,我们就得把它们打包成一个结构。
  • 返回值:成功返回0,出错返回某个非0值,与套接字函数及大多数系统调用出错时返回-1并设置errno为某个正直的做法不同的是:Pthread函数出错时作为函数返回值返回正值错误指示。Pthread函数不设置errno

pthread_join等待某个线程终止

1
2
#include <pthread.h>
int pthread_join(pthread_t *tid, void **status);

如果status指针非空,来自所等待线程的返回值(一个指向某个对象的指针)将存入由status指向的位置。

pthread_self获取自身线程ID

1
2
#include <pthread.h>
pthread_t pthread_self(void)

pthread_detach将指定线程变成脱离状态

1
2
#include <pthread.h>
int pthread_detach(pthread_t tid);

一个线程是可汇合的(joinable,默认值),或者是脱离的(detached)。当一个可汇合的线程终止时,它的线程ID和退出状态将留存到另一个线程对它调用pthread_join。脱离的线程却像守护进程,当它们终止时,所有相关资源都被释放。本函数通常由想让自己脱离的线程调用,就如该语句pthread_detach(pthread_self());

pthread_exit退出线程

1
2
#include <pthread.h>
void pthread_exit(void *status);

如果线程未曾脱离,它的线程ID和退出状态将一直留存到调用进程内的某个其他线程对它调用pthread_join。指针status不能指向局部于调用线程的对象,因为线程终止时这样的对象也消失。

实现

客户类没有变动,服务器类发生改变

Do函数现在为每一个accept的已连接套接字创建一个线程,传递套接字去执行doit函数。

考虑到线程切换引起的内核态和用户态的切换,可能导致系统调用中断,所以用版本2中提到的相对可靠的readn和writen取代read和write。

1
2
3
4
5
6
7
8
9
10
11
void RPC_Server::Do()
{
//accept and create thread to handle
for( ; ; ){
connfd = accept(listenfd,(sockaddr*)&client_addr, &clisocklen);
cout<<"client's ip: "<<inet_ntoa(client_addr.sin_addr)<<" port: "<<ntohs(client_addr.sin_port)<<endl;
pthread_t tid;
cout<<"new connfd is "<<connfd<<endl;
pthread_create(&tid, NULL, &doit, (void*) connfd);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void* RPC_Server::doit(void* connfd)
{
int tconnfd = (int)connfd;
pthread_detach(pthread_self());
ssize_t r;
char require;
for( ; ; )
{
if ( ( r = readn(tconnfd, &require, sizeof(char))) <= 0 ){
break;
}else{
cout<<pthread_self()<<" doit connfd "<<tconnfd<<" r="<<r<<",require="<<require<<endl;
if( require == '1'){
sum(tconnfd);
}else if( require == '2'){
uppercase(tconnfd);
}else{
cout<<pthread_self()<<" doit connfd "<<tconnfd<<" require error!";
}
}
}
close(tconnfd); /* done with connected socket */
return NULL;
}

将服务器类的sum和uppercase成员函数改成static函数,因为只传递了connfd给线程(原因见下面的错误3),为了让线程在没有服务器类对象的this指针的情况下,可以调用该对象的方法,才将sum和uppercase成员函数改成static函数。

犯的错误

  1. 在服务器每个线程执行的doit函数中,没有for循环监听客户的require,导致服务完一个函数sum后,链接关闭,不再服务uppercase
  2. 每个线程都根据客户的require字节调用sum或uppercase函数处理之后客户传来的参数,在sum和uppercase中用的是成员变量connfd,多进程中,每个进程都有一个不同的connfd并没有问题,然而多线程中,线程共用一个connfd,就会出错,需要给sum和uppercase传递已连接套接字描述符作为参数。
  3. 仅仅观察三个线程,发现第三个线程和第二个线程的文件描述符相同。原因是我传参给线程的时候,传递了地址,因为void*只能传8字节的数据或者地址。而我想传递connfd和对象指针,传递connfd是为了区分链接,传递对象指针是为了调用sum和uppercase,看来只能用键或者不传递this指针了,我选择不传this。这个错误最为致命,我打印了所有的线程id和connfd才看出问题来。

版本5 线程池服务,阻塞式读写

网络编程卷1:联网套接字API中,第三十章客户/服务器程序设计范式中给出的图:

image-20211129221657131

每个线程各自accept取listen套接字维护的的队列中的已连接套接字,进行服务,不停重复这一过程。或者由主线程统一accept再分配给线程进行服务。

前者比后者更快,所以让每个线程各自accept。每个线程中用互斥锁保护一个accept函数,因为如果使用无锁版本,可能同时有多个accept函数阻塞,当listenfd监听套接字的已连接套接字队列中有已连接套接字时,还要内核花时间决定分给哪个accept,花费一些用户时间来决定哪个accept接收已连接套接字将节省更多内核时间。

实现

在YA_PRC.h文件中增加NTHREADS宏为线程数量,线程数组(线程池)的组成结构,其中包含线程ID和已服务链接的计数器,在YA_RPC.h中的Server类中加一个线程数组指针和互斥锁。

1
2
3
4
5
6
7
8
9
#define NTHREADS 20

typedef struct {
pthread_t thread_tid; /* thread ID */
long thread_count; /* # connections handled */
} Thread;

Thread *tptr;
pthread_mutex_t mlock;

在Do中初始化互斥锁,初始化NTHREADS个线程

1
2
3
4
5
6
7
8
9
10
pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER;

tptr = Calloc(NTHREADS, sizeof(Thread));

for (i=0; i<NTHREADS; i++)
{
pthread_create(&tptr[i].thread_tid, NULL, &doit, (void*)i);
}


每个线程执行的doit函数

1
2
3
4
pthread_mutex_lock(&mlock);
connfd = Accept(listennfd, cliaddr, &clilen)
pthread_mutex_unlock(&mlock);
tptr[(int) arg].thread_count++;

错误

error: invalid use of member ‘RPC_Server::listenfd’ in static member function。在doit静态函数中使用了成员变量,因为doit函数是void func (void)类型的函数,所以它没有对象指针this。这次我们没传connfd而是传递了Thread结构在线程数组中的位置,因为connfd线程自己accept,不用从主线程中传参进来了,而为了给每个线程的服务链接计数,要传递i,即Thread结构在线程池中的位置。

所以我先尝试把listenfd和tptr改成静态成员变量。由于静态的意图是只在一个源文件中生效,会导致链接器报undefend reference问题,所以直接改成全局变量。结果由于YA_RPC.c和server.c都应用了YA_RPC.h导致编译生成的两个.o文件都有全局变量,链接器报错multiple definition。所以我把listenfd,tptr和mlock都扔到YA_RPC.c中了。另一种解决办法是在.h中使用extern关键字声明全局变量,在YA_RPC.c中定义这些全局变量。

版本6 IO复用之epoll监听事件与非阻塞式读写

非阻塞套接字

1
2
3
4
5
6
7
8
9
//将套接字设置为非阻塞模式
void setnonblock(int fd)
{
int flags;
flags = fcntl(fd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}

套接字的默认状态是阻塞的。这就意味着当发出一个不能立即完成的套接字调用时,其进程将被投入睡眠,等待相应操作的完成。

输入操作

  • 包括read,readv,recv,recvfrom和recvmsg共5个函数。
  • 阻塞条件:该套接字阻塞,接收缓冲区中没有数据可读
  • 唤醒条件:TCP是字节流协议,只要有一些数据到达,这些数据既可能是单个字节,也可以是一个完整的TCP分节中的数据。如果想等到某个固定数目的数据可读为止,那么可以调用我们的readn函数,或指定MSG_WAITALL标志。
  • 非阻塞:如果输入操作不能被满足(对于TCP套接字即至少有一个字节的数据可读,对于UDP套接字即有一个完整的数据报可读),相应调用将立即返回一个EWOULDBLOCK错误。

输出操作

  • 包括write、writev、send、sendto和sendmsg共5个函数
  • 阻塞条件:如果其发送缓冲区中没有空间,进程将被投入睡眠,直到有空间为止。
  • 唤醒条件:发送缓冲区有空间
  • 非阻塞:如果其发送缓冲区中没有空间,输出函数调用将立即返回一个EWOULDBLOCK错误。如果其发送缓冲区中有一些空间,返回值将是能够复制到该缓冲区中的字节数(不足计数 short count)

接收外来链接

对于一个阻塞的套接字调用accept函数,并且尚无新的连接到达,accept调用将立即返回一个EWOULDBLOCK错误。

发起外出链接

connect函数一直要等到客户收到对于自己的SYN的ACK为止才返回。这意味着TCP的每个connect总会阻塞其调用进程至少一个服务器的RTT时间。

如果对于一个非阻塞的TCP套接字调用connect,并且连接不能立即建立,那么连接的建立能照样发起,不过会返回一个EINPROGRESS错误。(立即建立的连接通常发生在服务器和客户处于同一个主机的情况下)

epoll

可以监听建立连接、读、写事件。

epoll_create,创建一个epoll句柄

1
2
int epoll_create(int size);
//返回值为一个文件描述符

size为最大可监听的描述符个数

epoll_ctl,注册要监听的描述符和事件

1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
  • int epfd:是epoll_create()的返回值
  • int op:EPOLL_CTL_ADD,注册新的fd到epfd中;EPOLL_CTL_MOD,修改已经注册的fd的监听事件;EPOLL_CTL_DEL,从epfd中删除一个fd。
  • int fd:需要监听的描述符
  • struct epoll_event *event
1
2
3
4
5
6
7
8
9
10
11
typedef union epoll_data{
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
}epoll_data_t;

struct epoll_event{
__uint32_t events; /*Epoll events*/
epoll_data_t data; /*User data variable*/
};

events代表事件:EPOLLIN表示对应的文件描述符可读(包括对端socket正常关闭);EPOLLOUT表示对应的文件描述符可以写;EPOLLPRI表示对应的文件描述符有紧急的数据可读(带外数据);EPOLLERR表示对应的文件描述符发生错误;EPOLLHUP表示对应的文件描述符被挂断;EPOLLET将EPOLL设为边缘触发(Edge Triggered),处理完事件后才会再收到事件

epoll_wait

events里存储所有读写事件,函数返回需要处理的事件数目,如返回0表示已超时

1
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
  • int epfd:epfd为epoll_create()创建的描述符
  • struct epoll_event *events:从内核得到事件的集合
  • int maxevents:返回描述符事件的最大数目
  • int timeout:超时时间,0立刻返回,-1一直等下去

shutdown

epoll监听建立链接,读,写事件并分别处理,可能会出现读到EOF,链接关闭, 然而还有一些本该处理的写事件没处理完

close把描述符的引用计数减1,仅在该计数变为0时才关闭套接字。使用shutdown可以不管引用计数就激发TCP的正常连接终止序列。

close终止读和写两个方向的数据传递。shutdown可以关闭一半TCP链接。

1
2
#inlcude <sys/socket.h>
int shutdown(int sockfd, int howto);

该函数的行为依赖于howto参数的值。

  • SHUT_RD:关闭链接的读一半——套接字中不再有数据可接收,而且套接字接收缓冲区中的现有数据都被丢弃。进程不能再对这样的套接字调用任何读函数。对一个TCP套接字这样调用shutdown函数后,由该套接字接收的来自对端的任何数据都被确认,然后丢弃。
  • SHUT_WR:关闭链接的写一半——当前留在套接字发送缓冲区中的数据将被发送掉,后跟TCP的正常连接中止序列。进程不能再对这样的套接字调用任何写函数。

然而在本次实现中,客户端通过客户类,在构造函数中建立连接,完成所有任务后在析构函数中关闭连接,所以不用使用shutdown,只使用close就好。

实现

首先创建一个epoll实例

注册listenfd和EPOLLIN,监听建立链接的事件,如果有已建立的链接,服务器accept然后注册connfd和读事件到epoll中。

循环接收已发生的事件,如果有某已链接套接字的可读事件,就读该套接字,如果链接结束就删除,如果要写,就注册该套接字的写事件到epoll中。

如果有某已链接套接字的可写事件,就写该套接字。

在服务器类的构造函数中,创建epoll实例并注册listenfd的EPOLLIN事件,以accept链接。

1
2
3
4
5
6
7
//create epoll
epfd = epoll_create(MAXFDS);

//regit listenfd and accept event
ev.data.fd = listenfd;
ev.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);

在Do中循环接收监听到的事件

此时发现,epoll监听注册的套接字的读写事件,将读写分离开来,没有连续地处理客户请求。比较复杂。所以咕咕咕。

目前就实现到线程池。