From 3b29d716f6bb9bcf75d8b717482ec05e8cb281dc Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Mon, 1 Jun 2026 15:46:43 +0200 Subject: [PATCH 01/13] permutation trick --- .../benchmarks/performance.rs | 63 ++- .../docs/permutation-design.md | 408 ++++++++++++++++++ .../examples/statistical_comparison.rs | 194 +++++++++ .../src/consistent_permutation.rs | 322 ++++++++++++++ crates/consistent-choose-k/src/lib.rs | 2 + 5 files changed, 987 insertions(+), 2 deletions(-) create mode 100644 crates/consistent-choose-k/docs/permutation-design.md create mode 100644 crates/consistent-choose-k/examples/statistical_comparison.rs create mode 100644 crates/consistent-choose-k/src/consistent_permutation.rs diff --git a/crates/consistent-choose-k/benchmarks/performance.rs b/crates/consistent-choose-k/benchmarks/performance.rs index bf4cce1..dfef90d 100644 --- a/crates/consistent-choose-k/benchmarks/performance.rs +++ b/crates/consistent-choose-k/benchmarks/performance.rs @@ -4,7 +4,9 @@ use std::{ time::Duration, }; -use consistent_choose_k::{ConsistentChooseKHasher, ConsistentHasher}; +use consistent_choose_k::{ + ConsistentChooseKHasher, ConsistentHasher, ConsistentPermutation, +}; use criterion::{ criterion_group, criterion_main, AxisScale, BenchmarkId, Criterion, PlotConfiguration, Throughput, @@ -68,6 +70,63 @@ fn append_vs_new_with_k(c: &mut Criterion) { group.finish(); } +fn grow_k_vs_permutation(c: &mut Criterion) { + // Compare three ways to obtain `k` distinct samples out of `0..n`: + // * `ConsistentChooseKHasher::new_with_k` (pre-build the full set); + // * `ConsistentChooseKHasher::new` + `grow_k` k times (incremental); + // * `ConsistentPermutation` (per-layer Feistel permutation, take k). + // + // All three are driven from a per-key seed so the cost of building the + // underlying permutation/hash state is included in each iteration. + let keys: Vec = rng().random_iter().take(100).collect(); + + let mut group = c.benchmark_group("grow_k_vs_permutation"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + for n in [100usize, 1_000, 10_000, 100_000] { + for k in [2usize, 10, 100, 1_000] { + if k > n { + continue; + } + group.throughput(Throughput::Elements((keys.len() * k) as u64)); + + group.bench_with_input(BenchmarkId::new(format!("new_with_k/k_{k}"), n), &n, |b, n| { + b.iter(|| { + for key in &keys { + let mut h = DefaultHasher::default(); + key.hash(&mut h); + black_box(ConsistentChooseKHasher::new_with_k(h, *n, k)); + } + }) + }); + + group.bench_with_input(BenchmarkId::new(format!("grow_k/k_{k}"), n), &n, |b, n| { + b.iter(|| { + for key in &keys { + let mut h = DefaultHasher::default(); + key.hash(&mut h); + let mut iter = ConsistentChooseKHasher::new(h, *n); + for _ in 0..k { + black_box(iter.grow_k()); + } + } + }) + }); + + group.bench_with_input(BenchmarkId::new(format!("permutation/k_{k}"), n), &n, |b, n| { + b.iter(|| { + for key in &keys { + let mut iter = ConsistentPermutation::new(*n as u32, *key); + for _ in 0..k { + black_box(iter.next()); + } + } + }) + }); + } + } + group.finish(); +} + criterion_group!( name = benches; config = Criterion::default() @@ -75,6 +134,6 @@ criterion_group!( .measurement_time(Duration::from_millis(4000)) .nresamples(1000); - targets = throughput_benchmark, append_vs_new_with_k, + targets = throughput_benchmark, append_vs_new_with_k, grow_k_vs_permutation, ); criterion_main!(benches); diff --git a/crates/consistent-choose-k/docs/permutation-design.md b/crates/consistent-choose-k/docs/permutation-design.md new file mode 100644 index 0000000..f6a5648 --- /dev/null +++ b/crates/consistent-choose-k/docs/permutation-design.md @@ -0,0 +1,408 @@ +# Permutation iterator design notes + +This document explains the design of [`ConsistentPermutation`], the +per-layer Feistel permutation iterator that this crate uses to drive +its `n`-consistent ranking. + +Given a 64-bit `key` and a universe size `n`, the iterator produces a +uniformly distributed permutation of `[0, n)` as a streaming iterator +satisfying: + +- **Stateless and deterministic** — no per-call PRNG state outside the + iterator; two iterators on the same `(key, n)` produce the same + sequence. +- **`n`-consistent** — when `n` grows by one, the new sequence agrees + with the old except for the at most one position where the new + element `n − 1` is inserted (i.e. the underlying choose-`k` family + satisfies the consistency properties in the main [README]). +- **Fast in the small-`k`, repeated-query regime** — production + workloads call the iterator with `k` ≪ `n` for many keys, so the + per-emission cost and the iterator setup cost both matter. +- **Statistically clean for choose-2 set uniformity** — pairs of + selected nodes must be uniformly distributed over `binom(n, 2)` + cells over random keys. This is the test that consistently + distinguishes "looks fine on marginals but is actually biased" from + "near-random-permutation quality". + +[`ConsistentPermutation`]: ../src/consistent_permutation.rs +[README]: ../README.md + +## Core idea + +### Permutations are automatically `n`-consistent + +Fix any permutation `π` of `[0, N)`, and for each `n ≤ N` define +`π_n` as the subsequence of `π` you get by deleting every entry +`≥ n`. Then `{π_n}_{n ≤ N}` is an `n`-consistent family of +permutations: for every `n < N`, `π_{n+1}` equals `π_n` with the +element `n` inserted at one position. + +Two things to check: + +- **`π_n` is a permutation of `[0, n)`.** `π` is a bijection of + `[0, N)`, so each value in `[0, n)` appears in `π` exactly once. + Deleting entries `≥ n` leaves each value in `[0, n)` at exactly + one position, in their original relative order. +- **Going from `π_n` to `π_{n+1}` only inserts `n`.** The + construction of `π_{n+1}` differs from that of `π_n` only in + that the entry equal to `n` is no longer deleted. Every other + surviving entry sits at the same relative position. So + `π_{n+1}` is `π_n` with `n` spliced in at the position `π` puts + it. + +So *any* recipe for sampling a permutation of `[0, N)` from a key +automatically gives an `n`-consistent family of permutations for +every `n ≤ N`. + +### Hash functions already give us power-of-two permutations + +Modern keyed hash functions and block ciphers are by design +*bijections* on fixed-width words: a small Feistel network on +`2b` bits produces a key-parameterized permutation of +`[0, 2^(2b))` in `O(1)` evaluation time. So if `n` happens to be a +power of two `n = 2^B`, the family `π_n` is straightforward — pick +a Feistel of width `B`, and reading the iterator amounts to +evaluating it at `0, 1, 2, …`. + +The catch is that `n` is rarely a power of two. The lemma above +lets us take a permutation of `[0, N)` for any `N ≥ n` and read +off the survivors, so picking the smallest power of two `N ≥ n` +(say) and using a single Feistel of width `⌈log₂ N⌉` works +correctness-wise — but begs the question of how expensive the +survivor walk is. + +### Why we cannot just pick one big `N` + +The crudest instantiation is "pick a giant `N` (say `2^30` for all +queries), construct one permutation of `[0, N)` per key, and read +off the first `k` non-deleted values for any query `(n, k)`". +Correctness is given by the lemma. The cost is not. + +Each iterator step evaluates the underlying bijection on the next +counter value and either keeps the output (if `< n`) or discards +it. The expected fraction of kept outputs is `n / N`, so the +expected number of bijection evaluations per emission is `N / n`. +With `N = 2^30` and `n` in the low hundreds this is roughly `10^7` +hash evaluations per emission — unusable. + +What we actually want is `O(1)` bijection evaluations per +emission, **independent of how large the universe might +eventually grow**. A construction with cost `O(N / n)` per +emission ties us to picking the smallest possible `N` for each +query — but then we lose the ability to grow `n` cheaply, since +growing `n` past `N` requires a brand-new construction with no +relationship to the old one. + +### Layering: building the permutation in doubling buckets + +If we could always operate on an `N` that exceeds `n` by only a +constant factor, the per-emission cost above would already be +`O(1)`. The layered construction arranges exactly that, by +splitting the universe into a growing stack of intervals each +roughly the size of the previous one combined. + +Slice `[0, ∞)` into doubling *buckets* + +``` +bucket 0 = [0, 4), +bucket 1 = [4, 16), +bucket 2 = [16, 64), +... +bucket j = [4^j, 4^(j+1)). +``` + +Bucket `j` is owned by **layer `j`**, which is fully self-contained: +it knows how to emit the elements of its own bucket in a +key-dependent order, interleaved with calls into layer `j − 1` for +the values in lower buckets. Two pieces of state are all a layer +needs: + +- A **bijection `π_j`** on `[0, 4^(j+1))` (constructed in §"Feistel + (vs PCG)" below from the master key and the layer index). +- A **counter `c_j`** initialized to `0`, walking `0, 1, 2, …`. + +A single step of layer `j` is: + +1. compute `y = π_j(c_j)`, advance `c_j`; +2. look at the top two bits of `y` (i.e. `y >> 2j`): + - if they are nonzero (probability `3/4`), `y ∈ [4^j, 4^(j+1)) =` + bucket `j` — **emit** `y`; + - if they are `00` (probability `1/4`), `y ∈ [0, 4^j)` — that + emission belongs to a lower bucket, so **descend** to layer + `j − 1` and let it produce the value. + +To produce one element the iterator starts at the top layer `j_max` +and steps; each step either emits or descends. Termination is when +the top layer's counter has walked its full domain `4^(j_max+1)`. +The top layer additionally gates emissions by `y < n` (the only +bucket that can spill past `n`); lower layers' outputs are +automatically in range. + +### Why this works (uniformity, `n`-consistency, `O(1)` per emit) + +**Uniformity.** At universe size `N = 4^(j_max+1)` the +construction *is* a permutation of `[0, N)`: layer `j`'s counter +walks `0, …, 4^(j+1) − 1` and `π_j` is a bijection, so layer `j` +emits each value of bucket `j` exactly once and triggers a descent +for each value in `[0, 4^j)`. Summed over layers, every value of +`[0, N)` is emitted exactly once. The relative *order* is governed +by the per-layer keys, so over random keys the iterator output is +a uniformly random permutation of `[0, N)` — modulo the +statistical quality of the per-layer bijection, which is the +subject of the rest of this document. + +**`n`-consistency.** Because the result at universe size `N` is a +true permutation, the §"Permutations are automatically +`n`-consistent" lemma kicks in: the iterator's output for any +`n ≤ N` is `π_N` with values `≥ n` deleted, and the family across +all `n ≤ N` is automatically `n`-consistent. The top layer's +`y < n` gate is exactly this deletion. + +Growing past `N` adds a new top layer above the existing ones. +Every old layer's bijection and counter walk are unchanged; the +new top layer's descents into the old top layer arrive at counter +values `0, 1, 2, …` in order, which is exactly what the old top +layer saw when it was the root. So at the larger universe size +`N' = 4^(j_max+2)` the construction is again a permutation of +`[0, N')` whose restriction (in the sense of the lemma) to any +`n ≤ N` agrees with the old `π_n`. The universe can therefore be +extended indefinitely without re-specifying or disturbing the +existing structure. + +**Amortized cost per emission.** Consider an emission produced +somewhere in the descent. At each non-top layer the single step +emits with probability `3/4` and descends with probability `1/4`, +so the expected number of `π_j` evaluations per emission satisfies + +``` +E_j = 1 + (1/4) · E_{j − 1}, E_0 = 1 +``` + +which converges to `E_∞ = 4/3` regardless of how many layers are +below. The top layer is slightly worse — emissions whose raw +value lies in `[n, N)` get rejected — but since `j_max` is chosen +as the smallest level with `4^(j_max+1) ≥ n`, we always have +`N ≤ 4n`, so the top-layer in-bucket pass rate is at least `n/N ≥ +1/4`. The expected number of top-layer evaluations per kept top +emission is at most `4`. Combined, every iterator step costs a +bounded constant number of layer evaluations, independent of `n` +and `j_max`. + +## Overall structure + +The rest of this document justifies the three design choices that +the iterator above leaves open: + +1. **What bijection drives each layer** — an independently-keyed + balanced Feistel network. +2. **How many bits a layer call emits** — two. +3. **What `n` we cap at and how the iterator terminates** — `n ≤ 2³⁰`, + driven by a single top-layer counter. + +## Per-layer bijection (vs a single global one) + +A tempting alternative is to use a single global *chunk-prefix* +permutation on `u32` — a bijection with the property that the prefix +of length `i` of the output depends only on the prefix of length `i` +of the input — and drive every layer of the descent with that one +permutation. Per-layer setup vanishes; each layer call is a single +tiny permutation evaluation. + +The fundamental problem is the **size of the permutation family** +this gives you. A single 64-bit key parameterizes one fixed +chunk-prefix permutation that is then re-used at every layer. The +same key controls every layer's bijection, so cross-layer +correlations are not just possible — they are baked in. For small +`n`, where choose-2 has only `binom(n, 2)` cells to spread over, the +choose-2 χ² test rejects this distribution comfortably below the +0.1 % significance threshold. + +Giving every layer its own, independently-keyed bijection breaks the +cross-layer correlation by construction. The cost is one bijection +setup per layer, but with `j_max ≈ log₂(n) / 2`, that is at most ~15 +small structs of state for any `n ≤ 2³⁰` — negligible compared to +the iterator's emission stream. + +## Feistel (vs PCG) + +Once each layer needs its own bijection, the primitive of choice is a +small balanced Feistel network on `[0, 2^n_bits)`. The two main +alternatives are PCG-style multiply-xor-shift hashes (either +stateless per call, or stateful with state preserved across calls); +both lose for reasons worth being explicit about. + +### Stateless multi-round PCG + +A stateless per-call PCG hash `y = (y ^ c) · a; y ^= y >> shift` with +the key rotated between `R` rounds is statistically clean once `R` +is large enough, but **how** large is severe at small `n_bits`: tiny +layers (`n_bits = 2`) have an F-input space of only 2 values, so the +brute-force "many rounds" knob is the only thing that breaks the +parity correlations. A passing schedule needs on the order of 16 +rounds at the small-`n_bits` end. + +In addition, the tight serial dependency `y → y·a → y^(y>>shift) → +y·a → …` runs close to its sequential multiply latency on a wide +out-of-order core, so per-round work is essentially un-amortized. +Feistel rounds are also serial, but a Feistel round does roughly the +same per-round work as a PCG round and far fewer of them suffice. + +### Stateful single-step PCG + +A more clever variant is to give each layer a stateful LCG, preserving +state across iterator calls so a single PCG step (plus a small output +mixer) per call would suffice. The hope is that 1 step × 2 layers +per emission beats Feistel's few rounds × 1 layer per emission. + +This fails for a structural reason. A full-period LCG on a +power-of-two modulus `2^k` needs Hull–Dobell parameters: `a ≡ 1 mod 4` +and `c` odd. Both forces conspire to make every consecutive state +pair `(state[i], state[i+1])` differ in bit 0 — `a · s ≡ s mod 2` +since `a` is odd, and `c` flips parity since `c` is odd. The output +of the layer is a key-dependent bijection of the state, but the +**state-pair** support is only half of the full pair space. At small +`n_bits` (e.g. 3) that reduces reachable `(state[i], state[i+1])` +pairs from 64 to 32, and no amount of output mixing recovers the +missing half — bit 0 of one output is fully determined by bit 0 of +the previous output (modulo the key-dependent mixer's bijection on +each individual state). The result is a hard floor on choose-2 χ² at +small `n` that no extra mixing round can dissolve. + +The standard PCG construction sidesteps this by using a **wider** +state than its output (e.g. 64 bits in, 32 bits out, hiding the +parity-alternation in unused bits). That option is not available +here without breaking per-layer bijectivity: if the LCG cycle length +exceeds the layer's output domain, the output repeats values within +a single iterator lifetime. + +### Feistel + +Each layer `j` runs a small balanced Feistel network on +`[0, 2^(2j + 2))`, keyed by a single avalanched 64-bit `master_key` +that is rotated and Weyl-incremented between rounds. The F-function +is one `xor`, one multiplication by an odd 32-bit slice of the master +key, and a high-to-low fold to the half-width: + +```text +mixed = (r ^ k_xor) * (k_mul | 1) +f = (mixed_low + mixed_high) & half_mask +new_r = l ^ f +new_l = r +``` + +The properties that matter: + +- **Bijective by construction.** No cycle walking, no rejection, no + probabilistic restarts. Every Feistel round is a permutation on + the layer's domain; their composition is too. +- **Small-`n_bits` mixing handled by extra rounds.** Tiny layers + (`n_bits = 2, 4, 6`) use `12, 10, 6` rounds; everything else uses + `4`. The extra rounds for tiny layers are essentially free in + amortized cost because level `j` only contributes ~`4^j` emissions + per iterator lifetime — the top layer dominates per-iterator cost + and uses the fewest rounds. +- **Key rotation amortizes a 64-bit master across all rounds.** We + rotate by exactly the bits consumed per round and add the Weyl + increment `0x9E37_79B9_7F4A_7C15` to decorrelate consecutive rounds + (rotation alone leaves 60+ bits in common between adjacent rounds). +- **All `n_bits` even** means the Feistel halves split cleanly with + no parity quirks, and the 2-bit-per-layer emission rule lines up + neatly with the per-layer top bits. + +## Two bits per layer (vs one) + +A 1-bit-per-layer scheme is initially appealing: narrower per-layer +width means fewer Feistel rounds suffice, and the per-layer call is +correspondingly cheaper. Two compounding effects make it lose +overall: + +- **Twice as many layers.** With 1 bit per layer, + `j_max ≈ log₂(n)`; with 2 bits, `j_max ≈ log₂(n) / 2`. Setup cost + (per-layer counter, per-layer state) doubles. +- **Lower emission probability per layer call.** A 1-bit-per-layer + scheme emits whenever the top bit is `1` — probability ½. A + 2-bit-per-layer scheme emits whenever the top two bits are not + `00` — probability ¾. So the **expected number of layer calls per + emission** drops from `≈ 2` to `≈ 4/3`. + +Combining the two effects, 2-bit-per-layer makes ~½ as many +`layer_apply` calls per emission as 1-bit-per-layer. Even after +accounting for the per-call cost difference (slightly more Feistel +rounds at wider `n_bits`), the 2-bit scheme wins comfortably. + +Forcing the per-layer width even (which is what "2 bits per layer" +buys structurally) also keeps the Feistel halves perfectly symmetric +and removes any need for cycle walking to stay inside `[0, n)` at the +top. + +## Universe cap: `n ≤ 2³⁰` + +Capping `n` at `2³⁰` lets the top-layer counter live in a `u32`, +which makes the iterator's termination condition a single +`counters[j_max] >= top_cap` comparison. No separate "emitted" +counter is needed to disambiguate boundary cases, the +`n_bits == 32` corner of the per-layer mask computation vanishes, +and `n_mask = (1 << n_bits) - 1` becomes unconditional inside +`layer_apply`. The cap is generous: `2³⁰ ≈ 10⁹` slots is more +node-count than any realistic distributed system, and the +simplifications it enables are observable in the benchmark. + +## Hot-path structure: top-layer phase vs descent phase + +The iterator's `next()` is split into two phases: + +- The **top-layer phase** walks the top layer's counter with both + the termination check (`counters[j_max] >= top_cap`) and the range + check (`raw < n`). The range check is necessary because the + Feistel maps the counter into `[0, 2^(2·j_max + 2))`, which may + overshoot `n`. +- The **descent phase** walks lower layers with **neither** check. + By the descent-tree invariant, lower-layer counters never exhaust + within a single iterator lifetime, and lower-layer raw outputs + always land in `[2^j, 2^(j+1)) ⊂ [0, n)`. So the hot path for + non-top layers is a pure Feistel call with no branches. + +## Measured throughput + +The `grow_k_vs_permutation` benchmark in +`benchmarks/performance.rs` constructs an iterator per key and pulls +`k` values from it, including the iterator-setup cost. Numbers +below are mean per-emitted-element times in nanoseconds on an Apple +M4 Max, for `ConsistentChooseKHasher::new_with_k` (the heap-based +implementation) versus `ConsistentPermutation::new` (this +construction). Smaller is better. + +| `n` | `k` | choose-k (ns/emit) | permutation (ns/emit) | speedup | +|---------|------:|-------------------:|----------------------:|--------:| +| 100 | 2 | 45.9 | 19.1 | 2.4 × | +| 100 | 10 | 59.6 | 14.3 | 4.2 × | +| 100 | 100 | 172 | 21.8 | 7.9 × | +| 1 000 | 2 | 41.5 | 16.2 | 2.6 × | +| 1 000 | 10 | 52.8 | 7.2 | 7.3 × | +| 1 000 | 100 | 93.8 | 7.6 | 12 × | +| 1 000 | 1 000 | 491 | 9.8 | 50 × | +| 10 000 | 2 | 54.1 | 15.5 | 3.5 × | +| 10 000 | 10 | 64.4 | 8.8 | 7.3 × | +| 10 000 | 100 | 109 | 12.1 | 9.0 × | +| 10 000 | 1 000 | 325 | 20.3 | 16 × | +| 100 000 | 2 | 47.3 | 19.5 | 2.4 × | +| 100 000 | 10 | 59.2 | 12.1 | 4.9 × | +| 100 000 | 100 | 101 | 18.8 | 5.4 × | +| 100 000 | 1 000 | 314 | 26.3 | 12 × | + +Observations matching the theory: + +- **Per-emit cost of `ConsistentPermutation` is bounded by a small + constant** (single-digit to low-tens of nanoseconds) across four + decades of `n` and `k`. The amortized `O(1)` layer evaluations + per emission analysis predicts exactly this. +- **The setup cost is amortized away by `k`.** At small `k = 2` the + per-emission number is dominated by iterator construction + (counter vector allocation), which is why + `k = 2` columns are systematically slower than `k = 10`. +- **Per-emit cost of `ConsistentChooseKHasher` grows roughly with + `k`** (heap operations cost `O(log k)` per pop and the + pre-build option is `O(k log k)`), which is why the speedup ratio + widens with `k`: at `k = 1 000, n = 1 000` the permutation + implementation is ~50× faster. + diff --git a/crates/consistent-choose-k/examples/statistical_comparison.rs b/crates/consistent-choose-k/examples/statistical_comparison.rs new file mode 100644 index 0000000..3473615 --- /dev/null +++ b/crates/consistent-choose-k/examples/statistical_comparison.rs @@ -0,0 +1,194 @@ +//! Statistical comparison of consistent-permutation constructions +//! against `ConsistentChooseKHasher` (the gold-standard reference). +//! +//! Each generator is asked to produce a uniformly-random ordering of +//! `0..n` and we run four chi-squared tests at alpha = 0.001: +//! +//! 1. Marginal at position 0 (first emitted element should be uniform). +//! 2. Marginal at position `k-1` (last sampled element should be uniform). +//! 3. Choose-2 unordered-pair uniformity over the `(n choose 2)` pairs. +//! 4. Per-position bit balance (each bit should be 1 ~half the time). +//! +//! Run with: cargo run --release --example statistical_comparison + +use std::hash::{DefaultHasher, Hash}; + +use consistent_choose_k::{ConsistentChooseKHasher, ConsistentPermutation}; + +const SAMPLES: usize = 200_000; + +/// 99.9% upper-tail critical value for chi-squared with `df` degrees of +/// freedom, computed by Wilson-Hilferty: +/// `chi2_{df, 1-alpha} ≈ df * (1 - 2/(9 df) + z * sqrt(2/(9 df)))^3` +/// where `z = Phi^{-1}(1 - alpha)`. For alpha=0.001, z ≈ 3.0902. +fn chi2_critical_999(df: usize) -> f64 { + let df = df as f64; + let a = 2.0 / (9.0 * df); + df * (1.0 - a + 3.0902 * a.sqrt()).powi(3) +} + +/// Pearson chi-squared statistic for `observed` against the uniform +/// distribution; returns `(chi2, df)`. +fn chi2_uniform(observed: &[u64]) -> (f64, usize) { + let total: u64 = observed.iter().sum(); + let cells = observed.len(); + let expected = total as f64 / cells as f64; + let mut chi2 = 0.0; + for &c in observed { + let d = c as f64 - expected; + chi2 += d * d / expected; + } + (chi2, cells - 1) +} + +// ---------- Generators ---------- + +fn chooseh_take(key: u64, n: usize, k: usize) -> Vec { + let mut h = DefaultHasher::default(); + key.hash(&mut h); + ConsistentChooseKHasher::new(h, n) + .map(|v| v as u32) + .take(k) + .collect() +} + +fn perm_take(key: u64, n: usize, k: usize) -> Vec { + ConsistentPermutation::new(n as u32, key).take(k).collect() +} + +// ---------- Per-method accumulator ---------- + +struct Acc { + name: String, + first: Vec, + kth: Vec, + pair: Vec, // n*n, indexed by lo*n + hi (lo <= hi) + bits: Vec>, // bits[pos][bit] +} + +impl Acc { + fn new(name: &str, n: usize, kk: usize, n_bits: usize) -> Self { + Self { + name: name.to_string(), + first: vec![0u64; n], + kth: vec![0u64; n], + pair: vec![0u64; n * n], + bits: vec![vec![0u64; n_bits]; kk], + } + } + + fn record(&mut self, v: &[u32], n: usize, n_bits: usize) { + let kk = self.bits.len(); + self.first[v[0] as usize] += 1; + self.kth[v[kk - 1] as usize] += 1; + let (lo, hi) = if v[0] <= v[1] { + (v[0], v[1]) + } else { + (v[1], v[0]) + }; + self.pair[(lo as usize) * n + hi as usize] += 1; + for pos in 0..kk { + for b in 0..n_bits { + if (v[pos] >> b) & 1 == 1 { + self.bits[pos][b] += 1; + } + } + } + } +} + +fn run_for_n(n: usize) { + let kk = 2usize.min(n - 1).max(1); + let n_bits = ((n - 1).next_power_of_two().trailing_zeros() as usize).max(1); + + // Build the list of methods to test. + let methods: Vec<(&'static str, Box Vec>)> = vec![ + ("ChooseKHasher ", Box::new(chooseh_take)), + ("Permutation ", Box::new(perm_take)), + ]; + + let mut accs: Vec = methods + .iter() + .map(|(name, _)| Acc::new(name, n, kk, n_bits)) + .collect(); + + for seed in 0..SAMPLES as u64 { + // Avalanche each sample's seed so we don't pass small + // sequential integers into the (single-pass) hashers, which + // would correlate strongly. `ConsistentPermutation::new` + // does not avalanche its key argument, so we have to do it + // here. + let mut z = seed.wrapping_add(0x9E37_79B9_7F4A_7C15); + z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + let key = z ^ (z >> 31); + for ((_, gen), acc) in methods.iter().zip(accs.iter_mut()) { + let v = gen(key, n, kk); + assert_eq!(v.len(), kk); + acc.record(&v, n, n_bits); + } + } + + println!("\n=== n = {n} (samples = {SAMPLES}) ==="); + println!( + "{:<46} {:>14} {:>6} {:>14} {:>7}", + "test", "chi2", "df", "crit(0.001)", "verdict" + ); + + let print_row = |label: &str, chi2: f64, df: usize| { + let crit = chi2_critical_999(df); + let verdict = if chi2 <= crit { "PASS" } else { "FAIL" }; + println!("{label:<46} {chi2:>14.2} {df:>6} {crit:>14.2} {verdict:>7}"); + }; + + // 1) Marginal @ pos 0. + for acc in &accs { + let (c, d) = chi2_uniform(&acc.first); + print_row(&format!("{}: marginal @ pos 0", acc.name), c, d); + } + // 2) Marginal @ pos kk-1. + for acc in &accs { + let (c, d) = chi2_uniform(&acc.kth); + print_row(&format!("{}: marginal @ pos {}", acc.name, kk - 1), c, d); + } + // 3) Choose-2 set uniformity over the (n choose 2) cells with i> b) & 1 == 1).count() as f64 / n as f64; + if pi == 0.0 || pi == 1.0 { + continue; + } + let exp_set = SAMPLES as f64 * pi; + let exp_unset = SAMPLES as f64 * (1.0 - pi); + for pos in 0..kk { + let set = acc.bits[pos][b] as f64; + let unset = SAMPLES as f64 - set; + chi2 += (set - exp_set).powi(2) / exp_set + + (unset - exp_unset).powi(2) / exp_unset; + df += 1; + } + } + print_row(&format!("{}: per-(pos,bit) balance", acc.name), chi2, df); + } +} + +fn main() { + println!("Statistical comparison of consistent-permutation generators"); + println!("(Chi-squared tests, alpha = 0.001 — PASS means we cannot reject uniformity.)"); + for n in [8usize, 16, 32, 64, 128, 256, 1024] { + run_for_n(n); + } +} diff --git a/crates/consistent-choose-k/src/consistent_permutation.rs b/crates/consistent-choose-k/src/consistent_permutation.rs new file mode 100644 index 0000000..978b30b --- /dev/null +++ b/crates/consistent-choose-k/src/consistent_permutation.rs @@ -0,0 +1,322 @@ +//! Per-key consistent permutation iterator over `0..n`, driven by an +//! independently-keyed Feistel network per layer (2 bits per layer). +//! +//! Given a 64-bit `seed` and a universe size `n`, the iterator emits +//! each integer in `0..n` exactly once in a key-dependent order. The +//! sequence is `n`-consistent: when `n` grows by one, the new sequence +//! agrees with the old except for the at most one position where the +//! new element is inserted. +//! +//! # Layered design +//! +//! Each layer `j` is a Feistel bijection on `[0, 2^(2j + 2))`. After +//! one Feistel pass we look at the **top two bits** of the output: +//! +//! * `00` (one quadrant) → descend to layer `j - 1` (or, at the +//! bottom layer, the output is `0` itself, which we emit). +//! * `01` / `10` / `11` (three quadrants) → emit the raw value +//! (subject to the `< n` range check). +//! +//! So every layer-`j` call has a `3/4` chance of producing an emission, +//! versus `1/2` in a one-bit-per-layer scheme. That cuts both the +//! number of layers (`j_max ≈ ⌈log₂(n)/2⌉ - 1`) and the expected +//! number of `layer_apply` calls per emission (from `≈ 2` to `≈ 4/3`). +//! Even `n_bits` also means the Feistel never needs cycle-walking. +//! +//! See [`docs/permutation-design.md`] in the crate for the full +//! design rationale, including why an independently-keyed per-layer +//! Feistel network beats the (now removed) chunk-prefix-permutation +//! approach on choose-2 set uniformity at small `n` while staying +//! cheaper than multi-round PCG. +//! +//! [`docs/permutation-design.md`]: ../../docs/permutation-design.md +//! +//! # Round-count policy +//! +//! The number of Feistel rounds is chosen automatically per layer as a +//! function of `n_bits` (see [`rounds_for_n_bits`]). Small layers have +//! a tiny F-function input space (`2^half_bits` distinct values), +//! which limits the per-round permutation family; they compensate with +//! extra rounds. Large layers already see a wide F input and reach the +//! random-perm floor with `4` rounds. Conveniently the small layers +//! are called rarely (level `j` contributes `≈ 4^j` emissions), so the +//! extra rounds there are essentially free, while the top layer — +//! which dominates per-iterator cost — uses the fewest rounds. +//! +//! # Iterator +//! +//! Start at the top layer `j_max` and walk its counter. If the top +//! two bits are `00`, descend one layer (and walk *that* layer's +//! counter on the next step); otherwise emit. The iterator stops once +//! it has emitted `n` values. + +/// Smallest Feistel round count we trust to deliver near-random-perm +/// statistics for a layer of width `n_bits`, based on the +/// `statistical_comparison` chi² experiments. Tiny layers +/// (`n_bits = 2`) have a F-input space of only 2 values and need +/// many rounds; large layers reach the floor with 4 rounds. +pub(crate) fn rounds_for_n_bits(n_bits: u32) -> usize { + match n_bits { + 2 => 12, + 4 => 10, + 6 => 6, + _ => 4, + } +} + +/// SplitMix64 — strong per-bit avalanche, useful as a seed +/// pre-mixer for callers that only have a low-entropy seed. +#[cfg(test)] +#[inline] +fn splitmix64(seed: u64) -> u64 { + let mut z = seed.wrapping_add(0x9E37_79B9_7F4A_7C15); + z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + z ^ (z >> 31) +} + +/// Apply the per-layer Feistel bijection on `[0, 2^n_bits)`. +/// `master_key` must be well-avalanched: two near-identical keys +/// will yield two highly correlated permutations. +/// +/// All per-layer geometry (`half_bits`, mask, round count) is derived +/// from `n_bits` on each call. `n_bits` is small (≤ 32) and the +/// constants are a handful of trivial bit ops, so this stays well off +/// the hot path of the Feistel loop itself. +/// +/// `n_bits` must be **even** and in `2..=30`. +#[inline] +fn layer_apply(n_bits: u32, master_key: u64, x: u32) -> u32 { + debug_assert!(n_bits >= 2 && n_bits <= 30 && n_bits.is_multiple_of(2)); + let rounds = rounds_for_n_bits(n_bits) as u32; + let half_bits = n_bits / 2; + let half_mask = (1u32 << half_bits) - 1; + let n_mask = (1u32 << n_bits) - 1; + let shift = 2 * half_bits; + + debug_assert!(x <= n_mask, "input out of range"); + let mut l = (x >> half_bits) & half_mask; + let mut r = x & half_mask; + let mut k = master_key; + for _ in 0..rounds { + // Split each 64-bit (rotated) master into two *independent* + // 32-bit sub-keys: the low half is an additive offset, the + // high half (forced odd) is the multiplier. Higher bits stay + // set on purpose — they contribute via the multiplicative + // mix. + let k_xor = k & 0xFFFF_FFFF; + let k_mul = (k >> 32) | 1; + let mixed = ((r as u64) ^ k_xor).wrapping_mul(k_mul); + let f = (mixed as u32).wrapping_add((mixed >> 32) as u32) & half_mask; + let new_l = r; + let new_r = l ^ f; + l = new_l; + r = new_r; + // Rotate the master key by exactly the per-round "consumed" + // bit count so that each round sees a fresh slice of the + // master in its low positions, plus a Weyl increment to + // decorrelate consecutive rounds (rotation alone leaves + // 60+ bits in common between adjacent rounds). + k = k.rotate_right(shift).wrapping_add(0x9E37_79B9_7F4A_7C15); + } + ((l << half_bits) | r) & n_mask +} + +/// `n`-consistent permutation iterator over `0..n` driven by one +/// bijection per level (see module docs). +pub struct ConsistentPermutation { + /// Master key driving every layer's Feistel. Must be supplied + /// already avalanched (see [`ConsistentPermutation::new`]). + /// Layers stay distinct because their geometry + /// (`n_bits`, `half_bits`, rotation amount, round count) differs. + master_key: u64, + /// `counters[j]` is the next index to feed into the layer-`j` + /// bijection. Walks the full domain `[0, 2^(2j + 2))`. With + /// `n <= 2^30`, `j_max <= 14` so the top counter caps at + /// `2^30 <= u32::MAX`. + counters: Vec, + n: u32, + /// Top layer index. Layer `j` has `n_bits = 2j + 2`. + j_max: u32, + /// `1 << (2 * j_max + 2)` — the top layer's domain size. Used as + /// the iterator's termination signal: once `counters[j_max]` + /// reaches `top_cap`, every walk has completed and exactly `n` + /// values have been emitted. + top_cap: u32, +} + +impl ConsistentPermutation { + /// Construct an iterator over `0..n` from a single 64-bit + /// `master_key`. Each layer's Feistel round count is chosen + /// automatically by [`rounds_for_n_bits`] given that layer's bit + /// width. + /// + /// `n` must satisfy `1 <= n <= 2^30`. + /// + /// `master_key` is used directly as the Feistel key — the + /// constructor does **not** avalanche it. Pass a high-entropy + /// value (e.g. the output of a cryptographic or strong + /// non-cryptographic hash like SplitMix64); sequential or + /// low-entropy keys will produce visibly correlated permutations + /// across consecutive iterators. + pub fn new(n: u32, master_key: u64) -> Self { + assert!(n > 0, "n must be at least 1"); + assert!(n <= 1u32 << 30, "n must be at most 2^30"); + // Smallest `j_max` such that `4^(j_max + 1) >= n`. For + // `n >= 2`, `(n-1).ilog2() = ceil(log2(n)) - 1`, so dividing + // by 2 gives `ceil(ceil(log2(n)) / 2) - 1` — the right value + // for both even and odd bit-widths. + let j_max = if n <= 1 { 0 } else { (n - 1).ilog2() / 2 }; + let counters = vec![0u32; (j_max + 1) as usize]; + let top_cap = 1u32 << (2 * j_max + 2); + Self { + master_key, + counters, + n, + j_max, + top_cap, + } + } + + /// Universe size. + pub fn n(&self) -> u32 { + self.n + } +} + +impl Iterator for ConsistentPermutation { + type Item = u32; + + fn next(&mut self) -> Option { + let j_max = self.j_max; + let n = self.n; + let mk = self.master_key; + + // Phase 1: walk the top layer (where skips and exhaustion + // happen) until we either emit an in-range value or descend. + let n_bits_top = 2 * j_max + 2; + let top_shift = 2 * j_max; + let mut j; + loop { + let counter = self.counters[j_max as usize]; + if counter >= self.top_cap { + return None; + } + let raw = layer_apply(n_bits_top, mk, counter); + self.counters[j_max as usize] = counter + 1; + let top_bits = (raw >> top_shift) & 0b11; + if top_bits != 0 { + // `raw` lies in `[4^j_max, 4^(j_max+1))`. May be out + // of range (only the top layer can exceed `n`). + if raw < n { + return Some(raw); + } + continue; + } + // Descend. At `j_max == 0` the top layer *is* the bottom + // layer and `top_bits == 0` means `raw == 0`. + if j_max == 0 { + return Some(0); + } + j = j_max - 1; + break; + } + + // Phase 2: descend through lower layers. Their counters can + // never exhaust within a single walk, and their raw values + // are always in `[4^j, 4^(j+1)) ⊂ [0, n)`, so neither + // exhaustion nor range checks are needed. + loop { + let counter = self.counters[j as usize]; + let n_bits = 2 * j + 2; + let raw = layer_apply(n_bits, mk, counter); + self.counters[j as usize] = counter + 1; + let top_shift = 2 * j; + let top_bits = (raw >> top_shift) & 0b11; + if top_bits != 0 { + return Some(raw); + } + if j == 0 { + return Some(0); + } + j -= 1; + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + + /// Across many seeds, `layer_apply(_, key, 0)` must not always + /// collapse to the same value (regression test for the "F(0, k) = + /// 0" symmetry bug). + #[test] + fn layer_apply_zero_input_is_key_sensitive() { + for n_bits in [2u32, 4, 6, 8, 10] { + let outputs: HashSet = (0..1000u64) + .map(|seed| layer_apply(n_bits, splitmix64(seed), 0)) + .collect(); + let domain = 1u32 << n_bits; + assert!( + outputs.len() >= (domain as usize / 2).min(50), + "n_bits={n_bits}: only {} distinct values from layer_apply(_, _, 0) across 1000 seeds", + outputs.len() + ); + } + } + + /// `layer_apply` must be a bijection on `[0, 2^n_bits)` for every + /// even supported `n_bits`. + #[test] + fn layer_apply_is_bijection() { + for n_bits in [2u32, 4, 6, 8, 10] { + for seed in 0..16u64 { + let key = splitmix64(seed); + let domain: u32 = 1 << n_bits; + let outputs: HashSet = + (0..domain).map(|x| layer_apply(n_bits, key, x)).collect(); + assert_eq!( + outputs.len(), + domain as usize, + "n_bits={n_bits} seed={seed}: not a bijection" + ); + for &o in &outputs { + assert!( + o < domain, + "n_bits={n_bits} seed={seed}: output {o} out of range" + ); + } + } + } + } + + /// Collecting the iterator must yield values in `0..n` without + /// duplicates, and must yield **exactly** `n` of them. + #[test] + fn no_duplicates_within_universe() { + for seed in 0..32u64 { + for n in [1u32, 2, 3, 4, 5, 7, 8, 9, 16, 17, 31, 32, 33, 100, 1000] { + let iter = ConsistentPermutation::new(n, 0x9E37_79B9_7F4A_7C15 ^ seed); + let emitted: Vec = iter.collect(); + assert_eq!( + emitted.len(), + n as usize, + "seed={seed} n={n}: wrong emission count: {emitted:?}" + ); + assert!( + emitted.iter().all(|&v| v < n), + "seed={seed} n={n}: out-of-range value emitted: {emitted:?}" + ); + let unique: HashSet = emitted.iter().copied().collect(); + assert_eq!( + unique.len(), + emitted.len(), + "seed={seed} n={n}: duplicate emitted: {emitted:?}" + ); + } + } + } +} diff --git a/crates/consistent-choose-k/src/lib.rs b/crates/consistent-choose-k/src/lib.rs index 8a2c392..2a42269 100644 --- a/crates/consistent-choose-k/src/lib.rs +++ b/crates/consistent-choose-k/src/lib.rs @@ -1,9 +1,11 @@ mod choose_k; mod consistent_hash; +mod consistent_permutation; mod node_map; pub use choose_k::ConsistentChooseKHasher; pub use consistent_hash::{ ConsistentHashIterator, ConsistentHashRevIterator, ConsistentHasher, HashSeqBuilder, HashSequence, ManySeqBuilder, }; +pub use consistent_permutation::ConsistentPermutation; pub use node_map::ConsistentNodeMap; From bb3eb3257bf1349f5556d5a750b141dd3ce0808f Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Mon, 1 Jun 2026 16:11:29 +0200 Subject: [PATCH 02/13] Update permutation-design.md --- .../docs/permutation-design.md | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/consistent-choose-k/docs/permutation-design.md b/crates/consistent-choose-k/docs/permutation-design.md index f6a5648..e0020d3 100644 --- a/crates/consistent-choose-k/docs/permutation-design.md +++ b/crates/consistent-choose-k/docs/permutation-design.md @@ -374,21 +374,21 @@ construction). Smaller is better. | `n` | `k` | choose-k (ns/emit) | permutation (ns/emit) | speedup | |---------|------:|-------------------:|----------------------:|--------:| -| 100 | 2 | 45.9 | 19.1 | 2.4 × | -| 100 | 10 | 59.6 | 14.3 | 4.2 × | -| 100 | 100 | 172 | 21.8 | 7.9 × | -| 1 000 | 2 | 41.5 | 16.2 | 2.6 × | -| 1 000 | 10 | 52.8 | 7.2 | 7.3 × | -| 1 000 | 100 | 93.8 | 7.6 | 12 × | -| 1 000 | 1 000 | 491 | 9.8 | 50 × | -| 10 000 | 2 | 54.1 | 15.5 | 3.5 × | -| 10 000 | 10 | 64.4 | 8.8 | 7.3 × | -| 10 000 | 100 | 109 | 12.1 | 9.0 × | -| 10 000 | 1 000 | 325 | 20.3 | 16 × | -| 100 000 | 2 | 47.3 | 19.5 | 2.4 × | -| 100 000 | 10 | 59.2 | 12.1 | 4.9 × | -| 100 000 | 100 | 101 | 18.8 | 5.4 × | -| 100 000 | 1 000 | 314 | 26.3 | 12 × | +| 100 | 2 | 41.8 | 18.9 | 2.2 × | +| 100 | 10 | 59.7 | 14.3 | 4.2 × | +| 100 | 100 | 172 | 22.1 | 7.8 × | +| 1 000 | 2 | 41.6 | 14.9 | 2.8 × | +| 1 000 | 10 | 52.8 | 7.3 | 7.2 × | +| 1 000 | 100 | 93.2 | 7.5 | 12 × | +| 1 000 | 1 000 | 494 | 9.8 | 51 × | +| 10 000 | 2 | 51.7 | 16.4 | 3.2 × | +| 10 000 | 10 | 64.2 | 8.8 | 7.3 × | +| 10 000 | 100 | 108 | 12.1 | 8.9 × | +| 10 000 | 1 000 | 319 | 19.8 | 16 × | +| 100 000 | 2 | 44.4 | 19.1 | 2.3 × | +| 100 000 | 10 | 59.8 | 12.1 | 5.0 × | +| 100 000 | 100 | 102 | 19.0 | 5.4 × | +| 100 000 | 1 000 | 302 | 25.5 | 12 × | Observations matching the theory: From 3d9d72604c315775875ae195b0dde6d2ca7841ac Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Mon, 1 Jun 2026 16:32:16 +0200 Subject: [PATCH 03/13] Update permutation-design.md --- .../docs/permutation-design.md | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/crates/consistent-choose-k/docs/permutation-design.md b/crates/consistent-choose-k/docs/permutation-design.md index e0020d3..6ff55dd 100644 --- a/crates/consistent-choose-k/docs/permutation-design.md +++ b/crates/consistent-choose-k/docs/permutation-design.md @@ -275,6 +275,46 @@ here without breaking per-layer bijectivity: if the LCG cycle length exceeds the layer's output domain, the output repeats values within a single iterator lifetime. +### Stateless PCG with bit reversal + +The stateless multi-round PCG above suffers from too many serial +multiplies at small `n_bits` because `y * a` followed by `y ^= y >> +shift` only diffuses bits *downwards*. Replacing the xor-shift with a +**bit reversal** breaks that asymmetry: every round mixes high bits +into low bits and vice versa, so a single multiplication can spread +entropy across the whole width in both directions. + +A round becomes: + +```text +y ^= k_xor; +y *= k_mul | 1; // odd multiplier +y ^= y >> shift; // local diffusion +y = y.reverse_bits() >> reverse_shift; +``` + +With this primitive a 2-bit-per-layer scheme passes χ² at `R = +8/8/4/4` (vs Feistel's `12/10/6/4`), and a 1-bit-per-layer scheme +passes at `R = 1/12/12/8/6/4`. So statistically the variant is at +least as good as Feistel with comparable or slightly fewer rounds. + +The wall-clock comparison is roughly a wash: + +- On **ARM64** (Apple M4), `u32::reverse_bits` is a single-cycle + `RBIT` instruction. The 2-bit-per-layer PCG-reverse variant came + in ~10% faster than Feistel on median, with peaks around +30% at + small `n`. The 1-bit-per-layer variant gave that gain back (more + layer calls per emission), landing at parity. +- On **x86_64**, `u32::reverse_bits` compiles to a 6–8-op sequence + of mask-and-swap (no native instruction), which is expected to + erase the ARM64 win and likely make the variant slower than + Feistel. + +Since the win is platform-dependent and small even where it exists, +the shipping implementation stays on Feistel: it has uniformly good +behaviour across architectures and no dependence on a single +fast-bit-reverse instruction. + ### Feistel Each layer `j` runs a small balanced Feistel network on @@ -335,6 +375,11 @@ buys structurally) also keeps the Feistel halves perfectly symmetric and removes any need for cycle walking to stay inside `[0, n)` at the top. +The PCG-reverse variant (which has no even-`n_bits` requirement) was +tried at both 1-bit and 2-bit per layer; the 2-bit variant was +faster there too, for the same reason — fewer layer calls per +emission outweighs the modestly cheaper per-call cost. + ## Universe cap: `n ≤ 2³⁰` Capping `n` at `2³⁰` lets the top-layer counter live in a `u32`, From 32b94779e46e10a03e92c77e54b5f96898cb887a Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 2 Jun 2026 10:17:44 +0200 Subject: [PATCH 04/13] add a consistent reservoir --- .../src/consistent_permutation.rs | 2 +- .../src/consistent_reservoir.rs | 424 ++++++++++++++++++ crates/consistent-choose-k/src/lib.rs | 2 + 3 files changed, 427 insertions(+), 1 deletion(-) create mode 100644 crates/consistent-choose-k/src/consistent_reservoir.rs diff --git a/crates/consistent-choose-k/src/consistent_permutation.rs b/crates/consistent-choose-k/src/consistent_permutation.rs index 978b30b..7f86d97 100644 --- a/crates/consistent-choose-k/src/consistent_permutation.rs +++ b/crates/consistent-choose-k/src/consistent_permutation.rs @@ -86,7 +86,7 @@ fn splitmix64(seed: u64) -> u64 { /// /// `n_bits` must be **even** and in `2..=30`. #[inline] -fn layer_apply(n_bits: u32, master_key: u64, x: u32) -> u32 { +pub(crate) fn layer_apply(n_bits: u32, master_key: u64, x: u32) -> u32 { debug_assert!(n_bits >= 2 && n_bits <= 30 && n_bits.is_multiple_of(2)); let rounds = rounds_for_n_bits(n_bits) as u32; let half_bits = n_bits / 2; diff --git a/crates/consistent-choose-k/src/consistent_reservoir.rs b/crates/consistent-choose-k/src/consistent_reservoir.rs new file mode 100644 index 0000000..9f77cb4 --- /dev/null +++ b/crates/consistent-choose-k/src/consistent_reservoir.rs @@ -0,0 +1,424 @@ +//! Online consistent reservoir sampling driven by +//! [`ConsistentPermutation`]. +//! +//! # What is a "consistent" reservoir? +//! +//! Given a 64-bit `master_key`, a *consistent* reservoir of size `k` +//! over a universe `[0, n)` is the set +//! +//! ```text +//! reservoir(n, k) = ConsistentPermutation::new(n, master_key).take(k) +//! ``` +//! +//! — the first `k` emissions of the keyed permutation of `[0, n)`. +//! By the consistency property of [`ConsistentPermutation`]: +//! +//! * For fixed `k`, growing `n` by one inserts at most one new +//! element into `reservoir(n, k)` (and evicts at most one). The +//! stream of `(added, evicted)` pairs as `n` ranges over `n..` +//! is what this iterator emits. +//! * For fixed `n`, the reservoir is a **prefix** in rank order: +//! `reservoir(n, k) ⊂ reservoir(n, k+1)`. So the same reservoir +//! serves every `k` — there is no need to fix `k` at construction +//! time the way standard reservoir sampling does. The first +//! `k'` elements of the reservoir (in rank order) are themselves +//! a valid reservoir of size `k'`. +//! +//! # Complexity +//! +//! * `ConsistentPermutation::new(n, key).take(k)` — get the +//! reservoir for a fixed `n` and `k` from scratch: **`O(k)`**. +//! * [`ConsistentReservoir::new(k, n, key)`](Self::new) — set up +//! the iterator at any starting `n`: **`O(k log k)`** (the +//! `O(k)` walk plus a sort of `pending`). +//! * [`Iterator::next`] — get the next `(added, evicted)` pair: +//! **`O(log k)` amortised** (each call pops a constant number of +//! tail entries; `grow_layer`, which costs `O(k log k)`, is +//! triggered on average once per `Θ(k)` admissions). +//! +//! Contrast with the classic random-jump reservoir sampling +//! algorithms (Vitter L): +//! +//! * `O(1)` to jump to the next sample in the stream. +//! * `O(k log(n / k))` to *jump* to a target universe size `n` +//! (since L still has to iterate through the geometric jumps). +//! * Tied to the `k` fixed at construction; changing `k` breaks +//! consistency property. +//! +//! So a consistent reservoir is the right pick when you want to +//! random-access the reservoir at an arbitrary `n` and/or vary `k` +//! while keeping the consistency property; standard reservoir sampling wins on per-element +//! streaming cost but loses on these two flexibilities. +//! +//! # Data layout +//! +//! Everything lives in a single flat `values: Vec` plus a `pending: Vec` +//! of ranks (indices into `values`). +//! +//! After construction (or after [`Self::grow_layer`]), `values` is +//! exactly the first `k + |pending|` emissions of +//! `ConsistentPermutation::new(m, master_key)` in walk order, i.e. +//! `values[r]` is the rank-`r` emission of the current top-level +//! permutation. Descents (old-reservoir items) and non-descents +//! (fresh items) as well as active and inactive items are interleaved. +//! +//! `pending` stores the ranks of the **non-descent** emissions +//! (i.e. the new items that might enter the reservoir as `n` grows). +//! Ranks are sorted **descending by value**, so `pending.pop()` returns +//! the smallest fresh value first — the next admission as `n` grows. +//! +//! # Active reservoir via `n` +//! +//! `self.n` is the current effective universe size: `reservoir()` +//! returns the items in `values` whose value is `< n`, in walk order. +//! By [`ConsistentPermutation`] consistency this is exactly +//! `ConsistentPermutation::new(n, master_key).take(k)` — the +//! consistent top-`k` for `[0, n)`. +//! +//! Each [`Iterator::next`] call advances `n` to the next admission's +//! value `+ 1`, yielding `(added_value, evicted_value)`. +//! +//! # Initial state ([`Self::new`]) +//! +//! Given the user's initial `n`, pick `j_max = smallest_j_max(n)` and +//! `m = 4^(j_max+1) >= n`. Walk `ConsistentPermutation::new(m, key)` +//! pushing each emission into `values` until we have collected +//! exactly `k` in-range values (i.e. `< n`). Every out-of-range +//! emission seen along the way has its rank recorded in `pending`; +//! finally `pending` is sorted descending by value. +//! +//! The loop is guaranteed to terminate with the last push being an +//! in-range value — that's what triggers `count == k`. So +//! `values.last()` is always a real reservoir item, which is the +//! one to evict on the first admission. +//! +//! # Growing the universe ([`Self::grow_layer`]) +//! +//! Opens the next top Feistel layer (`j_max += 1`, `m *= 4`) and +//! rebuilds `values` / `pending` against the new permutation by +//! walking the new top layer's counter until all `k` old-reservoir +//! entries have been consumed by descents. Every emission is pushed +//! to `new_values` in walk order; non-descents additionally have +//! their counter recorded in `pending`. The walk terminates on a +//! descent so `values.last()` is again a real reservoir item. +//! +//! Non-descents at counters `>= k` are real emissions of the new +//! permutation but their rank places them outside the top-`k`. They +//! still end up in `values` and `pending`, and are filtered out +//! later by the iterator's three guards (see below). +//! +//! # Invariants +//! +//! Throughout the iterator's lifetime (between [`Iterator::next`] +//! calls, and restored before each yields) four invariants hold: +//! +//! 1. **The active reservoir has size `k`.** The *active set* — +//! the entries of `values` whose value is `< n` — always +//! contains exactly `k` elements. This is what `reservoir()` +//! returns. +//! 2. **Every pending value is strictly larger than every active +//! value.** Pending entries were emitted with value `>= n` by +//! [`ConsistentPermutation`], whereas active entries are by +//! definition `< n`. In particular the smallest pending value +//! — which `pending.pop()` returns, since `pending` is sorted +//! descending by value — is the next candidate to cross into +//! the active window as `n` grows. +//! 3. **`pending` must contain all elements from `values` which are greater or equal to `n`.** +//! Since there are exactly `k` elements less than `n` in `values`, `pending.len()` +//! must be larger or equal to `values.len() - k` (note that there can be values in +//! `pending` which we discarded while searching for the last active value). +//! 4. **The last item in `values` is always an active element.** +//! That is, `values.last()` is guaranteed to have a value `< n`. +//! +//! From these facts the algorithm of [`Iterator::next`] reads +//! off: +//! +//! **Finding the eviction.** Since the last item in `values` is always +//! an active element (invariant 4), the last active entry in walk order +//! resides exactly at the tail of `values`. Therefore, we can immediately +//! obtain the evicted value by popping `values`. +//! +//! **Finding the admission.** Pop `added_rank` from `pending` until it is +//! strictly less than `values.len()`. If it is out-of-bounds, it corresponds +//! to an element that was previously popped from `values` because it was pending +//! at the tail, so we safely discard it and continue. Otherwise, the corresponding +//! value is our candidate `added_value`. +//! +//! **Restoring the invariants.** Once we have `added_value`, we update `n` to +//! `added_value + 1`. This makes `added_value` active, restoring the active +//! reservoir size to `k`. Finally, we restore invariant 4 by popping any pending +//! elements (values `>= n`) from the tail of `values` until either `values.len() == k` +//! or the tail is active (value `< n`). +//! +//! The stream only terminates once the universe reaches the [`ConsistentPermutation`] +//! cap (`m = 2^30`); until then, `grow_layer` re-establishes the invariants whenever +//! `pending` empties. + +use std::cmp::Reverse; + +use crate::consistent_permutation::{layer_apply, ConsistentPermutation}; + +/// Reservoir of size `k` whose contents track the consistent top-`k` +/// by [`ConsistentPermutation`] rank, with explicit support for +/// growing the universe one Feistel layer at a time. +pub struct ConsistentReservoir { + /// First `k + pending.len()` emissions of + /// `ConsistentPermutation(m, master_key)` in walk order. + /// `values[r]` is always the rank-`r` emission of the current + /// top-level permutation. + values: Vec, + /// Ranks (indices into `values`) of the non-descent emissions + /// in `values` — the candidate fresh admissions. Sorted + /// **descending by `values[rank]`** so `pop()` returns the + /// smallest pending value first. + pending: Vec, + k: u32, + /// Top Feistel layer index. The current top-level universe size + /// is `m = 4^(j_max+1) = 1 << (2*j_max + 2)`. + j_max: u32, + master_key: u64, + /// Current effective universe size. `reservoir()` returns + /// values `< n`, which equals + /// `ConsistentPermutation::new(n, master_key).take(k)`. + n: u32, +} + +/// Smallest `j_max` such that `4^(j_max+1) >= k`. Matches the +/// formula used by [`ConsistentPermutation::new`]. +fn smallest_j_max(k: u32) -> u32 { + if k <= 1 { + 0 + } else { + (k - 1).ilog2() / 2 + } +} + +impl ConsistentReservoir { + /// Build a consistent reservoir of size `k` for initial universe + /// `n`, seeded by `master_key`. The initial `reservoir()` is the + /// consistent top-`k` for `[0, n)`. + /// + /// Internally we use the smallest layer boundary + /// `m = 4^(j_max+1) >= n` and walk + /// `ConsistentPermutation::new(m, master_key)` until we have + /// observed exactly `k` values `< n`; out-of-range emissions + /// become pending admissions for later iteration. + /// + /// `master_key` is forwarded verbatim to + /// [`ConsistentPermutation::new`] (no avalanche). + /// + /// # Panics + /// + /// Panics if `k == 0`, `n > 2^30`, or `k > n`. + pub fn new(k: u32, n: u32, master_key: u64) -> Self { + assert!(k >= 1, "k must be at least 1"); + assert!(n <= 1u32 << 30, "n must be at most 2^30"); + let j_max = smallest_j_max(n); + let m = 1u32 << (2 * j_max + 2); + let mut values = vec![]; + let mut pending = vec![]; + let mut count = 0; + for value in ConsistentPermutation::new(m, master_key) { + values.push(value); + if value < n { + count += 1; + if count == k { + break; + } + } else { + pending.push(values.len() as u32 - 1); + } + } + pending.sort_unstable_by_key(|&r| Reverse(values[r as usize])); + Self { + values, + pending, + k, + j_max, + master_key, + n, + } + } + + /// Reservoir size. + pub fn k(&self) -> u32 { + self.k + } + + /// Current effective universe size — `reservoir()` returns + /// `ConsistentPermutation::new(n(), master_key).take(k)`. + pub fn n(&self) -> u32 { + self.n + } + + /// The current reservoir contents, in rank order. + /// + /// Equals `ConsistentPermutation::new(n(), master_key).take(k)` + /// by [`ConsistentPermutation`] consistency. + pub fn reservoir(&self) -> impl Iterator + '_ { + self.values.iter().copied().filter(|&v| v < self.n()) + } + + /// Open the next top Feistel layer (extends `m` to `4 * m`) and + /// rebuild `values`/`pending` against the new permutation. + /// + /// Any unconsumed entries left from the previous walk are + /// dropped — their ranks lived in the old top layer's coordinate + /// system and have no meaning under the new one. The new walk + /// is guaranteed to terminate on a descent, so `values.last()` + /// is again a genuine reservoir item ready to be evicted on the + /// next admission. + /// + /// # Panics + /// + /// Panics if growing would push the universe past + /// [`ConsistentPermutation`]'s `n <= 2^30` cap. + pub fn grow_layer(&mut self) { + assert!( + self.j_max < 14, + "ConsistentReservoir at universe cap (m = 2^30)" + ); + let new_j_max = self.j_max + 1; + let new_n_bits = 2 * new_j_max + 2; + let new_top_shift = 2 * new_j_max; + let k = self.k as usize; + // Drop the stale eviction tail from the previous grow_layer + // (if the iterator wasn't fully drained, those values are no + // longer reachable anyway). + self.values.truncate(k); + let old_reservoir = std::mem::take(&mut self.values); + debug_assert_eq!(old_reservoir.len(), k); + + let mut new_values: Vec = Vec::with_capacity(k); + let mut pending: Vec = Vec::new(); + let mut next_old_idx = 0usize; + let mut counter: u32 = 0; + while next_old_idx < k { + let raw = layer_apply(new_n_bits, self.master_key, counter); + let is_descent = (raw >> new_top_shift) & 0b11 == 0; + if is_descent { + // Consume the next old reservoir entry; this is the + // walk's only termination condition. + new_values.push(old_reservoir[next_old_idx]); + next_old_idx += 1; + } else { + // Push the fresh emission and remember its rank + // (= counter). Ranks >= k are filtered out later by + // the iterator's guards. + new_values.push(raw); + pending.push(counter); + } + counter += 1; + } + debug_assert_eq!(new_values.len(), k + pending.len()); + // Descending by value so `pending.pop()` returns the + // smallest pending value first — i.e. the next admission as + // `n` grows. + pending.sort_unstable_by_key(|&r| Reverse(new_values[r as usize])); + + self.values = new_values; + self.pending = pending; + self.j_max = new_j_max; + } +} + +impl Iterator for ConsistentReservoir { + /// `(added_value, evicted_value)`: the next item admitted to the + /// reservoir paired with the item it displaces. After yielding, + /// `n()` advances to `added_value + 1`. + /// + /// `added_value` is strictly increasing across calls + /// (`pending` is consumed smallest-value first). The stream + /// terminates only at the [`ConsistentPermutation`] universe + /// cap (`m = 2^30`). + type Item = (u32, u32); + + fn next(&mut self) -> Option<(u32, u32)> { + // Keep growing until we are guaranteed to have a pending admission. + while self.values.len() <= self.k as usize { + if self.j_max >= 14 { + return None; + } + self.grow_layer(); + } + // By invariant 4, the last element is always active under the previous n. + let evicted_value = self + .values + .pop() + .expect("values has more than k entries, so pop() is safe"); + + let added_rank = std::iter::from_fn(|| self.pending.pop()) + .find(|&rank| (rank as usize) < self.values.len()) + .expect("pending is non-empty by our invariants") as usize; + + let added_value = self.values[added_rank]; + debug_assert!(added_value >= self.n); + self.n = added_value + 1; + // Restore invariant 4: any pending elements at the tail of values must be popped. + while self.values.len() > self.k as usize && self.values.last().copied().unwrap() >= self.n + { + self.values.pop(); + } + return Some((added_value, evicted_value)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn splitmix64(seed: u64) -> u64 { + let mut z = seed.wrapping_add(0x9E37_79B9_7F4A_7C15); + z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + z ^ (z >> 31) + } + + #[test] + fn smallest_j_max_picks_smallest_layer_geq_k() { + let cases = [ + (1u32, 0u32, 4u32), + (2, 0, 4), + (3, 0, 4), + (4, 0, 4), + (5, 1, 16), + (7, 1, 16), + (10, 1, 16), + (16, 1, 16), + (17, 2, 64), + (64, 2, 64), + (65, 3, 256), + (256, 3, 256), + (257, 4, 1024), + ]; + for (k, expected_j, expected_m) in cases { + assert_eq!(smallest_j_max(k), expected_j, "k={k}"); + assert_eq!(1u32 << (2 * smallest_j_max(k) + 2), expected_m, "k={k}"); + } + } + + #[test] + fn test_reservoir() { + for seed in [42, 123456789, 987654321] { + let master_key = splitmix64(seed); + for k in [1, 2, 3, 10, 100] { + let mut r = ConsistentReservoir::new(k, k, master_key); + for _ in 0.. { + let before: Vec<_> = r.reservoir().collect(); + let Some((added, evicted)) = r.next() else { + break; + }; + let after: Vec<_> = r.reservoir().collect(); + let expected: Vec<_> = ConsistentPermutation::new(added, master_key) + .take(k as usize) + .collect(); + assert_eq!(before, expected); + assert!(before.contains(&evicted)); + assert!(!after.contains(&evicted)); + assert!(!before.contains(&added)); + assert!(after.contains(&added)); + } + } + } + } +} diff --git a/crates/consistent-choose-k/src/lib.rs b/crates/consistent-choose-k/src/lib.rs index 2a42269..25b7e0f 100644 --- a/crates/consistent-choose-k/src/lib.rs +++ b/crates/consistent-choose-k/src/lib.rs @@ -1,6 +1,7 @@ mod choose_k; mod consistent_hash; mod consistent_permutation; +mod consistent_reservoir; mod node_map; pub use choose_k::ConsistentChooseKHasher; pub use consistent_hash::{ @@ -8,4 +9,5 @@ pub use consistent_hash::{ HashSequence, ManySeqBuilder, }; pub use consistent_permutation::ConsistentPermutation; +pub use consistent_reservoir::ConsistentReservoir; pub use node_map::ConsistentNodeMap; From 1223bbfab0403546e8f08e317109129386d961ad Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 2 Jun 2026 10:27:28 +0200 Subject: [PATCH 05/13] Update consistent_reservoir.rs --- .../src/consistent_reservoir.rs | 72 +++++++++---------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/crates/consistent-choose-k/src/consistent_reservoir.rs b/crates/consistent-choose-k/src/consistent_reservoir.rs index 9f77cb4..47580d0 100644 --- a/crates/consistent-choose-k/src/consistent_reservoir.rs +++ b/crates/consistent-choose-k/src/consistent_reservoir.rs @@ -29,11 +29,11 @@ //! * `ConsistentPermutation::new(n, key).take(k)` — get the //! reservoir for a fixed `n` and `k` from scratch: **`O(k)`**. //! * [`ConsistentReservoir::new(k, n, key)`](Self::new) — set up -//! the iterator at any starting `n`: **`O(k log k)`** (the -//! `O(k)` walk plus a sort of `pending`). +//! the iterator at any starting `n`: **`O(k)`** (the +//! `O(k)` walk plus a linear-time heapification of `pending`). //! * [`Iterator::next`] — get the next `(added, evicted)` pair: -//! **`O(log k)` amortised** (each call pops a constant number of -//! tail entries; `grow_layer`, which costs `O(k log k)`, is +//! **`O(log k)` amortised** (each call pops amortized a constant number of +//! tail entries; `grow_layer`, which costs `O(k)`, is //! triggered on average once per `Θ(k)` admissions). //! //! Contrast with the classic random-jump reservoir sampling @@ -52,8 +52,8 @@ //! //! # Data layout //! -//! Everything lives in a single flat `values: Vec` plus a `pending: Vec` -//! of ranks (indices into `values`). +//! Everything lives in a single flat `values: Vec` plus a `pending: BinaryHeap<(Reverse, u32)>` +//! of `(Reverse(value), rank)` pairs representing candidate fresh admissions. //! //! After construction (or after [`Self::grow_layer`]), `values` is //! exactly the first `k + |pending|` emissions of @@ -62,10 +62,10 @@ //! permutation. Descents (old-reservoir items) and non-descents //! (fresh items) as well as active and inactive items are interleaved. //! -//! `pending` stores the ranks of the **non-descent** emissions -//! (i.e. the new items that might enter the reservoir as `n` grows). -//! Ranks are sorted **descending by value**, so `pending.pop()` returns -//! the smallest fresh value first — the next admission as `n` grows. +//! `pending` is a min-heap storing the `(Reverse(value), rank)` of the +//! **non-descent** emissions (i.e. the new items that might enter the reservoir +//! as `n` grows). By storing reversed values, `pending.pop()` efficiently returns +//! the entry with the smallest fresh value first — the next admission as `n` grows. //! //! # Active reservoir via `n` //! @@ -84,8 +84,8 @@ //! `m = 4^(j_max+1) >= n`. Walk `ConsistentPermutation::new(m, key)` //! pushing each emission into `values` until we have collected //! exactly `k` in-range values (i.e. `< n`). Every out-of-range -//! emission seen along the way has its rank recorded in `pending`; -//! finally `pending` is sorted descending by value. +//! emission seen along the way has its rank and value recorded in `pending`; +//! finally `pending` is initialized in $O(k)$ time using $O(k)$ linear heapification. //! //! The loop is guaranteed to terminate with the last push being an //! in-range value — that's what triggers `count == k`. So @@ -99,13 +99,13 @@ //! walking the new top layer's counter until all `k` old-reservoir //! entries have been consumed by descents. Every emission is pushed //! to `new_values` in walk order; non-descents additionally have -//! their counter recorded in `pending`. The walk terminates on a +//! their rank and value recorded in `pending`. The walk terminates on a //! descent so `values.last()` is again a real reservoir item. //! //! Non-descents at counters `>= k` are real emissions of the new //! permutation but their rank places them outside the top-`k`. They //! still end up in `values` and `pending`, and are filtered out -//! later by the iterator's three guards (see below). +//! later when popped from the heap. //! //! # Invariants //! @@ -120,8 +120,8 @@ //! value.** Pending entries were emitted with value `>= n` by //! [`ConsistentPermutation`], whereas active entries are by //! definition `< n`. In particular the smallest pending value -//! — which `pending.pop()` returns, since `pending` is sorted -//! descending by value — is the next candidate to cross into +//! — which `pending.pop()` returns, since `pending` is structured +//! as a min-heap — is the next candidate to cross into //! the active window as `n` grows. //! 3. **`pending` must contain all elements from `values` which are greater or equal to `n`.** //! Since there are exactly `k` elements less than `n` in `values`, `pending.len()` @@ -138,7 +138,7 @@ //! resides exactly at the tail of `values`. Therefore, we can immediately //! obtain the evicted value by popping `values`. //! -//! **Finding the admission.** Pop `added_rank` from `pending` until it is +//! **Finding the admission.** Pop `added_rank` from the `pending` min-heap until it is //! strictly less than `values.len()`. If it is out-of-bounds, it corresponds //! to an element that was previously popped from `values` because it was pending //! at the tail, so we safely discard it and continue. Otherwise, the corresponding @@ -153,8 +153,14 @@ //! The stream only terminates once the universe reaches the [`ConsistentPermutation`] //! cap (`m = 2^30`); until then, `grow_layer` re-establishes the invariants whenever //! `pending` empties. +//! or the tail is active (value `< n`). +//! +//! The stream only terminates once the universe reaches the [`ConsistentPermutation`] +//! cap (`m = 2^30`); until then, `grow_layer` re-establishes the invariants whenever +//! `pending` empties. use std::cmp::Reverse; +use std::collections::BinaryHeap; use crate::consistent_permutation::{layer_apply, ConsistentPermutation}; @@ -167,11 +173,9 @@ pub struct ConsistentReservoir { /// `values[r]` is always the rank-`r` emission of the current /// top-level permutation. values: Vec, - /// Ranks (indices into `values`) of the non-descent emissions - /// in `values` — the candidate fresh admissions. Sorted - /// **descending by `values[rank]`** so `pop()` returns the - /// smallest pending value first. - pending: Vec, + /// Min-heap storing `(Reverse(value), rank)` for candidate fresh admissions. + /// `pop()` returns the entry with the smallest value first. + pending: BinaryHeap<(Reverse, u32)>, k: u32, /// Top Feistel layer index. The current top-level universe size /// is `m = 4^(j_max+1) = 1 << (2*j_max + 2)`. @@ -216,7 +220,7 @@ impl ConsistentReservoir { let j_max = smallest_j_max(n); let m = 1u32 << (2 * j_max + 2); let mut values = vec![]; - let mut pending = vec![]; + let mut pending_vec = vec![]; let mut count = 0; for value in ConsistentPermutation::new(m, master_key) { values.push(value); @@ -226,10 +230,10 @@ impl ConsistentReservoir { break; } } else { - pending.push(values.len() as u32 - 1); + pending_vec.push((Reverse(value), values.len() as u32 - 1)); } } - pending.sort_unstable_by_key(|&r| Reverse(values[r as usize])); + let pending = BinaryHeap::from(pending_vec); Self { values, pending, @@ -290,7 +294,7 @@ impl ConsistentReservoir { debug_assert_eq!(old_reservoir.len(), k); let mut new_values: Vec = Vec::with_capacity(k); - let mut pending: Vec = Vec::new(); + let mut pending_vec: Vec<(Reverse, u32)> = Vec::new(); let mut next_old_idx = 0usize; let mut counter: u32 = 0; while next_old_idx < k { @@ -306,15 +310,12 @@ impl ConsistentReservoir { // (= counter). Ranks >= k are filtered out later by // the iterator's guards. new_values.push(raw); - pending.push(counter); + pending_vec.push((Reverse(raw), counter)); } counter += 1; } - debug_assert_eq!(new_values.len(), k + pending.len()); - // Descending by value so `pending.pop()` returns the - // smallest pending value first — i.e. the next admission as - // `n` grows. - pending.sort_unstable_by_key(|&r| Reverse(new_values[r as usize])); + debug_assert_eq!(new_values.len(), k + pending_vec.len()); + let pending = BinaryHeap::from(pending_vec); self.values = new_values; self.pending = pending; @@ -347,11 +348,10 @@ impl Iterator for ConsistentReservoir { .pop() .expect("values has more than k entries, so pop() is safe"); - let added_rank = std::iter::from_fn(|| self.pending.pop()) - .find(|&rank| (rank as usize) < self.values.len()) - .expect("pending is non-empty by our invariants") as usize; + let (Reverse(added_value), _) = std::iter::from_fn(|| self.pending.pop()) + .find(|&(_, rank)| rank < self.values.len() as u32) + .expect("pending is non-empty by our invariants"); - let added_value = self.values[added_rank]; debug_assert!(added_value >= self.n); self.n = added_value + 1; // Restore invariant 4: any pending elements at the tail of values must be popped. From 501f71cade5f84c1c909518b224739d6c4e2966a Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 2 Jun 2026 10:31:53 +0200 Subject: [PATCH 06/13] Update consistent_reservoir.rs --- crates/consistent-choose-k/src/consistent_reservoir.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/consistent-choose-k/src/consistent_reservoir.rs b/crates/consistent-choose-k/src/consistent_reservoir.rs index 47580d0..ed0ed5f 100644 --- a/crates/consistent-choose-k/src/consistent_reservoir.rs +++ b/crates/consistent-choose-k/src/consistent_reservoir.rs @@ -36,6 +36,14 @@ //! tail entries; `grow_layer`, which costs `O(k)`, is //! triggered on average once per `Θ(k)` admissions). //! +//! Note that the heapification of `pending` runs in linear $O(k)$ time, but retrieving elements +//! sequentially from the heap still carries a cost of up to $O(\log k)$ per pop. The sorting/priority-queue +//! overhead could be completely eliminated and sped up to true $O(k)$ total complexity via bucket sort! +//! Because all values generated in a layer are expected to be uniformly randomly chosen within the +//! interval $[\frac{1}{4}m, m)$, bucket-sorting them into $O(k)$ buckets achieves a linear worst-case average-case runtime. +//! With that optimization, the complexity of consistent reservoir sampling would match that of standard +//! reservoir sampling amortized ($O(1)$ per processed item). +//! //! Contrast with the classic random-jump reservoir sampling //! algorithms (Vitter L): //! From 7ac45e0f6d86621e9dc8ca20303933183cabfa61 Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 2 Jun 2026 10:51:05 +0200 Subject: [PATCH 07/13] bucket sort! --- .../src/consistent_reservoir.rs | 148 +++++++++++++----- 1 file changed, 105 insertions(+), 43 deletions(-) diff --git a/crates/consistent-choose-k/src/consistent_reservoir.rs b/crates/consistent-choose-k/src/consistent_reservoir.rs index ed0ed5f..6e06a37 100644 --- a/crates/consistent-choose-k/src/consistent_reservoir.rs +++ b/crates/consistent-choose-k/src/consistent_reservoir.rs @@ -30,18 +30,16 @@ //! reservoir for a fixed `n` and `k` from scratch: **`O(k)`**. //! * [`ConsistentReservoir::new(k, n, key)`](Self::new) — set up //! the iterator at any starting `n`: **`O(k)`** (the -//! `O(k)` walk plus a linear-time heapification of `pending`). +//! `O(k)` walk plus an average-case linear-time bucket sort of `pending` ranks). //! * [`Iterator::next`] — get the next `(added, evicted)` pair: -//! **`O(log k)` amortised** (each call pops amortized a constant number of -//! tail entries; `grow_layer`, which costs `O(k)`, is +//! **`O(1)` amortised** (each call pops amortized a constant number of +//! tail entries; `grow_layer`, which costs $O(k)$ average-case, is //! triggered on average once per `Θ(k)` admissions). //! -//! Note that the heapification of `pending` runs in linear $O(k)$ time, but retrieving elements -//! sequentially from the heap still carries a cost of up to $O(\log k)$ per pop. The sorting/priority-queue -//! overhead could be completely eliminated and sped up to true $O(k)$ total complexity via bucket sort! +//! The sorting overhead is minimized to a true $O(k)$ average-case complexity via bucket sort! //! Because all values generated in a layer are expected to be uniformly randomly chosen within the //! interval $[\frac{1}{4}m, m)$, bucket-sorting them into $O(k)$ buckets achieves a linear worst-case average-case runtime. -//! With that optimization, the complexity of consistent reservoir sampling would match that of standard +//! With this optimization, the complexity of consistent reservoir sampling matches that of standard //! reservoir sampling amortized ($O(1)$ per processed item). //! //! Contrast with the classic random-jump reservoir sampling @@ -60,8 +58,8 @@ //! //! # Data layout //! -//! Everything lives in a single flat `values: Vec` plus a `pending: BinaryHeap<(Reverse, u32)>` -//! of `(Reverse(value), rank)` pairs representing candidate fresh admissions. +//! Everything lives in a single flat `values: Vec` plus a `pending: Vec` +//! of rank indices into `values`, representing candidate fresh admissions. //! //! After construction (or after [`Self::grow_layer`]), `values` is //! exactly the first `k + |pending|` emissions of @@ -70,10 +68,9 @@ //! permutation. Descents (old-reservoir items) and non-descents //! (fresh items) as well as active and inactive items are interleaved. //! -//! `pending` is a min-heap storing the `(Reverse(value), rank)` of the -//! **non-descent** emissions (i.e. the new items that might enter the reservoir -//! as `n` grows). By storing reversed values, `pending.pop()` efficiently returns -//! the entry with the smallest fresh value first — the next admission as `n` grows. +//! `pending` is a vector of ranks/indices sorted descending by their corresponding value in `values`. +//! By storing them descending, `pending.pop()` efficiently returns the rank pointing to the smallest +//! fresh value first in $O(1)$ time — the next admission as `n` grows. //! //! # Active reservoir via `n` //! @@ -92,8 +89,8 @@ //! `m = 4^(j_max+1) >= n`. Walk `ConsistentPermutation::new(m, key)` //! pushing each emission into `values` until we have collected //! exactly `k` in-range values (i.e. `< n`). Every out-of-range -//! emission seen along the way has its rank and value recorded in `pending`; -//! finally `pending` is initialized in $O(k)$ time using $O(k)$ linear heapification. +//! emission seen along the way has its rank recorded in `pending_vec`; +//! finally `pending` is initialized in average-case $O(k)$ time using flat bucket sorting. //! //! The loop is guaranteed to terminate with the last push being an //! in-range value — that's what triggers `count == k`. So @@ -107,13 +104,13 @@ //! walking the new top layer's counter until all `k` old-reservoir //! entries have been consumed by descents. Every emission is pushed //! to `new_values` in walk order; non-descents additionally have -//! their rank and value recorded in `pending`. The walk terminates on a +//! their rank recorded in `pending_vec`. The walk terminates on a //! descent so `values.last()` is again a real reservoir item. //! //! Non-descents at counters `>= k` are real emissions of the new //! permutation but their rank places them outside the top-`k`. They //! still end up in `values` and `pending`, and are filtered out -//! later when popped from the heap. +//! later when popped. //! //! # Invariants //! @@ -128,9 +125,8 @@ //! value.** Pending entries were emitted with value `>= n` by //! [`ConsistentPermutation`], whereas active entries are by //! definition `< n`. In particular the smallest pending value -//! — which `pending.pop()` returns, since `pending` is structured -//! as a min-heap — is the next candidate to cross into -//! the active window as `n` grows. +//! — which corresponds to the rank popped by `pending.pop()` — is +//! the next candidate to cross into the active window as `n` grows. //! 3. **`pending` must contain all elements from `values` which are greater or equal to `n`.** //! Since there are exactly `k` elements less than `n` in `values`, `pending.len()` //! must be larger or equal to `values.len() - k` (note that there can be values in @@ -146,7 +142,7 @@ //! resides exactly at the tail of `values`. Therefore, we can immediately //! obtain the evicted value by popping `values`. //! -//! **Finding the admission.** Pop `added_rank` from the `pending` min-heap until it is +//! **Finding the admission.** Pop `added_rank` from `pending` until it is //! strictly less than `values.len()`. If it is out-of-bounds, it corresponds //! to an element that was previously popped from `values` because it was pending //! at the tail, so we safely discard it and continue. Otherwise, the corresponding @@ -161,14 +157,8 @@ //! The stream only terminates once the universe reaches the [`ConsistentPermutation`] //! cap (`m = 2^30`); until then, `grow_layer` re-establishes the invariants whenever //! `pending` empties. -//! or the tail is active (value `< n`). -//! -//! The stream only terminates once the universe reaches the [`ConsistentPermutation`] -//! cap (`m = 2^30`); until then, `grow_layer` re-establishes the invariants whenever -//! `pending` empties. use std::cmp::Reverse; -use std::collections::BinaryHeap; use crate::consistent_permutation::{layer_apply, ConsistentPermutation}; @@ -181,9 +171,9 @@ pub struct ConsistentReservoir { /// `values[r]` is always the rank-`r` emission of the current /// top-level permutation. values: Vec, - /// Min-heap storing `(Reverse(value), rank)` for candidate fresh admissions. - /// `pop()` returns the entry with the smallest value first. - pending: BinaryHeap<(Reverse, u32)>, + /// Ranks of pending candidate admissions, sorted **descending by value** using `values` + /// so `pop()` returns the rank corresponding to the smallest value first. + pending: Vec, k: u32, /// Top Feistel layer index. The current top-level universe size /// is `m = 4^(j_max+1) = 1 << (2*j_max + 2)`. @@ -195,6 +185,77 @@ pub struct ConsistentReservoir { n: u32, } +/// Helper function to perform a flat, allocation-friendly bucket sort on values of a slice +/// that are greater than or equal to `v_min`. It identifies their indices (ranks) and sorts them descending +/// by their corresponding value in $O(n)$ average-case time. +/// +/// This avoids allocating and sorting tuples/pairs of `(value, rank)`, and eliminates the need for +/// an external ranks vector by filtering of `values` directly in-place. +/// +/// # Arguments +/// +/// * `values` - A read-only slice whose values are inspected. +/// * `n_pending` - The precalculated count of values in `values` that are greater than or equal to `v_min`. +/// * `v_min` - The lower bound value for range scaling (e.g., $m_{\text{old}}$). +/// * `v_max` - The upper bound value for range scaling (e.g., $m_{\text{new}}$). +/// +/// # Complexity +/// +/// Runs in average-case $O(n)$ time and $O(n_{\text{pending}})$ auxiliary space (where $n_{\text{pending}}$ is the number of elements $\ge v_{\text{min}}$). +fn bucket_sort_ranks_descending( + values: &[u32], + n_pending: usize, + v_min: u32, + v_max: u32, +) -> Vec { + let range = (v_max - v_min) as u64; + if n_pending == 0 || range == 0{ + return Vec::new(); + } + // 1. Count elements in each bucket + let mut counts = vec![0usize; n_pending]; + for &val in values { + if val >= v_min { + let mut idx = (((val - v_min) as u64 * n_pending as u64) / range) as usize; + if idx >= n_pending { + idx = n_pending - 1; + } + counts[idx] += 1; + } + } + // 2. Compute starting offsets for buckets descending from n_pending-1 to 0 in-place inside `counts` + let mut current_offset = 0; + for i in (0..n_pending).rev() { + let count = counts[i]; + counts[i] = current_offset; + current_offset += count; + } + // 3. Place ranks into destination array while updating write heads in `counts` + let mut dest = vec![0u32; n_pending]; + for (rank, &val) in values.iter().enumerate() { + if val >= v_min { + let mut idx = (((val - v_min) as u64 * n_pending as u64) / range) as usize; + if idx >= n_pending { + idx = n_pending - 1; + } + let pos = counts[idx]; + dest[pos] = rank as u32; + counts[idx] += 1; + } + } + // 4. Sort each bucket in-place descending by value. + // After step 3, `counts[i]` holds the exclusive end of bucket `i`. + // The starting index of bucket `i` is 0 if `i == n_pending - 1`, otherwise `counts[i + 1]`. + for i in 0..n_pending { + let start = if i == n_pending - 1 { 0 } else { counts[i + 1] }; + let end = counts[i]; + if end > start + 1 { + dest[start..end].sort_unstable_by_key(|&rank| Reverse(values[rank as usize])); + } + } + dest +} + /// Smallest `j_max` such that `4^(j_max+1) >= k`. Matches the /// formula used by [`ConsistentPermutation::new`]. fn smallest_j_max(k: u32) -> u32 { @@ -228,8 +289,8 @@ impl ConsistentReservoir { let j_max = smallest_j_max(n); let m = 1u32 << (2 * j_max + 2); let mut values = vec![]; - let mut pending_vec = vec![]; let mut count = 0; + let mut pending_count = 0; for value in ConsistentPermutation::new(m, master_key) { values.push(value); if value < n { @@ -238,10 +299,10 @@ impl ConsistentReservoir { break; } } else { - pending_vec.push((Reverse(value), values.len() as u32 - 1)); + pending_count += 1; } } - let pending = BinaryHeap::from(pending_vec); + let pending = bucket_sort_ranks_descending(&values, pending_count, n, m); Self { values, pending, @@ -302,9 +363,9 @@ impl ConsistentReservoir { debug_assert_eq!(old_reservoir.len(), k); let mut new_values: Vec = Vec::with_capacity(k); - let mut pending_vec: Vec<(Reverse, u32)> = Vec::new(); let mut next_old_idx = 0usize; let mut counter: u32 = 0; + let mut pending_count = 0; while next_old_idx < k { let raw = layer_apply(new_n_bits, self.master_key, counter); let is_descent = (raw >> new_top_shift) & 0b11 == 0; @@ -314,17 +375,17 @@ impl ConsistentReservoir { new_values.push(old_reservoir[next_old_idx]); next_old_idx += 1; } else { - // Push the fresh emission and remember its rank - // (= counter). Ranks >= k are filtered out later by - // the iterator's guards. + // Push the fresh emission. + // Ranks >= k are filtered out later by the iterator's guards. new_values.push(raw); - pending_vec.push((Reverse(raw), counter)); + pending_count += 1; } counter += 1; } - debug_assert_eq!(new_values.len(), k + pending_vec.len()); - let pending = BinaryHeap::from(pending_vec); + let m_old = 1u32 << (2 * self.j_max + 2); + let m_new = 1u32 << (2 * new_j_max + 2); + let pending = bucket_sort_ranks_descending(&new_values, pending_count, m_old, m_new); self.values = new_values; self.pending = pending; self.j_max = new_j_max; @@ -356,10 +417,11 @@ impl Iterator for ConsistentReservoir { .pop() .expect("values has more than k entries, so pop() is safe"); - let (Reverse(added_value), _) = std::iter::from_fn(|| self.pending.pop()) - .find(|&(_, rank)| rank < self.values.len() as u32) - .expect("pending is non-empty by our invariants"); + let added_rank = std::iter::from_fn(|| self.pending.pop()) + .find(|&rank| (rank as usize) < self.values.len()) + .expect("pending is non-empty by our invariants") as usize; + let added_value = self.values[added_rank]; debug_assert!(added_value >= self.n); self.n = added_value + 1; // Restore invariant 4: any pending elements at the tail of values must be popped. From bd08cc7df3ce596c2086367e8f9c5b7044c5dff2 Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 2 Jun 2026 12:08:44 +0200 Subject: [PATCH 08/13] reuse vectors and cleanup benchmarks --- .../benchmarks/performance.rs | 116 +++++++++++++++++- .../src/consistent_reservoir.rs | 71 ++++++----- 2 files changed, 151 insertions(+), 36 deletions(-) diff --git a/crates/consistent-choose-k/benchmarks/performance.rs b/crates/consistent-choose-k/benchmarks/performance.rs index dfef90d..2216731 100644 --- a/crates/consistent-choose-k/benchmarks/performance.rs +++ b/crates/consistent-choose-k/benchmarks/performance.rs @@ -5,13 +5,13 @@ use std::{ }; use consistent_choose_k::{ - ConsistentChooseKHasher, ConsistentHasher, ConsistentPermutation, + ConsistentChooseKHasher, ConsistentHasher, ConsistentPermutation, ConsistentReservoir, }; use criterion::{ criterion_group, criterion_main, AxisScale, BenchmarkId, Criterion, PlotConfiguration, Throughput, }; -use rand::{rng, RngExt}; +use rand::{rng, rngs::StdRng, RngExt, SeedableRng}; fn throughput_benchmark(c: &mut Criterion) { let keys: Vec = rng().random_iter().take(1000).collect(); @@ -127,6 +127,116 @@ fn grow_k_vs_permutation(c: &mut Criterion) { group.finish(); } +// A standard reservoir sampling (Algorithm R) implementation. +// It initializes a reservoir of size k, and then scans from k to n element-by-element, +// deciding for each step whether to admit that element. +fn standard_reservoir_r(k: u32, n: u32, seed: u64) -> Vec { + let mut reservoir: Vec = (0..k).collect(); + if n <= k { + return reservoir; + } + let mut rng = StdRng::seed_from_u64(seed); + + for i in k..n { + let j = rng.random_range(0..=i); + if j < k { + reservoir[j as usize] = i; + } + } + reservoir +} + +// A standard skip-based reservoir sampling (Algorithm L / Vitter) implementation. +fn standard_reservoir_l(k: u32, n: u32, seed: u64) -> Vec { + let mut reservoir: Vec = (0..k).collect(); + if n <= k { + return reservoir; + } + let mut rng = StdRng::seed_from_u64(seed); + + let mut w = (rng.random::().ln() / (k as f64)).exp(); + let mut i = k; + while i < n { + let u: f64 = rng.random(); + let s = (u.ln() / (1.0 - w).ln()) as u32; + i += s + 1; + if i <= n { + let j = rng.random_range(0..k); + reservoir[j as usize] = i - 1; + w *= (rng.random::().ln() / (k as f64)).exp(); + } + } + reservoir +} + +fn consistent_reservoir_to_n(k: u32, target_n: u32, seed: u64) -> Vec { + let mut r = ConsistentReservoir::new(k, k, seed); + let mut reservoir = r.reservoir().collect::>(); + while r.n() < target_n { + if let Some((added, evicted)) = r.next() { + if added >= target_n { + break; + } + if let Some(pos) = reservoir.iter().position(|&x| x == evicted) { + reservoir[pos] = added; + } + } else { + break; + } + } + reservoir +} + +fn reservoir_benchmarks(c: &mut Criterion) { + let seed = 42u64; + + let mut group = c.benchmark_group("reservoir_computation_up_to_n"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + group.sample_size(10); + group.warm_up_time(Duration::from_millis(200)); + group.measurement_time(Duration::from_secs(3)); + + for &n in &[100_000, 10_000_000] { + for &k in &[100, 1000] { + // 1. ConsistentReservoir direct build (O(k)) + group.bench_function(BenchmarkId::new(format!("ConsistentReservoir_Direct/k_{k}"), n), |b| { + b.iter(|| { + black_box(ConsistentReservoir::new(k, n, seed).reservoir().collect::>()); + }) + }); + + // 2. ConsistentPermutation direct build (O(k)) + group.bench_function(BenchmarkId::new(format!("ConsistentPermutation_Direct/k_{k}"), n), |b| { + b.iter(|| { + black_box(ConsistentPermutation::new(n, seed).take(k as usize).collect::>()); + }) + }); + + // 3. ConsistentReservoir streaming iteration from k to n (O(k log(n/k))) + group.bench_function(BenchmarkId::new(format!("ConsistentReservoir_Streaming/k_{k}"), n), |b| { + b.iter(|| { + black_box(consistent_reservoir_to_n(k, n, seed)); + }) + }); + + // 4. Standard Algorithm R (O(n) linear scan) + group.bench_function(BenchmarkId::new(format!("Standard_Algorithm_R/k_{k}"), n), |b| { + b.iter(|| { + black_box(standard_reservoir_r(k, n, seed)); + }) + }); + + // 5. Standard Algorithm L / Vitter (O(k log(n/k)) skip based) + group.bench_function(BenchmarkId::new(format!("Standard_Algorithm_L/k_{k}"), n), |b| { + b.iter(|| { + black_box(standard_reservoir_l(k, n, seed)); + }) + }); + } + } + group.finish(); +} + criterion_group!( name = benches; config = Criterion::default() @@ -134,6 +244,6 @@ criterion_group!( .measurement_time(Duration::from_millis(4000)) .nresamples(1000); - targets = throughput_benchmark, append_vs_new_with_k, grow_k_vs_permutation, + targets = throughput_benchmark, append_vs_new_with_k, grow_k_vs_permutation, reservoir_benchmarks, ); criterion_main!(benches); diff --git a/crates/consistent-choose-k/src/consistent_reservoir.rs b/crates/consistent-choose-k/src/consistent_reservoir.rs index 6e06a37..8ba5437 100644 --- a/crates/consistent-choose-k/src/consistent_reservoir.rs +++ b/crates/consistent-choose-k/src/consistent_reservoir.rs @@ -56,6 +56,18 @@ //! while keeping the consistency property; standard reservoir sampling wins on per-element //! streaming cost but loses on these two flexibilities. //! +//! # Benchmark Results +//! +//! Performance comparison for computing the reservoir of size $k$ up to point $n = 10,000,000$: +//! +//! | Algorithm | Strategy / Complexity | $k = 100$ | $k = 1000$ | Consistency Support? | Fixed-$k$ Constrained? | +//! | :--- | :--- | :--- | :--- | :--- | :--- | +//! | **`ConsistentPermutation_Direct`** | Direct Random-Access / $O(k)$ | **`917 ns`** | **`8.01 µs`** | **Yes** (Perfect Profile) | No (Arbitrary subsets) | +//! | **`ConsistentReservoir_Direct`** | Direct Build / $O(k)$ | **`1.68 µs`** | **`14.82 µs`** | **Yes** (Perfect Profile) | No (Iterator starts at $n$) | +//! | **`Standard_Algorithm_L` (Vitter)**| Streaming Skip / $O(k \log(n/k))$| **`22.43 µs`** | **`178.82 µs`** | No | **Yes** (Must rebuild from $0$) | +//! | **`ConsistentReservoir_Streaming`**| Incremental Streaming / $O(k \log(n/k))$| **`44.65 µs`** | **`1.63 ms`** | **Yes** (Stateful Iterator) | No (Sub-streams consistent)| +//! | **`Standard_Algorithm_R` (Scan)**| Streaming Scan / $O(n)$ | **`28.68 ms`** | **`28.34 ms`** | No | **Yes** (Must rebuild from $0$) | +//! //! # Data layout //! //! Everything lives in a single flat `values: Vec` plus a `pending: Vec` @@ -183,37 +195,27 @@ pub struct ConsistentReservoir { /// values `< n`, which equals /// `ConsistentPermutation::new(n, master_key).take(k)`. n: u32, + /// Reusable scratchpad for holding the old reservoir of size `k` during grow layers. + old_reservoir: Vec, + /// Reusable scratchpad for bucket sort counts to avoid allocations. + counts_scratch: Vec, } /// Helper function to perform a flat, allocation-friendly bucket sort on values of a slice -/// that are greater than or equal to `v_min`. It identifies their indices (ranks) and sorts them descending -/// by their corresponding value in $O(n)$ average-case time. -/// -/// This avoids allocating and sorting tuples/pairs of `(value, rank)`, and eliminates the need for -/// an external ranks vector by filtering of `values` directly in-place. -/// -/// # Arguments -/// -/// * `values` - A read-only slice whose values are inspected. -/// * `n_pending` - The precalculated count of values in `values` that are greater than or equal to `v_min`. -/// * `v_min` - The lower bound value for range scaling (e.g., $m_{\text{old}}$). -/// * `v_max` - The upper bound value for range scaling (e.g., $m_{\text{new}}$). -/// -/// # Complexity -/// -/// Runs in average-case $O(n)$ time and $O(n_{\text{pending}})$ auxiliary space (where $n_{\text{pending}}$ is the number of elements $\ge v_{\text{min}}$). -fn bucket_sort_ranks_descending( +/// that are greater than or equal to `v_min` into a pre-allocated destination buffer. +fn bucket_sort_ranks_descending_in( values: &[u32], - n_pending: usize, + dest: &mut [u32], + counts: &mut [usize], v_min: u32, v_max: u32, -) -> Vec { +) { + let n_pending = dest.len(); let range = (v_max - v_min) as u64; - if n_pending == 0 || range == 0{ - return Vec::new(); + if n_pending == 0 || range == 0 { + return; } // 1. Count elements in each bucket - let mut counts = vec![0usize; n_pending]; for &val in values { if val >= v_min { let mut idx = (((val - v_min) as u64 * n_pending as u64) / range) as usize; @@ -231,7 +233,6 @@ fn bucket_sort_ranks_descending( current_offset += count; } // 3. Place ranks into destination array while updating write heads in `counts` - let mut dest = vec![0u32; n_pending]; for (rank, &val) in values.iter().enumerate() { if val >= v_min { let mut idx = (((val - v_min) as u64 * n_pending as u64) / range) as usize; @@ -253,7 +254,6 @@ fn bucket_sort_ranks_descending( dest[start..end].sort_unstable_by_key(|&rank| Reverse(values[rank as usize])); } } - dest } /// Smallest `j_max` such that `4^(j_max+1) >= k`. Matches the @@ -302,7 +302,9 @@ impl ConsistentReservoir { pending_count += 1; } } - let pending = bucket_sort_ranks_descending(&values, pending_count, n, m); + let mut pending = vec![0u32; pending_count]; + let mut counts_scratch = vec![0usize; pending_count]; + bucket_sort_ranks_descending_in(&values, &mut pending, &mut counts_scratch, n, m); Self { values, pending, @@ -310,6 +312,8 @@ impl ConsistentReservoir { j_max, master_key, n, + old_reservoir: Vec::new(), + counts_scratch, } } @@ -359,10 +363,10 @@ impl ConsistentReservoir { // (if the iterator wasn't fully drained, those values are no // longer reachable anyway). self.values.truncate(k); - let old_reservoir = std::mem::take(&mut self.values); - debug_assert_eq!(old_reservoir.len(), k); + std::mem::swap(&mut self.values, &mut self.old_reservoir); + debug_assert_eq!(self.old_reservoir.len(), k); - let mut new_values: Vec = Vec::with_capacity(k); + self.values.clear(); let mut next_old_idx = 0usize; let mut counter: u32 = 0; let mut pending_count = 0; @@ -372,12 +376,12 @@ impl ConsistentReservoir { if is_descent { // Consume the next old reservoir entry; this is the // walk's only termination condition. - new_values.push(old_reservoir[next_old_idx]); + self.values.push(self.old_reservoir[next_old_idx]); next_old_idx += 1; } else { // Push the fresh emission. // Ranks >= k are filtered out later by the iterator's guards. - new_values.push(raw); + self.values.push(raw); pending_count += 1; } counter += 1; @@ -385,9 +389,10 @@ impl ConsistentReservoir { let m_old = 1u32 << (2 * self.j_max + 2); let m_new = 1u32 << (2 * new_j_max + 2); - let pending = bucket_sort_ranks_descending(&new_values, pending_count, m_old, m_new); - self.values = new_values; - self.pending = pending; + self.pending.resize(pending_count, 0); + self.counts_scratch.resize(pending_count, 0); + self.counts_scratch.fill(0); + bucket_sort_ranks_descending_in(&self.values, &mut self.pending, &mut self.counts_scratch, m_old, m_new); self.j_max = new_j_max; } } From 9d5e2cc9acf22dfbe47992a089d3853da8d5a0b5 Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 2 Jun 2026 12:20:42 +0200 Subject: [PATCH 09/13] fix benchmark --- .../consistent-choose-k/benchmarks/performance.rs | 15 ++++----------- .../src/consistent_reservoir.rs | 10 +++++----- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/crates/consistent-choose-k/benchmarks/performance.rs b/crates/consistent-choose-k/benchmarks/performance.rs index 2216731..912bf33 100644 --- a/crates/consistent-choose-k/benchmarks/performance.rs +++ b/crates/consistent-choose-k/benchmarks/performance.rs @@ -171,20 +171,13 @@ fn standard_reservoir_l(k: u32, n: u32, seed: u64) -> Vec { fn consistent_reservoir_to_n(k: u32, target_n: u32, seed: u64) -> Vec { let mut r = ConsistentReservoir::new(k, k, seed); - let mut reservoir = r.reservoir().collect::>(); while r.n() < target_n { - if let Some((added, evicted)) = r.next() { - if added >= target_n { - break; - } - if let Some(pos) = reservoir.iter().position(|&x| x == evicted) { - reservoir[pos] = added; - } - } else { - break; + match r.next() { + Some((added, _)) if added < target_n => {} + _ => break, } } - reservoir + r.reservoir().collect() } fn reservoir_benchmarks(c: &mut Criterion) { diff --git a/crates/consistent-choose-k/src/consistent_reservoir.rs b/crates/consistent-choose-k/src/consistent_reservoir.rs index 8ba5437..afa2e6b 100644 --- a/crates/consistent-choose-k/src/consistent_reservoir.rs +++ b/crates/consistent-choose-k/src/consistent_reservoir.rs @@ -62,11 +62,11 @@ //! //! | Algorithm | Strategy / Complexity | $k = 100$ | $k = 1000$ | Consistency Support? | Fixed-$k$ Constrained? | //! | :--- | :--- | :--- | :--- | :--- | :--- | -//! | **`ConsistentPermutation_Direct`** | Direct Random-Access / $O(k)$ | **`917 ns`** | **`8.01 µs`** | **Yes** (Perfect Profile) | No (Arbitrary subsets) | -//! | **`ConsistentReservoir_Direct`** | Direct Build / $O(k)$ | **`1.68 µs`** | **`14.82 µs`** | **Yes** (Perfect Profile) | No (Iterator starts at $n$) | -//! | **`Standard_Algorithm_L` (Vitter)**| Streaming Skip / $O(k \log(n/k))$| **`22.43 µs`** | **`178.82 µs`** | No | **Yes** (Must rebuild from $0$) | -//! | **`ConsistentReservoir_Streaming`**| Incremental Streaming / $O(k \log(n/k))$| **`44.65 µs`** | **`1.63 ms`** | **Yes** (Stateful Iterator) | No (Sub-streams consistent)| -//! | **`Standard_Algorithm_R` (Scan)**| Streaming Scan / $O(n)$ | **`28.68 ms`** | **`28.34 ms`** | No | **Yes** (Must rebuild from $0$) | +//! | **`ConsistentPermutation_Direct`** | Direct Random-Access / $O(k)$ | **`883 ns`** | **`7.80 µs`** | **Yes** (Perfect Profile) | No (Arbitrary subsets) | +//! | **`ConsistentReservoir_Direct`** | Direct Build / $O(k)$ | **`1.61 µs`** | **`14.30 µs`** | **Yes** (Perfect Profile) | No (Iterator starts at $n$) | +//! | **`Standard_Algorithm_L` (Vitter)**| Streaming Skip / $O(k \log(n/k))$| **`22.12 µs`** | **`183.17 µs`** | No | **Yes** (Must rebuild from $0$) | +//! | **`ConsistentReservoir_Streaming`**| Incremental Streaming / $O(k \log(n/k))$| **`24.06 µs`** | **`318.88 µs`** | **Yes** (Stateful Iterator) | No (Sub-streams consistent)| +//! | **`Standard_Algorithm_R` (Scan)**| Streaming Scan / $O(n)$ | **`28.22 ms`** | **`28.26 ms`** | No | **Yes** (Must rebuild from $0$) | //! //! # Data layout //! From 642f2edee0549c593ba71a3d403ba4a1715ba4c7 Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 2 Jun 2026 15:55:02 +0200 Subject: [PATCH 10/13] address review comments --- .../examples/statistical_comparison.rs | 8 +++-- .../src/consistent_permutation.rs | 29 +++++-------------- .../src/consistent_reservoir.rs | 2 +- 3 files changed, 14 insertions(+), 25 deletions(-) diff --git a/crates/consistent-choose-k/examples/statistical_comparison.rs b/crates/consistent-choose-k/examples/statistical_comparison.rs index 3473615..61fa145 100644 --- a/crates/consistent-choose-k/examples/statistical_comparison.rs +++ b/crates/consistent-choose-k/examples/statistical_comparison.rs @@ -87,9 +87,9 @@ impl Acc { (v[1], v[0]) }; self.pair[(lo as usize) * n + hi as usize] += 1; - for pos in 0..kk { + for (pos, &val) in v.iter().enumerate().take(kk) { for b in 0..n_bits { - if (v[pos] >> b) & 1 == 1 { + if (val >> b) & 1 == 1 { self.bits[pos][b] += 1; } } @@ -101,8 +101,10 @@ fn run_for_n(n: usize) { let kk = 2usize.min(n - 1).max(1); let n_bits = ((n - 1).next_power_of_two().trailing_zeros() as usize).max(1); + type GenFn = Box Vec>; + // Build the list of methods to test. - let methods: Vec<(&'static str, Box Vec>)> = vec![ + let methods: Vec<(&'static str, GenFn)> = vec![ ("ChooseKHasher ", Box::new(chooseh_take)), ("Permutation ", Box::new(perm_take)), ]; diff --git a/crates/consistent-choose-k/src/consistent_permutation.rs b/crates/consistent-choose-k/src/consistent_permutation.rs index 7f86d97..4d715c8 100644 --- a/crates/consistent-choose-k/src/consistent_permutation.rs +++ b/crates/consistent-choose-k/src/consistent_permutation.rs @@ -87,7 +87,7 @@ fn splitmix64(seed: u64) -> u64 { /// `n_bits` must be **even** and in `2..=30`. #[inline] pub(crate) fn layer_apply(n_bits: u32, master_key: u64, x: u32) -> u32 { - debug_assert!(n_bits >= 2 && n_bits <= 30 && n_bits.is_multiple_of(2)); + debug_assert!((2..=30).contains(&n_bits) && n_bits.is_multiple_of(2)); let rounds = rounds_for_n_bits(n_bits) as u32; let half_bits = n_bits / 2; let half_mask = (1u32 << half_bits) - 1; @@ -136,13 +136,6 @@ pub struct ConsistentPermutation { /// `2^30 <= u32::MAX`. counters: Vec, n: u32, - /// Top layer index. Layer `j` has `n_bits = 2j + 2`. - j_max: u32, - /// `1 << (2 * j_max + 2)` — the top layer's domain size. Used as - /// the iterator's termination signal: once `counters[j_max]` - /// reaches `top_cap`, every walk has completed and exactly `n` - /// values have been emitted. - top_cap: u32, } impl ConsistentPermutation { @@ -168,13 +161,10 @@ impl ConsistentPermutation { // for both even and odd bit-widths. let j_max = if n <= 1 { 0 } else { (n - 1).ilog2() / 2 }; let counters = vec![0u32; (j_max + 1) as usize]; - let top_cap = 1u32 << (2 * j_max + 2); Self { master_key, counters, n, - j_max, - top_cap, } } @@ -188,18 +178,19 @@ impl Iterator for ConsistentPermutation { type Item = u32; fn next(&mut self) -> Option { - let j_max = self.j_max; + let num_layers = self.counters.len(); + let j_max = (num_layers - 1) as u32; let n = self.n; let mk = self.master_key; // Phase 1: walk the top layer (where skips and exhaustion // happen) until we either emit an in-range value or descend. - let n_bits_top = 2 * j_max + 2; + let n_bits_top = 2 * num_layers as u32; let top_shift = 2 * j_max; - let mut j; + let top_cap = 1u32 << n_bits_top; loop { let counter = self.counters[j_max as usize]; - if counter >= self.top_cap { + if counter >= top_cap { return None; } let raw = layer_apply(n_bits_top, mk, counter); @@ -218,7 +209,6 @@ impl Iterator for ConsistentPermutation { if j_max == 0 { return Some(0); } - j = j_max - 1; break; } @@ -226,7 +216,7 @@ impl Iterator for ConsistentPermutation { // never exhaust within a single walk, and their raw values // are always in `[4^j, 4^(j+1)) ⊂ [0, n)`, so neither // exhaustion nor range checks are needed. - loop { + for j in (0..j_max).rev() { let counter = self.counters[j as usize]; let n_bits = 2 * j + 2; let raw = layer_apply(n_bits, mk, counter); @@ -236,11 +226,8 @@ impl Iterator for ConsistentPermutation { if top_bits != 0 { return Some(raw); } - if j == 0 { - return Some(0); - } - j -= 1; } + Some(0) } } diff --git a/crates/consistent-choose-k/src/consistent_reservoir.rs b/crates/consistent-choose-k/src/consistent_reservoir.rs index afa2e6b..6605391 100644 --- a/crates/consistent-choose-k/src/consistent_reservoir.rs +++ b/crates/consistent-choose-k/src/consistent_reservoir.rs @@ -434,7 +434,7 @@ impl Iterator for ConsistentReservoir { { self.values.pop(); } - return Some((added_value, evicted_value)); + Some((added_value, evicted_value)) } } From 7aa8aa921fb91087a7daa9c721a78fe64b213ec3 Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 2 Jun 2026 16:40:55 +0200 Subject: [PATCH 11/13] fix clippy --- .../benchmarks/performance.rs | 117 +++++++++++------- .../examples/statistical_comparison.rs | 5 +- crates/consistent-choose-k/src/choose_k.rs | 8 +- .../src/consistent_reservoir.rs | 8 +- 4 files changed, 89 insertions(+), 49 deletions(-) diff --git a/crates/consistent-choose-k/benchmarks/performance.rs b/crates/consistent-choose-k/benchmarks/performance.rs index 912bf33..b0dc772 100644 --- a/crates/consistent-choose-k/benchmarks/performance.rs +++ b/crates/consistent-choose-k/benchmarks/performance.rs @@ -89,15 +89,19 @@ fn grow_k_vs_permutation(c: &mut Criterion) { } group.throughput(Throughput::Elements((keys.len() * k) as u64)); - group.bench_with_input(BenchmarkId::new(format!("new_with_k/k_{k}"), n), &n, |b, n| { - b.iter(|| { - for key in &keys { - let mut h = DefaultHasher::default(); - key.hash(&mut h); - black_box(ConsistentChooseKHasher::new_with_k(h, *n, k)); - } - }) - }); + group.bench_with_input( + BenchmarkId::new(format!("new_with_k/k_{k}"), n), + &n, + |b, n| { + b.iter(|| { + for key in &keys { + let mut h = DefaultHasher::default(); + key.hash(&mut h); + black_box(ConsistentChooseKHasher::new_with_k(h, *n, k)); + } + }) + }, + ); group.bench_with_input(BenchmarkId::new(format!("grow_k/k_{k}"), n), &n, |b, n| { b.iter(|| { @@ -112,16 +116,20 @@ fn grow_k_vs_permutation(c: &mut Criterion) { }) }); - group.bench_with_input(BenchmarkId::new(format!("permutation/k_{k}"), n), &n, |b, n| { - b.iter(|| { - for key in &keys { - let mut iter = ConsistentPermutation::new(*n as u32, *key); - for _ in 0..k { - black_box(iter.next()); + group.bench_with_input( + BenchmarkId::new(format!("permutation/k_{k}"), n), + &n, + |b, n| { + b.iter(|| { + for key in &keys { + let mut iter = ConsistentPermutation::new(*n as u32, *key); + for _ in 0..k { + black_box(iter.next()); + } } - } - }) - }); + }) + }, + ); } } group.finish(); @@ -192,39 +200,62 @@ fn reservoir_benchmarks(c: &mut Criterion) { for &n in &[100_000, 10_000_000] { for &k in &[100, 1000] { // 1. ConsistentReservoir direct build (O(k)) - group.bench_function(BenchmarkId::new(format!("ConsistentReservoir_Direct/k_{k}"), n), |b| { - b.iter(|| { - black_box(ConsistentReservoir::new(k, n, seed).reservoir().collect::>()); - }) - }); + group.bench_function( + BenchmarkId::new(format!("ConsistentReservoir_Direct/k_{k}"), n), + |b| { + b.iter(|| { + black_box( + ConsistentReservoir::new(k, n, seed) + .reservoir() + .collect::>(), + ); + }) + }, + ); // 2. ConsistentPermutation direct build (O(k)) - group.bench_function(BenchmarkId::new(format!("ConsistentPermutation_Direct/k_{k}"), n), |b| { - b.iter(|| { - black_box(ConsistentPermutation::new(n, seed).take(k as usize).collect::>()); - }) - }); + group.bench_function( + BenchmarkId::new(format!("ConsistentPermutation_Direct/k_{k}"), n), + |b| { + b.iter(|| { + black_box( + ConsistentPermutation::new(n, seed) + .take(k as usize) + .collect::>(), + ); + }) + }, + ); // 3. ConsistentReservoir streaming iteration from k to n (O(k log(n/k))) - group.bench_function(BenchmarkId::new(format!("ConsistentReservoir_Streaming/k_{k}"), n), |b| { - b.iter(|| { - black_box(consistent_reservoir_to_n(k, n, seed)); - }) - }); + group.bench_function( + BenchmarkId::new(format!("ConsistentReservoir_Streaming/k_{k}"), n), + |b| { + b.iter(|| { + black_box(consistent_reservoir_to_n(k, n, seed)); + }) + }, + ); // 4. Standard Algorithm R (O(n) linear scan) - group.bench_function(BenchmarkId::new(format!("Standard_Algorithm_R/k_{k}"), n), |b| { - b.iter(|| { - black_box(standard_reservoir_r(k, n, seed)); - }) - }); + group.bench_function( + BenchmarkId::new(format!("Standard_Algorithm_R/k_{k}"), n), + |b| { + b.iter(|| { + black_box(standard_reservoir_r(k, n, seed)); + }) + }, + ); // 5. Standard Algorithm L / Vitter (O(k log(n/k)) skip based) - group.bench_function(BenchmarkId::new(format!("Standard_Algorithm_L/k_{k}"), n), |b| { - b.iter(|| { - black_box(standard_reservoir_l(k, n, seed)); - }) - }); + group.bench_function( + BenchmarkId::new(format!("Standard_Algorithm_L/k_{k}"), n), + |b| { + b.iter(|| { + black_box(standard_reservoir_l(k, n, seed)); + }) + }, + ); } } group.finish(); diff --git a/crates/consistent-choose-k/examples/statistical_comparison.rs b/crates/consistent-choose-k/examples/statistical_comparison.rs index 61fa145..841e0e2 100644 --- a/crates/consistent-choose-k/examples/statistical_comparison.rs +++ b/crates/consistent-choose-k/examples/statistical_comparison.rs @@ -62,7 +62,7 @@ struct Acc { name: String, first: Vec, kth: Vec, - pair: Vec, // n*n, indexed by lo*n + hi (lo <= hi) + pair: Vec, // n*n, indexed by lo*n + hi (lo <= hi) bits: Vec>, // bits[pos][bit] } @@ -178,8 +178,7 @@ fn run_for_n(n: usize) { for pos in 0..kk { let set = acc.bits[pos][b] as f64; let unset = SAMPLES as f64 - set; - chi2 += (set - exp_set).powi(2) / exp_set - + (unset - exp_unset).powi(2) / exp_unset; + chi2 += (set - exp_set).powi(2) / exp_set + (unset - exp_unset).powi(2) / exp_unset; df += 1; } } diff --git a/crates/consistent-choose-k/src/choose_k.rs b/crates/consistent-choose-k/src/choose_k.rs index ce777ee..54984ac 100644 --- a/crates/consistent-choose-k/src/choose_k.rs +++ b/crates/consistent-choose-k/src/choose_k.rs @@ -5,7 +5,7 @@ use crate::{ConsistentHasher, ManySeqBuilder}; /// The hashes are consistent when `n` changes and when `k` changes! /// I.e. one hash changes with probability `k/(n+1)` when `n` increases by one, /// resp. one hash gets added when `k` is increased. Additionally, the returned `k` tuple -/// is guaranteed to be uniformely chosen from all possible `n-choose-k` tuples. +/// is guaranteed to be uniformly chosen from all possible `n-choose-k` tuples. /// /// Also implements `Iterator` to yield the next sample when k is increased. /// Note: since this hashing algorithm implements choose k semantics, all the returned samples are distinct. @@ -49,7 +49,11 @@ impl ConsistentChooseKHasher { iter.samples.push(iter.get_sample(i, n)); } for i in (0..k).rev() { - let s = iter.samples[0..=i].iter().copied().max().expect(""); + let s = iter.samples[0..=i] + .iter() + .copied() + .max() + .expect("subslice 0..=i has at least one element"); iter.samples[i] = s; for j in 0..i { if iter.samples[j] == s { diff --git a/crates/consistent-choose-k/src/consistent_reservoir.rs b/crates/consistent-choose-k/src/consistent_reservoir.rs index 6605391..06385c6 100644 --- a/crates/consistent-choose-k/src/consistent_reservoir.rs +++ b/crates/consistent-choose-k/src/consistent_reservoir.rs @@ -392,7 +392,13 @@ impl ConsistentReservoir { self.pending.resize(pending_count, 0); self.counts_scratch.resize(pending_count, 0); self.counts_scratch.fill(0); - bucket_sort_ranks_descending_in(&self.values, &mut self.pending, &mut self.counts_scratch, m_old, m_new); + bucket_sort_ranks_descending_in( + &self.values, + &mut self.pending, + &mut self.counts_scratch, + m_old, + m_new, + ); self.j_max = new_j_max; } } From be0b090f01b6f58d123fe272f5d0d6d01c0fb749 Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 2 Jun 2026 16:41:56 +0200 Subject: [PATCH 12/13] Update README.md --- crates/consistent-choose-k/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/consistent-choose-k/README.md b/crates/consistent-choose-k/README.md index 4270d68..45574ab 100644 --- a/crates/consistent-choose-k/README.md +++ b/crates/consistent-choose-k/README.md @@ -185,8 +185,8 @@ Putting it together leads to `S(k+1, n+1) = {n} ∪ S(k, m)` and `S(k+1, n) = {m ### Property 2 -The final part is to prove property 2. This time we have an inducation over `k` and `n`. -As before, the base case of the induction for `k=1` and all `n>0` is inherited from the `consistency_hash` implementation. The case `n=k` is also trivially covered, since the only valid set are the numbers `{0, ..., k-1}` which the algorithm correctly outputs. So, we only need to care about the induction step where `k>1` and `n>k`. +The final part is to prove property 2. This time we have an induction over `k` and `n`. +As before, the base case of the induction for `k=1` and all `n>0` is inherited from the `consistent_hash` implementation. The case `n=k` is also trivially covered, since the only valid set are the numbers `{0, ..., k-1}` which the algorithm correctly outputs. So, we only need to care about the induction step where `k>1` and `n>k`. We need to prove that `P(i ∈ S(k+1, n+1)) = (k+1)/(n+1)` for all `0 <= i <= n`. Property 3 already proves the case `i = n`. Furthermore we know that `P(n ∈ S(k+1, n+1)) = (k+1)/(n+1)` and vice versa `P(n ∉ S(k+1, n+1)) = 1 - (k+1)/(n+1)`. Let's consider those two cases separately. From bb850f5e64996176d9a5e3f86aeb6a39216167e1 Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Wed, 3 Jun 2026 08:53:54 +0200 Subject: [PATCH 13/13] Update consistent_reservoir.rs --- crates/consistent-choose-k/src/consistent_reservoir.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/consistent-choose-k/src/consistent_reservoir.rs b/crates/consistent-choose-k/src/consistent_reservoir.rs index 06385c6..276ed88 100644 --- a/crates/consistent-choose-k/src/consistent_reservoir.rs +++ b/crates/consistent-choose-k/src/consistent_reservoir.rs @@ -436,7 +436,13 @@ impl Iterator for ConsistentReservoir { debug_assert!(added_value >= self.n); self.n = added_value + 1; // Restore invariant 4: any pending elements at the tail of values must be popped. - while self.values.len() > self.k as usize && self.values.last().copied().unwrap() >= self.n + while self.values.len() > self.k as usize + && self + .values + .last() + .copied() + .expect("since values is non-empty") + >= self.n { self.values.pop(); }