diff --git a/flow.c b/flow.c index d6650fc..5e94a7a 100644 --- a/flow.c +++ b/flow.c @@ -26,7 +26,59 @@ static_assert(ARRAY_SIZE(flow_type_str) == FLOW_NUM_TYPES, "flow_type_str[] doesn't match enum flow_type"); /* Global Flow Table */ -unsigned flow_count; + +/** + * DOC: Theory of Operation - allocating and freeing flow entries + * + * Flows are entries in flowtab[]. We need to routinely scan the whole table to + * perform deferred bookkeeping tasks on active entries, and sparse empty slots + * waste time and worsen data locality. But, keeping the table fully compact by + * moving entries on deletion is fiddly: it requires updating hash tables, and + * the epoll references to flows. Instead, we implement the compromise described + * below. + * + * Free clusters + * A "free cluster" is a contiguous set of unused (FLOW_TYPE_NONE) entries in + * flowtab[]. The first entry in each cluster contains metadata ('free' + * field in union flow), specifically the number of entries in the cluster + * (free.n), and the index of the next free cluster (free.next). The entries + * in the cluster other than the first should have n == next == 0. + * + * Free cluster list + * flow_first_free gives the index of the first (lowest index) free cluster. + * Each free cluster has the index of the next free cluster, or MAX_FLOW if + * it is the last free cluster. Together these form a linked list of free + * clusters, in strictly increasing order of index. + * + * Allocating + * We always allocate a new flow into the lowest available index, i.e. the + * first entry of the first free cluster, that is, at index flow_first_free. + * We update flow_first_free and the free cluster to maintain the invariants + * above (so the free cluster list is still in strictly increasing order). + * + * Freeing + * It's not possible to maintain the invariants above if we allow freeing of + * any entry at any time. So we only allow freeing in two cases. + * + * 1) flow_alloc_cancel() will free the most recent allocation. We can + * maintain the invariants because we know that allocation was made in the + * lowest available slot, and so will become the lowest index free slot again + * after cancellation. + * + * 2) Flows can be freed by returning true from the flow type specific + * deferred or timer function. These are called from flow_defer_handler() + * which is already scanning the whole table in index order. We can use that + * to rebuild the free cluster list correctly, either merging them into + * existing free clusters or creating new free clusters in the list for them. + * + * Scanning the table + * Theoretically, scanning the table requires FLOW_MAX iterations. However, + * when we encounter the start of a free cluster, we can immediately skip + * past it, meaning that in practice we only need (number of active + * connections) + (number of free clusters) iterations. + */ + +unsigned flow_first_free; union flow flowtab[FLOW_MAX]; /* Last time the flow timers ran */ @@ -57,10 +109,35 @@ void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...) */ union flow *flow_alloc(void) { - if (flow_count >= FLOW_MAX) + union flow *flow = &flowtab[flow_first_free]; + + if (flow_first_free >= FLOW_MAX) return NULL; - return &flowtab[flow_count++]; + ASSERT(flow->f.type == FLOW_TYPE_NONE); + ASSERT(flow->free.n >= 1); + ASSERT(flow_first_free + flow->free.n <= FLOW_MAX); + + if (flow->free.n > 1) { + union flow *next; + + /* Use one entry from the cluster */ + ASSERT(flow_first_free <= FLOW_MAX - 2); + next = &flowtab[++flow_first_free]; + + ASSERT(FLOW_IDX(next) < FLOW_MAX); + ASSERT(next->f.type == FLOW_TYPE_NONE); + ASSERT(next->free.n == 0); + + next->free.n = flow->free.n - 1; + next->free.next = flow->free.next; + } else { + /* Use the entire cluster */ + flow_first_free = flow->free.next; + } + + memset(flow, 0, sizeof(*flow)); + return flow; } /** @@ -71,48 +148,15 @@ union flow *flow_alloc(void) */ void flow_alloc_cancel(union flow *flow) { - ASSERT(FLOW_IDX(flow) == flow_count - 1); - memset(flow, 0, sizeof(*flow)); - flow_count--; -} + ASSERT(flow_first_free > FLOW_IDX(flow)); -/** - * flow_table_compact() - Perform compaction on flow table - * @c: Execution context - * @hole: Pointer to recently closed flow - */ -static void flow_table_compact(const struct ctx *c, union flow *hole) -{ - union flow *from; - - if (FLOW_IDX(hole) == --flow_count) { - debug("flow: table compaction: maximum index was %u (%p)", - FLOW_IDX(hole), (void *)hole); - memset(hole, 0, sizeof(*hole)); - return; - } - - from = flowtab + flow_count; - memcpy(hole, from, sizeof(*hole)); - - switch (from->f.type) { - case FLOW_TCP: - tcp_tap_conn_update(c, &from->tcp, &hole->tcp); - break; - case FLOW_TCP_SPLICE: - tcp_splice_conn_update(c, &hole->tcp_splice); - break; - default: - die("Unexpected %s in tcp_table_compact()", - FLOW_TYPE(&from->f)); - } - - debug("flow: table compaction (%s): old index %u, new index %u, " - "from: %p, to: %p", - FLOW_TYPE(&from->f), FLOW_IDX(from), FLOW_IDX(hole), - (void *)from, (void *)hole); - - memset(from, 0, sizeof(*from)); + flow->f.type = FLOW_TYPE_NONE; + /* Put it back in a length 1 free cluster, don't attempt to fully + * reverse flow_alloc()s steps. This will get folded together the next + * time flow_defer_handler runs anyway() */ + flow->free.n = 1; + flow->free.next = flow_first_free; + flow_first_free = FLOW_IDX(flow); } /** @@ -122,18 +166,46 @@ static void flow_table_compact(const struct ctx *c, union flow *hole) */ void flow_defer_handler(const struct ctx *c, const struct timespec *now) { + struct flow_free_cluster *free_head = NULL; + unsigned *last_next = &flow_first_free; bool timer = false; - union flow *flow; + unsigned idx; if (timespec_diff_ms(now, &flow_timer_run) >= FLOW_TIMER_INTERVAL) { timer = true; flow_timer_run = *now; } - for (flow = flowtab + flow_count - 1; flow >= flowtab; flow--) { + for (idx = 0; idx < FLOW_MAX; idx++) { + union flow *flow = &flowtab[idx]; bool closed = false; + if (flow->f.type == FLOW_TYPE_NONE) { + unsigned skip = flow->free.n; + + /* First entry of a free cluster must have n >= 1 */ + ASSERT(skip); + + if (free_head) { + /* Merge into preceding free cluster */ + free_head->n += flow->free.n; + flow->free.n = flow->free.next = 0; + } else { + /* New free cluster, add to chain */ + free_head = &flow->free; + *last_next = idx; + last_next = &free_head->next; + } + + /* Skip remaining empty entries */ + idx += skip - 1; + continue; + } + switch (flow->f.type) { + case FLOW_TYPE_NONE: + ASSERT(false); + break; case FLOW_TCP: closed = tcp_flow_defer(flow); break; @@ -147,7 +219,35 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now) ; } - if (closed) - flow_table_compact(c, flow); + if (closed) { + flow->f.type = FLOW_TYPE_NONE; + + if (free_head) { + /* Add slot to current free cluster */ + ASSERT(idx == FLOW_IDX(free_head) + free_head->n); + free_head->n++; + flow->free.n = flow->free.next = 0; + } else { + /* Create new free cluster */ + free_head = &flow->free; + free_head->n = 1; + *last_next = idx; + last_next = &free_head->next; + } + } else { + free_head = NULL; + } } + + *last_next = FLOW_MAX; +} + +/** + * flow_init() - Initialise flow related data structures + */ +void flow_init(void) +{ + /* Initial state is a single free cluster containing the whole table */ + flowtab[0].free.n = FLOW_MAX; + flowtab[0].free.next = FLOW_MAX; } diff --git a/flow.h b/flow.h index 8064f0e..48a0ab4 100644 --- a/flow.h +++ b/flow.h @@ -68,6 +68,7 @@ static inline bool flow_sidx_eq(flow_sidx_t a, flow_sidx_t b) union flow; +void flow_init(void); void flow_defer_handler(const struct ctx *c, const struct timespec *now); void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...) diff --git a/flow_table.h b/flow_table.h index 2773a2b..eecf884 100644 --- a/flow_table.h +++ b/flow_table.h @@ -9,6 +9,19 @@ #include "tcp_conn.h" +/** + * struct flow_free_cluster - Information about a cluster of free entries + * @f: Generic flow information + * @n: Number of entries in the free cluster (including this one) + * @next: Index of next free cluster + */ +struct flow_free_cluster { + /* Must be first element */ + struct flow_common f; + unsigned n; + unsigned next; +}; + /** * union flow - Descriptor for a logical packet flow (e.g. connection) * @f: Fields common between all variants @@ -17,12 +30,13 @@ */ union flow { struct flow_common f; + struct flow_free_cluster free; struct tcp_tap_conn tcp; struct tcp_splice_conn tcp_splice; }; /* Global Flow Table */ -extern unsigned flow_count; +extern unsigned flow_first_free; extern union flow flowtab[]; diff --git a/passt.c b/passt.c index 71bea8f..d315438 100644 --- a/passt.c +++ b/passt.c @@ -285,6 +285,8 @@ int main(int argc, char **argv) clock_gettime(CLOCK_MONOTONIC, &now); + flow_init(); + if ((!c.no_udp && udp_init(&c)) || (!c.no_tcp && tcp_init(&c))) exit(EXIT_FAILURE); diff --git a/tcp.c b/tcp.c index ee2c3af..905d26f 100644 --- a/tcp.c +++ b/tcp.c @@ -1251,29 +1251,6 @@ static void tcp_hash_remove(const struct ctx *c, tc_hash[b] = FLOW_SIDX_NONE; } -/** - * tcp_tap_conn_update() - Update tcp_tap_conn when being moved in the table - * @c: Execution context - * @old: Old location of tcp_tap_conn - * @new: New location of tcp_tap_conn - */ -void tcp_tap_conn_update(const struct ctx *c, struct tcp_tap_conn *old, - struct tcp_tap_conn *new) - -{ - unsigned b = tcp_hash_probe(c, old); - - if (!flow_at_sidx(tc_hash[b])) - return; /* Not in hash table, nothing to update */ - - tc_hash[b] = FLOW_SIDX(new, TAPSIDE); - - debug("TCP: hash table update: old index %u, new index %u, sock %i, " - "bucket: %u", FLOW_IDX(old), FLOW_IDX(new), new->sock, b); - - tcp_epoll_ctl(c, new); -} - /** * tcp_hash_lookup() - Look up connection given remote address and ports * @c: Execution context diff --git a/tcp_conn.h b/tcp_conn.h index 636224e..a5f5cfe 100644 --- a/tcp_conn.h +++ b/tcp_conn.h @@ -155,9 +155,6 @@ struct tcp_splice_conn { extern int init_sock_pool4 [TCP_SOCK_POOL_SIZE]; extern int init_sock_pool6 [TCP_SOCK_POOL_SIZE]; -void tcp_tap_conn_update(const struct ctx *c, struct tcp_tap_conn *old, - struct tcp_tap_conn *new); -void tcp_splice_conn_update(const struct ctx *c, struct tcp_splice_conn *new); bool tcp_flow_defer(union flow *flow); bool tcp_splice_flow_defer(union flow *flow); void tcp_splice_timer(const struct ctx *c, union flow *flow); diff --git a/tcp_splice.c b/tcp_splice.c index daef7de..26d3206 100644 --- a/tcp_splice.c +++ b/tcp_splice.c @@ -231,17 +231,6 @@ static void conn_event_do(const struct ctx *c, struct tcp_splice_conn *conn, } while (0) -/** - * tcp_splice_conn_update() - Update tcp_splice_conn when being moved in the table - * @c: Execution context - * @new: New location of tcp_splice_conn - */ -void tcp_splice_conn_update(const struct ctx *c, struct tcp_splice_conn *new) -{ - if (tcp_splice_epoll_ctl(c, new)) - conn_flag(c, new, CLOSING); -} - /** * tcp_splice_flow_defer() - Deferred per-flow handling (clean up closed) * @flow: Flow table entry for this connection