diff --git a/crates/consistent-choose-k/README.md b/crates/consistent-choose-k/README.md index 4270d68..45574ab 100644 --- a/crates/consistent-choose-k/README.md +++ b/crates/consistent-choose-k/README.md @@ -185,8 +185,8 @@ Putting it together leads to `S(k+1, n+1) = {n} ∪ S(k, m)` and `S(k+1, n) = {m ### Property 2 -The final part is to prove property 2. This time we have an inducation over `k` and `n`. -As before, the base case of the induction for `k=1` and all `n>0` is inherited from the `consistency_hash` implementation. The case `n=k` is also trivially covered, since the only valid set are the numbers `{0, ..., k-1}` which the algorithm correctly outputs. So, we only need to care about the induction step where `k>1` and `n>k`. +The final part is to prove property 2. This time we have an induction over `k` and `n`. +As before, the base case of the induction for `k=1` and all `n>0` is inherited from the `consistent_hash` implementation. The case `n=k` is also trivially covered, since the only valid set are the numbers `{0, ..., k-1}` which the algorithm correctly outputs. So, we only need to care about the induction step where `k>1` and `n>k`. We need to prove that `P(i ∈ S(k+1, n+1)) = (k+1)/(n+1)` for all `0 <= i <= n`. Property 3 already proves the case `i = n`. Furthermore we know that `P(n ∈ S(k+1, n+1)) = (k+1)/(n+1)` and vice versa `P(n ∉ S(k+1, n+1)) = 1 - (k+1)/(n+1)`. Let's consider those two cases separately. diff --git a/crates/consistent-choose-k/benchmarks/performance.rs b/crates/consistent-choose-k/benchmarks/performance.rs index bf4cce1..b0dc772 100644 --- a/crates/consistent-choose-k/benchmarks/performance.rs +++ b/crates/consistent-choose-k/benchmarks/performance.rs @@ -4,12 +4,14 @@ use std::{ time::Duration, }; -use consistent_choose_k::{ConsistentChooseKHasher, ConsistentHasher}; +use consistent_choose_k::{ + ConsistentChooseKHasher, ConsistentHasher, ConsistentPermutation, ConsistentReservoir, +}; use criterion::{ criterion_group, criterion_main, AxisScale, BenchmarkId, Criterion, PlotConfiguration, Throughput, }; -use rand::{rng, RngExt}; +use rand::{rng, rngs::StdRng, RngExt, SeedableRng}; fn throughput_benchmark(c: &mut Criterion) { let keys: Vec = rng().random_iter().take(1000).collect(); @@ -68,6 +70,197 @@ fn append_vs_new_with_k(c: &mut Criterion) { group.finish(); } +fn grow_k_vs_permutation(c: &mut Criterion) { + // Compare three ways to obtain `k` distinct samples out of `0..n`: + // * `ConsistentChooseKHasher::new_with_k` (pre-build the full set); + // * `ConsistentChooseKHasher::new` + `grow_k` k times (incremental); + // * `ConsistentPermutation` (per-layer Feistel permutation, take k). + // + // All three are driven from a per-key seed so the cost of building the + // underlying permutation/hash state is included in each iteration. + let keys: Vec = rng().random_iter().take(100).collect(); + + let mut group = c.benchmark_group("grow_k_vs_permutation"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + for n in [100usize, 1_000, 10_000, 100_000] { + for k in [2usize, 10, 100, 1_000] { + if k > n { + continue; + } + group.throughput(Throughput::Elements((keys.len() * k) as u64)); + + group.bench_with_input( + BenchmarkId::new(format!("new_with_k/k_{k}"), n), + &n, + |b, n| { + b.iter(|| { + for key in &keys { + let mut h = DefaultHasher::default(); + key.hash(&mut h); + black_box(ConsistentChooseKHasher::new_with_k(h, *n, k)); + } + }) + }, + ); + + group.bench_with_input(BenchmarkId::new(format!("grow_k/k_{k}"), n), &n, |b, n| { + b.iter(|| { + for key in &keys { + let mut h = DefaultHasher::default(); + key.hash(&mut h); + let mut iter = ConsistentChooseKHasher::new(h, *n); + for _ in 0..k { + black_box(iter.grow_k()); + } + } + }) + }); + + group.bench_with_input( + BenchmarkId::new(format!("permutation/k_{k}"), n), + &n, + |b, n| { + b.iter(|| { + for key in &keys { + let mut iter = ConsistentPermutation::new(*n as u32, *key); + for _ in 0..k { + black_box(iter.next()); + } + } + }) + }, + ); + } + } + group.finish(); +} + +// A standard reservoir sampling (Algorithm R) implementation. +// It initializes a reservoir of size k, and then scans from k to n element-by-element, +// deciding for each step whether to admit that element. +fn standard_reservoir_r(k: u32, n: u32, seed: u64) -> Vec { + let mut reservoir: Vec = (0..k).collect(); + if n <= k { + return reservoir; + } + let mut rng = StdRng::seed_from_u64(seed); + + for i in k..n { + let j = rng.random_range(0..=i); + if j < k { + reservoir[j as usize] = i; + } + } + reservoir +} + +// A standard skip-based reservoir sampling (Algorithm L / Vitter) implementation. +fn standard_reservoir_l(k: u32, n: u32, seed: u64) -> Vec { + let mut reservoir: Vec = (0..k).collect(); + if n <= k { + return reservoir; + } + let mut rng = StdRng::seed_from_u64(seed); + + let mut w = (rng.random::().ln() / (k as f64)).exp(); + let mut i = k; + while i < n { + let u: f64 = rng.random(); + let s = (u.ln() / (1.0 - w).ln()) as u32; + i += s + 1; + if i <= n { + let j = rng.random_range(0..k); + reservoir[j as usize] = i - 1; + w *= (rng.random::().ln() / (k as f64)).exp(); + } + } + reservoir +} + +fn consistent_reservoir_to_n(k: u32, target_n: u32, seed: u64) -> Vec { + let mut r = ConsistentReservoir::new(k, k, seed); + while r.n() < target_n { + match r.next() { + Some((added, _)) if added < target_n => {} + _ => break, + } + } + r.reservoir().collect() +} + +fn reservoir_benchmarks(c: &mut Criterion) { + let seed = 42u64; + + let mut group = c.benchmark_group("reservoir_computation_up_to_n"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + group.sample_size(10); + group.warm_up_time(Duration::from_millis(200)); + group.measurement_time(Duration::from_secs(3)); + + for &n in &[100_000, 10_000_000] { + for &k in &[100, 1000] { + // 1. ConsistentReservoir direct build (O(k)) + group.bench_function( + BenchmarkId::new(format!("ConsistentReservoir_Direct/k_{k}"), n), + |b| { + b.iter(|| { + black_box( + ConsistentReservoir::new(k, n, seed) + .reservoir() + .collect::>(), + ); + }) + }, + ); + + // 2. ConsistentPermutation direct build (O(k)) + group.bench_function( + BenchmarkId::new(format!("ConsistentPermutation_Direct/k_{k}"), n), + |b| { + b.iter(|| { + black_box( + ConsistentPermutation::new(n, seed) + .take(k as usize) + .collect::>(), + ); + }) + }, + ); + + // 3. ConsistentReservoir streaming iteration from k to n (O(k log(n/k))) + group.bench_function( + BenchmarkId::new(format!("ConsistentReservoir_Streaming/k_{k}"), n), + |b| { + b.iter(|| { + black_box(consistent_reservoir_to_n(k, n, seed)); + }) + }, + ); + + // 4. Standard Algorithm R (O(n) linear scan) + group.bench_function( + BenchmarkId::new(format!("Standard_Algorithm_R/k_{k}"), n), + |b| { + b.iter(|| { + black_box(standard_reservoir_r(k, n, seed)); + }) + }, + ); + + // 5. Standard Algorithm L / Vitter (O(k log(n/k)) skip based) + group.bench_function( + BenchmarkId::new(format!("Standard_Algorithm_L/k_{k}"), n), + |b| { + b.iter(|| { + black_box(standard_reservoir_l(k, n, seed)); + }) + }, + ); + } + } + group.finish(); +} + criterion_group!( name = benches; config = Criterion::default() @@ -75,6 +268,6 @@ criterion_group!( .measurement_time(Duration::from_millis(4000)) .nresamples(1000); - targets = throughput_benchmark, append_vs_new_with_k, + targets = throughput_benchmark, append_vs_new_with_k, grow_k_vs_permutation, reservoir_benchmarks, ); criterion_main!(benches); diff --git a/crates/consistent-choose-k/docs/permutation-design.md b/crates/consistent-choose-k/docs/permutation-design.md new file mode 100644 index 0000000..6ff55dd --- /dev/null +++ b/crates/consistent-choose-k/docs/permutation-design.md @@ -0,0 +1,453 @@ +# Permutation iterator design notes + +This document explains the design of [`ConsistentPermutation`], the +per-layer Feistel permutation iterator that this crate uses to drive +its `n`-consistent ranking. + +Given a 64-bit `key` and a universe size `n`, the iterator produces a +uniformly distributed permutation of `[0, n)` as a streaming iterator +satisfying: + +- **Stateless and deterministic** — no per-call PRNG state outside the + iterator; two iterators on the same `(key, n)` produce the same + sequence. +- **`n`-consistent** — when `n` grows by one, the new sequence agrees + with the old except for the at most one position where the new + element `n − 1` is inserted (i.e. the underlying choose-`k` family + satisfies the consistency properties in the main [README]). +- **Fast in the small-`k`, repeated-query regime** — production + workloads call the iterator with `k` ≪ `n` for many keys, so the + per-emission cost and the iterator setup cost both matter. +- **Statistically clean for choose-2 set uniformity** — pairs of + selected nodes must be uniformly distributed over `binom(n, 2)` + cells over random keys. This is the test that consistently + distinguishes "looks fine on marginals but is actually biased" from + "near-random-permutation quality". + +[`ConsistentPermutation`]: ../src/consistent_permutation.rs +[README]: ../README.md + +## Core idea + +### Permutations are automatically `n`-consistent + +Fix any permutation `π` of `[0, N)`, and for each `n ≤ N` define +`π_n` as the subsequence of `π` you get by deleting every entry +`≥ n`. Then `{π_n}_{n ≤ N}` is an `n`-consistent family of +permutations: for every `n < N`, `π_{n+1}` equals `π_n` with the +element `n` inserted at one position. + +Two things to check: + +- **`π_n` is a permutation of `[0, n)`.** `π` is a bijection of + `[0, N)`, so each value in `[0, n)` appears in `π` exactly once. + Deleting entries `≥ n` leaves each value in `[0, n)` at exactly + one position, in their original relative order. +- **Going from `π_n` to `π_{n+1}` only inserts `n`.** The + construction of `π_{n+1}` differs from that of `π_n` only in + that the entry equal to `n` is no longer deleted. Every other + surviving entry sits at the same relative position. So + `π_{n+1}` is `π_n` with `n` spliced in at the position `π` puts + it. + +So *any* recipe for sampling a permutation of `[0, N)` from a key +automatically gives an `n`-consistent family of permutations for +every `n ≤ N`. + +### Hash functions already give us power-of-two permutations + +Modern keyed hash functions and block ciphers are by design +*bijections* on fixed-width words: a small Feistel network on +`2b` bits produces a key-parameterized permutation of +`[0, 2^(2b))` in `O(1)` evaluation time. So if `n` happens to be a +power of two `n = 2^B`, the family `π_n` is straightforward — pick +a Feistel of width `B`, and reading the iterator amounts to +evaluating it at `0, 1, 2, …`. + +The catch is that `n` is rarely a power of two. The lemma above +lets us take a permutation of `[0, N)` for any `N ≥ n` and read +off the survivors, so picking the smallest power of two `N ≥ n` +(say) and using a single Feistel of width `⌈log₂ N⌉` works +correctness-wise — but begs the question of how expensive the +survivor walk is. + +### Why we cannot just pick one big `N` + +The crudest instantiation is "pick a giant `N` (say `2^30` for all +queries), construct one permutation of `[0, N)` per key, and read +off the first `k` non-deleted values for any query `(n, k)`". +Correctness is given by the lemma. The cost is not. + +Each iterator step evaluates the underlying bijection on the next +counter value and either keeps the output (if `< n`) or discards +it. The expected fraction of kept outputs is `n / N`, so the +expected number of bijection evaluations per emission is `N / n`. +With `N = 2^30` and `n` in the low hundreds this is roughly `10^7` +hash evaluations per emission — unusable. + +What we actually want is `O(1)` bijection evaluations per +emission, **independent of how large the universe might +eventually grow**. A construction with cost `O(N / n)` per +emission ties us to picking the smallest possible `N` for each +query — but then we lose the ability to grow `n` cheaply, since +growing `n` past `N` requires a brand-new construction with no +relationship to the old one. + +### Layering: building the permutation in doubling buckets + +If we could always operate on an `N` that exceeds `n` by only a +constant factor, the per-emission cost above would already be +`O(1)`. The layered construction arranges exactly that, by +splitting the universe into a growing stack of intervals each +roughly the size of the previous one combined. + +Slice `[0, ∞)` into doubling *buckets* + +``` +bucket 0 = [0, 4), +bucket 1 = [4, 16), +bucket 2 = [16, 64), +... +bucket j = [4^j, 4^(j+1)). +``` + +Bucket `j` is owned by **layer `j`**, which is fully self-contained: +it knows how to emit the elements of its own bucket in a +key-dependent order, interleaved with calls into layer `j − 1` for +the values in lower buckets. Two pieces of state are all a layer +needs: + +- A **bijection `π_j`** on `[0, 4^(j+1))` (constructed in §"Feistel + (vs PCG)" below from the master key and the layer index). +- A **counter `c_j`** initialized to `0`, walking `0, 1, 2, …`. + +A single step of layer `j` is: + +1. compute `y = π_j(c_j)`, advance `c_j`; +2. look at the top two bits of `y` (i.e. `y >> 2j`): + - if they are nonzero (probability `3/4`), `y ∈ [4^j, 4^(j+1)) =` + bucket `j` — **emit** `y`; + - if they are `00` (probability `1/4`), `y ∈ [0, 4^j)` — that + emission belongs to a lower bucket, so **descend** to layer + `j − 1` and let it produce the value. + +To produce one element the iterator starts at the top layer `j_max` +and steps; each step either emits or descends. Termination is when +the top layer's counter has walked its full domain `4^(j_max+1)`. +The top layer additionally gates emissions by `y < n` (the only +bucket that can spill past `n`); lower layers' outputs are +automatically in range. + +### Why this works (uniformity, `n`-consistency, `O(1)` per emit) + +**Uniformity.** At universe size `N = 4^(j_max+1)` the +construction *is* a permutation of `[0, N)`: layer `j`'s counter +walks `0, …, 4^(j+1) − 1` and `π_j` is a bijection, so layer `j` +emits each value of bucket `j` exactly once and triggers a descent +for each value in `[0, 4^j)`. Summed over layers, every value of +`[0, N)` is emitted exactly once. The relative *order* is governed +by the per-layer keys, so over random keys the iterator output is +a uniformly random permutation of `[0, N)` — modulo the +statistical quality of the per-layer bijection, which is the +subject of the rest of this document. + +**`n`-consistency.** Because the result at universe size `N` is a +true permutation, the §"Permutations are automatically +`n`-consistent" lemma kicks in: the iterator's output for any +`n ≤ N` is `π_N` with values `≥ n` deleted, and the family across +all `n ≤ N` is automatically `n`-consistent. The top layer's +`y < n` gate is exactly this deletion. + +Growing past `N` adds a new top layer above the existing ones. +Every old layer's bijection and counter walk are unchanged; the +new top layer's descents into the old top layer arrive at counter +values `0, 1, 2, …` in order, which is exactly what the old top +layer saw when it was the root. So at the larger universe size +`N' = 4^(j_max+2)` the construction is again a permutation of +`[0, N')` whose restriction (in the sense of the lemma) to any +`n ≤ N` agrees with the old `π_n`. The universe can therefore be +extended indefinitely without re-specifying or disturbing the +existing structure. + +**Amortized cost per emission.** Consider an emission produced +somewhere in the descent. At each non-top layer the single step +emits with probability `3/4` and descends with probability `1/4`, +so the expected number of `π_j` evaluations per emission satisfies + +``` +E_j = 1 + (1/4) · E_{j − 1}, E_0 = 1 +``` + +which converges to `E_∞ = 4/3` regardless of how many layers are +below. The top layer is slightly worse — emissions whose raw +value lies in `[n, N)` get rejected — but since `j_max` is chosen +as the smallest level with `4^(j_max+1) ≥ n`, we always have +`N ≤ 4n`, so the top-layer in-bucket pass rate is at least `n/N ≥ +1/4`. The expected number of top-layer evaluations per kept top +emission is at most `4`. Combined, every iterator step costs a +bounded constant number of layer evaluations, independent of `n` +and `j_max`. + +## Overall structure + +The rest of this document justifies the three design choices that +the iterator above leaves open: + +1. **What bijection drives each layer** — an independently-keyed + balanced Feistel network. +2. **How many bits a layer call emits** — two. +3. **What `n` we cap at and how the iterator terminates** — `n ≤ 2³⁰`, + driven by a single top-layer counter. + +## Per-layer bijection (vs a single global one) + +A tempting alternative is to use a single global *chunk-prefix* +permutation on `u32` — a bijection with the property that the prefix +of length `i` of the output depends only on the prefix of length `i` +of the input — and drive every layer of the descent with that one +permutation. Per-layer setup vanishes; each layer call is a single +tiny permutation evaluation. + +The fundamental problem is the **size of the permutation family** +this gives you. A single 64-bit key parameterizes one fixed +chunk-prefix permutation that is then re-used at every layer. The +same key controls every layer's bijection, so cross-layer +correlations are not just possible — they are baked in. For small +`n`, where choose-2 has only `binom(n, 2)` cells to spread over, the +choose-2 χ² test rejects this distribution comfortably below the +0.1 % significance threshold. + +Giving every layer its own, independently-keyed bijection breaks the +cross-layer correlation by construction. The cost is one bijection +setup per layer, but with `j_max ≈ log₂(n) / 2`, that is at most ~15 +small structs of state for any `n ≤ 2³⁰` — negligible compared to +the iterator's emission stream. + +## Feistel (vs PCG) + +Once each layer needs its own bijection, the primitive of choice is a +small balanced Feistel network on `[0, 2^n_bits)`. The two main +alternatives are PCG-style multiply-xor-shift hashes (either +stateless per call, or stateful with state preserved across calls); +both lose for reasons worth being explicit about. + +### Stateless multi-round PCG + +A stateless per-call PCG hash `y = (y ^ c) · a; y ^= y >> shift` with +the key rotated between `R` rounds is statistically clean once `R` +is large enough, but **how** large is severe at small `n_bits`: tiny +layers (`n_bits = 2`) have an F-input space of only 2 values, so the +brute-force "many rounds" knob is the only thing that breaks the +parity correlations. A passing schedule needs on the order of 16 +rounds at the small-`n_bits` end. + +In addition, the tight serial dependency `y → y·a → y^(y>>shift) → +y·a → …` runs close to its sequential multiply latency on a wide +out-of-order core, so per-round work is essentially un-amortized. +Feistel rounds are also serial, but a Feistel round does roughly the +same per-round work as a PCG round and far fewer of them suffice. + +### Stateful single-step PCG + +A more clever variant is to give each layer a stateful LCG, preserving +state across iterator calls so a single PCG step (plus a small output +mixer) per call would suffice. The hope is that 1 step × 2 layers +per emission beats Feistel's few rounds × 1 layer per emission. + +This fails for a structural reason. A full-period LCG on a +power-of-two modulus `2^k` needs Hull–Dobell parameters: `a ≡ 1 mod 4` +and `c` odd. Both forces conspire to make every consecutive state +pair `(state[i], state[i+1])` differ in bit 0 — `a · s ≡ s mod 2` +since `a` is odd, and `c` flips parity since `c` is odd. The output +of the layer is a key-dependent bijection of the state, but the +**state-pair** support is only half of the full pair space. At small +`n_bits` (e.g. 3) that reduces reachable `(state[i], state[i+1])` +pairs from 64 to 32, and no amount of output mixing recovers the +missing half — bit 0 of one output is fully determined by bit 0 of +the previous output (modulo the key-dependent mixer's bijection on +each individual state). The result is a hard floor on choose-2 χ² at +small `n` that no extra mixing round can dissolve. + +The standard PCG construction sidesteps this by using a **wider** +state than its output (e.g. 64 bits in, 32 bits out, hiding the +parity-alternation in unused bits). That option is not available +here without breaking per-layer bijectivity: if the LCG cycle length +exceeds the layer's output domain, the output repeats values within +a single iterator lifetime. + +### Stateless PCG with bit reversal + +The stateless multi-round PCG above suffers from too many serial +multiplies at small `n_bits` because `y * a` followed by `y ^= y >> +shift` only diffuses bits *downwards*. Replacing the xor-shift with a +**bit reversal** breaks that asymmetry: every round mixes high bits +into low bits and vice versa, so a single multiplication can spread +entropy across the whole width in both directions. + +A round becomes: + +```text +y ^= k_xor; +y *= k_mul | 1; // odd multiplier +y ^= y >> shift; // local diffusion +y = y.reverse_bits() >> reverse_shift; +``` + +With this primitive a 2-bit-per-layer scheme passes χ² at `R = +8/8/4/4` (vs Feistel's `12/10/6/4`), and a 1-bit-per-layer scheme +passes at `R = 1/12/12/8/6/4`. So statistically the variant is at +least as good as Feistel with comparable or slightly fewer rounds. + +The wall-clock comparison is roughly a wash: + +- On **ARM64** (Apple M4), `u32::reverse_bits` is a single-cycle + `RBIT` instruction. The 2-bit-per-layer PCG-reverse variant came + in ~10% faster than Feistel on median, with peaks around +30% at + small `n`. The 1-bit-per-layer variant gave that gain back (more + layer calls per emission), landing at parity. +- On **x86_64**, `u32::reverse_bits` compiles to a 6–8-op sequence + of mask-and-swap (no native instruction), which is expected to + erase the ARM64 win and likely make the variant slower than + Feistel. + +Since the win is platform-dependent and small even where it exists, +the shipping implementation stays on Feistel: it has uniformly good +behaviour across architectures and no dependence on a single +fast-bit-reverse instruction. + +### Feistel + +Each layer `j` runs a small balanced Feistel network on +`[0, 2^(2j + 2))`, keyed by a single avalanched 64-bit `master_key` +that is rotated and Weyl-incremented between rounds. The F-function +is one `xor`, one multiplication by an odd 32-bit slice of the master +key, and a high-to-low fold to the half-width: + +```text +mixed = (r ^ k_xor) * (k_mul | 1) +f = (mixed_low + mixed_high) & half_mask +new_r = l ^ f +new_l = r +``` + +The properties that matter: + +- **Bijective by construction.** No cycle walking, no rejection, no + probabilistic restarts. Every Feistel round is a permutation on + the layer's domain; their composition is too. +- **Small-`n_bits` mixing handled by extra rounds.** Tiny layers + (`n_bits = 2, 4, 6`) use `12, 10, 6` rounds; everything else uses + `4`. The extra rounds for tiny layers are essentially free in + amortized cost because level `j` only contributes ~`4^j` emissions + per iterator lifetime — the top layer dominates per-iterator cost + and uses the fewest rounds. +- **Key rotation amortizes a 64-bit master across all rounds.** We + rotate by exactly the bits consumed per round and add the Weyl + increment `0x9E37_79B9_7F4A_7C15` to decorrelate consecutive rounds + (rotation alone leaves 60+ bits in common between adjacent rounds). +- **All `n_bits` even** means the Feistel halves split cleanly with + no parity quirks, and the 2-bit-per-layer emission rule lines up + neatly with the per-layer top bits. + +## Two bits per layer (vs one) + +A 1-bit-per-layer scheme is initially appealing: narrower per-layer +width means fewer Feistel rounds suffice, and the per-layer call is +correspondingly cheaper. Two compounding effects make it lose +overall: + +- **Twice as many layers.** With 1 bit per layer, + `j_max ≈ log₂(n)`; with 2 bits, `j_max ≈ log₂(n) / 2`. Setup cost + (per-layer counter, per-layer state) doubles. +- **Lower emission probability per layer call.** A 1-bit-per-layer + scheme emits whenever the top bit is `1` — probability ½. A + 2-bit-per-layer scheme emits whenever the top two bits are not + `00` — probability ¾. So the **expected number of layer calls per + emission** drops from `≈ 2` to `≈ 4/3`. + +Combining the two effects, 2-bit-per-layer makes ~½ as many +`layer_apply` calls per emission as 1-bit-per-layer. Even after +accounting for the per-call cost difference (slightly more Feistel +rounds at wider `n_bits`), the 2-bit scheme wins comfortably. + +Forcing the per-layer width even (which is what "2 bits per layer" +buys structurally) also keeps the Feistel halves perfectly symmetric +and removes any need for cycle walking to stay inside `[0, n)` at the +top. + +The PCG-reverse variant (which has no even-`n_bits` requirement) was +tried at both 1-bit and 2-bit per layer; the 2-bit variant was +faster there too, for the same reason — fewer layer calls per +emission outweighs the modestly cheaper per-call cost. + +## Universe cap: `n ≤ 2³⁰` + +Capping `n` at `2³⁰` lets the top-layer counter live in a `u32`, +which makes the iterator's termination condition a single +`counters[j_max] >= top_cap` comparison. No separate "emitted" +counter is needed to disambiguate boundary cases, the +`n_bits == 32` corner of the per-layer mask computation vanishes, +and `n_mask = (1 << n_bits) - 1` becomes unconditional inside +`layer_apply`. The cap is generous: `2³⁰ ≈ 10⁹` slots is more +node-count than any realistic distributed system, and the +simplifications it enables are observable in the benchmark. + +## Hot-path structure: top-layer phase vs descent phase + +The iterator's `next()` is split into two phases: + +- The **top-layer phase** walks the top layer's counter with both + the termination check (`counters[j_max] >= top_cap`) and the range + check (`raw < n`). The range check is necessary because the + Feistel maps the counter into `[0, 2^(2·j_max + 2))`, which may + overshoot `n`. +- The **descent phase** walks lower layers with **neither** check. + By the descent-tree invariant, lower-layer counters never exhaust + within a single iterator lifetime, and lower-layer raw outputs + always land in `[2^j, 2^(j+1)) ⊂ [0, n)`. So the hot path for + non-top layers is a pure Feistel call with no branches. + +## Measured throughput + +The `grow_k_vs_permutation` benchmark in +`benchmarks/performance.rs` constructs an iterator per key and pulls +`k` values from it, including the iterator-setup cost. Numbers +below are mean per-emitted-element times in nanoseconds on an Apple +M4 Max, for `ConsistentChooseKHasher::new_with_k` (the heap-based +implementation) versus `ConsistentPermutation::new` (this +construction). Smaller is better. + +| `n` | `k` | choose-k (ns/emit) | permutation (ns/emit) | speedup | +|---------|------:|-------------------:|----------------------:|--------:| +| 100 | 2 | 41.8 | 18.9 | 2.2 × | +| 100 | 10 | 59.7 | 14.3 | 4.2 × | +| 100 | 100 | 172 | 22.1 | 7.8 × | +| 1 000 | 2 | 41.6 | 14.9 | 2.8 × | +| 1 000 | 10 | 52.8 | 7.3 | 7.2 × | +| 1 000 | 100 | 93.2 | 7.5 | 12 × | +| 1 000 | 1 000 | 494 | 9.8 | 51 × | +| 10 000 | 2 | 51.7 | 16.4 | 3.2 × | +| 10 000 | 10 | 64.2 | 8.8 | 7.3 × | +| 10 000 | 100 | 108 | 12.1 | 8.9 × | +| 10 000 | 1 000 | 319 | 19.8 | 16 × | +| 100 000 | 2 | 44.4 | 19.1 | 2.3 × | +| 100 000 | 10 | 59.8 | 12.1 | 5.0 × | +| 100 000 | 100 | 102 | 19.0 | 5.4 × | +| 100 000 | 1 000 | 302 | 25.5 | 12 × | + +Observations matching the theory: + +- **Per-emit cost of `ConsistentPermutation` is bounded by a small + constant** (single-digit to low-tens of nanoseconds) across four + decades of `n` and `k`. The amortized `O(1)` layer evaluations + per emission analysis predicts exactly this. +- **The setup cost is amortized away by `k`.** At small `k = 2` the + per-emission number is dominated by iterator construction + (counter vector allocation), which is why + `k = 2` columns are systematically slower than `k = 10`. +- **Per-emit cost of `ConsistentChooseKHasher` grows roughly with + `k`** (heap operations cost `O(log k)` per pop and the + pre-build option is `O(k log k)`), which is why the speedup ratio + widens with `k`: at `k = 1 000, n = 1 000` the permutation + implementation is ~50× faster. + diff --git a/crates/consistent-choose-k/examples/statistical_comparison.rs b/crates/consistent-choose-k/examples/statistical_comparison.rs new file mode 100644 index 0000000..841e0e2 --- /dev/null +++ b/crates/consistent-choose-k/examples/statistical_comparison.rs @@ -0,0 +1,195 @@ +//! Statistical comparison of consistent-permutation constructions +//! against `ConsistentChooseKHasher` (the gold-standard reference). +//! +//! Each generator is asked to produce a uniformly-random ordering of +//! `0..n` and we run four chi-squared tests at alpha = 0.001: +//! +//! 1. Marginal at position 0 (first emitted element should be uniform). +//! 2. Marginal at position `k-1` (last sampled element should be uniform). +//! 3. Choose-2 unordered-pair uniformity over the `(n choose 2)` pairs. +//! 4. Per-position bit balance (each bit should be 1 ~half the time). +//! +//! Run with: cargo run --release --example statistical_comparison + +use std::hash::{DefaultHasher, Hash}; + +use consistent_choose_k::{ConsistentChooseKHasher, ConsistentPermutation}; + +const SAMPLES: usize = 200_000; + +/// 99.9% upper-tail critical value for chi-squared with `df` degrees of +/// freedom, computed by Wilson-Hilferty: +/// `chi2_{df, 1-alpha} ≈ df * (1 - 2/(9 df) + z * sqrt(2/(9 df)))^3` +/// where `z = Phi^{-1}(1 - alpha)`. For alpha=0.001, z ≈ 3.0902. +fn chi2_critical_999(df: usize) -> f64 { + let df = df as f64; + let a = 2.0 / (9.0 * df); + df * (1.0 - a + 3.0902 * a.sqrt()).powi(3) +} + +/// Pearson chi-squared statistic for `observed` against the uniform +/// distribution; returns `(chi2, df)`. +fn chi2_uniform(observed: &[u64]) -> (f64, usize) { + let total: u64 = observed.iter().sum(); + let cells = observed.len(); + let expected = total as f64 / cells as f64; + let mut chi2 = 0.0; + for &c in observed { + let d = c as f64 - expected; + chi2 += d * d / expected; + } + (chi2, cells - 1) +} + +// ---------- Generators ---------- + +fn chooseh_take(key: u64, n: usize, k: usize) -> Vec { + let mut h = DefaultHasher::default(); + key.hash(&mut h); + ConsistentChooseKHasher::new(h, n) + .map(|v| v as u32) + .take(k) + .collect() +} + +fn perm_take(key: u64, n: usize, k: usize) -> Vec { + ConsistentPermutation::new(n as u32, key).take(k).collect() +} + +// ---------- Per-method accumulator ---------- + +struct Acc { + name: String, + first: Vec, + kth: Vec, + pair: Vec, // n*n, indexed by lo*n + hi (lo <= hi) + bits: Vec>, // bits[pos][bit] +} + +impl Acc { + fn new(name: &str, n: usize, kk: usize, n_bits: usize) -> Self { + Self { + name: name.to_string(), + first: vec![0u64; n], + kth: vec![0u64; n], + pair: vec![0u64; n * n], + bits: vec![vec![0u64; n_bits]; kk], + } + } + + fn record(&mut self, v: &[u32], n: usize, n_bits: usize) { + let kk = self.bits.len(); + self.first[v[0] as usize] += 1; + self.kth[v[kk - 1] as usize] += 1; + let (lo, hi) = if v[0] <= v[1] { + (v[0], v[1]) + } else { + (v[1], v[0]) + }; + self.pair[(lo as usize) * n + hi as usize] += 1; + for (pos, &val) in v.iter().enumerate().take(kk) { + for b in 0..n_bits { + if (val >> b) & 1 == 1 { + self.bits[pos][b] += 1; + } + } + } + } +} + +fn run_for_n(n: usize) { + let kk = 2usize.min(n - 1).max(1); + let n_bits = ((n - 1).next_power_of_two().trailing_zeros() as usize).max(1); + + type GenFn = Box Vec>; + + // Build the list of methods to test. + let methods: Vec<(&'static str, GenFn)> = vec![ + ("ChooseKHasher ", Box::new(chooseh_take)), + ("Permutation ", Box::new(perm_take)), + ]; + + let mut accs: Vec = methods + .iter() + .map(|(name, _)| Acc::new(name, n, kk, n_bits)) + .collect(); + + for seed in 0..SAMPLES as u64 { + // Avalanche each sample's seed so we don't pass small + // sequential integers into the (single-pass) hashers, which + // would correlate strongly. `ConsistentPermutation::new` + // does not avalanche its key argument, so we have to do it + // here. + let mut z = seed.wrapping_add(0x9E37_79B9_7F4A_7C15); + z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + let key = z ^ (z >> 31); + for ((_, gen), acc) in methods.iter().zip(accs.iter_mut()) { + let v = gen(key, n, kk); + assert_eq!(v.len(), kk); + acc.record(&v, n, n_bits); + } + } + + println!("\n=== n = {n} (samples = {SAMPLES}) ==="); + println!( + "{:<46} {:>14} {:>6} {:>14} {:>7}", + "test", "chi2", "df", "crit(0.001)", "verdict" + ); + + let print_row = |label: &str, chi2: f64, df: usize| { + let crit = chi2_critical_999(df); + let verdict = if chi2 <= crit { "PASS" } else { "FAIL" }; + println!("{label:<46} {chi2:>14.2} {df:>6} {crit:>14.2} {verdict:>7}"); + }; + + // 1) Marginal @ pos 0. + for acc in &accs { + let (c, d) = chi2_uniform(&acc.first); + print_row(&format!("{}: marginal @ pos 0", acc.name), c, d); + } + // 2) Marginal @ pos kk-1. + for acc in &accs { + let (c, d) = chi2_uniform(&acc.kth); + print_row(&format!("{}: marginal @ pos {}", acc.name, kk - 1), c, d); + } + // 3) Choose-2 set uniformity over the (n choose 2) cells with i> b) & 1 == 1).count() as f64 / n as f64; + if pi == 0.0 || pi == 1.0 { + continue; + } + let exp_set = SAMPLES as f64 * pi; + let exp_unset = SAMPLES as f64 * (1.0 - pi); + for pos in 0..kk { + let set = acc.bits[pos][b] as f64; + let unset = SAMPLES as f64 - set; + chi2 += (set - exp_set).powi(2) / exp_set + (unset - exp_unset).powi(2) / exp_unset; + df += 1; + } + } + print_row(&format!("{}: per-(pos,bit) balance", acc.name), chi2, df); + } +} + +fn main() { + println!("Statistical comparison of consistent-permutation generators"); + println!("(Chi-squared tests, alpha = 0.001 — PASS means we cannot reject uniformity.)"); + for n in [8usize, 16, 32, 64, 128, 256, 1024] { + run_for_n(n); + } +} diff --git a/crates/consistent-choose-k/src/choose_k.rs b/crates/consistent-choose-k/src/choose_k.rs index ce777ee..54984ac 100644 --- a/crates/consistent-choose-k/src/choose_k.rs +++ b/crates/consistent-choose-k/src/choose_k.rs @@ -5,7 +5,7 @@ use crate::{ConsistentHasher, ManySeqBuilder}; /// The hashes are consistent when `n` changes and when `k` changes! /// I.e. one hash changes with probability `k/(n+1)` when `n` increases by one, /// resp. one hash gets added when `k` is increased. Additionally, the returned `k` tuple -/// is guaranteed to be uniformely chosen from all possible `n-choose-k` tuples. +/// is guaranteed to be uniformly chosen from all possible `n-choose-k` tuples. /// /// Also implements `Iterator` to yield the next sample when k is increased. /// Note: since this hashing algorithm implements choose k semantics, all the returned samples are distinct. @@ -49,7 +49,11 @@ impl ConsistentChooseKHasher { iter.samples.push(iter.get_sample(i, n)); } for i in (0..k).rev() { - let s = iter.samples[0..=i].iter().copied().max().expect(""); + let s = iter.samples[0..=i] + .iter() + .copied() + .max() + .expect("subslice 0..=i has at least one element"); iter.samples[i] = s; for j in 0..i { if iter.samples[j] == s { diff --git a/crates/consistent-choose-k/src/consistent_permutation.rs b/crates/consistent-choose-k/src/consistent_permutation.rs new file mode 100644 index 0000000..4d715c8 --- /dev/null +++ b/crates/consistent-choose-k/src/consistent_permutation.rs @@ -0,0 +1,309 @@ +//! Per-key consistent permutation iterator over `0..n`, driven by an +//! independently-keyed Feistel network per layer (2 bits per layer). +//! +//! Given a 64-bit `seed` and a universe size `n`, the iterator emits +//! each integer in `0..n` exactly once in a key-dependent order. The +//! sequence is `n`-consistent: when `n` grows by one, the new sequence +//! agrees with the old except for the at most one position where the +//! new element is inserted. +//! +//! # Layered design +//! +//! Each layer `j` is a Feistel bijection on `[0, 2^(2j + 2))`. After +//! one Feistel pass we look at the **top two bits** of the output: +//! +//! * `00` (one quadrant) → descend to layer `j - 1` (or, at the +//! bottom layer, the output is `0` itself, which we emit). +//! * `01` / `10` / `11` (three quadrants) → emit the raw value +//! (subject to the `< n` range check). +//! +//! So every layer-`j` call has a `3/4` chance of producing an emission, +//! versus `1/2` in a one-bit-per-layer scheme. That cuts both the +//! number of layers (`j_max ≈ ⌈log₂(n)/2⌉ - 1`) and the expected +//! number of `layer_apply` calls per emission (from `≈ 2` to `≈ 4/3`). +//! Even `n_bits` also means the Feistel never needs cycle-walking. +//! +//! See [`docs/permutation-design.md`] in the crate for the full +//! design rationale, including why an independently-keyed per-layer +//! Feistel network beats the (now removed) chunk-prefix-permutation +//! approach on choose-2 set uniformity at small `n` while staying +//! cheaper than multi-round PCG. +//! +//! [`docs/permutation-design.md`]: ../../docs/permutation-design.md +//! +//! # Round-count policy +//! +//! The number of Feistel rounds is chosen automatically per layer as a +//! function of `n_bits` (see [`rounds_for_n_bits`]). Small layers have +//! a tiny F-function input space (`2^half_bits` distinct values), +//! which limits the per-round permutation family; they compensate with +//! extra rounds. Large layers already see a wide F input and reach the +//! random-perm floor with `4` rounds. Conveniently the small layers +//! are called rarely (level `j` contributes `≈ 4^j` emissions), so the +//! extra rounds there are essentially free, while the top layer — +//! which dominates per-iterator cost — uses the fewest rounds. +//! +//! # Iterator +//! +//! Start at the top layer `j_max` and walk its counter. If the top +//! two bits are `00`, descend one layer (and walk *that* layer's +//! counter on the next step); otherwise emit. The iterator stops once +//! it has emitted `n` values. + +/// Smallest Feistel round count we trust to deliver near-random-perm +/// statistics for a layer of width `n_bits`, based on the +/// `statistical_comparison` chi² experiments. Tiny layers +/// (`n_bits = 2`) have a F-input space of only 2 values and need +/// many rounds; large layers reach the floor with 4 rounds. +pub(crate) fn rounds_for_n_bits(n_bits: u32) -> usize { + match n_bits { + 2 => 12, + 4 => 10, + 6 => 6, + _ => 4, + } +} + +/// SplitMix64 — strong per-bit avalanche, useful as a seed +/// pre-mixer for callers that only have a low-entropy seed. +#[cfg(test)] +#[inline] +fn splitmix64(seed: u64) -> u64 { + let mut z = seed.wrapping_add(0x9E37_79B9_7F4A_7C15); + z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + z ^ (z >> 31) +} + +/// Apply the per-layer Feistel bijection on `[0, 2^n_bits)`. +/// `master_key` must be well-avalanched: two near-identical keys +/// will yield two highly correlated permutations. +/// +/// All per-layer geometry (`half_bits`, mask, round count) is derived +/// from `n_bits` on each call. `n_bits` is small (≤ 32) and the +/// constants are a handful of trivial bit ops, so this stays well off +/// the hot path of the Feistel loop itself. +/// +/// `n_bits` must be **even** and in `2..=30`. +#[inline] +pub(crate) fn layer_apply(n_bits: u32, master_key: u64, x: u32) -> u32 { + debug_assert!((2..=30).contains(&n_bits) && n_bits.is_multiple_of(2)); + let rounds = rounds_for_n_bits(n_bits) as u32; + let half_bits = n_bits / 2; + let half_mask = (1u32 << half_bits) - 1; + let n_mask = (1u32 << n_bits) - 1; + let shift = 2 * half_bits; + + debug_assert!(x <= n_mask, "input out of range"); + let mut l = (x >> half_bits) & half_mask; + let mut r = x & half_mask; + let mut k = master_key; + for _ in 0..rounds { + // Split each 64-bit (rotated) master into two *independent* + // 32-bit sub-keys: the low half is an additive offset, the + // high half (forced odd) is the multiplier. Higher bits stay + // set on purpose — they contribute via the multiplicative + // mix. + let k_xor = k & 0xFFFF_FFFF; + let k_mul = (k >> 32) | 1; + let mixed = ((r as u64) ^ k_xor).wrapping_mul(k_mul); + let f = (mixed as u32).wrapping_add((mixed >> 32) as u32) & half_mask; + let new_l = r; + let new_r = l ^ f; + l = new_l; + r = new_r; + // Rotate the master key by exactly the per-round "consumed" + // bit count so that each round sees a fresh slice of the + // master in its low positions, plus a Weyl increment to + // decorrelate consecutive rounds (rotation alone leaves + // 60+ bits in common between adjacent rounds). + k = k.rotate_right(shift).wrapping_add(0x9E37_79B9_7F4A_7C15); + } + ((l << half_bits) | r) & n_mask +} + +/// `n`-consistent permutation iterator over `0..n` driven by one +/// bijection per level (see module docs). +pub struct ConsistentPermutation { + /// Master key driving every layer's Feistel. Must be supplied + /// already avalanched (see [`ConsistentPermutation::new`]). + /// Layers stay distinct because their geometry + /// (`n_bits`, `half_bits`, rotation amount, round count) differs. + master_key: u64, + /// `counters[j]` is the next index to feed into the layer-`j` + /// bijection. Walks the full domain `[0, 2^(2j + 2))`. With + /// `n <= 2^30`, `j_max <= 14` so the top counter caps at + /// `2^30 <= u32::MAX`. + counters: Vec, + n: u32, +} + +impl ConsistentPermutation { + /// Construct an iterator over `0..n` from a single 64-bit + /// `master_key`. Each layer's Feistel round count is chosen + /// automatically by [`rounds_for_n_bits`] given that layer's bit + /// width. + /// + /// `n` must satisfy `1 <= n <= 2^30`. + /// + /// `master_key` is used directly as the Feistel key — the + /// constructor does **not** avalanche it. Pass a high-entropy + /// value (e.g. the output of a cryptographic or strong + /// non-cryptographic hash like SplitMix64); sequential or + /// low-entropy keys will produce visibly correlated permutations + /// across consecutive iterators. + pub fn new(n: u32, master_key: u64) -> Self { + assert!(n > 0, "n must be at least 1"); + assert!(n <= 1u32 << 30, "n must be at most 2^30"); + // Smallest `j_max` such that `4^(j_max + 1) >= n`. For + // `n >= 2`, `(n-1).ilog2() = ceil(log2(n)) - 1`, so dividing + // by 2 gives `ceil(ceil(log2(n)) / 2) - 1` — the right value + // for both even and odd bit-widths. + let j_max = if n <= 1 { 0 } else { (n - 1).ilog2() / 2 }; + let counters = vec![0u32; (j_max + 1) as usize]; + Self { + master_key, + counters, + n, + } + } + + /// Universe size. + pub fn n(&self) -> u32 { + self.n + } +} + +impl Iterator for ConsistentPermutation { + type Item = u32; + + fn next(&mut self) -> Option { + let num_layers = self.counters.len(); + let j_max = (num_layers - 1) as u32; + let n = self.n; + let mk = self.master_key; + + // Phase 1: walk the top layer (where skips and exhaustion + // happen) until we either emit an in-range value or descend. + let n_bits_top = 2 * num_layers as u32; + let top_shift = 2 * j_max; + let top_cap = 1u32 << n_bits_top; + loop { + let counter = self.counters[j_max as usize]; + if counter >= top_cap { + return None; + } + let raw = layer_apply(n_bits_top, mk, counter); + self.counters[j_max as usize] = counter + 1; + let top_bits = (raw >> top_shift) & 0b11; + if top_bits != 0 { + // `raw` lies in `[4^j_max, 4^(j_max+1))`. May be out + // of range (only the top layer can exceed `n`). + if raw < n { + return Some(raw); + } + continue; + } + // Descend. At `j_max == 0` the top layer *is* the bottom + // layer and `top_bits == 0` means `raw == 0`. + if j_max == 0 { + return Some(0); + } + break; + } + + // Phase 2: descend through lower layers. Their counters can + // never exhaust within a single walk, and their raw values + // are always in `[4^j, 4^(j+1)) ⊂ [0, n)`, so neither + // exhaustion nor range checks are needed. + for j in (0..j_max).rev() { + let counter = self.counters[j as usize]; + let n_bits = 2 * j + 2; + let raw = layer_apply(n_bits, mk, counter); + self.counters[j as usize] = counter + 1; + let top_shift = 2 * j; + let top_bits = (raw >> top_shift) & 0b11; + if top_bits != 0 { + return Some(raw); + } + } + Some(0) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + + /// Across many seeds, `layer_apply(_, key, 0)` must not always + /// collapse to the same value (regression test for the "F(0, k) = + /// 0" symmetry bug). + #[test] + fn layer_apply_zero_input_is_key_sensitive() { + for n_bits in [2u32, 4, 6, 8, 10] { + let outputs: HashSet = (0..1000u64) + .map(|seed| layer_apply(n_bits, splitmix64(seed), 0)) + .collect(); + let domain = 1u32 << n_bits; + assert!( + outputs.len() >= (domain as usize / 2).min(50), + "n_bits={n_bits}: only {} distinct values from layer_apply(_, _, 0) across 1000 seeds", + outputs.len() + ); + } + } + + /// `layer_apply` must be a bijection on `[0, 2^n_bits)` for every + /// even supported `n_bits`. + #[test] + fn layer_apply_is_bijection() { + for n_bits in [2u32, 4, 6, 8, 10] { + for seed in 0..16u64 { + let key = splitmix64(seed); + let domain: u32 = 1 << n_bits; + let outputs: HashSet = + (0..domain).map(|x| layer_apply(n_bits, key, x)).collect(); + assert_eq!( + outputs.len(), + domain as usize, + "n_bits={n_bits} seed={seed}: not a bijection" + ); + for &o in &outputs { + assert!( + o < domain, + "n_bits={n_bits} seed={seed}: output {o} out of range" + ); + } + } + } + } + + /// Collecting the iterator must yield values in `0..n` without + /// duplicates, and must yield **exactly** `n` of them. + #[test] + fn no_duplicates_within_universe() { + for seed in 0..32u64 { + for n in [1u32, 2, 3, 4, 5, 7, 8, 9, 16, 17, 31, 32, 33, 100, 1000] { + let iter = ConsistentPermutation::new(n, 0x9E37_79B9_7F4A_7C15 ^ seed); + let emitted: Vec = iter.collect(); + assert_eq!( + emitted.len(), + n as usize, + "seed={seed} n={n}: wrong emission count: {emitted:?}" + ); + assert!( + emitted.iter().all(|&v| v < n), + "seed={seed} n={n}: out-of-range value emitted: {emitted:?}" + ); + let unique: HashSet = emitted.iter().copied().collect(); + assert_eq!( + unique.len(), + emitted.len(), + "seed={seed} n={n}: duplicate emitted: {emitted:?}" + ); + } + } + } +} diff --git a/crates/consistent-choose-k/src/consistent_reservoir.rs b/crates/consistent-choose-k/src/consistent_reservoir.rs new file mode 100644 index 0000000..06385c6 --- /dev/null +++ b/crates/consistent-choose-k/src/consistent_reservoir.rs @@ -0,0 +1,505 @@ +//! Online consistent reservoir sampling driven by +//! [`ConsistentPermutation`]. +//! +//! # What is a "consistent" reservoir? +//! +//! Given a 64-bit `master_key`, a *consistent* reservoir of size `k` +//! over a universe `[0, n)` is the set +//! +//! ```text +//! reservoir(n, k) = ConsistentPermutation::new(n, master_key).take(k) +//! ``` +//! +//! — the first `k` emissions of the keyed permutation of `[0, n)`. +//! By the consistency property of [`ConsistentPermutation`]: +//! +//! * For fixed `k`, growing `n` by one inserts at most one new +//! element into `reservoir(n, k)` (and evicts at most one). The +//! stream of `(added, evicted)` pairs as `n` ranges over `n..` +//! is what this iterator emits. +//! * For fixed `n`, the reservoir is a **prefix** in rank order: +//! `reservoir(n, k) ⊂ reservoir(n, k+1)`. So the same reservoir +//! serves every `k` — there is no need to fix `k` at construction +//! time the way standard reservoir sampling does. The first +//! `k'` elements of the reservoir (in rank order) are themselves +//! a valid reservoir of size `k'`. +//! +//! # Complexity +//! +//! * `ConsistentPermutation::new(n, key).take(k)` — get the +//! reservoir for a fixed `n` and `k` from scratch: **`O(k)`**. +//! * [`ConsistentReservoir::new(k, n, key)`](Self::new) — set up +//! the iterator at any starting `n`: **`O(k)`** (the +//! `O(k)` walk plus an average-case linear-time bucket sort of `pending` ranks). +//! * [`Iterator::next`] — get the next `(added, evicted)` pair: +//! **`O(1)` amortised** (each call pops amortized a constant number of +//! tail entries; `grow_layer`, which costs $O(k)$ average-case, is +//! triggered on average once per `Θ(k)` admissions). +//! +//! The sorting overhead is minimized to a true $O(k)$ average-case complexity via bucket sort! +//! Because all values generated in a layer are expected to be uniformly randomly chosen within the +//! interval $[\frac{1}{4}m, m)$, bucket-sorting them into $O(k)$ buckets achieves a linear worst-case average-case runtime. +//! With this optimization, the complexity of consistent reservoir sampling matches that of standard +//! reservoir sampling amortized ($O(1)$ per processed item). +//! +//! Contrast with the classic random-jump reservoir sampling +//! algorithms (Vitter L): +//! +//! * `O(1)` to jump to the next sample in the stream. +//! * `O(k log(n / k))` to *jump* to a target universe size `n` +//! (since L still has to iterate through the geometric jumps). +//! * Tied to the `k` fixed at construction; changing `k` breaks +//! consistency property. +//! +//! So a consistent reservoir is the right pick when you want to +//! random-access the reservoir at an arbitrary `n` and/or vary `k` +//! while keeping the consistency property; standard reservoir sampling wins on per-element +//! streaming cost but loses on these two flexibilities. +//! +//! # Benchmark Results +//! +//! Performance comparison for computing the reservoir of size $k$ up to point $n = 10,000,000$: +//! +//! | Algorithm | Strategy / Complexity | $k = 100$ | $k = 1000$ | Consistency Support? | Fixed-$k$ Constrained? | +//! | :--- | :--- | :--- | :--- | :--- | :--- | +//! | **`ConsistentPermutation_Direct`** | Direct Random-Access / $O(k)$ | **`883 ns`** | **`7.80 µs`** | **Yes** (Perfect Profile) | No (Arbitrary subsets) | +//! | **`ConsistentReservoir_Direct`** | Direct Build / $O(k)$ | **`1.61 µs`** | **`14.30 µs`** | **Yes** (Perfect Profile) | No (Iterator starts at $n$) | +//! | **`Standard_Algorithm_L` (Vitter)**| Streaming Skip / $O(k \log(n/k))$| **`22.12 µs`** | **`183.17 µs`** | No | **Yes** (Must rebuild from $0$) | +//! | **`ConsistentReservoir_Streaming`**| Incremental Streaming / $O(k \log(n/k))$| **`24.06 µs`** | **`318.88 µs`** | **Yes** (Stateful Iterator) | No (Sub-streams consistent)| +//! | **`Standard_Algorithm_R` (Scan)**| Streaming Scan / $O(n)$ | **`28.22 ms`** | **`28.26 ms`** | No | **Yes** (Must rebuild from $0$) | +//! +//! # Data layout +//! +//! Everything lives in a single flat `values: Vec` plus a `pending: Vec` +//! of rank indices into `values`, representing candidate fresh admissions. +//! +//! After construction (or after [`Self::grow_layer`]), `values` is +//! exactly the first `k + |pending|` emissions of +//! `ConsistentPermutation::new(m, master_key)` in walk order, i.e. +//! `values[r]` is the rank-`r` emission of the current top-level +//! permutation. Descents (old-reservoir items) and non-descents +//! (fresh items) as well as active and inactive items are interleaved. +//! +//! `pending` is a vector of ranks/indices sorted descending by their corresponding value in `values`. +//! By storing them descending, `pending.pop()` efficiently returns the rank pointing to the smallest +//! fresh value first in $O(1)$ time — the next admission as `n` grows. +//! +//! # Active reservoir via `n` +//! +//! `self.n` is the current effective universe size: `reservoir()` +//! returns the items in `values` whose value is `< n`, in walk order. +//! By [`ConsistentPermutation`] consistency this is exactly +//! `ConsistentPermutation::new(n, master_key).take(k)` — the +//! consistent top-`k` for `[0, n)`. +//! +//! Each [`Iterator::next`] call advances `n` to the next admission's +//! value `+ 1`, yielding `(added_value, evicted_value)`. +//! +//! # Initial state ([`Self::new`]) +//! +//! Given the user's initial `n`, pick `j_max = smallest_j_max(n)` and +//! `m = 4^(j_max+1) >= n`. Walk `ConsistentPermutation::new(m, key)` +//! pushing each emission into `values` until we have collected +//! exactly `k` in-range values (i.e. `< n`). Every out-of-range +//! emission seen along the way has its rank recorded in `pending_vec`; +//! finally `pending` is initialized in average-case $O(k)$ time using flat bucket sorting. +//! +//! The loop is guaranteed to terminate with the last push being an +//! in-range value — that's what triggers `count == k`. So +//! `values.last()` is always a real reservoir item, which is the +//! one to evict on the first admission. +//! +//! # Growing the universe ([`Self::grow_layer`]) +//! +//! Opens the next top Feistel layer (`j_max += 1`, `m *= 4`) and +//! rebuilds `values` / `pending` against the new permutation by +//! walking the new top layer's counter until all `k` old-reservoir +//! entries have been consumed by descents. Every emission is pushed +//! to `new_values` in walk order; non-descents additionally have +//! their rank recorded in `pending_vec`. The walk terminates on a +//! descent so `values.last()` is again a real reservoir item. +//! +//! Non-descents at counters `>= k` are real emissions of the new +//! permutation but their rank places them outside the top-`k`. They +//! still end up in `values` and `pending`, and are filtered out +//! later when popped. +//! +//! # Invariants +//! +//! Throughout the iterator's lifetime (between [`Iterator::next`] +//! calls, and restored before each yields) four invariants hold: +//! +//! 1. **The active reservoir has size `k`.** The *active set* — +//! the entries of `values` whose value is `< n` — always +//! contains exactly `k` elements. This is what `reservoir()` +//! returns. +//! 2. **Every pending value is strictly larger than every active +//! value.** Pending entries were emitted with value `>= n` by +//! [`ConsistentPermutation`], whereas active entries are by +//! definition `< n`. In particular the smallest pending value +//! — which corresponds to the rank popped by `pending.pop()` — is +//! the next candidate to cross into the active window as `n` grows. +//! 3. **`pending` must contain all elements from `values` which are greater or equal to `n`.** +//! Since there are exactly `k` elements less than `n` in `values`, `pending.len()` +//! must be larger or equal to `values.len() - k` (note that there can be values in +//! `pending` which we discarded while searching for the last active value). +//! 4. **The last item in `values` is always an active element.** +//! That is, `values.last()` is guaranteed to have a value `< n`. +//! +//! From these facts the algorithm of [`Iterator::next`] reads +//! off: +//! +//! **Finding the eviction.** Since the last item in `values` is always +//! an active element (invariant 4), the last active entry in walk order +//! resides exactly at the tail of `values`. Therefore, we can immediately +//! obtain the evicted value by popping `values`. +//! +//! **Finding the admission.** Pop `added_rank` from `pending` until it is +//! strictly less than `values.len()`. If it is out-of-bounds, it corresponds +//! to an element that was previously popped from `values` because it was pending +//! at the tail, so we safely discard it and continue. Otherwise, the corresponding +//! value is our candidate `added_value`. +//! +//! **Restoring the invariants.** Once we have `added_value`, we update `n` to +//! `added_value + 1`. This makes `added_value` active, restoring the active +//! reservoir size to `k`. Finally, we restore invariant 4 by popping any pending +//! elements (values `>= n`) from the tail of `values` until either `values.len() == k` +//! or the tail is active (value `< n`). +//! +//! The stream only terminates once the universe reaches the [`ConsistentPermutation`] +//! cap (`m = 2^30`); until then, `grow_layer` re-establishes the invariants whenever +//! `pending` empties. + +use std::cmp::Reverse; + +use crate::consistent_permutation::{layer_apply, ConsistentPermutation}; + +/// Reservoir of size `k` whose contents track the consistent top-`k` +/// by [`ConsistentPermutation`] rank, with explicit support for +/// growing the universe one Feistel layer at a time. +pub struct ConsistentReservoir { + /// First `k + pending.len()` emissions of + /// `ConsistentPermutation(m, master_key)` in walk order. + /// `values[r]` is always the rank-`r` emission of the current + /// top-level permutation. + values: Vec, + /// Ranks of pending candidate admissions, sorted **descending by value** using `values` + /// so `pop()` returns the rank corresponding to the smallest value first. + pending: Vec, + k: u32, + /// Top Feistel layer index. The current top-level universe size + /// is `m = 4^(j_max+1) = 1 << (2*j_max + 2)`. + j_max: u32, + master_key: u64, + /// Current effective universe size. `reservoir()` returns + /// values `< n`, which equals + /// `ConsistentPermutation::new(n, master_key).take(k)`. + n: u32, + /// Reusable scratchpad for holding the old reservoir of size `k` during grow layers. + old_reservoir: Vec, + /// Reusable scratchpad for bucket sort counts to avoid allocations. + counts_scratch: Vec, +} + +/// Helper function to perform a flat, allocation-friendly bucket sort on values of a slice +/// that are greater than or equal to `v_min` into a pre-allocated destination buffer. +fn bucket_sort_ranks_descending_in( + values: &[u32], + dest: &mut [u32], + counts: &mut [usize], + v_min: u32, + v_max: u32, +) { + let n_pending = dest.len(); + let range = (v_max - v_min) as u64; + if n_pending == 0 || range == 0 { + return; + } + // 1. Count elements in each bucket + for &val in values { + if val >= v_min { + let mut idx = (((val - v_min) as u64 * n_pending as u64) / range) as usize; + if idx >= n_pending { + idx = n_pending - 1; + } + counts[idx] += 1; + } + } + // 2. Compute starting offsets for buckets descending from n_pending-1 to 0 in-place inside `counts` + let mut current_offset = 0; + for i in (0..n_pending).rev() { + let count = counts[i]; + counts[i] = current_offset; + current_offset += count; + } + // 3. Place ranks into destination array while updating write heads in `counts` + for (rank, &val) in values.iter().enumerate() { + if val >= v_min { + let mut idx = (((val - v_min) as u64 * n_pending as u64) / range) as usize; + if idx >= n_pending { + idx = n_pending - 1; + } + let pos = counts[idx]; + dest[pos] = rank as u32; + counts[idx] += 1; + } + } + // 4. Sort each bucket in-place descending by value. + // After step 3, `counts[i]` holds the exclusive end of bucket `i`. + // The starting index of bucket `i` is 0 if `i == n_pending - 1`, otherwise `counts[i + 1]`. + for i in 0..n_pending { + let start = if i == n_pending - 1 { 0 } else { counts[i + 1] }; + let end = counts[i]; + if end > start + 1 { + dest[start..end].sort_unstable_by_key(|&rank| Reverse(values[rank as usize])); + } + } +} + +/// Smallest `j_max` such that `4^(j_max+1) >= k`. Matches the +/// formula used by [`ConsistentPermutation::new`]. +fn smallest_j_max(k: u32) -> u32 { + if k <= 1 { + 0 + } else { + (k - 1).ilog2() / 2 + } +} + +impl ConsistentReservoir { + /// Build a consistent reservoir of size `k` for initial universe + /// `n`, seeded by `master_key`. The initial `reservoir()` is the + /// consistent top-`k` for `[0, n)`. + /// + /// Internally we use the smallest layer boundary + /// `m = 4^(j_max+1) >= n` and walk + /// `ConsistentPermutation::new(m, master_key)` until we have + /// observed exactly `k` values `< n`; out-of-range emissions + /// become pending admissions for later iteration. + /// + /// `master_key` is forwarded verbatim to + /// [`ConsistentPermutation::new`] (no avalanche). + /// + /// # Panics + /// + /// Panics if `k == 0`, `n > 2^30`, or `k > n`. + pub fn new(k: u32, n: u32, master_key: u64) -> Self { + assert!(k >= 1, "k must be at least 1"); + assert!(n <= 1u32 << 30, "n must be at most 2^30"); + let j_max = smallest_j_max(n); + let m = 1u32 << (2 * j_max + 2); + let mut values = vec![]; + let mut count = 0; + let mut pending_count = 0; + for value in ConsistentPermutation::new(m, master_key) { + values.push(value); + if value < n { + count += 1; + if count == k { + break; + } + } else { + pending_count += 1; + } + } + let mut pending = vec![0u32; pending_count]; + let mut counts_scratch = vec![0usize; pending_count]; + bucket_sort_ranks_descending_in(&values, &mut pending, &mut counts_scratch, n, m); + Self { + values, + pending, + k, + j_max, + master_key, + n, + old_reservoir: Vec::new(), + counts_scratch, + } + } + + /// Reservoir size. + pub fn k(&self) -> u32 { + self.k + } + + /// Current effective universe size — `reservoir()` returns + /// `ConsistentPermutation::new(n(), master_key).take(k)`. + pub fn n(&self) -> u32 { + self.n + } + + /// The current reservoir contents, in rank order. + /// + /// Equals `ConsistentPermutation::new(n(), master_key).take(k)` + /// by [`ConsistentPermutation`] consistency. + pub fn reservoir(&self) -> impl Iterator + '_ { + self.values.iter().copied().filter(|&v| v < self.n()) + } + + /// Open the next top Feistel layer (extends `m` to `4 * m`) and + /// rebuild `values`/`pending` against the new permutation. + /// + /// Any unconsumed entries left from the previous walk are + /// dropped — their ranks lived in the old top layer's coordinate + /// system and have no meaning under the new one. The new walk + /// is guaranteed to terminate on a descent, so `values.last()` + /// is again a genuine reservoir item ready to be evicted on the + /// next admission. + /// + /// # Panics + /// + /// Panics if growing would push the universe past + /// [`ConsistentPermutation`]'s `n <= 2^30` cap. + pub fn grow_layer(&mut self) { + assert!( + self.j_max < 14, + "ConsistentReservoir at universe cap (m = 2^30)" + ); + let new_j_max = self.j_max + 1; + let new_n_bits = 2 * new_j_max + 2; + let new_top_shift = 2 * new_j_max; + let k = self.k as usize; + // Drop the stale eviction tail from the previous grow_layer + // (if the iterator wasn't fully drained, those values are no + // longer reachable anyway). + self.values.truncate(k); + std::mem::swap(&mut self.values, &mut self.old_reservoir); + debug_assert_eq!(self.old_reservoir.len(), k); + + self.values.clear(); + let mut next_old_idx = 0usize; + let mut counter: u32 = 0; + let mut pending_count = 0; + while next_old_idx < k { + let raw = layer_apply(new_n_bits, self.master_key, counter); + let is_descent = (raw >> new_top_shift) & 0b11 == 0; + if is_descent { + // Consume the next old reservoir entry; this is the + // walk's only termination condition. + self.values.push(self.old_reservoir[next_old_idx]); + next_old_idx += 1; + } else { + // Push the fresh emission. + // Ranks >= k are filtered out later by the iterator's guards. + self.values.push(raw); + pending_count += 1; + } + counter += 1; + } + + let m_old = 1u32 << (2 * self.j_max + 2); + let m_new = 1u32 << (2 * new_j_max + 2); + self.pending.resize(pending_count, 0); + self.counts_scratch.resize(pending_count, 0); + self.counts_scratch.fill(0); + bucket_sort_ranks_descending_in( + &self.values, + &mut self.pending, + &mut self.counts_scratch, + m_old, + m_new, + ); + self.j_max = new_j_max; + } +} + +impl Iterator for ConsistentReservoir { + /// `(added_value, evicted_value)`: the next item admitted to the + /// reservoir paired with the item it displaces. After yielding, + /// `n()` advances to `added_value + 1`. + /// + /// `added_value` is strictly increasing across calls + /// (`pending` is consumed smallest-value first). The stream + /// terminates only at the [`ConsistentPermutation`] universe + /// cap (`m = 2^30`). + type Item = (u32, u32); + + fn next(&mut self) -> Option<(u32, u32)> { + // Keep growing until we are guaranteed to have a pending admission. + while self.values.len() <= self.k as usize { + if self.j_max >= 14 { + return None; + } + self.grow_layer(); + } + // By invariant 4, the last element is always active under the previous n. + let evicted_value = self + .values + .pop() + .expect("values has more than k entries, so pop() is safe"); + + let added_rank = std::iter::from_fn(|| self.pending.pop()) + .find(|&rank| (rank as usize) < self.values.len()) + .expect("pending is non-empty by our invariants") as usize; + + let added_value = self.values[added_rank]; + debug_assert!(added_value >= self.n); + self.n = added_value + 1; + // Restore invariant 4: any pending elements at the tail of values must be popped. + while self.values.len() > self.k as usize && self.values.last().copied().unwrap() >= self.n + { + self.values.pop(); + } + Some((added_value, evicted_value)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn splitmix64(seed: u64) -> u64 { + let mut z = seed.wrapping_add(0x9E37_79B9_7F4A_7C15); + z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + z ^ (z >> 31) + } + + #[test] + fn smallest_j_max_picks_smallest_layer_geq_k() { + let cases = [ + (1u32, 0u32, 4u32), + (2, 0, 4), + (3, 0, 4), + (4, 0, 4), + (5, 1, 16), + (7, 1, 16), + (10, 1, 16), + (16, 1, 16), + (17, 2, 64), + (64, 2, 64), + (65, 3, 256), + (256, 3, 256), + (257, 4, 1024), + ]; + for (k, expected_j, expected_m) in cases { + assert_eq!(smallest_j_max(k), expected_j, "k={k}"); + assert_eq!(1u32 << (2 * smallest_j_max(k) + 2), expected_m, "k={k}"); + } + } + + #[test] + fn test_reservoir() { + for seed in [42, 123456789, 987654321] { + let master_key = splitmix64(seed); + for k in [1, 2, 3, 10, 100] { + let mut r = ConsistentReservoir::new(k, k, master_key); + for _ in 0.. { + let before: Vec<_> = r.reservoir().collect(); + let Some((added, evicted)) = r.next() else { + break; + }; + let after: Vec<_> = r.reservoir().collect(); + let expected: Vec<_> = ConsistentPermutation::new(added, master_key) + .take(k as usize) + .collect(); + assert_eq!(before, expected); + assert!(before.contains(&evicted)); + assert!(!after.contains(&evicted)); + assert!(!before.contains(&added)); + assert!(after.contains(&added)); + } + } + } + } +} diff --git a/crates/consistent-choose-k/src/lib.rs b/crates/consistent-choose-k/src/lib.rs index 8a2c392..25b7e0f 100644 --- a/crates/consistent-choose-k/src/lib.rs +++ b/crates/consistent-choose-k/src/lib.rs @@ -1,9 +1,13 @@ mod choose_k; mod consistent_hash; +mod consistent_permutation; +mod consistent_reservoir; mod node_map; pub use choose_k::ConsistentChooseKHasher; pub use consistent_hash::{ ConsistentHashIterator, ConsistentHashRevIterator, ConsistentHasher, HashSeqBuilder, HashSequence, ManySeqBuilder, }; +pub use consistent_permutation::ConsistentPermutation; +pub use consistent_reservoir::ConsistentReservoir; pub use node_map::ConsistentNodeMap;