Echo/Queue/
StealingQueue.rs

1//! # StealingQueue
2//!
3//! A generic, priority-aware, work-stealing queue implementation. This module
4//! is self-contained and can be used by any scheduler or application to manage
5//! and distribute tasks of any type that can be prioritized.
6
7#![allow(non_snake_case, non_camel_case_types)]
8
9use std::sync::Arc;
10
11use crossbeam_deque::{Injector, Steal, Stealer, Worker};
12use rand::Rng;
13
14/// Defines a contract for types that can be prioritized by the queue.
15pub trait Prioritized {
16	/// The type of the priority value used by the implementor.
17	type Kind: PartialEq + Eq + Copy;
18
19	/// A method to retrieve the priority of the item.
20	fn Rank(&self) -> Self::Kind;
21}
22
23/// Defines the internal priority levels used by the generic queue. These map
24/// directly to the different deques managed by the system.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum Priority {
27	High,
28
29	Normal,
30
31	Low,
32}
33
34/// Holds the queue components that are safe to share across all threads.
35///
36/// This includes global injectors for submitting new tasks from any context and
37/// stealers for taking tasks from other workers' deques, organized by priority
38/// level.
39pub struct Share<TTask> {
40	/// Global, multi-producer queues for each priority level.
41	pub Injector:(Injector<TTask>, Injector<TTask>, Injector<TTask>),
42
43	/// Shared handles for stealing tasks from each worker's local queue.
44	pub Stealer:(Vec<Stealer<TTask>>, Vec<Stealer<TTask>>, Vec<Stealer<TTask>>),
45}
46
47/// A generic, priority-aware, work-stealing queue.
48///
49/// This is the public-facing entry point for submitting tasks. It is generic
50/// over any task type `TTask` that implements the `Prioritized` trait.
51pub struct StealingQueue<TTask:Prioritized<Kind = Priority>> {
52	/// A shared, thread-safe pointer to the queue's shared components.
53	Share:Arc<Share<TTask>>,
54}
55
56/// Contains all necessary components for a single worker thread to operate.
57///
58/// This includes the thread-local `Worker` deques, which are not safe to share,
59
60/// making this `Context` object the sole owner of a worker's private queues.
61pub struct Context<TTask> {
62	/// A unique identifier for the worker, used to avoid self-stealing.
63	pub Identifier:usize,
64
65	/// Thread-local work queues for each priority level.
66	pub Local:(Worker<TTask>, Worker<TTask>, Worker<TTask>),
67
68	/// A reference to the shared components of the entire queue system.
69	pub Share:Arc<Share<TTask>>,
70}
71
72impl<TTask:Prioritized<Kind = Priority>> StealingQueue<TTask> {
73	/// Creates a complete work-stealing queue system.
74	///
75	/// This function initializes all the necessary queues, both shared and
76	/// thread-local, for a given number of workers.
77	///
78	/// # Returns
79	///
80	/// A tuple containing:
81	/// 1. The public-facing `StealingQueue` for submitting new tasks.
82	/// 2. A `Vec` of `Context` objects, one for each worker thread to own.
83	pub fn Create(Count:usize) -> (Self, Vec<Context<TTask>>) {
84		let mut High:Vec<Worker<TTask>> = Vec::with_capacity(Count);
85
86		let mut Normal:Vec<Worker<TTask>> = Vec::with_capacity(Count);
87
88		let mut Low:Vec<Worker<TTask>> = Vec::with_capacity(Count);
89
90		// For each priority level, create a thread-local worker queue and its
91		// corresponding shared stealer.
92		let StealerHigh:Vec<Stealer<TTask>> = (0..Count)
93			.map(|_| {
94				let Worker = Worker::new_fifo();
95
96				let Stealer = Worker.stealer();
97
98				High.push(Worker);
99
100				Stealer
101			})
102			.collect();
103
104		let StealerNormal:Vec<Stealer<TTask>> = (0..Count)
105			.map(|_| {
106				let Worker = Worker::new_fifo();
107
108				let Stealer = Worker.stealer();
109
110				Normal.push(Worker);
111
112				Stealer
113			})
114			.collect();
115
116		let StealerLow:Vec<Stealer<TTask>> = (0..Count)
117			.map(|_| {
118				let Worker = Worker::new_fifo();
119
120				let Stealer = Worker.stealer();
121
122				Low.push(Worker);
123
124				Stealer
125			})
126			.collect();
127
128		// Bundle all shared components into an Arc for safe sharing.
129		let Share = Arc::new(Share {
130			Injector:(Injector::new(), Injector::new(), Injector::new()),
131
132			Stealer:(StealerHigh, StealerNormal, StealerLow),
133		});
134
135		// Create a unique context for each worker, giving it ownership of its
136		// local queues and a reference to the shared components.
137		let mut Contexts = Vec::with_capacity(Count);
138
139		for Identifier in 0..Count {
140			Contexts.push(Context {
141				Identifier,
142
143				Local:(High.remove(0), Normal.remove(0), Low.remove(0)),
144
145				Share:Share.clone(),
146			});
147		}
148
149		let Queue = Self { Share };
150
151		(Queue, Contexts)
152	}
153
154	/// Submits a new task to the appropriate global queue based on its
155	/// priority. This method is thread-safe and can be called from any
156	/// context.
157	pub fn Submit(&self, Task:TTask) {
158		match Task.Rank() {
159			Priority::High => self.Share.Injector.0.push(Task),
160
161			Priority::Normal => self.Share.Injector.1.push(Task),
162
163			Priority::Low => self.Share.Injector.2.push(Task),
164		}
165	}
166}
167
168impl<TTask> Context<TTask> {
169	/// Finds the next available task for the worker to execute.
170	///
171	/// This method implements the complete work-finding logic:
172	/// 1. Check local deques (from high to low priority).
173	/// 2. If local deques are empty, attempt to steal from the system (from
174	///    high to low priority).
175	pub fn Next(&self) -> Option<TTask> {
176		self.Local
177			.0
178			.pop()
179			.or_else(|| self.Local.1.pop())
180			.or_else(|| self.Local.2.pop())
181			.or_else(|| self.Steal(&self.Share.Injector.0, &self.Share.Stealer.0, &self.Local.0))
182			.or_else(|| self.Steal(&self.Share.Injector.1, &self.Share.Stealer.1, &self.Local.1))
183			.or_else(|| self.Steal(&self.Share.Injector.2, &self.Share.Stealer.2, &self.Local.2))
184	}
185
186	/// Attempts to steal a task from a specific priority set.
187	///
188	/// It first tries to steal a batch from the global injector queue for that
189	/// priority. If that fails, it attempts to steal from a randomly chosen
190	/// peer worker to ensure fair distribution and avoid contention hotspots.
191	pub fn Steal<'a>(
192		&self,
193
194		Injector:&'a Injector<TTask>,
195
196		Stealers:&'a [Stealer<TTask>],
197
198		Local:&'a Worker<TTask>,
199	) -> Option<TTask> {
200		// First, try to steal a batch from the global injector.
201		// `steal_batch_and_pop` is efficient: it moves a batch into our local
202		// queue and returns one task immediately if successful.
203		if let Steal::Success(Task) = Injector.steal_batch_and_pop(Local) {
204			return Some(Task);
205		}
206
207		// If the global queue is empty, try stealing from peers.
208		let Count = Stealers.len();
209
210		if Count <= 1 {
211			// Cannot steal if there are no other workers.
212			return None;
213		}
214
215		// Allocation-free random iteration: pick a random starting point.
216		let mut Rng = rand::rng();
217
218		let Start = Rng.random_range(0..Count);
219
220		// Iterate through all peers starting from the random offset.
221		for i in 0..Count {
222			let Index = (Start + i) % Count;
223
224			// Don't steal from ourselves.
225			if Index == self.Identifier {
226				continue;
227			}
228
229			if let Steal::Success(Task) = Stealers[Index].steal_batch_and_pop(Local) {
230				return Some(Task);
231			}
232		}
233
234		None
235	}
236}