One Sentence.

简单的阻塞客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For gethostbyname */
#include <netdb.h>

#include <unistd.h>
#include <string.h>
#include <stdio.h>

int main(int c, char **v)
{
const char query[] =
"GET / HTTP/1.0\r\n"
"Host: www.google.com\r\n"
"\r\n";
const char hostname[] = "www.google.com";
struct sockaddr_in sin;
struct hostent *h;
const char *cp;
int fd;
ssize_t n_written, remaining;
char buf[1024];

/* Look up the IP address for the hostname. Watch out; this isn't
threadsafe on most platforms. */
h = gethostbyname(hostname);
if (!h) {
fprintf(stderr, "Couldn't lookup %s: %s", hostname, hstrerror(h_errno));
return 1;
}
if (h->h_addrtype != AF_INET) {
fprintf(stderr, "No ipv6 support, sorry.");
return 1;
}

/* Allocate a new socket */
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
perror("socket");
return 1;
}

/* Connect to the remote host. */
sin.sin_family = AF_INET;
sin.sin_port = htons(80);
sin.sin_addr = *(struct in_addr*)h->h_addr;
if (connect(fd, (struct sockaddr*) &sin, sizeof(sin))) {
perror("connect");
close(fd);
return 1;
}

/* Write the query. */
/* XXX Can send succeed partially? */
cp = query;
remaining = strlen(query);
while (remaining) {
n_written = send(fd, cp, remaining, 0);
if (n_written <= 0) {
perror("send");
return 1;
}
remaining -= n_written;
cp += n_written;
}

/* Get an answer back. */
while (1) {
ssize_t result = recv(fd, buf, sizeof(buf), 0);
if (result == 0) {
break;
} else if (result < 0) {
perror("recv");
close(fd);
return 1;
}
fwrite(buf, 1, result, stdout);
}

close(fd);
return 0;
}

bad example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* This won't work. */
char buf[1024];
int i, n;
while (i_still_want_to_read()) {
for (i=0; i<n_sockets; ++i) {
n = recv(fd[i], buf, sizeof(buf), 0);
if (n==0)
handle_close(fd[i]);
else if (n<0)
handle_error(fd[i], errno);
else
handle_input(fd[i], buf, n);
}
}

对每个连接fork后进行处理的服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>

#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

#define MAX_LINE 16384

char
rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}

void
child(int fd)
{
char outbuf[MAX_LINE+1];
size_t outbuf_used = 0;
ssize_t result;

while (1) {
char ch;
result = recv(fd, &ch, 1, 0);
if (result == 0) {
break;
} else if (result == -1) {
perror("read");
break;
}

/* We do this test to keep the user from overflowing the buffer. */
if (outbuf_used < sizeof(outbuf)) {
outbuf[outbuf_used++] = rot13_char(ch);
}

if (ch == '\n') {
send(fd, outbuf, outbuf_used, 0);
outbuf_used = 0;
continue;
}
}
}

void
run(void)
{
int listener;
struct sockaddr_in sin;

sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(40713);

listener = socket(AF_INET, SOCK_STREAM, 0);

#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif

if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
return;
}

if (listen(listener, 16)<0) {
perror("listen");
return;
}



while (1) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) {
perror("accept");
} else {
if (fork() == 0) {
child(fd);
exit(0);
}
}
}
}

int
main(int c, char **v)
{
run();
return 0;
}

引入非阻塞、忙轮询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* This will work, but the performance will be unforgivably bad. */
int i, n;
char buf[1024];
for (i=0; i < n_sockets; ++i)
fcntl(fd[i], F_SETFL, O_NONBLOCK);

while (i_still_want_to_read()) {
for (i=0; i < n_sockets; ++i) {
n = recv(fd[i], buf, sizeof(buf), 0);
if (n == 0) {
handle_close(fd[i]);
} else if (n < 0) {
if (errno == EAGAIN)
; /* The kernel didn't have any data for us to read. */
else
handle_error(fd[i], errno);
} else {
handle_input(fd[i], buf, n);
}
}
}

使用 select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* If you only have a couple dozen fds, this version won't be awful */
fd_set readset;
int i, n;
char buf[1024];

while (i_still_want_to_read()) {
int maxfd = -1;
FD_ZERO(&readset);

/* Add all of the interesting fds to readset */
for (i=0; i < n_sockets; ++i) {
if (fd[i]>maxfd) maxfd = fd[i];
FD_SET(fd[i], &readset);
}

/* Wait until one or more fds are ready to read */
select(maxfd+1, &readset, NULL, NULL, NULL);

/* Process all of the fds that are still set in readset */
for (i=0; i < n_sockets; ++i) {
if (FD_ISSET(fd[i], &readset)) {
n = recv(fd[i], buf, sizeof(buf), 0);
if (n == 0) {
handle_close(fd[i]);
} else if (n < 0) {
if (errno == EAGAIN)
; /* The kernel didn't have any data for us to read. */
else
handle_error(fd[i], errno);
} else {
handle_input(fd[i], buf, n);
}
}
}
}

