QUIC: Add multi-stream support to internal test apps
Change-Id: Iab07697ef482529e62c11433cffa1f8f894e5bb7
Signed-off-by: Aloys Augustin <aloaugus@cisco.com>
Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
diff --git a/src/vnet/session-apps/echo_client.c b/src/vnet/session-apps/echo_client.c
index 39f464d..8366c56 100644
--- a/src/vnet/session-apps/echo_client.c
+++ b/src/vnet/session-apps/echo_client.c
@@ -23,6 +23,9 @@
echo_client_main_t echo_client_main;
#define ECHO_CLIENT_DBG (0)
+#define DBG(_fmt, _args...) \
+ if (ECHO_CLIENT_DBG) \
+ clib_warning (_fmt, ##_args)
static void
signal_evt_to_cli_i (int *code)
@@ -351,12 +354,117 @@
vec_validate (ecm->connection_index_by_thread, vtm->n_vlib_mains);
vec_validate (ecm->connections_this_batch_by_thread, vtm->n_vlib_mains);
+ vec_validate (ecm->quic_session_index_by_thread, vtm->n_vlib_mains);
vec_validate (ecm->vpp_event_queue, vtm->n_vlib_mains);
return 0;
}
static int
+quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context,
+ session_t * s, u8 is_fail)
+{
+ echo_client_main_t *ecm = &echo_client_main;
+ vnet_connect_args_t a;
+ int rv;
+ u8 thread_index = vlib_get_thread_index ();
+ session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
+
+ DBG ("QUIC Connection handle %d", session_handle (s));
+
+ a.uri = (char *) ecm->connect_uri;
+ parse_uri (a.uri, &sep);
+ sep.transport_opts = session_handle (s);
+ sep.port = 0;
+ clib_memset (&a, 0, sizeof (a));
+ a.app_index = ecm->app_index;
+ a.api_context = -1 - api_context;
+ clib_memcpy (&a.sep_ext, &sep, sizeof (sep));
+
+ if ((rv = vnet_connect (&a)))
+ {
+ clib_error ("Session opening failed: %d", rv);
+ return -1;
+ }
+ vec_add1 (ecm->quic_session_index_by_thread[thread_index],
+ session_handle (s));
+ return 0;
+}
+
+static int
+quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context,
+ session_t * s, u8 is_fail)
+{
+ echo_client_main_t *ecm = &echo_client_main;
+ eclient_session_t *session;
+ u32 session_index;
+ u8 thread_index;
+
+ if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING))
+ return -1;
+
+ if (is_fail)
+ {
+ clib_warning ("connection %d failed!", api_context);
+ ecm->run_test = ECHO_CLIENTS_EXITING;
+ signal_evt_to_cli (-1);
+ return 0;
+ }
+
+ if (!(s->flags & SESSION_F_QUIC_STREAM))
+ return quic_echo_clients_qsession_connected_callback (app_index,
+ api_context, s,
+ is_fail);
+ DBG ("STREAM Connection callback %d", api_context);
+
+ thread_index = s->thread_index;
+ ASSERT (thread_index == vlib_get_thread_index ()
+ || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
+
+ if (!ecm->vpp_event_queue[thread_index])
+ ecm->vpp_event_queue[thread_index] =
+ session_main_get_vpp_event_queue (thread_index);
+
+ /*
+ * Setup session
+ */
+ clib_spinlock_lock_if_init (&ecm->sessions_lock);
+ pool_get (ecm->sessions, session);
+ clib_spinlock_unlock_if_init (&ecm->sessions_lock);
+
+ clib_memset (session, 0, sizeof (*session));
+ session_index = session - ecm->sessions;
+ session->bytes_to_send = ecm->bytes_to_send;
+ session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
+ session->data.rx_fifo = s->rx_fifo;
+ session->data.rx_fifo->client_session_index = session_index;
+ session->data.tx_fifo = s->tx_fifo;
+ session->data.tx_fifo->client_session_index = session_index;
+ session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index];
+ session->vpp_session_handle = session_handle (s);
+
+ if (ecm->is_dgram)
+ {
+ transport_connection_t *tc;
+ tc = session_get_transport (s);
+ clib_memcpy_fast (&session->data.transport, tc,
+ sizeof (session->data.transport));
+ session->data.is_dgram = 1;
+ }
+
+ vec_add1 (ecm->connection_index_by_thread[thread_index], session_index);
+ clib_atomic_fetch_add (&ecm->ready_connections, 1);
+ if (ecm->ready_connections == ecm->expected_connections)
+ {
+ ecm->run_test = ECHO_CLIENTS_RUNNING;
+ /* Signal the CLI process that the action is starting... */
+ signal_evt_to_cli (1);
+ }
+
+ return 0;
+}
+
+static int
echo_clients_session_connected_callback (u32 app_index, u32 api_context,
session_t * s, u8 is_fail)
{
@@ -519,6 +627,9 @@
clib_memset (options, 0, sizeof (options));
a->api_client_index = ecm->my_client_index;
+ if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
+ echo_clients.session_connected_callback =
+ quic_echo_clients_session_connected_callback;
a->session_cb_vft = &echo_clients;
prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1;
@@ -597,12 +708,12 @@
int i, rv;
clib_memset (a, 0, sizeof (*a));
+
for (i = 0; i < n_clients; i++)
{
a->uri = (char *) ecm->connect_uri;
a->api_context = i;
a->app_index = ecm->app_index;
-
if ((rv = vnet_connect_uri (a)))
return clib_error_return (0, "connect returned: %d", rv);
@@ -637,6 +748,8 @@
clib_error_t *error = 0;
u8 *appns_id = 0;
int i;
+ session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
+ int rv;
ecm->bytes_to_send = 8192;
ecm->no_return = 0;
@@ -738,8 +851,10 @@
ecm->connect_uri = format (0, "%s%c", default_uri, 0);
}
- if (ecm->connect_uri[0] == 'u' && ecm->connect_uri[3] != 'c')
- ecm->is_dgram = 1;
+ if ((rv = parse_uri ((char *) ecm->connect_uri, &sep)))
+ return clib_error_return (0, "Uri parse error: %d", rv);
+ ecm->transport_proto = sep.transport_proto;
+ ecm->is_dgram = (sep.transport_proto == TRANSPORT_PROTO_UDP);
#if ECHO_CLIENT_PTHREAD
echo_clients_start_tx_pthread ();
@@ -858,6 +973,7 @@
{
vec_reset_length (ecm->connection_index_by_thread[i]);
vec_reset_length (ecm->connections_this_batch_by_thread[i]);
+ vec_reset_length (ecm->quic_session_index_by_thread[i]);
}
pool_free (ecm->sessions);
diff --git a/src/vnet/session-apps/echo_client.h b/src/vnet/session-apps/echo_client.h
index b183ed7..81ffcae 100644
--- a/src/vnet/session-apps/echo_client.h
+++ b/src/vnet/session-apps/echo_client.h
@@ -74,6 +74,7 @@
clib_spinlock_t sessions_lock;
u8 **rx_buf; /**< intermediate rx buffers */
u8 *connect_test_data; /**< Pre-computed test data */
+ u32 **quic_session_index_by_thread;
u32 **connection_index_by_thread;
u32 **connections_this_batch_by_thread; /**< active connection batch */
pthread_t client_thread_handle;
@@ -101,6 +102,7 @@
u8 no_output;
u8 test_bytes;
u8 test_failed;
+ u8 transport_proto;
vlib_main_t *vlib_main;
} echo_client_main_t;
diff --git a/src/vnet/session-apps/echo_server.c b/src/vnet/session-apps/echo_server.c
index 4249ed8..7459d03 100644
--- a/src/vnet/session-apps/echo_server.c
+++ b/src/vnet/session-apps/echo_server.c
@@ -19,6 +19,11 @@
#include <vnet/session/application_interface.h>
#include <vnet/session/session.h>
+#define ECHO_SERVER_DBG (0)
+#define DBG(_fmt, _args...) \
+ if (ECHO_SERVER_DBG) \
+ clib_warning (_fmt, ##_args)
+
typedef struct
{
/*
@@ -49,6 +54,7 @@
u8 **rx_buf; /**< Per-thread RX buffer */
u64 byte_index;
u32 **rx_retries;
+ u8 transport_proto;
vlib_main_t *vlib_main;
} echo_server_main_t;
@@ -56,10 +62,34 @@
echo_server_main_t echo_server_main;
int
+quic_echo_server_qsession_accept_callback (session_t * s)
+{
+ DBG ("QSession %u accept w/opaque %d", s->session_index, s->opaque);
+ return 0;
+}
+
+int
+quic_echo_server_session_accept_callback (session_t * s)
+{
+ echo_server_main_t *esm = &echo_server_main;
+ if (!(s->flags & SESSION_F_QUIC_STREAM))
+ return quic_echo_server_qsession_accept_callback (s);
+ DBG ("SSESSION %u accept w/opaque %d", s->session_index, s->opaque);
+
+ esm->vpp_queue[s->thread_index] =
+ session_main_get_vpp_event_queue (s->thread_index);
+ s->session_state = SESSION_STATE_READY;
+ esm->byte_index = 0;
+ ASSERT (vec_len (esm->rx_retries) > s->thread_index);
+ vec_validate (esm->rx_retries[s->thread_index], s->session_index);
+ esm->rx_retries[s->thread_index][s->session_index] = 0;
+ return 0;
+}
+
+int
echo_server_session_accept_callback (session_t * s)
{
echo_server_main_t *esm = &echo_server_main;
-
esm->vpp_queue[s->thread_index] =
session_main_get_vpp_event_queue (s->thread_index);
s->session_state = SESSION_STATE_READY;
@@ -304,6 +334,9 @@
else
echo_server_session_cb_vft.builtin_app_rx_callback =
echo_server_rx_callback;
+ if (esm->transport_proto == TRANSPORT_PROTO_QUIC)
+ echo_server_session_cb_vft.session_accept_callback =
+ quic_echo_server_session_accept_callback;
if (esm->private_segment_size)
segment_size = esm->private_segment_size;
@@ -426,6 +459,7 @@
u64 tmp, appns_flags = 0, appns_secret = 0;
char *default_uri = "tcp://0.0.0.0/1234";
int rv, is_stop = 0;
+ session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
esm->no_echo = 0;
esm->fifo_size = 64 << 10;
@@ -434,7 +468,6 @@
esm->private_segment_count = 0;
esm->private_segment_size = 0;
esm->tls_engine = TLS_ENGINE_OPENSSL;
- esm->is_dgram = 0;
vec_free (esm->server_uri);
while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -503,8 +536,11 @@
clib_warning ("No uri provided! Using default: %s", default_uri);
esm->server_uri = (char *) format (0, "%s%c", default_uri, 0);
}
- if (esm->server_uri[0] == 'u' && esm->server_uri[3] != 'c')
- esm->is_dgram = 1;
+
+ if ((rv = parse_uri ((char *) esm->server_uri, &sep)))
+ return clib_error_return (0, "Uri parse error: %d", rv);
+ esm->transport_proto = sep.transport_proto;
+ esm->is_dgram = (sep.transport_proto == TRANSPORT_PROTO_UDP);
rv = echo_server_create (vm, appns_id, appns_flags, appns_secret);
vec_free (appns_id);
diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c
index ae00292..2bd3ceb 100644
--- a/src/vnet/session/application_interface.c
+++ b/src/vnet/session/application_interface.c
@@ -78,6 +78,14 @@
sep->is_ip4 = 0;
return 1;
}
+ else if (unformat (input, "%U://session/%u", unformat_transport_proto,
+ &transport_proto, &sep->transport_opts))
+ {
+ sep->transport_proto = transport_proto;
+ sep->is_ip4 = 1;
+ sep->ip.ip4.as_u32 = 1; /* ip need to be non zero in vnet */
+ return 1;
+ }
return 0;
}
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index b49744c..f6091c5 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -211,6 +211,7 @@
#undef _
} session_fd_flag_t;
+int parse_uri (char *uri, session_endpoint_cfg_t * sep);
int vnet_bind_uri (vnet_listen_args_t *);
int vnet_unbind_uri (vnet_unlisten_args_t * a);
int vnet_connect_uri (vnet_connect_args_t * a);
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index 32a13cf..b392439 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -133,6 +133,7 @@
{
SESSION_F_RX_EVT = 1,
SESSION_F_PROXY = (1 << 1),
+ SESSION_F_QUIC_STREAM = (1 << 2),
} session_flags_t;
typedef struct session_