diff --git a/tcp.c b/tcp.c index 91b82ee..830bf4a 100644 --- a/tcp.c +++ b/tcp.c @@ -66,20 +66,15 @@ * ------ * * To avoid the need for dynamic memory allocation, a maximum, reasonable amount - * of connections is defined by MAX_TAP_CONNS below (currently 1M, close to - * the maximum amount of file descriptors typically available to a process on - * Linux). + * of connections is defined by MAX_TAP_CONNS below (currently 128k). * - * While fragmentation and reassembly are not implemented, tracking of missing - * segments and retransmissions needs to be, thus data needs to linger on - * sockets as long as it's not acknowledged by the guest, and read using - * MSG_PEEK into a single, preallocated static buffer sized to the maximum - * supported window, 16MiB. This imposes a practical limitation on window - * scaling, that is, the maximum factor is 512. If a bigger window scaling - * factor is observed during connection establishment, connection is reset and - * reestablished by omitting the scaling factor in the SYN segment. This - * limitation only applies to the window scaling advertised by the guest, but - * if exceeded, no window scaling will be allowed at all toward either endpoint. + * Data needs to linger on sockets as long as it's not acknowledged by the + * guest, and is read using MSG_PEEK into preallocated static buffers sized + * to the maximum supported window, 64MiB ("discard" buffer, for already-sent + * data) plus a number of maximum-MSS-sized buffers. This imposes a practical + * limitation on window scaling, that is, the maximum factor is 1024. Larger + * factors will be accepted, but resulting, larger values are never advertised + * to the other side, and not used while queueing data. * * * Ports @@ -245,8 +240,7 @@ * * @seq_init_from_tap: initial sequence number from tap * - * @tap_window: last window size received from tap, scaled - * @tcpi_acked_last: most recent value of tcpi_bytes_acked (TCP_INFO) + * @wnd_from_tap: last window size received from tap, scaled * * - from socket to tap: * - on new data from socket: @@ -255,7 +249,7 @@ * - starting at offset (@seq_to_tap - @seq_ack_from_tap) * - in MSS-sized segments * - increasing @seq_to_tap at each segment - * - up to window (until @seq_to_tap - @seq_ack_from_tap <= @tap_window) + * - up to window (until @seq_to_tap - @seq_ack_from_tap <= @wnd_from_tap) * - mark socket in bitmap for periodic ACK check, set @last_ts_to_tap * - on read error, send RST to tap, close socket * - on zero read, send FIN to tap, enter ESTABLISHED_SOCK_FIN @@ -271,25 +265,20 @@ * - periodically: * - if @seq_ack_from_tap < @seq_to_tap and the retransmission timer * (TODO: implement requirements from RFC 6298, currently 3s fixed) from - * @ts_sock elapsed, reset @seq_to_tap to @seq_ack_from_tap, and + * @ts_tap_from_ack elapsed, reset @seq_to_tap to @seq_ack_from_tap, and * resend data with the steps listed above * * - from tap to socket: * - on packet from tap: - * - set @ts_tap + * - set @ts_tap_ack * - set TCP_WINDOW_CLAMP from TCP header from tap * - check seq from header against @seq_from_tap, if data is missing, send * two ACKs with number @seq_ack_to_tap, discard packet * - otherwise queue data to socket, set @seq_from_tap to seq from header * plus payload length - * - query socket for TCP_INFO, on tcpi_bytes_acked > @tcpi_acked_last, - * set @tcpi_acked_last to tcpi_bytes_acked, set @seq_ack_to_tap - * to (tcpi_bytes_acked + @seq_init_from_tap) % 2^32 and - * send ACK to tap - * - periodically: - * - query socket for TCP_INFO, on tcpi_bytes_acked > @tcpi_acked_last, - * set @tcpi_acked_last to tcpi_bytes_acked, set @seq_ack_to_tap - * to (tcpi_bytes_acked + @seq_init_from_tap) % 2^32 and + * - in ESTABLISHED state, send ACK to tap as soon as we queue to the + * socket. In other states, query socket for TCP_INFO, set + * @seq_ack_to_tap to (tcpi_bytes_acked + @seq_init_from_tap) % 2^32 and * send ACK to tap * * @@ -351,13 +340,13 @@ #define TCP_TAP_FRAMES 8 -#define PIPE_SIZE (1024 * 1024) +#define MAX_PIPE_SIZE (2 * 1024 * 1024) #define TCP_HASH_TABLE_LOAD 70 /* % */ #define TCP_HASH_TABLE_SIZE (MAX_TAP_CONNS * 100 / \ TCP_HASH_TABLE_LOAD) -#define MAX_WS 9 +#define MAX_WS 10 #define MAX_WINDOW (1 << (16 + (MAX_WS))) #define MSS_DEFAULT 536 #define WINDOW_DEFAULT 14600 /* RFC 6928 */ @@ -369,12 +358,21 @@ #define FIN_TIMEOUT 240000 #define LAST_ACK_TIMEOUT 240000 +#define TCP_SOCK_POOL_SIZE 256 +#define TCP_SOCK_POOL_TSH 128 /* Refill in ns if > x used */ +#define TCP_SPLICE_PIPE_POOL_SIZE 256 +#define REFILL_INTERVAL 1000 /* We need to include for tcpi_bytes_acked, instead of * , but that doesn't include a definition for SOL_TCP */ #define SOL_TCP IPPROTO_TCP +#define SEQ_LE(a, b) ((b) - (a) < MAX_WINDOW) +#define SEQ_LT(a, b) ((b) - (a) - 1 < MAX_WINDOW) +#define SEQ_GE(a, b) ((a) - (b) < MAX_WINDOW) +#define SEQ_GT(a, b) ((a) - (b) - 1 < MAX_WINDOW) + enum tcp_state { CLOSED = 0, TAP_SYN_SENT, @@ -409,7 +407,9 @@ static char *tcp_state_str[TCP_STATE_STR_SIZE] __attribute((__unused__)) = { #define RST (1 << 2) #define ACK (1 << 4) /* Flags for internal usage */ -#define ZERO_WINDOW (1 << 5) +#define UPDATE_WINDOW (1 << 5) +#define DUP_ACK (1 << 6) +#define FORCE_ACK (1 << 7) #define OPT_EOL 0 #define OPT_NOP 1 @@ -439,18 +439,21 @@ struct tcp_tap_conn; * @seq_ack_from_tap: Last ACK number received from tap * @seq_from_tap: Next sequence for packets from tap (not actually sent) * @seq_ack_to_tap: Last ACK number sent to tap + * @seq_dup_ack: Last duplicate ACK number sent to tap * @seq_init_from_tap: Initial sequence number from tap - * @tcpi_acked_last: Most recent value of tcpi_bytes_acked (TCP_INFO query) - * @ws_allowed: Window scaling allowed + * @seq_init_from_tap: Initial sequence number to tap + * @ws_tap: Window scaling factor from tap * @ws: Window scaling factor - * @tap_window: Last window size received from tap, scaled + * @wnd_from_tap: Last window size received from tap, scaled + * @wnd_to_tap: Socket-side sending window, advertised to tap * @window_clamped: Window was clamped on socket at least once - * @no_snd_wnd: Kernel won't report window (without commit 8f7baad7f035) - * @tcpi_acked_last: Most recent value of tcpi_snd_wnd (TCP_INFO query) - * @ts_sock: Last activity timestamp from socket for timeout purposes - * @ts_tap: Last activity timestamp from tap for timeout purposes - * @ts_ack_tap: Last ACK segment timestamp from tap for timeout purposes + * @ts_sock_act: Last activity timestamp from socket for timeout purposes + * @ts_tap_act: Last activity timestamp from tap for timeout purposes + * @ts_ack_from_tap: Last ACK segment timestamp from tap + * @ts_ack_to_tap: Last ACK segment timestamp to tap + * @tap_data_noack: Last unacked data to tap, set to { 0, 0 } on ACK * @mss_guest: Maximum segment size advertised by guest + * @events: epoll events currently enabled for socket */ struct tcp_tap_conn { struct tcp_tap_conn *next; @@ -473,23 +476,24 @@ struct tcp_tap_conn { uint32_t seq_ack_from_tap; uint32_t seq_from_tap; uint32_t seq_ack_to_tap; + uint32_t seq_dup_ack; uint32_t seq_init_from_tap; uint32_t seq_init_to_tap; - uint64_t tcpi_acked_last; - int ws_allowed; - int ws; - uint32_t tap_window; + uint16_t ws_tap; + uint16_t ws; + uint32_t wnd_from_tap; + uint32_t wnd_to_tap; int window_clamped; - int no_snd_wnd; - uint32_t tcpi_snd_wnd; + int snd_buf; - struct timespec ts_sock; - struct timespec ts_tap; - struct timespec ts_ack_tap; + struct timespec ts_sock_act; + struct timespec ts_tap_act; + struct timespec ts_ack_from_tap; + struct timespec ts_ack_to_tap; + struct timespec tap_data_noack; int mss_guest; - uint32_t sndbuf; uint32_t events; }; @@ -626,7 +630,7 @@ static int tcp6_l2_buf_mss_nr_set; static int tcp6_l2_buf_mss_tap; static int tcp6_l2_buf_mss_tap_nr_set; -/* recvmmsg()/sendmmsg() data for tap */ +/* recvmsg()/sendmsg() data for tap */ static struct iovec tcp4_l2_iov_sock [TCP_TAP_FRAMES + 1]; static struct iovec tcp6_l2_iov_sock [TCP_TAP_FRAMES + 1]; static char tcp_buf_discard [MAX_WINDOW]; @@ -647,8 +651,9 @@ static struct mmsghdr tcp_l2_mh_tap [TCP_TAP_FRAMES] = { /* sendmsg() to socket */ static struct iovec tcp_tap_iov [UIO_MAXIOV]; -/* Bitmap, activity monitoring needed for connection via tap */ -static uint8_t tcp_act[MAX_TAP_CONNS / 8] = { 0 }; +/* SO_RCVLOWAT set on source ([0]) or destination ([1]) socket, and activity */ +static uint8_t splice_rcvlowat_set[MAX_SPLICE_CONNS / 8][2]; +static uint8_t splice_rcvlowat_act[MAX_SPLICE_CONNS / 8][2]; /* TCP connections */ static struct tcp_tap_conn tt[MAX_TAP_CONNS]; @@ -657,6 +662,13 @@ static struct tcp_splice_conn ts[MAX_SPLICE_CONNS]; /* Table for lookup from remote address, local port, remote port */ static struct tcp_tap_conn *tt_hash[TCP_HASH_TABLE_SIZE]; +/* Pools for pre-opened sockets and pipes */ +static int splice_pipe_pool [TCP_SPLICE_PIPE_POOL_SIZE][2][2]; +static int init_sock_pool4 [TCP_SOCK_POOL_SIZE]; +static int init_sock_pool6 [TCP_SOCK_POOL_SIZE]; +static int ns_sock_pool4 [TCP_SOCK_POOL_SIZE]; +static int ns_sock_pool6 [TCP_SOCK_POOL_SIZE]; + /** * tcp_tap_state() - Set given TCP state for tap connection, report to stderr * @conn: Connection pointer @@ -681,6 +693,21 @@ static void tcp_splice_state(struct tcp_splice_conn *conn, enum tcp_state state) conn->state = state; } +/** + * tcp_sock_set_bufsize() - Set SO_RCVBUF and SO_SNDBUF to maximum values + * @s: Socket, can be -1 to avoid check in the caller + */ +static void tcp_sock_set_bufsize(int s) +{ + int v = INT_MAX / 2; /* Kernel clamps and rounds, no need to check */ + + if (s == -1) + return; + + setsockopt(s, SOL_SOCKET, SO_RCVBUF, &v, sizeof(v)); + setsockopt(s, SOL_SOCKET, SO_SNDBUF, &v, sizeof(v)); +} + /** * tcp_update_check_ip4() - Update IPv4 with variable parts from stored one * @buf: L2 packet buffer with final IPv4 header @@ -1077,7 +1104,6 @@ static void tcp_table_tap_compact(struct ctx *c, struct tcp_tap_conn *hole) uint32_t events; if ((hole - tt) == --c->tcp.tap_conn_count) { - bitmap_clear(tcp_act, hole - tt); debug("TCP: hash table compaction: index %i (%p) was max index", hole - tt, hole); return; @@ -1112,8 +1138,10 @@ static void tcp_tap_destroy(struct ctx *c, struct tcp_tap_conn *conn) epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->sock, NULL); tcp_tap_state(conn, CLOSED); close(conn->sock); - tcp_hash_remove(conn); - tcp_table_tap_compact(c, conn); + + /* Removal from hash table and connection table compaction deferred to + * timer. + */ } static void tcp_rst(struct ctx *c, struct tcp_tap_conn *conn); @@ -1123,51 +1151,34 @@ static void tcp_rst(struct ctx *c, struct tcp_tap_conn *conn); * @c: Execution context * @conn: Connection pointer * @flags: TCP flags to set - * @in: Payload buffer - * @len: Payload length + * @now: Current timestamp, can be NULL * * Return: negative error code on connection reset, 0 otherwise */ -static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn, - int flags, char *in, int len) +static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn, int flags, + struct timespec *now) { - uint32_t ack_offset = conn->seq_from_tap - conn->seq_ack_to_tap; - char buf[USHRT_MAX] = { 0 }, *data; + char buf[sizeof(struct tcphdr) + OPT_MSS_LEN + OPT_WS_LEN + 1] = { 0 }; + uint32_t prev_ack_to_tap = conn->seq_ack_to_tap; struct tcp_info info = { 0 }; - int ws = 0, err, ack_pending; socklen_t sl = sizeof(info); struct tcphdr *th; + char *data; - if (conn->state == ESTABLISHED && !ack_offset && !flags && - conn->tcpi_snd_wnd) { - err = 0; - info.tcpi_bytes_acked = conn->tcpi_acked_last; - info.tcpi_snd_wnd = conn->tcpi_snd_wnd; - info.tcpi_snd_wscale = conn->ws; - } else if (conn->no_snd_wnd && !(flags & SYN)) { - err = 0; - } else { - err = getsockopt(conn->sock, SOL_TCP, TCP_INFO, &info, &sl); - if (err && !(flags & RST)) { - tcp_rst(c, conn); - return err; - } + if (SEQ_GE(conn->seq_ack_to_tap, conn->seq_from_tap) && + !flags && conn->wnd_to_tap) + return 0; - sl = sizeof(conn->sndbuf); - if (getsockopt(conn->sock, SOL_SOCKET, SO_SNDBUF, - &conn->sndbuf, &sl)) - conn->sndbuf = USHRT_MAX; - - info.tcpi_snd_wnd = MIN(info.tcpi_snd_wnd, - conn->sndbuf * 90 / 100); - conn->tcpi_snd_wnd = info.tcpi_snd_wnd; + if (getsockopt(conn->sock, SOL_TCP, TCP_INFO, &info, &sl)) { + tcp_rst(c, conn); + return -ECONNRESET; } th = (struct tcphdr *)buf; data = (char *)(th + 1); th->doff = sizeof(*th) / 4; - if ((flags & SYN) && !err) { + if (flags & SYN) { /* Options: MSS, NOP and window scale if allowed (4-8 bytes) */ *data++ = OPT_MSS; *data++ = OPT_MSS_LEN; @@ -1175,73 +1186,43 @@ static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn, data += OPT_MSS_LEN - 2; th->doff += OPT_MSS_LEN / 4; - /* Check if kernel includes commit: - * 8f7baad7f035 ("tcp: Add snd_wnd to TCP_INFO") - */ - conn->no_snd_wnd = !info.tcpi_snd_wnd; + if (!c->tcp.kernel_snd_wnd && info.tcpi_snd_wnd) + c->tcp.kernel_snd_wnd = 1; - if (conn->ws_allowed && (ws = info.tcpi_snd_wscale) && - !conn->no_snd_wnd) { - *data++ = OPT_NOP; + conn->ws = MIN(MAX_WS, info.tcpi_snd_wscale); - *data++ = OPT_WS; - *data++ = OPT_WS_LEN; - *data++ = ws; - - th->doff += (1 + OPT_WS_LEN) / 4; - } + *data++ = OPT_NOP; + *data++ = OPT_WS; + *data++ = OPT_WS_LEN; + *data++ = conn->ws; + th->doff += (1 + OPT_WS_LEN) / 4; /* RFC 793, 3.1: "[...] and the first data octet is ISN+1." */ th->seq = htonl(conn->seq_to_tap++); - } else { - th->seq = htonl(conn->seq_to_tap); - conn->seq_to_tap += len; - } - if (flags & SYN) { - ack_pending = 0; - } else if (conn->state == ESTABLISHED || conn->no_snd_wnd) { - ack_pending = conn->seq_from_tap != conn->seq_ack_to_tap && - (conn->seq_from_tap - conn->seq_ack_to_tap) < - MAX_WINDOW; + th->ack = !!(flags & ACK); } else { - ack_pending = info.tcpi_bytes_acked > conn->tcpi_acked_last; - } - - if (!err && (ack_pending || (flags & ACK) || len)) { th->ack = 1; - - if (conn->no_snd_wnd) { - conn->seq_ack_to_tap = conn->seq_from_tap; - } else { - if (conn->state == ESTABLISHED) - conn->seq_ack_to_tap = conn->seq_from_tap; - else - conn->seq_ack_to_tap = info.tcpi_bytes_acked + - conn->seq_init_from_tap; - - conn->tcpi_acked_last = info.tcpi_bytes_acked; - } - - /* seq_ack_to_tap matching seq_from_tap means, in these states, - * that we shut the writing half down, but the FIN segment - * wasn't acknowledged yet. We sent the FIN for sure, so adjust - * the sequence number in that case. - */ - if ((conn->state == LAST_ACK || - conn->state == FIN_WAIT_1_SOCK_FIN || - conn->state == FIN_WAIT_1) && - conn->seq_ack_to_tap == conn->seq_from_tap) - th->ack_seq = htonl(conn->seq_ack_to_tap + 1); - else - th->ack_seq = htonl(conn->seq_ack_to_tap); - } else { - if (!len && !flags) - return 0; - - th->ack = th->ack_seq = 0; + th->seq = htonl(conn->seq_to_tap); } + if (conn->state > ESTABLISHED || (flags & (DUP_ACK | FORCE_ACK))) { + conn->seq_ack_to_tap = conn->seq_from_tap; + } else { + conn->seq_ack_to_tap = info.tcpi_bytes_acked + + conn->seq_init_from_tap; + + if (SEQ_LT(conn->seq_ack_to_tap, prev_ack_to_tap)) + conn->seq_ack_to_tap = prev_ack_to_tap; + } + + if (!flags && + conn->seq_ack_to_tap == prev_ack_to_tap && + c->tcp.kernel_snd_wnd && conn->wnd_to_tap == info.tcpi_snd_wnd) + return 0; + + th->ack_seq = htonl(conn->seq_ack_to_tap); + th->rst = !!(flags & RST); th->syn = !!(flags & SYN); th->fin = !!(flags & FIN); @@ -1249,29 +1230,40 @@ static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn, th->source = htons(conn->sock_port); th->dest = htons(conn->tap_port); - if (flags & ZERO_WINDOW) { - th->window = 0; - } else if (!err && !conn->no_snd_wnd) { + if (th->syn) { /* First value sent by receiver is not scaled */ - th->window = htons(info.tcpi_snd_wnd >> - (th->syn ? 0 : info.tcpi_snd_wscale)); + th->window = htons(conn->wnd_to_tap = WINDOW_DEFAULT); } else { - th->window = htons(WINDOW_DEFAULT); - } + if (c->tcp.kernel_snd_wnd) { + conn->wnd_to_tap = MIN(info.tcpi_snd_wnd, + conn->snd_buf); + } else { + conn->wnd_to_tap = conn->snd_buf; + } + conn->wnd_to_tap = MIN(conn->wnd_to_tap, MAX_WINDOW); - if (!th->window) - conn->tcpi_snd_wnd = 0; + th->window = htons(MIN(conn->wnd_to_tap >> conn->ws, + USHRT_MAX)); + } th->urg_ptr = 0; th->check = 0; - memcpy(data, in, len); + if (th->ack && now) + conn->ts_ack_to_tap = *now; - tap_ip_send(c, &conn->a.a6, IPPROTO_TCP, buf, th->doff * 4 + len, + tap_ip_send(c, &conn->a.a6, IPPROTO_TCP, buf, th->doff * 4, conn->seq_init_to_tap); - if (th->fin) + if (flags & DUP_ACK) { + tap_ip_send(c, &conn->a.a6, IPPROTO_TCP, buf, th->doff * 4, + conn->seq_init_to_tap); + } + + if (th->fin) { + conn->tap_data_noack = *now; conn->seq_to_tap++; + } return 0; } @@ -1286,7 +1278,7 @@ static void tcp_rst(struct ctx *c, struct tcp_tap_conn *conn) if (conn->state == CLOSED) return; - tcp_send_to_tap(c, conn, RST, NULL, 0); + tcp_send_to_tap(c, conn, RST, NULL); tcp_tap_destroy(c, conn); } @@ -1302,37 +1294,39 @@ static void tcp_clamp_window(struct tcp_tap_conn *conn, struct tcphdr *th, int len, unsigned int window, int init) { if (init) { - conn->ws = tcp_opt_get(th, len, OPT_WS, NULL, NULL); - conn->ws_allowed = conn->ws >= 0 && conn->ws <= MAX_WS; - conn->ws *= conn->ws_allowed; + int ws = tcp_opt_get(th, len, OPT_WS, NULL, NULL); + + conn->ws_tap = ws; /* RFC 7323, 2.2: first value is not scaled. Also, don't clamp * yet, to avoid getting a zero scale just because we set a * small window now. */ - conn->tap_window = ntohs(th->window); + conn->wnd_from_tap = ntohs(th->window); conn->window_clamped = 0; } else { if (th) - window = ntohs(th->window) << conn->ws; + window = ntohs(th->window) << conn->ws_tap; else - window <<= conn->ws; + window <<= conn->ws_tap; + + window = MIN(MAX_WINDOW, window); if (conn->window_clamped) { - if (conn->tap_window == window) + if (conn->wnd_from_tap == window) return; /* Discard +/- 1% updates to spare some syscalls. */ - if ((window > conn->tap_window && - window * 99 / 100 < conn->tap_window) || - (window < conn->tap_window && - window * 101 / 100 > conn->tap_window)) { - conn->tap_window = window; + if ((window > conn->wnd_from_tap && + window * 99 / 100 < conn->wnd_from_tap) || + (window < conn->wnd_from_tap && + window * 101 / 100 > conn->wnd_from_tap)) { + conn->wnd_from_tap = window; return; } } - conn->tap_window = window; + conn->wnd_from_tap = window; if (window < 256) window = 256; setsockopt(conn->sock, SOL_TCP, TCP_WINDOW_CLAMP, @@ -1420,17 +1414,33 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr, union epoll_ref ref = { .proto = IPPROTO_TCP }; const struct sockaddr *sa; struct tcp_tap_conn *conn; + int i, s, *sock_pool_p; struct epoll_event ev; socklen_t sl; - int s; if (c->tcp.tap_conn_count >= MAX_TAP_CONNS) return; - ref.s = s = socket(af, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); + for (i = 0; i < TCP_SOCK_POOL_SIZE; i++) { + if (af == AF_INET6) + sock_pool_p = &init_sock_pool6[i]; + else + sock_pool_p = &init_sock_pool4[i]; + if ((ref.s = s = *sock_pool_p) > 0) { + *sock_pool_p = -1; + break; + } + } + + if (s < 0) + ref.s = s = socket(af, SOCK_STREAM | SOCK_NONBLOCK, + IPPROTO_TCP); + if (s < 0) return; + tcp_sock_set_bufsize(s); + if (af == AF_INET && addr4.sin_addr.s_addr == c->gw4) addr4.sin_addr.s_addr = htonl(INADDR_LOOPBACK); else if (af == AF_INET6 && !memcmp(addr, &c->gw6, sizeof(c->gw6))) @@ -1447,10 +1457,9 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr, conn = &tt[c->tcp.tap_conn_count++]; conn->sock = s; + conn->events = 0; - sl = sizeof(conn->sndbuf); - if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->sndbuf, &sl)) - conn->sndbuf = USHRT_MAX; + conn->wnd_to_tap = WINDOW_DEFAULT; conn->mss_guest = tcp_opt_get(th, len, OPT_MSS, NULL, NULL); if (conn->mss_guest < 0) @@ -1488,9 +1497,8 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr, conn->sock_port = ntohs(th->dest); conn->tap_port = ntohs(th->source); - conn->ts_sock = conn->ts_tap = conn->ts_ack_tap = *now; - - bitmap_set(tcp_act, conn - tt); + conn->ts_sock_act = conn->ts_tap_act = *now; + conn->ts_ack_to_tap = conn->ts_ack_from_tap = *now; conn->seq_init_from_tap = ntohl(th->seq); conn->seq_from_tap = conn->seq_init_from_tap + 1; @@ -1514,12 +1522,18 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr, } else { tcp_tap_state(conn, TAP_SYN_RCVD); - if (tcp_send_to_tap(c, conn, SYN | ACK, NULL, 0)) + if (tcp_send_to_tap(c, conn, SYN | ACK, now)) return; ev.events = EPOLLIN | EPOLLRDHUP; } + sl = sizeof(conn->snd_buf); + if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->snd_buf, &sl)) + conn->snd_buf = WINDOW_DEFAULT; + else + conn->snd_buf /= 2; + conn->events = ev.events; ref.tcp.index = conn - tt; ev.data.u64 = ref.u64; @@ -1542,10 +1556,22 @@ static void tcp_table_splice_compact(struct ctx *c, struct epoll_event ev_from; struct epoll_event ev_to; + hole->from_fin_sent = hole->to_fin_sent = 0; + hole->from_read = hole->from_written = 0; + hole->to_read = hole->to_written = 0; + + bitmap_clear(splice_rcvlowat_set[0], hole - ts); + bitmap_clear(splice_rcvlowat_set[1], hole - ts); + bitmap_clear(splice_rcvlowat_act[0], hole - ts); + bitmap_clear(splice_rcvlowat_act[1], hole - ts); + if ((hole - ts) == --c->tcp.splice_conn_count) return; move = &ts[c->tcp.splice_conn_count]; + if (move->state == CLOSED) + return; + memcpy(hole, move, sizeof(*hole)); move->state = CLOSED; move = hole; @@ -1558,8 +1584,8 @@ static void tcp_table_splice_compact(struct ctx *c, if (move->state == SPLICE_ACCEPTED) { ev_from.events = ev_to.events = 0; } else if (move->state == SPLICE_CONNECT) { - ev_from.events = EPOLLET | EPOLLRDHUP; - ev_to.events = EPOLLET | EPOLLOUT | EPOLLRDHUP; + ev_from.events = 0; + ev_to.events = EPOLLOUT; } else { ev_from.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; ev_to.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; @@ -1579,11 +1605,17 @@ static void tcp_table_splice_compact(struct ctx *c, */ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn) { + int epoll_del_done = 0; + switch (conn->state) { + case CLOSED: + epoll_del_done = 1; + /* Falls through */ case SPLICE_FIN_BOTH: case SPLICE_FIN_FROM: case SPLICE_FIN_TO: case SPLICE_ESTABLISHED: + /* Flushing might need to block: don't recycle them. */ if (conn->pipe_from_to[0] != -1) { close(conn->pipe_from_to[0]); conn->pipe_from_to[0] = -1; @@ -1598,18 +1630,17 @@ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn) } /* Falls through */ case SPLICE_CONNECT: - epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->from, NULL); - epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->to, NULL); + if (!epoll_del_done) { + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->from, NULL); + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->to, NULL); + } close(conn->to); /* Falls through */ case SPLICE_ACCEPTED: close(conn->from); tcp_splice_state(conn, CLOSED); tcp_table_splice_compact(c, conn); - conn->from_fin_sent = conn->to_fin_sent = 0; - conn->from_read = conn->from_written = 0; - conn->to_read = conn->to_written = 0; - return; + break; default: return; } @@ -1622,21 +1653,15 @@ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn) */ static void tcp_sock_consume(struct tcp_tap_conn *conn, uint32_t ack_seq) { - uint32_t to_ack; - - /* Implicitly take care of wrap-arounds */ - to_ack = ack_seq - conn->seq_ack_from_tap; - /* Simply ignore out-of-order ACKs: we already consumed the data we * needed from the buffer, and we won't rewind back to a lower ACK * sequence. */ - - if (to_ack > MAX_WINDOW) + if (SEQ_LE(ack_seq, conn->seq_ack_from_tap)) return; - if (to_ack) - recv(conn->sock, NULL, to_ack, MSG_DONTWAIT | MSG_TRUNC); + recv(conn->sock, NULL, ack_seq - conn->seq_ack_from_tap, + MSG_DONTWAIT | MSG_TRUNC); conn->seq_ack_from_tap = ack_seq; } @@ -1665,24 +1690,24 @@ static int tcp_data_from_sock(struct ctx *c, struct tcp_tap_conn *conn, already_sent = conn->seq_to_tap - conn->seq_ack_from_tap; - if (already_sent > MAX_WINDOW) { + if (SEQ_LT(already_sent, 0)) { /* RFC 761, section 2.1. */ seq_to_tap = conn->seq_to_tap = conn->seq_ack_from_tap; already_sent = 0; } - if (!conn->tap_window || already_sent >= conn->tap_window) { + if (!conn->wnd_from_tap || already_sent >= conn->wnd_from_tap) { tcp_tap_epoll_mask(c, conn, conn->events | EPOLLET); return 0; } - fill_bufs = DIV_ROUND_UP(conn->tap_window - already_sent, + fill_bufs = DIV_ROUND_UP(conn->wnd_from_tap - already_sent, conn->mss_guest); if (fill_bufs > TCP_TAP_FRAMES) { fill_bufs = TCP_TAP_FRAMES; iov_rem = 0; } else { - iov_rem = (conn->tap_window - already_sent) % conn->mss_guest; + iov_rem = (conn->wnd_from_tap - already_sent) % conn->mss_guest; } /* Adjust iovec length for recvmsg() based on what was set last time. */ @@ -1712,11 +1737,11 @@ static int tcp_data_from_sock(struct ctx *c, struct tcp_tap_conn *conn, tcp6_l2_mh_sock.msg_iovlen = fill_bufs + 1; /* Don't dequeue until acknowledged by guest. */ -recvmmsg: +recvmsg: len = recvmsg(s, v4 ? &tcp4_l2_mh_sock : &tcp6_l2_mh_sock, MSG_PEEK); if (len < 0) { if (errno == EINTR) - goto recvmmsg; + goto recvmsg; goto err; } @@ -1726,7 +1751,7 @@ recvmmsg: send = len - already_sent; if (send <= 0) { tcp_tap_epoll_mask(c, conn, conn->events | EPOLLET); - goto out_restore_iov; + goto out; } tcp_tap_epoll_mask(c, conn, conn->events & ~EPOLLET); @@ -1762,32 +1787,22 @@ recvmmsg: iov_tap[send_bufs - 1].iov_len = mss_tap - conn->mss_guest + last_len; /* Likely, some new data was acked too. */ - if (conn->seq_from_tap != conn->seq_ack_to_tap || !conn->tcpi_snd_wnd) { - if (conn->no_snd_wnd) { + if (conn->seq_from_tap != conn->seq_ack_to_tap || !conn->wnd_to_tap) { + if (conn->state != ESTABLISHED || + getsockopt(s, SOL_TCP, TCP_INFO, &info, &sl)) { conn->seq_ack_to_tap = conn->seq_from_tap; } else { - if (getsockopt(conn->sock, SOL_TCP, TCP_INFO, &info, - &sl)) - goto err; + conn->seq_ack_to_tap = info.tcpi_bytes_acked + + conn->seq_init_from_tap; - if (getsockopt(conn->sock, SOL_SOCKET, - SO_SNDBUF, &conn->sndbuf, &sl)) - conn->sndbuf = USHRT_MAX; - - info.tcpi_snd_wnd = MIN(info.tcpi_snd_wnd, - conn->sndbuf * 90 / 100); - - if (conn->state == ESTABLISHED) - conn->seq_ack_to_tap = conn->seq_from_tap; - else - conn->seq_ack_to_tap = info.tcpi_bytes_acked + - conn->seq_init_from_tap; - - conn->tcpi_acked_last = info.tcpi_bytes_acked; + if (c->tcp.kernel_snd_wnd) { + conn->wnd_to_tap = MIN(info.tcpi_snd_wnd, + conn->snd_buf); + } else { + conn->wnd_to_tap = conn->snd_buf; + } + conn->wnd_to_tap = MIN(conn->wnd_to_tap, MAX_WINDOW); } - } else { - info.tcpi_snd_wscale = conn->ws; - info.tcpi_snd_wnd = conn->tcpi_snd_wnd; } plen = conn->mss_guest; @@ -1815,29 +1830,22 @@ recvmmsg: b->th.dest = htons(conn->tap_port); b->th.seq = htonl(seq_to_tap); b->th.ack_seq = htonl(conn->seq_ack_to_tap); - - if (conn->no_snd_wnd) { - b->th.window = htons(WINDOW_DEFAULT); - } else { - b->th.window = htons(info.tcpi_snd_wnd >> - info.tcpi_snd_wscale); - conn->tcpi_snd_wnd = info.tcpi_snd_wnd; - } + b->th.window = htons(MIN(conn->wnd_to_tap >> conn->ws, + USHRT_MAX)); tcp_update_check_tcp4(b); - if (c->mode == MODE_PASTA) { - ip_len += sizeof(struct ethhdr); - write(c->fd_tap, &b->eh, ip_len); - pcap((char *)&b->eh, ip_len); - - conn->seq_to_tap += plen; + if (c->mode == MODE_PASST) { + b->vnet_len = htonl(sizeof(struct ethhdr) + + ip_len); + mh->msg_hdr.msg_iov = &tcp4_l2_iov_tap[i]; + seq_to_tap += plen; continue; } - b->vnet_len = htonl(sizeof(struct ethhdr) + ip_len); - - mh->msg_hdr.msg_iov = &tcp4_l2_iov_tap[i]; + ip_len += sizeof(struct ethhdr); + pcap((char *)&b->eh, ip_len); + ret = write(c->fd_tap, &b->eh, ip_len); } else { struct tcp6_l2_buf_t *b = &tcp6_l2_buf[i]; uint32_t flow = conn->seq_init_to_tap; @@ -1857,14 +1865,8 @@ recvmmsg: b->th.dest = htons(conn->tap_port); b->th.seq = htonl(seq_to_tap); b->th.ack_seq = htonl(conn->seq_ack_to_tap); - - if (conn->no_snd_wnd) { - b->th.window = htons(WINDOW_DEFAULT); - } else { - b->th.window = htons(info.tcpi_snd_wnd >> - info.tcpi_snd_wscale); - conn->tcpi_snd_wnd = info.tcpi_snd_wnd; - } + b->th.window = htons(MIN(conn->wnd_to_tap >> conn->ws, + USHRT_MAX)); memset(b->ip6h.flow_lbl, 0, 3); tcp_update_check_tcp6(b); @@ -1873,21 +1875,32 @@ recvmmsg: b->ip6h.flow_lbl[1] = (flow >> 8) & 0xff; b->ip6h.flow_lbl[2] = (flow >> 0) & 0xff; - if (c->mode == MODE_PASTA) { - ip_len += sizeof(struct ethhdr); - write(c->fd_tap, &b->eh, ip_len); - pcap((char *)&b->eh, ip_len); - - conn->seq_to_tap += plen; + if (c->mode == MODE_PASST) { + b->vnet_len = htonl(sizeof(struct ethhdr) + + ip_len); + mh->msg_hdr.msg_iov = &tcp6_l2_iov_tap[i]; + seq_to_tap += plen; continue; } - b->vnet_len = htonl(sizeof(struct ethhdr) + ip_len); - - mh->msg_hdr.msg_iov = &tcp6_l2_iov_tap[i]; + ip_len += sizeof(struct ethhdr); + pcap((char *)&b->eh, ip_len); + ret = write(c->fd_tap, &b->eh, ip_len); } - seq_to_tap += plen; + if (ret < ip_len) { + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return 0; + + tap_handler(c, EPOLLERR, now); + } + + i--; + continue; + } + + conn->seq_to_tap += plen; } if (c->mode == MODE_PASTA) @@ -1902,6 +1915,7 @@ sendmmsg: if (ret <= 0) goto out; + conn->tap_data_noack = *now; conn->seq_to_tap += conn->mss_guest * (ret - 1) + last_len; /* sendmmsg() indicates how many messages were sent at least partially. @@ -1925,6 +1939,8 @@ sendmmsg: *iov_base -= part_sent; } + conn->ts_ack_to_tap = *now; + pcapmm(tcp_l2_mh_tap, ret); goto out; @@ -1934,23 +1950,16 @@ err: tcp_rst(c, conn); ret = -errno; } - goto out_restore_iov; + goto out; zero_len: if (conn->state == ESTABLISHED_SOCK_FIN) { - uint8_t probe; - - if (!recv(conn->sock, &probe, 1, MSG_PEEK)) { - tcp_tap_epoll_mask(c, conn, EPOLLET); - tcp_send_to_tap(c, conn, FIN | ACK, NULL, 0); - tcp_tap_state(conn, ESTABLISHED_SOCK_FIN_SENT); - } + tcp_tap_epoll_mask(c, conn, EPOLLET); + tcp_send_to_tap(c, conn, FIN | ACK, now); + tcp_tap_state(conn, ESTABLISHED_SOCK_FIN_SENT); } out: - conn->ts_sock = *now; - -out_restore_iov: if (iov_rem) iov[fill_bufs - 1].iov_len = conn->mss_guest; if (send_bufs) @@ -1971,12 +1980,14 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, struct tap_l4_msg *msg, int count, struct timespec *now) { - int i, iov_i, ack = 0, fin = 0, retr = 0, keep = -1; + int i, iov_i, ack = 0, fin = 0, psh = 0, retr = 0, keep = -1; struct msghdr mh = { .msg_iov = tcp_tap_iov }; uint32_t max_ack_seq = conn->seq_ack_from_tap; + uint16_t max_ack_seq_wnd = conn->wnd_from_tap; uint32_t seq_from_tap = conn->seq_from_tap; - uint16_t max_ack_seq_wnd = WINDOW_DEFAULT; - ssize_t len; + int partial_send = 0; + uint16_t len; + ssize_t n; for (i = 0, iov_i = 0; i < count; i++) { uint32_t seq, seq_offset, ack_seq; @@ -2008,14 +2019,12 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, seq = ntohl(th->seq); ack_seq = ntohl(th->ack_seq); - if (!i) - max_ack_seq_wnd = ntohs(th->window); if (th->ack) { ack = 1; - if (ack_seq - conn->seq_ack_from_tap < MAX_WINDOW && - ack_seq - max_ack_seq < MAX_WINDOW) { + if (SEQ_GE(ack_seq, conn->seq_ack_from_tap) && + SEQ_GE(ack_seq, max_ack_seq)) { /* Fast re-transmit */ retr = !len && !th->fin && ack_seq == max_ack_seq && @@ -2029,6 +2038,9 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, if (th->fin) fin = 1; + if (th->psh) + psh = 1; + if (!len) continue; @@ -2046,19 +2058,22 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, * |--------| <-- len |--------| <-- len * '--------' <-- offset '-----| <- offset * ^ seq ^ seq - * (offset >= 0 i.e. < MAX_WINDOW, seq + len <= seq_from_tap) + * (offset >= 0, seq + len <= seq_from_tap) * * keep, look for another buffer, then go back, in this case: * , seq_from_tap * |--------| <-- len * '===' <-- offset * ^ seq - * (offset < 0 i.e. > MAX_WINDOW) + * (offset < 0) */ - if (seq_offset < MAX_WINDOW && seq + len <= seq_from_tap) + if (SEQ_GE(seq_offset, 0) && SEQ_LE(seq + len, seq_from_tap)) { + /* Force sending ACK, sender might have lost one */ + psh = 1; continue; + } - if (seq_offset > MAX_WINDOW) { + if (SEQ_LT(seq_offset, 0)) { if (keep == -1) keep = i; continue; @@ -2079,14 +2094,15 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, tcp_clamp_window(conn, NULL, 0, max_ack_seq_wnd, 0); if (ack) { - conn->ts_ack_tap = *now; + conn->ts_ack_from_tap = *now; + conn->tap_data_noack = ((struct timespec) { 0, 0 }); tcp_sock_consume(conn, max_ack_seq); } if (retr) { + conn->seq_ack_from_tap = max_ack_seq; conn->seq_to_tap = max_ack_seq; tcp_data_from_sock(c, conn, now); - return; } if (!iov_i) @@ -2094,30 +2110,41 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, mh.msg_iovlen = iov_i; eintr: - len = sendmsg(conn->sock, &mh, MSG_DONTWAIT | MSG_NOSIGNAL); - if (len < 0) { + n = sendmsg(conn->sock, &mh, MSG_DONTWAIT | MSG_NOSIGNAL); + if (n < 0) { + if (errno == EPIPE) { + /* Here's the wrap, said the tap. + * In my pocket, said the socket. + * Then swiftly looked away and left. + */ + conn->seq_from_tap = seq_from_tap; + tcp_send_to_tap(c, conn, FORCE_ACK, now); + } + if (errno == EINTR) goto eintr; if (errno == EAGAIN || errno == EWOULDBLOCK) { - tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0); + tcp_send_to_tap(c, conn, UPDATE_WINDOW, now); return; } tcp_rst(c, conn); return; } - if (len < (seq_from_tap - conn->seq_from_tap)) { - conn->seq_from_tap += len; - tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0); - } else { - conn->seq_from_tap += len; + if (n < (seq_from_tap - conn->seq_from_tap)) { + partial_send = 1; + tcp_send_to_tap(c, conn, UPDATE_WINDOW, now); } + conn->seq_from_tap += n; + out: if (keep != -1) { - tcp_send_to_tap(c, conn, ACK, NULL, 0); - tcp_send_to_tap(c, conn, ACK, NULL, 0); + if (conn->seq_dup_ack != conn->seq_from_tap) { + conn->seq_dup_ack = conn->seq_from_tap; + tcp_send_to_tap(c, conn, DUP_ACK, now); + } return; } @@ -2127,19 +2154,27 @@ out: tcp_tap_state(conn, CLOSE_WAIT); } - if (!fin) { - tcp_send_to_tap(c, conn, 0, NULL, 0); - return; - } + if (fin && !partial_send) { + conn->seq_from_tap++; - if (conn->state == ESTABLISHED) { - shutdown(conn->sock, SHUT_WR); - tcp_tap_state(conn, FIN_WAIT_1); - tcp_send_to_tap(c, conn, ACK, NULL, 0); - } else if (conn->state == CLOSE_WAIT) { - shutdown(conn->sock, SHUT_WR); - tcp_tap_state(conn, LAST_ACK); - tcp_send_to_tap(c, conn, ACK, NULL, 0); + if (conn->state == ESTABLISHED) { + shutdown(conn->sock, SHUT_WR); + tcp_tap_state(conn, FIN_WAIT_1); + tcp_send_to_tap(c, conn, ACK, now); + } else if (conn->state == CLOSE_WAIT) { + shutdown(conn->sock, SHUT_WR); + tcp_tap_state(conn, LAST_ACK); + tcp_send_to_tap(c, conn, ACK, now); + } + } else { + int ack_to_tap = timespec_diff_ms(now, &conn->ts_ack_to_tap); + int ack_offset = conn->seq_from_tap - conn->seq_ack_to_tap; + + if (c->mode == MODE_PASTA || + psh || SEQ_GE(ack_offset, conn->wnd_to_tap / 2) || + ack_to_tap > ACK_INTERVAL) { + tcp_send_to_tap(c, conn, psh ? FORCE_ACK : 0, now); + } } } @@ -2170,16 +2205,16 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, if (th->rst) { tcp_tap_destroy(c, conn); - return 1; + return count; } - conn->ts_tap = *now; + conn->ts_tap_act = *now; switch (conn->state) { case SOCK_SYN_SENT: if (!th->syn || !th->ack) { tcp_rst(c, conn); - return 1; + return count; } tcp_clamp_window(conn, th, len, 0, 1); @@ -2198,17 +2233,6 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, conn->mss_guest); } - ws = tcp_opt_get(th, len, OPT_WS, NULL, NULL); - if (ws > MAX_WS) { - if (tcp_send_to_tap(c, conn, RST, NULL, 0)) - return 1; - - conn->seq_to_tap = 0; - conn->ws_allowed = 0; - tcp_send_to_tap(c, conn, SYN, NULL, 0); - return 1; - } - /* info.tcpi_bytes_acked already includes one byte for SYN, but * not for incoming connections. */ @@ -2222,35 +2246,54 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, * dequeue waiting for SYN,ACK from tap -- check now. */ tcp_data_from_sock(c, conn, now); - tcp_send_to_tap(c, conn, 0, NULL, 0); + tcp_send_to_tap(c, conn, 0, now); tcp_tap_epoll_mask(c, conn, EPOLLIN | EPOLLRDHUP); break; case TAP_SYN_RCVD: if (th->fin) { + conn->seq_from_tap++; + shutdown(conn->sock, SHUT_WR); - tcp_send_to_tap(c, conn, ACK, NULL, 0); + tcp_send_to_tap(c, conn, ACK, now); tcp_tap_state(conn, FIN_WAIT_1); break; } if (!th->ack) { tcp_rst(c, conn); - return 1; + return count; } tcp_clamp_window(conn, th, len, 0, 0); tcp_tap_state(conn, ESTABLISHED); - break; + if (count == 1) + break; + + /* Falls through */ case ESTABLISHED: case ESTABLISHED_SOCK_FIN: case ESTABLISHED_SOCK_FIN_SENT: + tcp_tap_epoll_mask(c, conn, conn->events & ~EPOLLET); + tcp_data_from_tap(c, conn, msg, count, now); + return count; case CLOSE_WAIT: case FIN_WAIT_1_SOCK_FIN: case FIN_WAIT_1: + if (th->ack) { + conn->tap_data_noack = ((struct timespec) { 0, 0 }); + conn->ts_ack_from_tap = *now; + } + + tcp_sock_consume(conn, ntohl(th->ack_seq)); + if (conn->state == FIN_WAIT_1_SOCK_FIN && + conn->seq_ack_from_tap == conn->seq_to_tap) { + tcp_tap_destroy(c, conn); + return count; + } + tcp_tap_epoll_mask(c, conn, conn->events & ~EPOLLET); - tcp_data_from_tap(c, conn, msg, count, now); return count; case TAP_SYN_SENT: case LAST_ACK: @@ -2271,8 +2314,10 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, * tcp_connect_finish() - Handle completion of connect() from EPOLLOUT event * @c: Execution context * @s: File descriptor number for socket + * @now: Current timestamp */ -static void tcp_connect_finish(struct ctx *c, struct tcp_tap_conn *conn) +static void tcp_connect_finish(struct ctx *c, struct tcp_tap_conn *conn, + struct timespec *now) { socklen_t sl; int so; @@ -2283,7 +2328,7 @@ static void tcp_connect_finish(struct ctx *c, struct tcp_tap_conn *conn) return; } - if (tcp_send_to_tap(c, conn, SYN | ACK, NULL, 0)) + if (tcp_send_to_tap(c, conn, SYN | ACK, now)) return; /* Drop EPOLLOUT, only used to wait for connect() to complete */ @@ -2308,28 +2353,31 @@ static void tcp_splice_connect_finish(struct ctx *c, .tcp = { .splice = 1, .v6 = v6, .index = conn - ts } }; struct epoll_event ev_from, ev_to; + int i; - if (conn->state == SPLICE_CONNECT) { - socklen_t sl; - int so; + conn->pipe_from_to[0] = conn->pipe_to_from[0] = -1; + conn->pipe_from_to[1] = conn->pipe_to_from[1] = -1; + for (i = 0; i < TCP_SPLICE_PIPE_POOL_SIZE; i++) { + if (splice_pipe_pool[i][0][0] > 0) { + SWAP(conn->pipe_from_to[0], splice_pipe_pool[i][0][0]); + SWAP(conn->pipe_from_to[1], splice_pipe_pool[i][0][1]); - sl = sizeof(so); - if (getsockopt(conn->to, SOL_SOCKET, SO_ERROR, &so, &sl) || - so) { - tcp_splice_destroy(c, conn); - return; + SWAP(conn->pipe_to_from[0], splice_pipe_pool[i][1][0]); + SWAP(conn->pipe_to_from[1], splice_pipe_pool[i][1][1]); + break; } } - conn->pipe_from_to[0] = conn->pipe_to_from[0] = -1; - if (pipe2(conn->pipe_to_from, O_NONBLOCK) || - pipe2(conn->pipe_from_to, O_NONBLOCK)) { - tcp_splice_destroy(c, conn); - return; - } + if (conn->pipe_from_to[0] <= 0) { + if (pipe2(conn->pipe_to_from, O_NONBLOCK) || + pipe2(conn->pipe_from_to, O_NONBLOCK)) { + tcp_splice_destroy(c, conn); + return; + } - fcntl(conn->pipe_from_to[0], F_SETPIPE_SZ, PIPE_SIZE); - fcntl(conn->pipe_to_from[0], F_SETPIPE_SZ, PIPE_SIZE); + fcntl(conn->pipe_from_to[0], F_SETPIPE_SZ, c->tcp.pipe_size); + fcntl(conn->pipe_to_from[0], F_SETPIPE_SZ, c->tcp.pipe_size); + } if (conn->state == SPLICE_CONNECT) { tcp_splice_state(conn, SPLICE_ESTABLISHED); @@ -2338,7 +2386,7 @@ static void tcp_splice_connect_finish(struct ctx *c, ev_from.data.u64 = ref_from.u64; ev_to.data.u64 = ref_to.u64; - epoll_ctl(c->epollfd, EPOLL_CTL_MOD, conn->from, &ev_from); + epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->from, &ev_from); epoll_ctl(c->epollfd, EPOLL_CTL_MOD, conn->to, &ev_to); } } @@ -2353,20 +2401,19 @@ static void tcp_splice_connect_finish(struct ctx *c, * Return: 0 for connect() succeeded or in progress, negative value on error */ static int tcp_splice_connect(struct ctx *c, struct tcp_splice_conn *conn, - int v6, in_port_t port) + int s, int v6, in_port_t port) { - int sock_conn = socket(v6 ? AF_INET6 : AF_INET, - SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); + int sock_conn = (s > 0) ? s : socket(v6 ? AF_INET6 : AF_INET, + SOCK_STREAM | SOCK_NONBLOCK, + IPPROTO_TCP); union epoll_ref ref_accept = { .proto = IPPROTO_TCP, .s = conn->from, .tcp = { .splice = 1, .v6 = v6, .index = conn - ts } }; union epoll_ref ref_conn = { .proto = IPPROTO_TCP, .s = sock_conn, .tcp = { .splice = 1, .v6 = v6, .index = conn - ts } }; - struct epoll_event ev_accept = { .events = EPOLLET, - .data.u64 = ref_accept.u64 }; - struct epoll_event ev_conn = { .events = EPOLLET, - .data.u64 = ref_conn.u64 }; + struct epoll_event ev_accept = { .data.u64 = ref_accept.u64 }; + struct epoll_event ev_conn = { .data.u64 = ref_conn.u64 }; struct sockaddr_in6 addr6 = { .sin6_family = AF_INET6, .sin6_port = htons(port), @@ -2381,11 +2428,11 @@ static int tcp_splice_connect(struct ctx *c, struct tcp_splice_conn *conn, socklen_t sl; int ret; - if (sock_conn < 0) - return -errno; - conn->to = sock_conn; + if (s <= 0) + tcp_sock_set_bufsize(sock_conn); + if (v6) { sa = (struct sockaddr *)&addr6; sl = sizeof(addr6); @@ -2402,16 +2449,17 @@ static int tcp_splice_connect(struct ctx *c, struct tcp_splice_conn *conn, } tcp_splice_state(conn, SPLICE_CONNECT); - ev_conn.events |= EPOLLOUT; + ev_conn.events = EPOLLOUT; } else { tcp_splice_state(conn, SPLICE_ESTABLISHED); tcp_splice_connect_finish(c, conn, v6); - ev_conn.events |= EPOLLIN; - ev_accept.events |= EPOLLIN; + ev_accept.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; + ev_conn.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; + + epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->from, &ev_accept); } - epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->from, &ev_accept); epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->to, &ev_conn); return 0; @@ -2445,7 +2493,7 @@ static int tcp_splice_connect_ns(void *arg) a = (struct tcp_splice_connect_ns_arg *)arg; ns_enter(a->c->pasta_pid); - a->ret = tcp_splice_connect(a->c, a->conn, a->v6, a->port); + a->ret = tcp_splice_connect(a->c, a->conn, -1, a->v6, a->port); return 0; } @@ -2462,13 +2510,26 @@ static int tcp_splice_new(struct ctx *c, struct tcp_splice_conn *conn, int v6, in_port_t port) { struct tcp_splice_connect_ns_arg ns_arg = { c, conn, v6, port, 0 }; + int *sock_pool_p, i, s = -1; - if (bitmap_isset(c->tcp.port_to_tap, port)) { + if (bitmap_isset(c->tcp.port_to_tap, port)) + sock_pool_p = v6 ? ns_sock_pool6 : ns_sock_pool4; + else + sock_pool_p = v6 ? init_sock_pool6 : init_sock_pool4; + + for (i = 0; i < TCP_SOCK_POOL_SIZE; i++, sock_pool_p++) { + if ((s = *sock_pool_p) > 0) { + *sock_pool_p = -1; + break; + } + } + + if (s <= 0 && bitmap_isset(c->tcp.port_to_tap, port)) { NS_CALL(tcp_splice_connect_ns, &ns_arg); return ns_arg.ret; } - return tcp_splice_connect(c, conn, v6, port); + return tcp_splice_connect(c, conn, s, v6, port); } /** @@ -2500,10 +2561,6 @@ static void tcp_conn_from_sock(struct ctx *c, union epoll_ref ref, ref_conn.tcp.index = conn - tt; ref_conn.s = conn->sock = s; - sl = sizeof(conn->sndbuf); - if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->sndbuf, &sl)) - conn->sndbuf = USHRT_MAX; - if (ref.tcp.v6) { struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&sa; @@ -2551,19 +2608,24 @@ static void tcp_conn_from_sock(struct ctx *c, union epoll_ref ref, conn->seq_ack_from_tap = conn->seq_to_tap + 1; - conn->tap_window = WINDOW_DEFAULT; - conn->ws_allowed = 1; + conn->wnd_from_tap = WINDOW_DEFAULT; - conn->ts_sock = conn->ts_tap = conn->ts_ack_tap = *now; + conn->ts_sock_act = conn->ts_tap_act = *now; + conn->ts_ack_from_tap = conn->ts_ack_to_tap = *now; - bitmap_set(tcp_act, conn - tt); + tcp_send_to_tap(c, conn, SYN, now); conn->events = ev.events = EPOLLRDHUP; ev.data.u64 = ref_conn.u64; epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->sock, &ev); tcp_tap_state(conn, SOCK_SYN_SENT); - tcp_send_to_tap(c, conn, SYN, NULL, 0); + + sl = sizeof(conn->snd_buf); + if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->snd_buf, &sl)) + conn->snd_buf = WINDOW_DEFAULT; + else + conn->snd_buf /= 2; } /** @@ -2576,11 +2638,13 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref, uint32_t events) { int move_from, move_to, *pipes, eof, never_read; + uint8_t *rcvlowat_set, *rcvlowat_act; + uint64_t *seq_read, *seq_write; struct tcp_splice_conn *conn; struct epoll_event ev; if (ref.tcp.listen) { - int s; + int s, one = 1; if (c->tcp.splice_conn_count >= MAX_SPLICE_CONNS) return; @@ -2588,6 +2652,8 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref, if ((s = accept4(ref.s, NULL, NULL, SOCK_NONBLOCK)) < 0) return; + setsockopt(s, SOL_TCP, TCP_QUICKACK, &one, sizeof(one)); + conn = &ts[c->tcp.splice_conn_count++]; conn->from = s; tcp_splice_state(conn, SPLICE_ACCEPTED); @@ -2614,8 +2680,7 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref, if (conn->state == SPLICE_CONNECT) tcp_splice_connect_finish(c, conn, ref.tcp.v6); - - if (conn->state == SPLICE_ESTABLISHED) + else if (conn->state == SPLICE_ESTABLISHED) epoll_ctl(c->epollfd, EPOLL_CTL_MOD, ref.s, &ev); move_to = ref.s; @@ -2655,12 +2720,25 @@ swap: eof = 0; never_read = 1; + if (move_from == conn->from) { + seq_read = &conn->from_read; + seq_write = &conn->from_written; + rcvlowat_set = splice_rcvlowat_set[0]; + rcvlowat_act = splice_rcvlowat_act[0]; + } else { + seq_read = &conn->to_read; + seq_write = &conn->to_written; + rcvlowat_set = splice_rcvlowat_set[1]; + rcvlowat_act = splice_rcvlowat_act[1]; + } + + while (1) { - int retry_write = 1; + int retry_write = 0, more = 0; ssize_t read, to_write = 0, written; retry: - read = splice(move_from, NULL, pipes[1], NULL, PIPE_SIZE, + read = splice(move_from, NULL, pipes[1], NULL, c->tcp.pipe_size, SPLICE_F_MOVE); if (read < 0) { if (errno == EINTR) @@ -2669,35 +2747,46 @@ retry: if (errno != EAGAIN) goto close; - to_write = PIPE_SIZE; + to_write = c->tcp.pipe_size; } else if (!read) { eof = 1; - to_write = PIPE_SIZE; + to_write = c->tcp.pipe_size; } else { never_read = 0; to_write += read; - if (move_from == conn->from) - conn->from_read += read; - else - conn->to_read += read; + if (read >= (long)c->tcp.pipe_size * 90 / 100) + more = SPLICE_F_MORE; + + if (bitmap_isset(rcvlowat_set, conn - ts)) + bitmap_set(rcvlowat_act, conn - ts); } eintr: written = splice(pipes[0], NULL, move_to, NULL, to_write, - SPLICE_F_MOVE); + SPLICE_F_MOVE | more); - if (written > 0) { - if (move_from == conn->from) { - conn->from_written += written; - if (conn->from_read == conn->from_written) - break; - } else { - conn->to_written += written; - if (conn->to_read == conn->to_written) - break; + /* Most common case: skip updating counters. */ + if (read > 0 && read == written) { + if (read >= (long)c->tcp.pipe_size * 10 / 100) + continue; + + if (!bitmap_isset(rcvlowat_set, conn - ts) && + read > (long)c->tcp.pipe_size / 10) { + int lowat = c->tcp.pipe_size / 4; + + setsockopt(move_from, SOL_SOCKET, SO_RCVLOWAT, + &lowat, sizeof(lowat)); + + bitmap_set(rcvlowat_set, conn - ts); + bitmap_set(rcvlowat_act, conn - ts); } + + break; } + *seq_read += read > 0 ? read : 0; + *seq_write += written > 0 ? written : 0; + if (written < 0) { if (errno == EINTR) goto eintr; @@ -2716,9 +2805,9 @@ eintr: ev.data.u64 = ref.u64, epoll_ctl(c->epollfd, EPOLL_CTL_MOD, move_to, &ev); break; - } else if (never_read && written == PIPE_SIZE) { + } else if (never_read && written == (long)(c->tcp.pipe_size)) { goto retry; - } else if (!never_read &&written < to_write) { + } else if (!never_read && written < to_write) { to_write -= written; goto retry; } @@ -2727,11 +2816,10 @@ eintr: break; } - if (conn->state == SPLICE_FIN_BOTH || - (conn->state == SPLICE_FIN_FROM && move_from == conn->from) || - (conn->state == SPLICE_FIN_TO && move_from == conn->to)) { + if (*seq_read == *seq_write) { if (move_from == conn->from && - conn->from_read == conn->from_written) { + (conn->state == SPLICE_FIN_FROM || + conn->state == SPLICE_FIN_BOTH)) { if (!conn->from_fin_sent) { shutdown(conn->to, SHUT_WR); conn->from_fin_sent = 1; @@ -2746,7 +2834,8 @@ eintr: if (conn->to_fin_sent) goto close; } else if (move_from == conn->to && - conn->to_read == conn->to_written) { + (conn->state == SPLICE_FIN_TO || + conn->state == SPLICE_FIN_BOTH)) { if (!conn->to_fin_sent) { shutdown(conn->from, SHUT_WR); conn->to_fin_sent = 1; @@ -2778,7 +2867,9 @@ eintr: return; close: - tcp_splice_destroy(c, conn); + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->from, NULL); + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->to, NULL); + conn->state = CLOSED; return; } @@ -2806,39 +2897,42 @@ void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events, conn = &tt[ref.tcp.index]; + conn->ts_sock_act = *now; + if (events & EPOLLERR) { if (conn->state != CLOSED) tcp_rst(c, conn); + return; } switch (conn->state) { case TAP_SYN_SENT: if (events & EPOLLOUT) - tcp_connect_finish(c, conn); + tcp_connect_finish(c, conn, now); else tcp_rst(c, conn); return; case ESTABLISHED_SOCK_FIN: case ESTABLISHED_SOCK_FIN_SENT: case ESTABLISHED: - tcp_data_from_sock(c, conn, now); if (events & EPOLLRDHUP) { if (conn->state == ESTABLISHED) tcp_tap_state(conn, ESTABLISHED_SOCK_FIN); - tcp_data_from_sock(c, conn, now); } + tcp_data_from_sock(c, conn, now); return; case LAST_ACK: - tcp_send_to_tap(c, conn, 0, NULL, 0); - if (conn->seq_ack_to_tap == conn->seq_from_tap + 1) + tcp_send_to_tap(c, conn, 0, now); + if (conn->seq_ack_to_tap == conn->seq_from_tap + 1 || + conn->seq_ack_to_tap == conn->seq_from_tap) tcp_tap_destroy(c, conn); return; case FIN_WAIT_1: if (events & EPOLLIN) tcp_data_from_sock(c, conn, now); if (events & EPOLLRDHUP) { - tcp_send_to_tap(c, conn, FIN | ACK, NULL, 0); + tcp_send_to_tap(c, conn, FIN | ACK, now); tcp_tap_state(conn, FIN_WAIT_1_SOCK_FIN); } return; @@ -2847,11 +2941,13 @@ void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events, if (events & EPOLLIN) tcp_data_from_sock(c, conn, now); if (events & EPOLLHUP) { - if ((conn->seq_ack_to_tap == conn->seq_from_tap + 1) && - (conn->seq_ack_from_tap == conn->seq_to_tap)) { + if ((conn->seq_ack_to_tap == conn->seq_from_tap + 1 || + conn->seq_ack_to_tap == conn->seq_from_tap) && + (conn->seq_ack_from_tap == conn->seq_to_tap - 1 || + conn->seq_ack_from_tap == conn->seq_to_tap)) { tcp_tap_destroy(c, conn); } else { - tcp_send_to_tap(c, conn, 0, NULL, 0); + tcp_send_to_tap(c, conn, ACK, now); } } return; @@ -2868,6 +2964,43 @@ void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events, } } +/** + * tcp_set_pipe_size() - Set usable pipe size, probe starting from MAX_PIPE_SIZE + * @c: Execution context + */ +static void tcp_set_pipe_size(struct ctx *c) +{ + int probe_pipe[TCP_SPLICE_PIPE_POOL_SIZE * 2][2], i, j; + + c->tcp.pipe_size = MAX_PIPE_SIZE; + +smaller: + for (i = 0; i < TCP_SPLICE_PIPE_POOL_SIZE * 2; i++) { + if (pipe(probe_pipe[i])) { + i++; + break; + } + + if (fcntl(probe_pipe[i][0], F_SETPIPE_SZ, c->tcp.pipe_size) < 0) + break; + } + + for (j = i - 1; j >= 0; j--) { + close(probe_pipe[j][0]); + close(probe_pipe[j][1]); + } + + if (i == TCP_SPLICE_PIPE_POOL_SIZE * 2) + return; + + if (!(c->tcp.pipe_size /= 2)) { + c->tcp.pipe_size = MAX_PIPE_SIZE; + return; + } + + goto smaller; +} + /** * tcp_sock_init_ns() - Bind sockets in namespace for inbound connections * @arg: Execution context @@ -2879,6 +3012,7 @@ static int tcp_sock_init_ns(void *arg) union tcp_epoll_ref tref = { .listen = 1, .splice = 1 }; struct ctx *c = (struct ctx *)arg; in_port_t port; + int s; ns_enter(c->pasta_pid); @@ -2890,30 +3024,113 @@ static int tcp_sock_init_ns(void *arg) if (c->v4) { tref.v6 = 0; - sock_l4(c, AF_INET, IPPROTO_TCP, port, BIND_LOOPBACK, - tref.u32); + s = sock_l4(c, AF_INET, IPPROTO_TCP, port, + BIND_LOOPBACK, tref.u32); + tcp_sock_set_bufsize(s); } if (c->v6) { tref.v6 = 1; - sock_l4(c, AF_INET6, IPPROTO_TCP, port, BIND_LOOPBACK, - tref.u32); + s = sock_l4(c, AF_INET6, IPPROTO_TCP, port, + BIND_LOOPBACK, tref.u32); + tcp_sock_set_bufsize(s); } } return 0; } +/** + * tcp_splice_pipe_refill() - Refill pool of pre-opened pipes + * @c: Execution context + */ +static void tcp_splice_pipe_refill(struct ctx *c) +{ + int i; + + for (i = 0; i < TCP_SPLICE_PIPE_POOL_SIZE; i++) { + if (splice_pipe_pool[i][0][0] > 0) + break; + if (pipe2(splice_pipe_pool[i][0], O_NONBLOCK)) + continue; + if (pipe2(splice_pipe_pool[i][1], O_NONBLOCK)) { + close(splice_pipe_pool[i][1][0]); + close(splice_pipe_pool[i][1][1]); + continue; + } + + fcntl(splice_pipe_pool[i][0][0], F_SETPIPE_SZ, + c->tcp.pipe_size); + fcntl(splice_pipe_pool[i][1][0], F_SETPIPE_SZ, + c->tcp.pipe_size); + } +} + +/** + * struct tcp_sock_refill_arg - Arguments for tcp_sock_refill() + * @c: Execution context + * @ns: Set to refill pool of sockets created in namespace + */ +struct tcp_sock_refill_arg { + struct ctx *c; + int ns; +}; + +/** + * tcp_sock_refill() - Refill pool of pre-opened sockets + * @arg: See @tcp_sock_refill_arg + * + * Return: 0 + */ +static int tcp_sock_refill(void *arg) +{ + struct tcp_sock_refill_arg *a = (struct tcp_sock_refill_arg *)arg; + int i, *p4, *p6, one = 1; + + if (a->ns) { + if (ns_enter(a->c->pasta_pid)) + return 0; + p4 = ns_sock_pool4; + p6 = ns_sock_pool6; + } else { + p4 = init_sock_pool4; + p6 = init_sock_pool6; + } + + for (i = 0; a->c->v4 && i < TCP_SOCK_POOL_SIZE; i++, p4++) { + if (*p4 > 0) { + break; + } + *p4 = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); + setsockopt(*p4, SOL_TCP, TCP_QUICKACK, &one, sizeof(one)); + tcp_sock_set_bufsize(*p4); + } + + for (i = 0; a->c->v6 && i < TCP_SOCK_POOL_SIZE; i++, p6++) { + if (*p6 > 0) { + break; + } + *p6 = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, + IPPROTO_TCP); + setsockopt(*p6, SOL_TCP, TCP_QUICKACK, &one, sizeof(one)); + tcp_sock_set_bufsize(*p6); + } + + return 0; +} + /** * tcp_sock_init() - Bind sockets for inbound connections, get key for sequence * @c: Execution context * * Return: 0 on success, -1 on failure */ -int tcp_sock_init(struct ctx *c) +int tcp_sock_init(struct ctx *c, struct timespec *now) { + struct tcp_sock_refill_arg refill_arg = { c, 0 }; union tcp_epoll_ref tref = { .listen = 1 }; in_port_t port; + int s; getrandom(&c->tcp.hash_secret, sizeof(c->tcp.hash_secret), GRND_RANDOM); @@ -2926,14 +3143,16 @@ int tcp_sock_init(struct ctx *c) tref.v6 = 0; tref.splice = 0; - sock_l4(c, AF_INET, IPPROTO_TCP, port, - c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY, - tref.u32); + s = sock_l4(c, AF_INET, IPPROTO_TCP, port, + c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY, + tref.u32); + tcp_sock_set_bufsize(s); if (c->mode == MODE_PASTA) { tref.splice = 1; - sock_l4(c, AF_INET, IPPROTO_TCP, port, - BIND_LOOPBACK, tref.u32); + s = sock_l4(c, AF_INET, IPPROTO_TCP, port, + BIND_LOOPBACK, tref.u32); + tcp_sock_set_bufsize(s); } } @@ -2941,14 +3160,16 @@ int tcp_sock_init(struct ctx *c) tref.v6 = 1; tref.splice = 0; - sock_l4(c, AF_INET6, IPPROTO_TCP, port, - c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY, - tref.u32); + s = sock_l4(c, AF_INET6, IPPROTO_TCP, port, + c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY, + tref.u32); + tcp_sock_set_bufsize(s); if (c->mode == MODE_PASTA) { tref.splice = 1; - sock_l4(c, AF_INET6, IPPROTO_TCP, port, - BIND_LOOPBACK, tref.u32); + s = sock_l4(c, AF_INET6, IPPROTO_TCP, port, + BIND_LOOPBACK, tref.u32); + tcp_sock_set_bufsize(s); } } } @@ -2959,9 +3180,18 @@ int tcp_sock_init(struct ctx *c) if (c->v6) tcp_sock6_iov_init(); - if (c->mode == MODE_PASTA) + c->tcp.refill_ts = *now; + tcp_sock_refill(&refill_arg); + + if (c->mode == MODE_PASTA) { + tcp_set_pipe_size(c); NS_CALL(tcp_sock_init_ns, c); + refill_arg.ns = 1; + NS_CALL(tcp_sock_refill, &refill_arg); + tcp_splice_pipe_refill(c); + } + return 0; } @@ -2974,72 +3204,72 @@ int tcp_sock_init(struct ctx *c) static void tcp_timer_one(struct ctx *c, struct tcp_tap_conn *conn, struct timespec *ts) { - int ack_tap_ms = timespec_diff_ms(ts, &conn->ts_ack_tap); - int sock_ms = timespec_diff_ms(ts, &conn->ts_sock); - int tap_ms = timespec_diff_ms(ts, &conn->ts_tap); + int ack_from_tap = timespec_diff_ms(ts, &conn->ts_ack_from_tap); + int ack_to_tap = timespec_diff_ms(ts, &conn->ts_ack_to_tap); + int sock_act = timespec_diff_ms(ts, &conn->ts_sock_act); + int tap_act = timespec_diff_ms(ts, &conn->ts_tap_act); + int tap_data_noack; + + if (memcmp(&conn->tap_data_noack, &((struct timespec){ 0, 0 }), + sizeof(struct timespec))) + tap_data_noack = 0; + else + tap_data_noack = timespec_diff_ms(ts, &conn->tap_data_noack); switch (conn->state) { + case CLOSED: + tcp_hash_remove(conn); + tcp_table_tap_compact(c, conn); + break; case SOCK_SYN_SENT: case TAP_SYN_RCVD: - if (ack_tap_ms > SYN_TIMEOUT) + if (ack_from_tap > SYN_TIMEOUT) tcp_rst(c, conn); break; case ESTABLISHED_SOCK_FIN_SENT: - if (ack_tap_ms > FIN_TIMEOUT) { + if (tap_data_noack > FIN_TIMEOUT) { tcp_rst(c, conn); break; } /* Falls through */ case ESTABLISHED: case ESTABLISHED_SOCK_FIN: - if (tap_ms > ACT_TIMEOUT && sock_ms > ACT_TIMEOUT) { + if (tap_act > ACT_TIMEOUT && sock_act > ACT_TIMEOUT) { tcp_rst(c, conn); break; } - if (conn->seq_to_tap == conn->seq_ack_from_tap && - conn->seq_from_tap == conn->seq_ack_to_tap) { - conn->ts_sock = *ts; - break; - } + if (!conn->wnd_to_tap) + tcp_send_to_tap(c, conn, UPDATE_WINDOW, ts); + else if (ack_to_tap > ACK_INTERVAL) + tcp_send_to_tap(c, conn, 0, ts); - if (sock_ms > ACK_INTERVAL) { - if (conn->seq_from_tap > conn->seq_ack_to_tap) - tcp_send_to_tap(c, conn, 0, NULL, 0); - } - - if (sock_ms - ack_tap_ms > ACK_TIMEOUT) { + if (tap_data_noack > ACK_TIMEOUT) { if (conn->seq_ack_from_tap < conn->seq_to_tap) { - if (sock_ms - ack_tap_ms > 10 * ACK_TIMEOUT) { + if (tap_data_noack > LAST_ACK_TIMEOUT) { tcp_rst(c, conn); break; } conn->seq_to_tap = conn->seq_ack_from_tap; - if (sock_ms > ACK_TIMEOUT) - tcp_data_from_sock(c, conn, ts); + tcp_data_from_sock(c, conn, ts); } } - - if (conn->seq_from_tap == conn->seq_ack_to_tap) - conn->ts_sock = *ts; - - if (!conn->tcpi_snd_wnd) - tcp_send_to_tap(c, conn, 0, NULL, 0); - break; case CLOSE_WAIT: case FIN_WAIT_1_SOCK_FIN: - if (sock_ms - ack_tap_ms > FIN_TIMEOUT) + if (tap_data_noack > FIN_TIMEOUT) tcp_rst(c, conn); break; case FIN_WAIT_1: - if (sock_ms > FIN_TIMEOUT) + if (sock_act > FIN_TIMEOUT) tcp_rst(c, conn); break; case LAST_ACK: - if (sock_ms > LAST_ACK_TIMEOUT) + if (sock_act > LAST_ACK_TIMEOUT) + tcp_rst(c, conn); + else if (tap_act > LAST_ACK_TIMEOUT) tcp_rst(c, conn); break; case TAP_SYN_SENT: @@ -3049,7 +3279,6 @@ static void tcp_timer_one(struct ctx *c, struct tcp_tap_conn *conn, case SPLICE_FIN_FROM: case SPLICE_FIN_TO: case SPLICE_FIN_BOTH: - case CLOSED: break; } } @@ -3059,19 +3288,53 @@ static void tcp_timer_one(struct ctx *c, struct tcp_tap_conn *conn, * @c: Execution context * @ts: Timestamp from caller */ -void tcp_timer(struct ctx *c, struct timespec *ts) +void tcp_timer(struct ctx *c, struct timespec *now) { - long *word = (long *)tcp_act, tmp; - unsigned int i; - int n; + struct tcp_sock_refill_arg refill_arg = { c, 0 }; + int i; - for (i = 0; i < sizeof(tcp_act) / sizeof(long); i++, word++) { - tmp = *word; - while ((n = ffsl(tmp))) { - int index = i * sizeof(long) * 8 + n - 1; + if (timespec_diff_ms(now, &c->tcp.refill_ts) > REFILL_INTERVAL) { + tcp_sock_refill(&refill_arg); + if (c->mode == MODE_PASTA) { + refill_arg.ns = 1; + if ((c->v4 && ns_sock_pool4[TCP_SOCK_POOL_TSH] <= 0) || + (c->v6 && ns_sock_pool6[TCP_SOCK_POOL_TSH] <= 0)) + NS_CALL(tcp_sock_refill, &refill_arg); - tmp &= ~(1UL << (n - 1)); - tcp_timer_one(c, &tt[index], ts); + tcp_splice_pipe_refill(c); + } + } + + for (i = c->tcp.tap_conn_count - 1; i >= 0; i--) + tcp_timer_one(c, tt + i, now); + + if (c->mode == MODE_PASTA) { + for (i = c->tcp.splice_conn_count - 1; i >= 0; i--) { + if ((ts + i)->state == CLOSED) { + tcp_splice_destroy(c, ts + i); + continue; + } + + if (bitmap_isset(splice_rcvlowat_set[0], i) && + !bitmap_isset(splice_rcvlowat_act[0], i)) { + int lowat = 1; + + setsockopt((ts + i)->from, SOL_SOCKET, + SO_RCVLOWAT, &lowat, sizeof(lowat)); + bitmap_clear(splice_rcvlowat_set[0], i); + } + + if (bitmap_isset(splice_rcvlowat_set[1], i) && + !bitmap_isset(splice_rcvlowat_act[1], i)) { + int lowat = 1; + + setsockopt((ts + i)->to, SOL_SOCKET, + SO_RCVLOWAT, &lowat, sizeof(lowat)); + bitmap_clear(splice_rcvlowat_set[1], i); + } + + bitmap_clear(splice_rcvlowat_act[0], i); + bitmap_clear(splice_rcvlowat_act[1], i); } } } diff --git a/tcp.h b/tcp.h index 359414c..ae983ed 100644 --- a/tcp.h +++ b/tcp.h @@ -11,8 +11,8 @@ struct ctx; void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events, struct timespec *now); int tcp_tap_handler(struct ctx *c, int af, void *addr, - struct tap_msg *msg, int count, struct timespec *now); -int tcp_sock_init(struct ctx *c); + struct tap_l4_msg *msg, int count, struct timespec *now); +int tcp_sock_init(struct ctx *c, struct timespec *now); void tcp_timer(struct ctx *c, struct timespec *ts); void tcp_update_l2_buf(unsigned char *eth_d, unsigned char *eth_s, uint32_t *ip_da); @@ -45,6 +45,9 @@ union tcp_epoll_ref { * @port_to_tap: Ports bound host-side, packets to tap or spliced * @port_to_init: Ports bound namespace-side, spliced to init * @timer_run: Timestamp of most recent timer run + * @kernel_snd_wnd: Kernel reports sending window (with commit 8f7baad7f035) + * @pipe_size: Size of pipes for spliced connections + * @refill_ts: Time of last refill operation for pools of sockets/pipes */ struct tcp_ctx { uint64_t hash_secret[2]; @@ -53,6 +56,9 @@ struct tcp_ctx { uint8_t port_to_tap [USHRT_MAX / 8]; uint8_t port_to_init [USHRT_MAX / 8]; struct timespec timer_run; + int kernel_snd_wnd; + size_t pipe_size; + struct timespec refill_ts; }; #endif /* TCP_H */