select()-based ROT13 server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>
/* for select */
#include <sys/select.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

char
rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}

struct fd_state {
char buffer[MAX_LINE];
size_t buffer_used;

int writing;
size_t n_written;
size_t write_upto;
};

struct fd_state *
alloc_fd_state(void)
{
struct fd_state *state = malloc(sizeof(struct fd_state));
if (!state)
return NULL;
state->buffer_used = state->n_written = state->writing =
state->write_upto = 0;
return state;
}

void
free_fd_state(struct fd_state *state)
{
free(state);
}

void
make_nonblocking(int fd)
{
fcntl(fd, F_SETFL, O_NONBLOCK);
}

int
do_read(int fd, struct fd_state *state)
{
char buf[1024];
int i;
ssize_t result;
while (1) {
result = recv(fd, buf, sizeof(buf), 0);
if (result <= 0)
break;

for (i=0; i < result; ++i) {
if (state->buffer_used < sizeof(state->buffer))
state->buffer[state->buffer_used++] = rot13_char(buf[i]);
if (buf[i] == '\n') {
state->writing = 1;
state->write_upto = state->buffer_used;
}
}
}

if (result == 0) {
return 1;
} else if (result < 0) {
if (errno == EAGAIN)
return 0;
return -1;
}

return 0;
}

int
do_write(int fd, struct fd_state *state)
{
while (state->n_written < state->write_upto) {
ssize_t result = send(fd, state->buffer + state->n_written,
state->write_upto - state->n_written, 0);
if (result < 0) {
if (errno == EAGAIN)
return 0;
return -1;
}
assert(result != 0);

state->n_written += result;
}

if (state->n_written == state->buffer_used)
state->n_written = state->write_upto = state->buffer_used = 0;

state->writing = 0;

return 0;
}

void
run(void)
{
int listener;
struct fd_state *state[FD_SETSIZE];
struct sockaddr_in sin;
int i, maxfd;
fd_set readset, writeset, exset;

sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(40713);

for (i = 0; i < FD_SETSIZE; ++i)
state[i] = NULL;

listener = socket(AF_INET, SOCK_STREAM, 0);
make_nonblocking(listener);

#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif

if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
return;
}

if (listen(listener, 16)<0) {
perror("listen");
return;
}

FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exset);

while (1) {
maxfd = listener;

FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exset);

FD_SET(listener, &readset);

for (i=0; i < FD_SETSIZE; ++i) {
if (state[i]) {
if (i > maxfd)
maxfd = i;
FD_SET(i, &readset);
if (state[i]->writing) {
FD_SET(i, &writeset);
}
}
}

if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0) {
perror("select");
return;
}

if (FD_ISSET(listener, &readset)) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) {
perror("accept");
} else if (fd > FD_SETSIZE) {
close(fd);
} else {
make_nonblocking(fd);
state[fd] = alloc_fd_state();
assert(state[fd]);/*XXX*/
}
}

for (i=0; i < maxfd+1; ++i) {
int r = 0;
if (i == listener)
continue;

if (FD_ISSET(i, &readset)) {
r = do_read(i, state[i]);
}
if (r == 0 && FD_ISSET(i, &writeset)) {
r = do_write(i, state[i]);
}
if (r) {
free_fd_state(state[i]);
state[i] = NULL;
close(i);
}
}
}
}

int
main(int c, char **v)
{
setvbuf(stdout, NULL, _IONBF, 0);

run();
return 0;
}

A low-level ROT13 server with Libevent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>

#include <event2/event.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

char
rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}

struct fd_state {
char buffer[MAX_LINE];
size_t buffer_used;

size_t n_written;
size_t write_upto;

struct event *read_event;
struct event *write_event;
};

struct fd_state *
alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{
struct fd_state *state = malloc(sizeof(struct fd_state));
if (!state)
return NULL;
state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
if (!state->read_event) {
free(state);
return NULL;
}
state->write_event =
event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);

if (!state->write_event) {
event_free(state->read_event);
free(state);
return NULL;
}

state->buffer_used = state->n_written = state->write_upto = 0;

assert(state->write_event);
return state;
}

void
free_fd_state(struct fd_state *state)
{
event_free(state->read_event);
event_free(state->write_event);
free(state);
}

