af_packet: multithreading support
This patch adds multithreading support for af_packet interfaces.
Change-Id: Ief5d1117e7ffeaa59dbc2831e583d5d8e8d4fa7a
Signed-off-by: Mohsin KAZMI <sykazmi@cisco.com>
diff --git a/src/vnet/devices/af_packet/af_packet.c b/src/vnet/devices/af_packet/af_packet.c
index 91c3988..e491ba4 100644
--- a/src/vnet/devices/af_packet/af_packet.c
+++ b/src/vnet/devices/af_packet/af_packet.c
@@ -171,6 +171,31 @@
return ret;
}
+static void
+af_packet_worker_thread_enable ()
+{
+ /* If worker threads are enabled, switch to polling mode */
+ foreach_vlib_main ((
+ {
+ vlib_node_set_state (this_vlib_main,
+ af_packet_input_node.index,
+ VLIB_NODE_STATE_POLLING);
+ }));
+
+}
+
+static void
+af_packet_worker_thread_disable ()
+{
+ foreach_vlib_main ((
+ {
+ vlib_node_set_state (this_vlib_main,
+ af_packet_input_node.index,
+ VLIB_NODE_STATE_INTERRUPT);
+ }));
+
+}
+
int
af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
u32 * sw_if_index)
@@ -184,6 +209,7 @@
u8 hw_addr[6];
clib_error_t *error;
vnet_sw_interface_t *sw;
+ vlib_thread_main_t *tm = vlib_get_thread_main ();
vnet_main_t *vnm = vnet_get_main ();
uword *p;
uword if_index;
@@ -226,6 +252,13 @@
apif->next_tx_frame = 0;
apif->next_rx_frame = 0;
+ if (tm->n_vlib_mains > 1)
+ {
+ apif->lockp = clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES,
+ CLIB_CACHE_LINE_BYTES);
+ memset ((void *) apif->lockp, 0, CLIB_CACHE_LINE_BYTES);
+ }
+
{
unix_file_t template = { 0 };
template.read_function = af_packet_fd_read_ready;
@@ -273,6 +306,10 @@
0);
if (sw_if_index)
*sw_if_index = apif->sw_if_index;
+
+ if (tm->n_vlib_mains > 1 && pool_elts (apm->interfaces) == 1)
+ af_packet_worker_thread_enable ();
+
return 0;
error:
@@ -286,6 +323,7 @@
af_packet_delete_if (vlib_main_t * vm, u8 * host_if_name)
{
vnet_main_t *vnm = vnet_get_main ();
+ vlib_thread_main_t *tm = vlib_get_thread_main ();
af_packet_main_t *apm = &af_packet_main;
af_packet_if_t *apif;
uword *p;
@@ -335,6 +373,8 @@
ethernet_delete_interface (vnm, apif->hw_if_index);
pool_put (apm->interfaces, apif);
+ if (tm->n_vlib_mains > 1 && pool_elts (apm->interfaces) == 0)
+ af_packet_worker_thread_disable ();
return 0;
}
@@ -344,9 +384,24 @@
{
af_packet_main_t *apm = &af_packet_main;
vlib_thread_main_t *tm = vlib_get_thread_main ();
+ vlib_thread_registration_t *tr;
+ uword *p;
memset (apm, 0, sizeof (af_packet_main_t));
+ apm->input_cpu_first_index = 0;
+ apm->input_cpu_count = 1;
+
+ /* find out which cpus will be used for input */
+ p = hash_get_mem (tm->thread_registrations_by_name, "workers");
+ tr = p ? (vlib_thread_registration_t *) p[0] : 0;
+
+ if (tr && tr->count > 0)
+ {
+ apm->input_cpu_first_index = tr->first_index;
+ apm->input_cpu_count = tr->count;
+ }
+
mhash_init_vec_string (&apm->if_index_by_host_if_name, sizeof (uword));
vec_validate_aligned (apm->rx_buffers, tm->n_vlib_mains - 1,
diff --git a/src/vnet/devices/af_packet/af_packet.h b/src/vnet/devices/af_packet/af_packet.h
index 19e2523..e00e5cb 100644
--- a/src/vnet/devices/af_packet/af_packet.h
+++ b/src/vnet/devices/af_packet/af_packet.h
@@ -20,6 +20,7 @@
typedef struct
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
+ volatile u32 *lockp;
u8 *host_if_name;
int fd;
struct tpacket_req *rx_req;
@@ -50,6 +51,12 @@
/* hash of host interface names */
mhash_t if_index_by_host_if_name;
+
+ /* first cpu index */
+ u32 input_cpu_first_index;
+
+ /* total cpu count */
+ u32 input_cpu_count;
} af_packet_main_t;
af_packet_main_t af_packet_main;
diff --git a/src/vnet/devices/af_packet/device.c b/src/vnet/devices/af_packet/device.c
index 1fb4000..e3bf9bb 100644
--- a/src/vnet/devices/af_packet/device.c
+++ b/src/vnet/devices/af_packet/device.c
@@ -92,6 +92,12 @@
struct tpacket2_hdr *tph;
u32 frame_not_ready = 0;
+ if (PREDICT_FALSE (apif->lockp != 0))
+ {
+ while (__sync_lock_test_and_set (apif->lockp, 1))
+ ;
+ }
+
while (n_left > 0)
{
u32 len;
@@ -152,6 +158,9 @@
}
}
+ if (PREDICT_FALSE (apif->lockp != 0))
+ *apif->lockp = 0;
+
if (PREDICT_FALSE (frame_not_ready))
vlib_error_count (vm, node->node_index,
AF_PACKET_TX_ERROR_FRAME_NOT_READY, frame_not_ready);
diff --git a/src/vnet/devices/af_packet/node.c b/src/vnet/devices/af_packet/node.c
index 7200432..476ccca 100644
--- a/src/vnet/devices/af_packet/node.c
+++ b/src/vnet/devices/af_packet/node.c
@@ -108,10 +108,9 @@
always_inline uword
af_packet_device_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
- vlib_frame_t * frame, u32 device_idx)
+ vlib_frame_t * frame, af_packet_if_t * apif)
{
af_packet_main_t *apm = &af_packet_main;
- af_packet_if_t *apif = pool_elt_at_index (apm->interfaces, device_idx);
struct tpacket2_hdr *tph;
u32 next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
u32 block = 0;
@@ -125,10 +124,10 @@
u32 frame_num = apif->rx_req->tp_frame_nr;
u8 *block_start = apif->rx_ring + block * block_size;
uword n_trace = vlib_get_trace_count (vm, node);
+ u32 cpu_index = os_get_cpu_number ();
u32 n_buffer_bytes = vlib_buffer_free_list_buffer_size (vm,
VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
u32 min_bufs = apif->rx_req->tp_frame_size / n_buffer_bytes;
- int cpu_index = node->cpu_index;
if (apif->per_interface_next_index != ~0)
next_index = apif->per_interface_next_index;
@@ -249,16 +248,18 @@
{
int i;
u32 n_rx_packets = 0;
-
+ u32 cpu_index = os_get_cpu_number ();
af_packet_main_t *apm = &af_packet_main;
+ af_packet_if_t *apif;
- /* *INDENT-OFF* */
- clib_bitmap_foreach (i, apm->pending_input_bitmap,
- ({
- clib_bitmap_set (apm->pending_input_bitmap, i, 0);
- n_rx_packets += af_packet_device_input_fn(vm, node, frame, i);
- }));
- /* *INDENT-ON* */
+ for (i = 0; i < vec_len (apm->interfaces); i++)
+ {
+ apif = vec_elt_at_index (apm->interfaces, i);
+ if (apif->is_admin_up &&
+ (i % apm->input_cpu_count) ==
+ (cpu_index - apm->input_cpu_first_index))
+ n_rx_packets += af_packet_device_input_fn (vm, node, frame, apif);
+ }
return n_rx_packets;
}
@@ -270,6 +271,9 @@
.sibling_of = "device-input",
.format_trace = format_af_packet_input_trace,
.type = VLIB_NODE_TYPE_INPUT,
+ /**
+ * default state is INTERRUPT mode, switch to POLLING if worker threads are enabled
+ */
.state = VLIB_NODE_STATE_INTERRUPT,
.n_errors = AF_PACKET_INPUT_N_ERROR,
.error_strings = af_packet_input_error_strings,