blob: 0968768718943a7b0fadbf67c961161d1c38a935 [file] [log] [blame]
Dave Barach68b0fb02017-02-28 15:15:56 -05001/*
2 * Copyright (c) 2017 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
Florin Coras6792ec02017-03-13 03:49:51 -070016#include <math.h>
Dave Barach68b0fb02017-02-28 15:15:56 -050017#include <vlib/vlib.h>
18#include <vnet/vnet.h>
Dave Barach68b0fb02017-02-28 15:15:56 -050019#include <vnet/tcp/tcp.h>
Dave Barach68b0fb02017-02-28 15:15:56 -050020#include <vppinfra/elog.h>
Florin Coras6792ec02017-03-13 03:49:51 -070021#include <vnet/session/application.h>
Florin Corase69f4952017-03-07 10:06:24 -080022#include <vnet/session/session_debug.h>
Florin Coras6792ec02017-03-13 03:49:51 -070023#include <vlibmemory/unix_shared_memory_queue.h>
Dave Barach68b0fb02017-02-28 15:15:56 -050024
25vlib_node_registration_t session_queue_node;
26
27typedef struct
28{
29 u32 session_index;
30 u32 server_thread_index;
31} session_queue_trace_t;
32
33/* packet trace format function */
34static u8 *
35format_session_queue_trace (u8 * s, va_list * args)
36{
37 CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
38 CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
39 session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
40
41 s = format (s, "SESSION_QUEUE: session index %d, server thread index %d",
42 t->session_index, t->server_thread_index);
43 return s;
44}
45
46vlib_node_registration_t session_queue_node;
47
Florin Coras6792ec02017-03-13 03:49:51 -070048#define foreach_session_queue_error \
49_(TX, "Packets transmitted") \
Florin Coras93992a92017-05-24 18:03:56 -070050_(TIMER, "Timer events") \
51_(NO_BUFFER, "Out of buffers")
Dave Barach68b0fb02017-02-28 15:15:56 -050052
53typedef enum
54{
55#define _(sym,str) SESSION_QUEUE_ERROR_##sym,
56 foreach_session_queue_error
57#undef _
58 SESSION_QUEUE_N_ERROR,
59} session_queue_error_t;
60
61static char *session_queue_error_strings[] = {
62#define _(sym,string) string,
63 foreach_session_queue_error
64#undef _
65};
66
67static u32 session_type_to_next[] = {
68 SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT,
69 SESSION_QUEUE_NEXT_IP4_LOOKUP,
70 SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT,
71 SESSION_QUEUE_NEXT_IP6_LOOKUP,
72};
73
Florin Corasf6d68ed2017-05-07 19:12:02 -070074always_inline void
75session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
76 u8 thread_index, svm_fifo_t * fifo,
77 vlib_buffer_t * b0, u32 bi0, u8 n_bufs_per_seg,
Florin Corasb2215d62017-08-01 16:56:58 -070078 u32 left_from_seg, u32 * left_to_snd0,
Florin Coras1f152cd2017-08-18 19:28:03 -070079 u16 * n_bufs, u32 * tx_offset, u16 deq_per_buf,
Florin Corasb2215d62017-08-01 16:56:58 -070080 u8 peek_data)
Florin Corasf6d68ed2017-05-07 19:12:02 -070081{
82 vlib_buffer_t *chain_b0, *prev_b0;
Florin Corasb2215d62017-08-01 16:56:58 -070083 u32 chain_bi0, to_deq;
Florin Corasf6d68ed2017-05-07 19:12:02 -070084 u16 len_to_deq0, n_bytes_read;
85 u8 *data0, j;
86
Florin Corasb2215d62017-08-01 16:56:58 -070087 b0->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
88 b0->total_length_not_including_first_buffer = 0;
89
Florin Corasf6d68ed2017-05-07 19:12:02 -070090 chain_bi0 = bi0;
91 chain_b0 = b0;
Florin Corasb2215d62017-08-01 16:56:58 -070092 to_deq = left_from_seg;
Florin Corasf6d68ed2017-05-07 19:12:02 -070093 for (j = 1; j < n_bufs_per_seg; j++)
94 {
95 prev_b0 = chain_b0;
Florin Corasb2215d62017-08-01 16:56:58 -070096 len_to_deq0 = clib_min (to_deq, deq_per_buf);
Florin Corasf6d68ed2017-05-07 19:12:02 -070097
98 *n_bufs -= 1;
99 chain_bi0 = smm->tx_buffers[thread_index][*n_bufs];
100 _vec_len (smm->tx_buffers[thread_index]) = *n_bufs;
101
102 chain_b0 = vlib_get_buffer (vm, chain_bi0);
103 chain_b0->current_data = 0;
104 data0 = vlib_buffer_get_current (chain_b0);
105 if (peek_data)
106 {
Florin Coras1f152cd2017-08-18 19:28:03 -0700107 n_bytes_read = svm_fifo_peek (fifo, *tx_offset, len_to_deq0, data0);
108 *tx_offset += n_bytes_read;
Florin Corasf6d68ed2017-05-07 19:12:02 -0700109 }
110 else
111 {
112 n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0);
113 }
114 ASSERT (n_bytes_read == len_to_deq0);
115 chain_b0->current_length = n_bytes_read;
116 b0->total_length_not_including_first_buffer += chain_b0->current_length;
117
118 /* update previous buffer */
119 prev_b0->next_buffer = chain_bi0;
120 prev_b0->flags |= VLIB_BUFFER_NEXT_PRESENT;
121
122 /* update current buffer */
123 chain_b0->next_buffer = 0;
124
Florin Corasb2215d62017-08-01 16:56:58 -0700125 to_deq -= n_bytes_read;
126 if (to_deq == 0)
Florin Corasf6d68ed2017-05-07 19:12:02 -0700127 break;
128 }
Florin Coras1f152cd2017-08-18 19:28:03 -0700129 ASSERT (to_deq == 0
130 && b0->total_length_not_including_first_buffer == left_from_seg);
Florin Corasb2215d62017-08-01 16:56:58 -0700131 *left_to_snd0 -= left_from_seg;
Florin Corasf6d68ed2017-05-07 19:12:02 -0700132}
133
Dave Barach68b0fb02017-02-28 15:15:56 -0500134always_inline int
Florin Corasd79b41e2017-03-04 05:37:52 -0800135session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
136 session_manager_main_t * smm,
137 session_fifo_event_t * e0,
138 stream_session_t * s0, u32 thread_index,
139 int *n_tx_packets, u8 peek_data)
Dave Barach68b0fb02017-02-28 15:15:56 -0500140{
141 u32 n_trace = vlib_get_trace_count (vm, node);
Florin Corasf6d68ed2017-05-07 19:12:02 -0700142 u32 left_to_snd0, max_len_to_snd0, len_to_deq0, snd_space0;
Florin Corase87216f2017-08-17 16:59:22 -0700143 u32 n_bufs_per_evt, n_frames_per_evt, n_bufs_per_frame;
Dave Barach68b0fb02017-02-28 15:15:56 -0500144 transport_connection_t *tc0;
145 transport_proto_vft_t *transport_vft;
146 u32 next_index, next0, *to_next, n_left_to_next, bi0;
147 vlib_buffer_t *b0;
Florin Coras1f152cd2017-08-18 19:28:03 -0700148 u32 tx_offset = 0, max_dequeue0, n_bytes_per_seg, left_for_seg;
Florin Corasf6d68ed2017-05-07 19:12:02 -0700149 u16 snd_mss0, n_bufs_per_seg, n_bufs;
Dave Barach68b0fb02017-02-28 15:15:56 -0500150 u8 *data0;
Florin Coras6792ec02017-03-13 03:49:51 -0700151 int i, n_bytes_read;
Florin Corase87216f2017-08-17 16:59:22 -0700152 u32 n_bytes_per_buf, deq_per_buf, deq_per_first_buf;
Florin Coras93992a92017-05-24 18:03:56 -0700153 u32 buffers_allocated, buffers_allocated_this_call;
Dave Barach68b0fb02017-02-28 15:15:56 -0500154
155 next_index = next0 = session_type_to_next[s0->session_type];
156
157 transport_vft = session_get_transport_vft (s0->session_type);
158 tc0 = transport_vft->get_connection (s0->connection_index, thread_index);
159
160 /* Make sure we have space to send and there's something to dequeue */
Dave Barach68b0fb02017-02-28 15:15:56 -0500161 snd_mss0 = transport_vft->send_mss (tc0);
Florin Corasc8343412017-05-04 14:25:50 -0700162 snd_space0 = transport_vft->send_space (tc0);
Dave Barach68b0fb02017-02-28 15:15:56 -0500163
Florin Corase04c2992017-03-01 08:17:34 -0800164 /* Can't make any progress */
Florin Coras6792ec02017-03-13 03:49:51 -0700165 if (snd_space0 == 0 || snd_mss0 == 0)
Florin Corase04c2992017-03-01 08:17:34 -0800166 {
Dave Barachacd2a6a2017-05-16 17:41:34 -0400167 vec_add1 (smm->pending_event_vector[thread_index], *e0);
Florin Corase04c2992017-03-01 08:17:34 -0800168 return 0;
169 }
Dave Barach68b0fb02017-02-28 15:15:56 -0500170
Dave Barach68b0fb02017-02-28 15:15:56 -0500171 if (peek_data)
172 {
173 /* Offset in rx fifo from where to peek data */
Florin Coras1f152cd2017-08-18 19:28:03 -0700174 tx_offset = transport_vft->tx_fifo_offset (tc0);
Dave Barach68b0fb02017-02-28 15:15:56 -0500175 }
176
Florin Coras6792ec02017-03-13 03:49:51 -0700177 /* Check how much we can pull. If buffering, subtract the offset */
Florin Coras1f152cd2017-08-18 19:28:03 -0700178 max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo) - tx_offset;
Florin Coras6792ec02017-03-13 03:49:51 -0700179
Florin Coras6792ec02017-03-13 03:49:51 -0700180 /* Nothing to read return */
181 if (max_dequeue0 == 0)
Florin Corasf03a59a2017-06-09 21:07:32 -0700182 {
183 svm_fifo_unset_event (s0->server_tx_fifo);
184 return 0;
185 }
Florin Coras6792ec02017-03-13 03:49:51 -0700186
187 /* Ensure we're not writing more than transport window allows */
Florin Coras3e350af2017-03-30 02:54:28 -0700188 if (max_dequeue0 < snd_space0)
189 {
190 /* Constrained by tx queue. Try to send only fully formed segments */
191 max_len_to_snd0 = (max_dequeue0 > snd_mss0) ?
192 max_dequeue0 - max_dequeue0 % snd_mss0 : max_dequeue0;
193 /* TODO Nagle ? */
194 }
195 else
196 {
Florin Coras1f152cd2017-08-18 19:28:03 -0700197 /* Expectation is that snd_space0 is already a multiple of snd_mss */
Florin Coras3e350af2017-03-30 02:54:28 -0700198 max_len_to_snd0 = snd_space0;
199 }
Dave Barach68b0fb02017-02-28 15:15:56 -0500200
Florin Coras93992a92017-05-24 18:03:56 -0700201 n_bytes_per_buf = vlib_buffer_free_list_buffer_size
202 (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
Florin Corase87216f2017-08-17 16:59:22 -0700203 ASSERT (n_bytes_per_buf > MAX_HDRS_LEN);
Florin Corasf6d68ed2017-05-07 19:12:02 -0700204 n_bytes_per_seg = MAX_HDRS_LEN + snd_mss0;
205 n_bufs_per_seg = ceil ((double) n_bytes_per_seg / n_bytes_per_buf);
Florin Corase87216f2017-08-17 16:59:22 -0700206 n_bufs_per_evt = ceil ((double) max_len_to_snd0 / n_bytes_per_seg);
Florin Corasf6d68ed2017-05-07 19:12:02 -0700207 n_frames_per_evt = ceil ((double) n_bufs_per_evt / VLIB_FRAME_SIZE);
Florin Corase87216f2017-08-17 16:59:22 -0700208 n_bufs_per_frame = n_bufs_per_seg * VLIB_FRAME_SIZE;
Florin Corasf6d68ed2017-05-07 19:12:02 -0700209
210 deq_per_buf = clib_min (snd_mss0, n_bytes_per_buf);
Florin Corase87216f2017-08-17 16:59:22 -0700211 deq_per_first_buf = clib_min (snd_mss0, n_bytes_per_buf - MAX_HDRS_LEN);
Dave Barach68b0fb02017-02-28 15:15:56 -0500212
213 n_bufs = vec_len (smm->tx_buffers[thread_index]);
214 left_to_snd0 = max_len_to_snd0;
215 for (i = 0; i < n_frames_per_evt; i++)
216 {
217 /* Make sure we have at least one full frame of buffers ready */
Florin Corase87216f2017-08-17 16:59:22 -0700218 if (PREDICT_FALSE (n_bufs < n_bufs_per_frame))
Dave Barach68b0fb02017-02-28 15:15:56 -0500219 {
220 vec_validate (smm->tx_buffers[thread_index],
Florin Corase87216f2017-08-17 16:59:22 -0700221 n_bufs + n_bufs_per_frame - 1);
Florin Coras93992a92017-05-24 18:03:56 -0700222 buffers_allocated = 0;
223 do
Dave Barach68b0fb02017-02-28 15:15:56 -0500224 {
Florin Corase87216f2017-08-17 16:59:22 -0700225 buffers_allocated_this_call = vlib_buffer_alloc (vm,
226 &smm->tx_buffers
227 [thread_index]
228 [n_bufs +
229 buffers_allocated],
230 n_bufs_per_frame
231 -
232 buffers_allocated);
Florin Coras93992a92017-05-24 18:03:56 -0700233 buffers_allocated += buffers_allocated_this_call;
Dave Barach68b0fb02017-02-28 15:15:56 -0500234 }
Florin Coras93992a92017-05-24 18:03:56 -0700235 while (buffers_allocated_this_call > 0
Florin Corase87216f2017-08-17 16:59:22 -0700236 && ((buffers_allocated + n_bufs < n_bufs_per_frame)));
Florin Coras93992a92017-05-24 18:03:56 -0700237
238 n_bufs += buffers_allocated;
Dave Barach68b0fb02017-02-28 15:15:56 -0500239 _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
Florin Coras93992a92017-05-24 18:03:56 -0700240
Florin Corase87216f2017-08-17 16:59:22 -0700241 if (PREDICT_FALSE (n_bufs < n_bufs_per_frame))
Florin Coras93992a92017-05-24 18:03:56 -0700242 {
243 vec_add1 (smm->pending_event_vector[thread_index], *e0);
244 return -1;
245 }
Florin Corase87216f2017-08-17 16:59:22 -0700246 ASSERT (n_bufs >= n_bufs_per_frame);
Dave Barach68b0fb02017-02-28 15:15:56 -0500247 }
Florin Coras93992a92017-05-24 18:03:56 -0700248 /* Allow enqueuing of a new event */
249 svm_fifo_unset_event (s0->server_tx_fifo);
Dave Barach68b0fb02017-02-28 15:15:56 -0500250
251 vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
Florin Corase87216f2017-08-17 16:59:22 -0700252 while (left_to_snd0 && n_left_to_next)
Dave Barach68b0fb02017-02-28 15:15:56 -0500253 {
Florin Corasf6d68ed2017-05-07 19:12:02 -0700254 /*
255 * Handle first buffer in chain separately
256 */
257
Dave Barach68b0fb02017-02-28 15:15:56 -0500258 /* Get free buffer */
Florin Coras93992a92017-05-24 18:03:56 -0700259 ASSERT (n_bufs >= 1);
Florin Corasf6d68ed2017-05-07 19:12:02 -0700260 bi0 = smm->tx_buffers[thread_index][--n_bufs];
Dave Barach68b0fb02017-02-28 15:15:56 -0500261 _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
262
Florin Corasf6359c82017-06-19 12:26:09 -0400263 /* usual speculation, or the enqueue_x1 macro will barf */
264 to_next[0] = bi0;
265 to_next += 1;
266 n_left_to_next -= 1;
267
Dave Barach68b0fb02017-02-28 15:15:56 -0500268 b0 = vlib_get_buffer (vm, bi0);
269 b0->error = 0;
Florin Coras4eeeaaf2017-09-05 14:03:37 -0400270 b0->flags |= VNET_BUFFER_F_LOCALLY_ORIGINATED;
Dave Barach68b0fb02017-02-28 15:15:56 -0500271 b0->current_data = 0;
Florin Corasf6d68ed2017-05-07 19:12:02 -0700272 b0->total_length_not_including_first_buffer = 0;
Dave Barach68b0fb02017-02-28 15:15:56 -0500273
Florin Corase87216f2017-08-17 16:59:22 -0700274 len_to_deq0 = clib_min (left_to_snd0, deq_per_first_buf);
Florin Corasf6d68ed2017-05-07 19:12:02 -0700275 data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
276 if (peek_data)
277 {
Florin Coras1f152cd2017-08-18 19:28:03 -0700278 n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, tx_offset,
Florin Corasf6d68ed2017-05-07 19:12:02 -0700279 len_to_deq0, data0);
280 /* Keep track of progress locally, transport is also supposed to
281 * increment it independently when pushing the header */
Florin Coras1f152cd2017-08-18 19:28:03 -0700282 tx_offset += n_bytes_read;
Florin Corasf6d68ed2017-05-07 19:12:02 -0700283 }
284 else
285 {
286 n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
287 len_to_deq0, data0);
288 }
289
290 if (n_bytes_read <= 0)
291 goto dequeue_fail;
292
293 b0->current_length = n_bytes_read;
294
295 left_to_snd0 -= n_bytes_read;
296 *n_tx_packets = *n_tx_packets + 1;
297
298 /*
299 * Fill in the remaining buffers in the chain, if any
300 */
Florin Corase87216f2017-08-17 16:59:22 -0700301 if (PREDICT_FALSE (n_bufs_per_seg > 1 && left_to_snd0))
Florin Corasb2215d62017-08-01 16:56:58 -0700302 {
Florin Corasb2215d62017-08-01 16:56:58 -0700303 left_for_seg = clib_min (snd_mss0 - n_bytes_read, left_to_snd0);
304 session_tx_fifo_chain_tail (smm, vm, thread_index,
305 s0->server_tx_fifo, b0, bi0,
306 n_bufs_per_seg, left_for_seg,
Florin Coras1f152cd2017-08-18 19:28:03 -0700307 &left_to_snd0, &n_bufs, &tx_offset,
Florin Corasb2215d62017-08-01 16:56:58 -0700308 deq_per_buf, peek_data);
309 }
Florin Corasf6d68ed2017-05-07 19:12:02 -0700310
311 /* Ask transport to push header after current_length and
312 * total_length_not_including_first_buffer are updated */
313 transport_vft->push_header (tc0, b0);
314
315 /* *INDENT-OFF* */
316 SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
317 ed->data[0] = e0->event_id;
318 ed->data[1] = max_dequeue0;
319 ed->data[2] = len_to_deq0;
320 ed->data[3] = left_to_snd0;
321 }));
322 /* *INDENT-ON* */
323
Dave Barach68b0fb02017-02-28 15:15:56 -0500324 VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
Florin Coras4eeeaaf2017-09-05 14:03:37 -0400325 if (VLIB_BUFFER_TRACE_TRAJECTORY)
326 b0->pre_data[1] = 3;
327
Dave Barach68b0fb02017-02-28 15:15:56 -0500328 if (PREDICT_FALSE (n_trace > 0))
329 {
330 session_queue_trace_t *t0;
331 vlib_trace_buffer (vm, node, next_index, b0,
332 1 /* follow_chain */ );
333 vlib_set_trace_count (vm, node, --n_trace);
334 t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
335 t0->session_index = s0->session_index;
336 t0->server_thread_index = s0->thread_index;
337 }
338
Dave Barach68b0fb02017-02-28 15:15:56 -0500339 vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
340 to_next, n_left_to_next,
341 bi0, next0);
342 }
343 vlib_put_next_frame (vm, node, next_index, n_left_to_next);
344 }
345
Florin Coras6792ec02017-03-13 03:49:51 -0700346 /* If we couldn't dequeue all bytes mark as partially read */
347 if (max_len_to_snd0 < max_dequeue0)
Dave Barach68b0fb02017-02-28 15:15:56 -0500348 {
Florin Coras6792ec02017-03-13 03:49:51 -0700349 /* If we don't already have new event */
350 if (svm_fifo_set_event (s0->server_tx_fifo))
351 {
Dave Barachacd2a6a2017-05-16 17:41:34 -0400352 vec_add1 (smm->pending_event_vector[thread_index], *e0);
Florin Coras6792ec02017-03-13 03:49:51 -0700353 }
Dave Barach68b0fb02017-02-28 15:15:56 -0500354 }
355 return 0;
356
357dequeue_fail:
Florin Coras6792ec02017-03-13 03:49:51 -0700358 /*
359 * Can't read from fifo. If we don't already have an event, save as partially
360 * read, return buff to free list and return
361 */
362 clib_warning ("dequeue fail");
Dave Barach68b0fb02017-02-28 15:15:56 -0500363
Florin Coras6792ec02017-03-13 03:49:51 -0700364 if (svm_fifo_set_event (s0->server_tx_fifo))
365 {
Dave Barachacd2a6a2017-05-16 17:41:34 -0400366 vec_add1 (smm->pending_event_vector[thread_index], *e0);
Florin Coras6792ec02017-03-13 03:49:51 -0700367 }
368 vlib_put_next_frame (vm, node, next_index, n_left_to_next + 1);
Dave Barach68b0fb02017-02-28 15:15:56 -0500369 _vec_len (smm->tx_buffers[thread_index]) += 1;
370
Dave Barach68b0fb02017-02-28 15:15:56 -0500371 return 0;
372}
373
374int
Florin Corasd79b41e2017-03-04 05:37:52 -0800375session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
376 session_manager_main_t * smm,
377 session_fifo_event_t * e0,
378 stream_session_t * s0, u32 thread_index,
379 int *n_tx_pkts)
Dave Barach68b0fb02017-02-28 15:15:56 -0500380{
Florin Corasd79b41e2017-03-04 05:37:52 -0800381 return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
382 n_tx_pkts, 1);
Dave Barach68b0fb02017-02-28 15:15:56 -0500383}
384
385int
Florin Corasd79b41e2017-03-04 05:37:52 -0800386session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
387 session_manager_main_t * smm,
388 session_fifo_event_t * e0,
389 stream_session_t * s0, u32 thread_index,
390 int *n_tx_pkts)
Dave Barach68b0fb02017-02-28 15:15:56 -0500391{
Florin Corasd79b41e2017-03-04 05:37:52 -0800392 return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
393 n_tx_pkts, 0);
Dave Barach68b0fb02017-02-28 15:15:56 -0500394}
395
Dave Barach2c25a622017-06-26 11:35:07 -0400396always_inline stream_session_t *
397session_event_get_session (session_fifo_event_t * e, u8 thread_index)
Florin Corasa5464812017-04-19 13:00:05 -0700398{
Dave Barach2c25a622017-06-26 11:35:07 -0400399 return stream_session_get_if_valid (e->fifo->master_session_index,
400 thread_index);
Florin Corasa5464812017-04-19 13:00:05 -0700401}
402
Dave Barachacd2a6a2017-05-16 17:41:34 -0400403void
404dump_thread_0_event_queue (void)
405{
406 session_manager_main_t *smm = vnet_get_session_manager_main ();
407 vlib_main_t *vm = &vlib_global_main;
408 u32 my_thread_index = vm->thread_index;
409 session_fifo_event_t _e, *e = &_e;
410 stream_session_t *s0;
411 int i, index;
412 i8 *headp;
413
414 unix_shared_memory_queue_t *q;
415 q = smm->vpp_event_queues[my_thread_index];
416
417 index = q->head;
418
419 for (i = 0; i < q->cursize; i++)
420 {
421 headp = (i8 *) (&q->data[0] + q->elsize * index);
422 clib_memcpy (e, headp, q->elsize);
423
424 switch (e->event_type)
425 {
426 case FIFO_EVENT_APP_TX:
427 s0 = session_event_get_session (e, my_thread_index);
428 fformat (stdout, "[%04d] TX session %d\n", i, s0->session_index);
429 break;
430
431 case FIFO_EVENT_DISCONNECT:
432 s0 = stream_session_get_from_handle (e->session_handle);
433 fformat (stdout, "[%04d] disconnect session %d\n", i,
434 s0->session_index);
435 break;
436
437 case FIFO_EVENT_BUILTIN_RX:
438 s0 = session_event_get_session (e, my_thread_index);
439 fformat (stdout, "[%04d] builtin_rx %d\n", i, s0->session_index);
440 break;
441
442 case FIFO_EVENT_RPC:
443 fformat (stdout, "[%04d] RPC call %llx with %llx\n",
444 i, (u64) (e->rpc_args.fp), (u64) (e->rpc_args.arg));
445 break;
446
447 default:
448 fformat (stdout, "[%04d] unhandled event type %d\n",
449 i, e->event_type);
450 break;
451 }
452
453 index++;
454
455 if (index == q->maxsize)
456 index = 0;
457 }
458}
459
Florin Coras6534b7a2017-07-18 05:38:03 -0400460static u8
461session_node_cmp_event (session_fifo_event_t * e, svm_fifo_t * f)
462{
463 stream_session_t *s;
464 switch (e->event_type)
465 {
466 case FIFO_EVENT_APP_RX:
467 case FIFO_EVENT_APP_TX:
468 case FIFO_EVENT_BUILTIN_RX:
469 if (e->fifo == f)
470 return 1;
471 break;
472 case FIFO_EVENT_DISCONNECT:
473 break;
474 case FIFO_EVENT_RPC:
475 s = stream_session_get_from_handle (e->session_handle);
476 if (!s)
477 {
478 clib_warning ("session has event but doesn't exist!");
479 break;
480 }
481 if (s->server_rx_fifo == f || s->server_tx_fifo == f)
482 return 1;
483 break;
484 default:
485 break;
486 }
487 return 0;
488}
489
490u8
491session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
492{
493 session_manager_main_t *smm = vnet_get_session_manager_main ();
494 unix_shared_memory_queue_t *q;
495 session_fifo_event_t *pending_event_vector, *evt;
496 int i, index, found = 0;
497 i8 *headp;
498 u8 thread_index;
499
500 ASSERT (e);
501 thread_index = f->master_thread_index;
502 /*
503 * Search evt queue
504 */
505 q = smm->vpp_event_queues[thread_index];
506 index = q->head;
507 for (i = 0; i < q->cursize; i++)
508 {
509 headp = (i8 *) (&q->data[0] + q->elsize * index);
510 clib_memcpy (e, headp, q->elsize);
511 found = session_node_cmp_event (e, f);
512 if (found)
513 break;
514 if (++index == q->maxsize)
515 index = 0;
516 }
517 /*
518 * Search pending events vector
519 */
520 pending_event_vector = smm->pending_event_vector[thread_index];
521 vec_foreach (evt, pending_event_vector)
522 {
523 found = session_node_cmp_event (evt, f);
524 if (found)
525 {
526 clib_memcpy (e, evt, sizeof (*evt));
527 break;
528 }
529 }
530 return found;
531}
532
Dave Barach68b0fb02017-02-28 15:15:56 -0500533static uword
534session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
535 vlib_frame_t * frame)
536{
537 session_manager_main_t *smm = vnet_get_session_manager_main ();
Dave Barachacd2a6a2017-05-16 17:41:34 -0400538 session_fifo_event_t *my_pending_event_vector, *e;
539 session_fifo_event_t *my_fifo_events;
Florin Corase04c2992017-03-01 08:17:34 -0800540 u32 n_to_dequeue, n_events;
Dave Barach68b0fb02017-02-28 15:15:56 -0500541 unix_shared_memory_queue_t *q;
Florin Coras6792ec02017-03-13 03:49:51 -0700542 application_t *app;
Dave Barach68b0fb02017-02-28 15:15:56 -0500543 int n_tx_packets = 0;
Damjan Marion586afd72017-04-05 19:18:20 +0200544 u32 my_thread_index = vm->thread_index;
Dave Barach68b0fb02017-02-28 15:15:56 -0500545 int i, rv;
Florin Coras3e350af2017-03-30 02:54:28 -0700546 f64 now = vlib_time_now (vm);
Dave Barache5f1d272017-05-10 13:34:04 -0400547 void (*fp) (void *);
Florin Coras3e350af2017-03-30 02:54:28 -0700548
549 SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, my_thread_index);
Dave Barach68b0fb02017-02-28 15:15:56 -0500550
551 /*
552 * Update TCP time
553 */
Florin Coras3e350af2017-03-30 02:54:28 -0700554 tcp_update_time (now, my_thread_index);
Dave Barach68b0fb02017-02-28 15:15:56 -0500555
556 /*
557 * Get vpp queue events
558 */
559 q = smm->vpp_event_queues[my_thread_index];
560 if (PREDICT_FALSE (q == 0))
561 return 0;
562
Dave Barachacd2a6a2017-05-16 17:41:34 -0400563 my_fifo_events = smm->free_event_vector[my_thread_index];
564
Dave Barach68b0fb02017-02-28 15:15:56 -0500565 /* min number of events we can dequeue without blocking */
566 n_to_dequeue = q->cursize;
Dave Barachacd2a6a2017-05-16 17:41:34 -0400567 my_pending_event_vector = smm->pending_event_vector[my_thread_index];
Dave Barach68b0fb02017-02-28 15:15:56 -0500568
Dave Barachacd2a6a2017-05-16 17:41:34 -0400569 if (n_to_dequeue == 0 && vec_len (my_pending_event_vector) == 0)
Florin Corase04c2992017-03-01 08:17:34 -0800570 return 0;
571
Florin Coras6792ec02017-03-13 03:49:51 -0700572 SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
573
Florin Corase04c2992017-03-01 08:17:34 -0800574 /*
575 * If we didn't manage to process previous events try going
Dave Barach68b0fb02017-02-28 15:15:56 -0500576 * over them again without dequeuing new ones.
Florin Corase04c2992017-03-01 08:17:34 -0800577 */
578 /* XXX: Block senders to sessions that can't keep up */
Dave Barachacd2a6a2017-05-16 17:41:34 -0400579 if (0 && vec_len (my_pending_event_vector) >= 100)
Florin Coras6792ec02017-03-13 03:49:51 -0700580 {
581 clib_warning ("too many fifo events unsolved");
582 goto skip_dequeue;
583 }
Dave Barach68b0fb02017-02-28 15:15:56 -0500584
585 /* See you in the next life, don't be late */
586 if (pthread_mutex_trylock (&q->mutex))
587 return 0;
588
589 for (i = 0; i < n_to_dequeue; i++)
590 {
591 vec_add2 (my_fifo_events, e, 1);
592 unix_shared_memory_queue_sub_raw (q, (u8 *) e);
593 }
594
595 /* The other side of the connection is not polling */
596 if (q->cursize < (q->maxsize / 8))
597 (void) pthread_cond_broadcast (&q->condvar);
598 pthread_mutex_unlock (&q->mutex);
599
Dave Barachacd2a6a2017-05-16 17:41:34 -0400600 vec_append (my_fifo_events, my_pending_event_vector);
601
602 _vec_len (my_pending_event_vector) = 0;
603 smm->pending_event_vector[my_thread_index] = my_pending_event_vector;
Dave Barach68b0fb02017-02-28 15:15:56 -0500604
605skip_dequeue:
Florin Corase04c2992017-03-01 08:17:34 -0800606 n_events = vec_len (my_fifo_events);
607 for (i = 0; i < n_events; i++)
Dave Barach68b0fb02017-02-28 15:15:56 -0500608 {
Florin Corasa5464812017-04-19 13:00:05 -0700609 stream_session_t *s0; /* $$$ prefetch 1 ahead maybe */
Dave Barach68b0fb02017-02-28 15:15:56 -0500610 session_fifo_event_t *e0;
611
612 e0 = &my_fifo_events[i];
Dave Barach68b0fb02017-02-28 15:15:56 -0500613
614 switch (e0->event_type)
615 {
Florin Corasa5464812017-04-19 13:00:05 -0700616 case FIFO_EVENT_APP_TX:
617 s0 = session_event_get_session (e0, my_thread_index);
618
619 if (CLIB_DEBUG && !s0)
620 {
621 clib_warning ("It's dead, Jim!");
622 continue;
623 }
Florin Corasb2215d62017-08-01 16:56:58 -0700624 /* Can retransmit for closed sessions but can't do anything if
625 * session is not ready or closed */
626 if (PREDICT_FALSE (s0->session_state < SESSION_STATE_READY))
Florin Corasa5464812017-04-19 13:00:05 -0700627 continue;
Dave Barach68b0fb02017-02-28 15:15:56 -0500628 /* Spray packets in per session type frames, since they go to
629 * different nodes */
Florin Corase69f4952017-03-07 10:06:24 -0800630 rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
Dave Barach68b0fb02017-02-28 15:15:56 -0500631 my_thread_index,
632 &n_tx_packets);
Florin Coras6792ec02017-03-13 03:49:51 -0700633 /* Out of buffers */
Florin Coras93992a92017-05-24 18:03:56 -0700634 if (PREDICT_FALSE (rv < 0))
Dave Barachacd2a6a2017-05-16 17:41:34 -0400635 {
Florin Coras93992a92017-05-24 18:03:56 -0700636 vlib_node_increment_counter (vm, node->node_index,
637 SESSION_QUEUE_ERROR_NO_BUFFER, 1);
Dave Barachacd2a6a2017-05-16 17:41:34 -0400638 continue;
639 }
Dave Barach68b0fb02017-02-28 15:15:56 -0500640 break;
Florin Corasa5464812017-04-19 13:00:05 -0700641 case FIFO_EVENT_DISCONNECT:
642 s0 = stream_session_get_from_handle (e0->session_handle);
Florin Coras6792ec02017-03-13 03:49:51 -0700643 stream_session_disconnect (s0);
644 break;
645 case FIFO_EVENT_BUILTIN_RX:
Florin Corasa5464812017-04-19 13:00:05 -0700646 s0 = session_event_get_session (e0, my_thread_index);
Florin Corasc87c91d2017-08-16 19:55:49 -0700647 if (PREDICT_FALSE (!s0))
648 continue;
Florin Coras6792ec02017-03-13 03:49:51 -0700649 svm_fifo_unset_event (s0->server_rx_fifo);
Florin Coras6792ec02017-03-13 03:49:51 -0700650 app = application_get (s0->app_index);
651 app->cb_fns.builtin_server_rx_callback (s0);
652 break;
Dave Barache5f1d272017-05-10 13:34:04 -0400653 case FIFO_EVENT_RPC:
654 fp = e0->rpc_args.fp;
655 (*fp) (e0->rpc_args.arg);
656 break;
657
Dave Barach68b0fb02017-02-28 15:15:56 -0500658 default:
659 clib_warning ("unhandled event type %d", e0->event_type);
660 }
661 }
662
Dave Barachacd2a6a2017-05-16 17:41:34 -0400663 _vec_len (my_fifo_events) = 0;
664 smm->free_event_vector[my_thread_index] = my_fifo_events;
Dave Barach68b0fb02017-02-28 15:15:56 -0500665
666 vlib_node_increment_counter (vm, session_queue_node.index,
667 SESSION_QUEUE_ERROR_TX, n_tx_packets);
668
Florin Coras6792ec02017-03-13 03:49:51 -0700669 SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 1);
670
Dave Barach68b0fb02017-02-28 15:15:56 -0500671 return n_tx_packets;
672}
673
674/* *INDENT-OFF* */
675VLIB_REGISTER_NODE (session_queue_node) =
676{
677 .function = session_queue_node_fn,
678 .name = "session-queue",
679 .format_trace = format_session_queue_trace,
680 .type = VLIB_NODE_TYPE_INPUT,
681 .n_errors = ARRAY_LEN (session_queue_error_strings),
682 .error_strings = session_queue_error_strings,
683 .n_next_nodes = SESSION_QUEUE_N_NEXT,
Florin Corase04c2992017-03-01 08:17:34 -0800684 .state = VLIB_NODE_STATE_DISABLED,
Dave Barach68b0fb02017-02-28 15:15:56 -0500685 .next_nodes =
686 {
687 [SESSION_QUEUE_NEXT_DROP] = "error-drop",
688 [SESSION_QUEUE_NEXT_IP4_LOOKUP] = "ip4-lookup",
689 [SESSION_QUEUE_NEXT_IP6_LOOKUP] = "ip6-lookup",
690 [SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT] = "tcp4-output",
691 [SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT] = "tcp6-output",
692 },
693};
694/* *INDENT-ON* */
695
696/*
697 * fd.io coding-style-patch-verification: ON
698 *
699 * Local Variables:
700 * eval: (c-set-style "gnu")
701 * End:
702 */