void
do_read(evutil_socket_t fd, short events, void *arg)
{
struct fd_state *state = arg;
char buf[1024];
int i;
ssize_t result;
while (1) {
assert(state->write_event);
result = recv(fd, buf, sizeof(buf), 0);
if (result <= 0)
break;

for (i=0; i < result; ++i) {
if (state->buffer_used < sizeof(state->buffer))
state->buffer[state->buffer_used++] = rot13_char(buf[i]);
if (buf[i] == '\n') {
assert(state->write_event);
event_add(state->write_event, NULL);
state->write_upto = state->buffer_used;
}
}
}

if (result == 0) {
free_fd_state(state);
} else if (result < 0) {
if (errno == EAGAIN) // XXXX use evutil macro
return;
perror("recv");
free_fd_state(state);
}
}

void
do_write(evutil_socket_t fd, short events, void *arg)
{
struct fd_state *state = arg;

while (state->n_written < state->write_upto) {
ssize_t result = send(fd, state->buffer + state->n_written,
state->write_upto - state->n_written, 0);
if (result < 0) {
if (errno == EAGAIN) // XXX use evutil macro
return;
free_fd_state(state);
return;
}
assert(result != 0);

state->n_written += result;
}

if (state->n_written == state->buffer_used)
state->n_written = state->write_upto = state->buffer_used = 1;

event_del(state->write_event);
}

void
do_accept(evutil_socket_t listener, short event, void *arg)
{
struct event_base *base = arg;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) { // XXXX eagain??
perror("accept");
} else if (fd > FD_SETSIZE) {
close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */
} else {
struct fd_state *state;
evutil_make_socket_nonblocking(fd);
state = alloc_fd_state(base, fd);
assert(state); /*XXX err*/
assert(state->write_event);
event_add(state->read_event, NULL);
}
}

void
run(void)
{
evutil_socket_t listener;
struct sockaddr_in sin;
struct event_base *base;
struct event *listener_event;

base = event_base_new();
if (!base)
return; /*XXXerr*/

sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(40713);

listener = socket(AF_INET, SOCK_STREAM, 0);
evutil_make_socket_nonblocking(listener);

#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif

if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
return;
}

if (listen(listener, 16)<0) {
perror("listen");
return;
}

listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
/*XXX check it */
event_add(listener_event, NULL);

event_base_dispatch(base);
}

int
main(int c, char **v)
{
setvbuf(stdout, NULL, _IONBF, 0);

run();
return 0;
}

simpler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>

#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

char
rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}

void
readcb(struct bufferevent *bev, void *ctx)
{
struct evbuffer *input, *output;
char *line;
size_t n;
int i;
input = bufferevent_get_input(bev);
output = bufferevent_get_output(bev);

while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {
for (i = 0; i < n; ++i)
line[i] = rot13_char(line[i]);
evbuffer_add(output, line, n);
evbuffer_add(output, "\n", 1);
free(line);
}

if (evbuffer_get_length(input) >= MAX_LINE) {
/* Too long; just process what there is and go on so that the buffer
* doesn't grow infinitely long. */
char buf[1024];
while (evbuffer_get_length(input)) {
int n = evbuffer_remove(input, buf, sizeof(buf));
for (i = 0; i < n; ++i)
buf[i] = rot13_char(buf[i]);
evbuffer_add(output, buf, n);
}
evbuffer_add(output, "\n", 1);
}
}

void
errorcb(struct bufferevent *bev, short error, void *ctx)
{
if (error & BEV_EVENT_EOF) {
/* connection has been closed, do any clean up here */
/* ... */
} else if (error & BEV_EVENT_ERROR) {
/* check errno to see what error occurred */
/* ... */
} else if (error & BEV_EVENT_TIMEOUT) {
/* must be a timeout event handle, handle it */
/* ... */
}
bufferevent_free(bev);
}

void
do_accept(evutil_socket_t listener, short event, void *arg)
{
struct event_base *base = arg;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) {
perror("accept");
} else if (fd > FD_SETSIZE) {
close(fd);
} else {
struct bufferevent *bev;
evutil_make_socket_nonblocking(fd);
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
bufferevent_enable(bev, EV_READ|EV_WRITE);
}
}

void
run(void)
{
evutil_socket_t listener;
struct sockaddr_in sin;
struct event_base *base;
struct event *listener_event;

base = event_base_new();
if (!base)
return; /*XXXerr*/

sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(40713);

listener = socket(AF_INET, SOCK_STREAM, 0);
evutil_make_socket_nonblocking(listener);

#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif

if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
return;
}

if (listen(listener, 16)<0) {
perror("listen");
return;
}

listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
/*XXX check it */
event_add(listener_event, NULL);

event_base_dispatch(base);
}

int
main(int c, char **v)
{
setvbuf(stdout, NULL, _IONBF, 0);

run();
return 0;
}