// Listen on the socket for new peers to connect. When a new peer connects, // the on_peer_connected callback will be invoked. if ((rc = uv_listen((uv_stream_t*)&server_stream, N_BACKLOG, on_peer_connected)) < 0) { die("uv_listen failed: %s", uv_strerror(rc)); }
voidon_peer_connected(uv_stream_t* server_stream, int status){ if (status < 0) { fprintf(stderr, "Peer connection error: %s\n", uv_strerror(status)); return; }
// client will represent this peer; it's allocated on the heap and only // released when the client disconnects. The client holds a pointer to // peer_state_t in its data field; this peer state tracks the protocol state // with this client throughout interaction. uv_tcp_t* client = (uv_tcp_t*)xmalloc(sizeof(*client)); int rc; if ((rc = uv_tcp_init(uv_default_loop(), client)) < 0) { die("uv_tcp_init failed: %s", uv_strerror(rc)); } client->data = NULL;
if (uv_accept(server_stream, (uv_stream_t*)client) == 0) { structsockaddr_storage peername; int namelen = sizeof(peername); if ((rc = uv_tcp_getpeername(client, (struct sockaddr*)&peername, &namelen)) < 0) { die("uv_tcp_getpeername failed: %s", uv_strerror(rc)); } report_peer_connected((conststruct sockaddr_in*)&peername, namelen);
// Initialize the peer state for a new client: we start by sending the peer // the initial '*' ack. peer_state_t* peerstate = (peer_state_t*)xmalloc(sizeof(*peerstate)); peerstate->state = INITIAL_ACK; peerstate->sendbuf[0] = '*'; peerstate->sendbuf_end = 1; peerstate->client = client; client->data = peerstate;
// Enqueue the write request to send the ack; when it's done, // on_wrote_init_ack will be called. The peer state is passed to the write // request via the data pointer; the write request does not own this peer // state - it's owned by the client handle. uv_buf_t writebuf = uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end); uv_write_t* req = (uv_write_t*)xmalloc(sizeof(*req)); req->data = peerstate; if ((rc = uv_write(req, (uv_stream_t*)client, &writebuf, 1, on_wrote_init_ack)) < 0) { die("uv_write failed: %s", uv_strerror(rc)); } } else { uv_close((uv_handle_t*)client, on_client_closed); } }
这些代码都有很好的注释,但是,这里有一些重要的 libuv 语法我想去强调一下:
传入自定义数据到回调中:因为 C 语言还没有闭包,这可能是个挑战,libuv 在它的所有的处理类型中有一个 void* data 字段;这些字段可以被用于传递用户数据。例如,注意 client->data 是如何指向到一个 peer_state_t 结构上,以便于 uv_write 和 uv_read_start 注册的回调可以知道它们正在处理的是哪个客户端的数据。
voidon_wrote_init_ack(uv_write_t* req, int status){ if (status) { die("Write error: %s\n", uv_strerror(status)); } peer_state_t* peerstate = (peer_state_t*)req->data; // Flip the peer state to WAIT_FOR_MSG, and start listening for incoming data // from this peer. peerstate->state = WAIT_FOR_MSG; peerstate->sendbuf_end = 0;
int rc; if ((rc = uv_read_start((uv_stream_t*)peerstate->client, on_alloc_buffer, on_peer_read)) < 0) { die("uv_read_start failed: %s", uv_strerror(rc)); }
// Note: the write request doesn't own the peer state, hence we only free the // request itself, not the state. free(req); }
void on_peer_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) { if (nread < 0) { if (nread != uv_eof) { fprintf(stderr, "read error: %s\n", uv_strerror(nread)); } uv_close((uv_handle_t*)client, on_client_closed); } elseif (nread == 0) { // from the documentation of uv_read_cb: nread might be 0, which does not // indicate an error or eof. this is equivalent to eagain or ewouldblock // under read(2). } else { // nread > 0 assert(buf->len >= nread);
peer_state_t* peerstate = (peer_state_t*)client->data; if (peerstate->state == initial_ack) { // if the initial ack hasn't been sent for some reason, ignore whatever // the client sends in. free(buf->base); return; }
// run the protocol state machine. for (int i = 0; i < nread; ++i) { switch (peerstate->state) { case initial_ack: assert(0 && "can't reach here"); break; case wait_for_msg: if (buf->base[i] == '^') { peerstate->state = in_msg; } break; case in_msg: if (buf->base[i] == '$') { peerstate->state = wait_for_msg; } else { assert(peerstate->sendbuf_end < sendbuf_size); peerstate->sendbuf[peerstate->sendbuf_end++] = buf->base[i] + 1; } break; } }
if (peerstate->sendbuf_end > 0) { // we have data to send. the write buffer will point to the buffer stored // in the peer state for this client. uv_buf_t writebuf = uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end); uv_write_t* writereq = (uv_write_t*)xmalloc(sizeof(*writereq)); writereq->data = peerstate; int rc; if ((rc = uv_write(writereq, (uv_stream_t*)client, &writebuf, 1, on_wrote_buf)) < 0) { die("uv_write failed: %s", uv_strerror(rc)); } } } free(buf->base); }