blob: 8173bc7fec92ec3b1056f1bba2de3226ca9a733e [file] [log] [blame]
Kyle Swenson8d8f6542021-03-15 11:02:55 -06001/*
2 * Queued spinlock
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * (C) Copyright 2013-2015 Hewlett-Packard Development Company, L.P.
15 * (C) Copyright 2013-2014 Red Hat, Inc.
16 * (C) Copyright 2015 Intel Corp.
17 *
18 * Authors: Waiman Long <waiman.long@hp.com>
19 * Peter Zijlstra <peterz@infradead.org>
20 */
21
22#ifndef _GEN_PV_LOCK_SLOWPATH
23
24#include <linux/smp.h>
25#include <linux/bug.h>
26#include <linux/cpumask.h>
27#include <linux/percpu.h>
28#include <linux/hardirq.h>
29#include <linux/mutex.h>
30#include <asm/byteorder.h>
31#include <asm/qspinlock.h>
32
33/*
34 * The basic principle of a queue-based spinlock can best be understood
35 * by studying a classic queue-based spinlock implementation called the
36 * MCS lock. The paper below provides a good description for this kind
37 * of lock.
38 *
39 * http://www.cise.ufl.edu/tr/DOC/REP-1992-71.pdf
40 *
41 * This queued spinlock implementation is based on the MCS lock, however to make
42 * it fit the 4 bytes we assume spinlock_t to be, and preserve its existing
43 * API, we must modify it somehow.
44 *
45 * In particular; where the traditional MCS lock consists of a tail pointer
46 * (8 bytes) and needs the next pointer (another 8 bytes) of its own node to
47 * unlock the next pending (next->locked), we compress both these: {tail,
48 * next->locked} into a single u32 value.
49 *
50 * Since a spinlock disables recursion of its own context and there is a limit
51 * to the contexts that can nest; namely: task, softirq, hardirq, nmi. As there
52 * are at most 4 nesting levels, it can be encoded by a 2-bit number. Now
53 * we can encode the tail by combining the 2-bit nesting level with the cpu
54 * number. With one byte for the lock value and 3 bytes for the tail, only a
55 * 32-bit word is now needed. Even though we only need 1 bit for the lock,
56 * we extend it to a full byte to achieve better performance for architectures
57 * that support atomic byte write.
58 *
59 * We also change the first spinner to spin on the lock bit instead of its
60 * node; whereby avoiding the need to carry a node from lock to unlock, and
61 * preserving existing lock API. This also makes the unlock code simpler and
62 * faster.
63 *
64 * N.B. The current implementation only supports architectures that allow
65 * atomic operations on smaller 8-bit and 16-bit data types.
66 *
67 */
68
69#include "mcs_spinlock.h"
70
71#ifdef CONFIG_PARAVIRT_SPINLOCKS
72#define MAX_NODES 8
73#else
74#define MAX_NODES 4
75#endif
76
77/*
78 * Per-CPU queue node structures; we can never have more than 4 nested
79 * contexts: task, softirq, hardirq, nmi.
80 *
81 * Exactly fits one 64-byte cacheline on a 64-bit architecture.
82 *
83 * PV doubles the storage and uses the second cacheline for PV state.
84 */
85static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[MAX_NODES]);
86
87/*
88 * We must be able to distinguish between no-tail and the tail at 0:0,
89 * therefore increment the cpu number by one.
90 */
91
92static inline u32 encode_tail(int cpu, int idx)
93{
94 u32 tail;
95
96#ifdef CONFIG_DEBUG_SPINLOCK
97 BUG_ON(idx > 3);
98#endif
99 tail = (cpu + 1) << _Q_TAIL_CPU_OFFSET;
100 tail |= idx << _Q_TAIL_IDX_OFFSET; /* assume < 4 */
101
102 return tail;
103}
104
105static inline struct mcs_spinlock *decode_tail(u32 tail)
106{
107 int cpu = (tail >> _Q_TAIL_CPU_OFFSET) - 1;
108 int idx = (tail & _Q_TAIL_IDX_MASK) >> _Q_TAIL_IDX_OFFSET;
109
110 return per_cpu_ptr(&mcs_nodes[idx], cpu);
111}
112
113#define _Q_LOCKED_PENDING_MASK (_Q_LOCKED_MASK | _Q_PENDING_MASK)
114
115/*
116 * By using the whole 2nd least significant byte for the pending bit, we
117 * can allow better optimization of the lock acquisition for the pending
118 * bit holder.
119 *
120 * This internal structure is also used by the set_locked function which
121 * is not restricted to _Q_PENDING_BITS == 8.
122 */
123struct __qspinlock {
124 union {
125 atomic_t val;
126#ifdef __LITTLE_ENDIAN
127 struct {
128 u8 locked;
129 u8 pending;
130 };
131 struct {
132 u16 locked_pending;
133 u16 tail;
134 };
135#else
136 struct {
137 u16 tail;
138 u16 locked_pending;
139 };
140 struct {
141 u8 reserved[2];
142 u8 pending;
143 u8 locked;
144 };
145#endif
146 };
147};
148
149#if _Q_PENDING_BITS == 8
150/**
151 * clear_pending_set_locked - take ownership and clear the pending bit.
152 * @lock: Pointer to queued spinlock structure
153 *
154 * *,1,0 -> *,0,1
155 *
156 * Lock stealing is not allowed if this function is used.
157 */
158static __always_inline void clear_pending_set_locked(struct qspinlock *lock)
159{
160 struct __qspinlock *l = (void *)lock;
161
162 WRITE_ONCE(l->locked_pending, _Q_LOCKED_VAL);
163}
164
165/*
166 * xchg_tail - Put in the new queue tail code word & retrieve previous one
167 * @lock : Pointer to queued spinlock structure
168 * @tail : The new queue tail code word
169 * Return: The previous queue tail code word
170 *
171 * xchg(lock, tail)
172 *
173 * p,*,* -> n,*,* ; prev = xchg(lock, node)
174 */
175static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail)
176{
177 struct __qspinlock *l = (void *)lock;
178
179 return (u32)xchg(&l->tail, tail >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
180}
181
182#else /* _Q_PENDING_BITS == 8 */
183
184/**
185 * clear_pending_set_locked - take ownership and clear the pending bit.
186 * @lock: Pointer to queued spinlock structure
187 *
188 * *,1,0 -> *,0,1
189 */
190static __always_inline void clear_pending_set_locked(struct qspinlock *lock)
191{
192 atomic_add(-_Q_PENDING_VAL + _Q_LOCKED_VAL, &lock->val);
193}
194
195/**
196 * xchg_tail - Put in the new queue tail code word & retrieve previous one
197 * @lock : Pointer to queued spinlock structure
198 * @tail : The new queue tail code word
199 * Return: The previous queue tail code word
200 *
201 * xchg(lock, tail)
202 *
203 * p,*,* -> n,*,* ; prev = xchg(lock, node)
204 */
205static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail)
206{
207 u32 old, new, val = atomic_read(&lock->val);
208
209 for (;;) {
210 new = (val & _Q_LOCKED_PENDING_MASK) | tail;
211 old = atomic_cmpxchg(&lock->val, val, new);
212 if (old == val)
213 break;
214
215 val = old;
216 }
217 return old;
218}
219#endif /* _Q_PENDING_BITS == 8 */
220
221/**
222 * set_locked - Set the lock bit and own the lock
223 * @lock: Pointer to queued spinlock structure
224 *
225 * *,*,0 -> *,0,1
226 */
227static __always_inline void set_locked(struct qspinlock *lock)
228{
229 struct __qspinlock *l = (void *)lock;
230
231 WRITE_ONCE(l->locked, _Q_LOCKED_VAL);
232}
233
234
235/*
236 * Generate the native code for queued_spin_unlock_slowpath(); provide NOPs for
237 * all the PV callbacks.
238 */
239
240static __always_inline void __pv_init_node(struct mcs_spinlock *node) { }
241static __always_inline void __pv_wait_node(struct mcs_spinlock *node) { }
242static __always_inline void __pv_kick_node(struct qspinlock *lock,
243 struct mcs_spinlock *node) { }
244static __always_inline void __pv_wait_head(struct qspinlock *lock,
245 struct mcs_spinlock *node) { }
246
247#define pv_enabled() false
248
249#define pv_init_node __pv_init_node
250#define pv_wait_node __pv_wait_node
251#define pv_kick_node __pv_kick_node
252#define pv_wait_head __pv_wait_head
253
254#ifdef CONFIG_PARAVIRT_SPINLOCKS
255#define queued_spin_lock_slowpath native_queued_spin_lock_slowpath
256#endif
257
258/*
259 * queued_spin_lock_slowpath() can (load-)ACQUIRE the lock before
260 * issuing an _unordered_ store to set _Q_LOCKED_VAL.
261 *
262 * This means that the store can be delayed, but no later than the
263 * store-release from the unlock. This means that simply observing
264 * _Q_LOCKED_VAL is not sufficient to determine if the lock is acquired.
265 *
266 * There are two paths that can issue the unordered store:
267 *
268 * (1) clear_pending_set_locked(): *,1,0 -> *,0,1
269 *
270 * (2) set_locked(): t,0,0 -> t,0,1 ; t != 0
271 * atomic_cmpxchg_relaxed(): t,0,0 -> 0,0,1
272 *
273 * However, in both cases we have other !0 state we've set before to queue
274 * ourseves:
275 *
276 * For (1) we have the atomic_cmpxchg_acquire() that set _Q_PENDING_VAL, our
277 * load is constrained by that ACQUIRE to not pass before that, and thus must
278 * observe the store.
279 *
280 * For (2) we have a more intersting scenario. We enqueue ourselves using
281 * xchg_tail(), which ends up being a RELEASE. This in itself is not
282 * sufficient, however that is followed by an smp_cond_acquire() on the same
283 * word, giving a RELEASE->ACQUIRE ordering. This again constrains our load and
284 * guarantees we must observe that store.
285 *
286 * Therefore both cases have other !0 state that is observable before the
287 * unordered locked byte store comes through. This means we can use that to
288 * wait for the lock store, and then wait for an unlock.
289 */
290#ifndef queued_spin_unlock_wait
291void queued_spin_unlock_wait(struct qspinlock *lock)
292{
293 u32 val;
294
295 for (;;) {
296 val = atomic_read(&lock->val);
297
298 if (!val) /* not locked, we're done */
299 goto done;
300
301 if (val & _Q_LOCKED_MASK) /* locked, go wait for unlock */
302 break;
303
304 /* not locked, but pending, wait until we observe the lock */
305 cpu_relax();
306 }
307
308 /* any unlock is good */
309 while (atomic_read(&lock->val) & _Q_LOCKED_MASK)
310 cpu_relax();
311
312done:
313 smp_rmb(); /* CTRL + RMB -> ACQUIRE */
314}
315EXPORT_SYMBOL(queued_spin_unlock_wait);
316#endif
317
318#endif /* _GEN_PV_LOCK_SLOWPATH */
319
320/**
321 * queued_spin_lock_slowpath - acquire the queued spinlock
322 * @lock: Pointer to queued spinlock structure
323 * @val: Current value of the queued spinlock 32-bit word
324 *
325 * (queue tail, pending bit, lock value)
326 *
327 * fast : slow : unlock
328 * : :
329 * uncontended (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0)
330 * : | ^--------.------. / :
331 * : v \ \ | :
332 * pending : (0,1,1) +--> (0,1,0) \ | :
333 * : | ^--' | | :
334 * : v | | :
335 * uncontended : (n,x,y) +--> (n,0,0) --' | :
336 * queue : | ^--' | :
337 * : v | :
338 * contended : (*,x,y) +--> (*,0,0) ---> (*,0,1) -' :
339 * queue : ^--' :
340 */
341void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
342{
343 struct mcs_spinlock *prev, *next, *node;
344 u32 new, old, tail;
345 int idx;
346
347 BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
348
349 if (pv_enabled())
350 goto queue;
351
352 if (virt_spin_lock(lock))
353 return;
354
355 /*
356 * wait for in-progress pending->locked hand-overs
357 *
358 * 0,1,0 -> 0,0,1
359 */
360 if (val == _Q_PENDING_VAL) {
361 while ((val = atomic_read(&lock->val)) == _Q_PENDING_VAL)
362 cpu_relax();
363 }
364
365 /*
366 * trylock || pending
367 *
368 * 0,0,0 -> 0,0,1 ; trylock
369 * 0,0,1 -> 0,1,1 ; pending
370 */
371 for (;;) {
372 /*
373 * If we observe any contention; queue.
374 */
375 if (val & ~_Q_LOCKED_MASK)
376 goto queue;
377
378 new = _Q_LOCKED_VAL;
379 if (val == new)
380 new |= _Q_PENDING_VAL;
381
382 old = atomic_cmpxchg(&lock->val, val, new);
383 if (old == val)
384 break;
385
386 val = old;
387 }
388
389 /*
390 * we won the trylock
391 */
392 if (new == _Q_LOCKED_VAL)
393 return;
394
395 /*
396 * we're pending, wait for the owner to go away.
397 *
398 * *,1,1 -> *,1,0
399 *
400 * this wait loop must be a load-acquire such that we match the
401 * store-release that clears the locked bit and create lock
402 * sequentiality; this is because not all clear_pending_set_locked()
403 * implementations imply full barriers.
404 */
405 while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_MASK)
406 cpu_relax();
407
408 /*
409 * take ownership and clear the pending bit.
410 *
411 * *,1,0 -> *,0,1
412 */
413 clear_pending_set_locked(lock);
414 return;
415
416 /*
417 * End of pending bit optimistic spinning and beginning of MCS
418 * queuing.
419 */
420queue:
421 node = this_cpu_ptr(&mcs_nodes[0]);
422 idx = node->count++;
423 tail = encode_tail(smp_processor_id(), idx);
424
425 node += idx;
426 node->locked = 0;
427 node->next = NULL;
428 pv_init_node(node);
429
430 /*
431 * We touched a (possibly) cold cacheline in the per-cpu queue node;
432 * attempt the trylock once more in the hope someone let go while we
433 * weren't watching.
434 */
435 if (queued_spin_trylock(lock))
436 goto release;
437
438 /*
439 * We have already touched the queueing cacheline; don't bother with
440 * pending stuff.
441 *
442 * p,*,* -> n,*,*
443 */
444 old = xchg_tail(lock, tail);
445
446 /*
447 * if there was a previous node; link it and wait until reaching the
448 * head of the waitqueue.
449 */
450 if (old & _Q_TAIL_MASK) {
451 prev = decode_tail(old);
452 WRITE_ONCE(prev->next, node);
453
454 pv_wait_node(node);
455 arch_mcs_spin_lock_contended(&node->locked);
456 }
457
458 /*
459 * we're at the head of the waitqueue, wait for the owner & pending to
460 * go away.
461 *
462 * *,x,y -> *,0,0
463 *
464 * this wait loop must use a load-acquire such that we match the
465 * store-release that clears the locked bit and create lock
466 * sequentiality; this is because the set_locked() function below
467 * does not imply a full barrier.
468 *
469 */
470 pv_wait_head(lock, node);
471 while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_PENDING_MASK)
472 cpu_relax();
473
474 /*
475 * claim the lock:
476 *
477 * n,0,0 -> 0,0,1 : lock, uncontended
478 * *,0,0 -> *,0,1 : lock, contended
479 *
480 * If the queue head is the only one in the queue (lock value == tail),
481 * clear the tail code and grab the lock. Otherwise, we only need
482 * to grab the lock.
483 */
484 for (;;) {
485 if (val != tail) {
486 set_locked(lock);
487 break;
488 }
489 old = atomic_cmpxchg(&lock->val, val, _Q_LOCKED_VAL);
490 if (old == val)
491 goto release; /* No contention */
492
493 val = old;
494 }
495
496 /*
497 * contended path; wait for next, release.
498 */
499 while (!(next = READ_ONCE(node->next)))
500 cpu_relax();
501
502 arch_mcs_spin_unlock_contended(&next->locked);
503 pv_kick_node(lock, next);
504
505release:
506 /*
507 * release the node
508 */
509 this_cpu_dec(mcs_nodes[0].count);
510}
511EXPORT_SYMBOL(queued_spin_lock_slowpath);
512
513/*
514 * Generate the paravirt code for queued_spin_unlock_slowpath().
515 */
516#if !defined(_GEN_PV_LOCK_SLOWPATH) && defined(CONFIG_PARAVIRT_SPINLOCKS)
517#define _GEN_PV_LOCK_SLOWPATH
518
519#undef pv_enabled
520#define pv_enabled() true
521
522#undef pv_init_node
523#undef pv_wait_node
524#undef pv_kick_node
525#undef pv_wait_head
526
527#undef queued_spin_lock_slowpath
528#define queued_spin_lock_slowpath __pv_queued_spin_lock_slowpath
529
530#include "qspinlock_paravirt.h"
531#include "qspinlock.c"
532
533#endif