From 875c1737e5c3d417ecd6d2090e1fa955775d2ac7 Mon Sep 17 00:00:00 2001 From: Simon Gardling Date: Wed, 5 Mar 2025 10:25:38 -0500 Subject: [PATCH] elo: add live leaderboard --- Cargo.lock | 10 +++++++++ Cargo.toml | 1 + src/elo.rs | 65 ++++++++++++++++++++++++++++++++++++++++-------------- 3 files changed, 60 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f97a5d2..7deb177 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,6 +172,15 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -415,6 +424,7 @@ dependencies = [ "arrayvec", "const_fn", "criterion", + "crossbeam-channel", "either", "indicatif", "nohash-hasher", diff --git a/Cargo.toml b/Cargo.toml index 7ab808c..c647c28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ debug = true [dependencies] arrayvec = "0.7" const_fn = "0.4" +crossbeam-channel = "0.5.14" either = "1.13" indicatif = { version = "0.17", features = [ "rayon" ] } nohash-hasher = "0.2" diff --git a/src/elo.rs b/src/elo.rs index 0c98e51..75a9512 100644 --- a/src/elo.rs +++ b/src/elo.rs @@ -118,6 +118,7 @@ impl PlayerArena { players: players .into_iter() .zip([EloRating::new()].into_iter().cycle()) + // flatten tuple .map(|((a, b), c)| (a, b, c)) .collect(), } @@ -137,21 +138,53 @@ impl PlayerArena { // shuffle for consistency created_pairs.shuffle(&mut rand::rng()); - // after the agents are created, we can multithread the games being played - created_pairs - .into_par_iter() - .progress_with_style( - ProgressStyle::with_template("[{elapsed_precise}] {pos:>7}/{len:7} ETA: {eta}") - .expect("invalid ProgressStyle"), - ) - .map(|((i, j), (p1, p2))| (i, j, Self::play_two_inner(p1, p2))) - // TODO! is there some way in rayon to allow `self.process_outcome` to be run on the main thread - // as soon as Self::play_two_inner completes? This would allow maybe a live leaderboard to be displayed - // while players are playing - .collect::>() - // collect and process the outcomes of all the games - .into_iter() - .for_each(|(i, j, o)| self.process_outcome(i, j, &o)); + let num = created_pairs.len(); + + let (sender, receiver) = crossbeam_channel::unbounded(); + + // Spawn parallel processing in a dedicated thread + let processing_thread = { + let sender = sender.clone(); + std::thread::spawn(move || { + created_pairs + .into_par_iter() + .progress_with_style( + ProgressStyle::with_template( + "[{elapsed_precise}] {pos:>7}/{len:7} ETA: {eta}", + ) + .expect("invalid ProgressStyle"), + ) + .map(|((i, j), (p1, p2))| (i, j, Self::play_two_inner(p1, p2))) + .for_each(|(i, j, o)| { + sender.send((i, j, o)).expect("Failed to send result"); + }); + }) + }; + + // Immediately drop our copy of the sender so the channel closes properly + drop(sender); + + // Process results on main thread as they arrive + let mut received_num = 0; + while let Ok((i, j, o)) = receiver.recv() { + self.process_outcome(i, j, &o); + received_num += 1; + + // print the leaderboard every 5 steps + if received_num % 5 == 0 { + println!("{}", self); + } + + // break if all pairs were recieved + if received_num == num { + break; + } + } + + // Ensure parallel thread completes + processing_thread + .join() + .expect("Processing thread panicked"); } fn prop_arena(&mut self, n: usize) { @@ -191,7 +224,7 @@ impl PlayerArena { player_1, player_2, false, - Board::random(rand::random_range(3..8)), + Board::random(rand::random_range(3..=7)), ) .loop_until_result();