対称分割ソートを使用する

対称分割ソート (symmetry partition sort, SPSort) は、比例拡張ソート(PESort)の分割を土台にしつつ、整列済み標本を配列の両端へ置いてから未整列部分を分割することで、クイックソートの最悪 O(n²) を避けつつ分割処理を速くする比較ソートである。

PESort は整列済み接頭辞 S を左端に伸ばし、その中央値をピボットに未整列部分 U を分割する。

SPSort も同じ比例拡張の枠組みを使うが、分割に入る直前に S を中央値で左右に分け、左半分 L を左端、右半分 R を右端に置き、中央に U を残す L | U | R の形に整える。 LR はそれぞれピボット未満・以上の要素群として働き、分割ループの番兵(Sentinel)になるため、境界チェックを減らしつつ走査を速くできる。

  1. 接頭辞の初期化: 先頭 1 要素(または少数要素)を整列済み接頭辞 S とみなす。
  2. 比例拡張: |U| > p·|S| のあいだ、SU の先頭 p·|S| 要素をまとめて整列し、結果を新しい S とする。
  3. 対称配置: S の中央値位置で左右に分け、右半分 R を配列右端へ移動して L | U | R にする。
  4. 中央値ピボット: 中央値をピボットとして U を分割する(実装では Lomuto 型や 3 路分割などが用いられる)。
  5. 再帰: 左側(L とピボット未満の部分)・右側(ピボット以上と R)に同じ手順を適用する。十分短い区間は挿入ソートで仕上げる。
procedure symmetry_partition_sort(A, lo, hi, p)
  if hi - lo <= INSERTION_THRESHOLD then
    insertion_sort(A, lo, hi)
    return
  s_end = lo
  while s_end < hi and (hi - s_end) > p * (s_end - lo + 1)
    chunk_end = min(s_end + p * (s_end - lo + 1), hi)
    symmetry_partition_sort(A, lo, chunk_end, p)
    s_end = chunk_end
  median = lo + floor((s_end - lo) / 2)
  r_len = s_end - median
  move A[median + 1 .. s_end] to A[hi - r_len + 1 .. hi]   // L | U | R
  pivot = partition(A, lo, hi, median)
  symmetry_partition_sort(A, lo, pivot - 1, p)
  symmetry_partition_sort(A, pivot + 1, hi, p)

パラメータ p は PESort と同じく「未整列部分 ÷ 整列済み接頭辞」の上限である。p ≈ 16 付近で平均性能がよく、小さくすると最悪ケースが改善しやすい。

Jing-Chao Chen の実装では 適応的な連続区間の検出等値キー領域を持つ分割 も組み合わされ、部分整列済み入力への適応性が高い。

最悪・平均とも O(n log n) 比較、補助領域は再帰スタックで O(log n) とされる。安定ソートではない

以下のデモでは p = 2(小配列でも拡張・対称配置が見えるよう意図的に小さくしている)、小区間閾値 4、分割は Lomuto 型である。

青枠は左端の整列済み標本 L、水色は右端の R、紫は中央値ピボット、オレンジは比較、緑は交換を表す。

比例拡張ソートの記事と並べて読むと、「整列済み標本を左端だけに置くか、両端に置いて番兵にするか」の違いが対比しやすい。

計算時間量および空間計算量を計測する

Size Average time Maximum time Average memory Maximum memory
256 0.000006 0.000375 1662 1668
512 0.000019 0.000136 1666 1672
1024 0.000044 0.000498 1674 1680
2048 0.000099 0.000383 1690 1696
4096 0.000216 0.000486 1722 1728
8192 0.000626 0.003898 1786 1792
16384 0.001399 0.002199 1918 1924
32768 0.003027 0.004453 2177 2184
65536 0.006799 0.013110 2689 2696
131072 0.019849 0.137346 3714 3720
262144 0.045632 0.083470 5762 5768
計測に使用したコードを表示する

set -euo pipefail

WORKDIR="$(mktemp -d)"
trap 'rm -rf "$WORKDIR"' EXIT

cat > "$WORKDIR/Dockerfile" <<'EOF'
FROM rust:1.95.0

WORKDIR /app

RUN mkdir -p src

RUN cat > Cargo.toml <<'CARGO'
[package]
name = "rust-benchmark"
version = "0.1.0"
edition = "2021"

[profile.release]
lto = true
codegen-units = 1
panic = "abort"
CARGO

RUN cat > src/main.rs <<'RUST'
use std::{
    env,
    process::Command,
    time::{Duration, Instant},
};
const MIN_POWER: u32 = 8;
const MAX_POWER: u32 = 18;
const RUNS: usize = 8192;
fn insertion_sort(a: &mut [usize]) {
    for i in 1..a.len() {
        let mut j = i;
        while j > 0 && a[j - 1] > a[j] {
            a.swap(j - 1, j);
            j -= 1;
        }
    }
}
fn partition_at(a: &mut [usize], lo: usize, hi: usize, pivot_idx: usize) -> usize {
    a.swap(pivot_idx, hi);
    let pivot = a[hi];
    let mut i = lo;
    for j in lo..hi {
        if a[j] < pivot {
            a.swap(i, j);
            i += 1;
        }
    }
    a.swap(i, hi);
    i
}
fn sp_sort_range(a: &mut [usize], lo: usize, hi: usize) {
    const P: usize = 16;
    if hi <= lo {
        return;
    }
    if hi - lo < 16 {
        insertion_sort(&mut a[lo..=hi]);
        return;
    }
    let mut s_end = lo;
    while s_end < hi && hi - s_end > P * (s_end - lo + 1) {
        let chunk_end = (s_end + P * (s_end - lo + 1)).min(hi);
        sp_sort_range(a, lo, chunk_end);
        s_end = chunk_end;
    }
    let median = lo + (s_end - lo) / 2;
    let r_len = s_end - median;
    let r_start = median + 1;
    let r_dest = hi - r_len + 1;
    for i in 0..r_len {
        let from = r_start + i;
        let to = r_dest + i;
        if from != to {
            a.swap(from, to);
        }
    }
    let pivot = partition_at(a, lo, hi, median);
    if pivot > 0 {
        sp_sort_range(a, lo, pivot - 1);
    }
    sp_sort_range(a, pivot + 1, hi);
}

fn symmetry_partition_sort(a: &mut [usize]) {
    if let Some(hi) = a.len().checked_sub(1) {
        sp_sort_range(a, 0, hi);
    }
}

fn benchmark_sort(array: &mut [usize]) {

    symmetry_partition_sort(array);

}

fn shuffled(size: usize, seed: u64) -> Vec<usize> {
    let mut v: Vec<usize> = (1..=size).collect();

    let mut state = seed;

    for i in (1..size).rev() {
        state ^= state << 13;
        state ^= state >> 7;
        state ^= state << 17;

        let j = (state as usize) % (i + 1);

        v.swap(i, j);
    }

    v
}

fn memory_usage_kb() -> usize {
    let contents = std::fs::read_to_string("/proc/self/status")
        .unwrap_or_default();

    for line in contents.lines() {
        if let Some(rest) = line.strip_prefix("VmHWM:") {
            let kb = rest
                .split_whitespace()
                .next()
                .unwrap_or("0")
                .parse::<usize>()
                .unwrap_or(0);

            return kb;
        }
    }

    0
}

fn micros(d: Duration) -> u128 {
    d.as_micros()
}

fn run_once(size: usize, seed: usize) -> (u128, usize) {
    let expected: Vec<usize> = (1..=size).collect();
    let mut array = shuffled(size, seed as u64);

    let start = Instant::now();

    benchmark_sort(&mut array);

    let elapsed = start.elapsed();

    if array != expected {
        panic!(
            "sort failed with seed {} for size {}",
            seed,
            size
        );
    }

    (micros(elapsed), memory_usage_kb())
}

fn run_child(args: &[String]) {
    let size = args[2].parse::<usize>().expect("invalid size");
    let seed = args[3].parse::<usize>().expect("invalid seed");
    let (elapsed_us, mem) = run_once(size, seed);
    println!("{} {}", elapsed_us, mem);
}

fn main() {
    let args: Vec<String> = env::args().collect();
    if args.get(1).is_some_and(|arg| arg == "--run-once") {
        run_child(&args);
        return;
    }

    println!(
        "| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
        "Size",
        "Average time",
        "Maximum time",
        "Average memory",
        "Maximum memory"
    );

    println!(
        "|{:-<11}:|{:-<16}:|{:-<16}:|{:-<16}:|{:-<16}:|",
        "",
        "",
        "",
        "",
        ""
    );

    for power in MIN_POWER..=MAX_POWER {
        let size = 1usize << power;

        let mut total_time: u128 = 0;
        let mut max_time: u128 = 0;

        let mut total_mem: usize = 0;
        let mut max_mem: usize = 0;

        for seed in 1..=RUNS {
            let output = Command::new(env::current_exe().expect("failed to find current executable"))
                .arg("--run-once")
                .arg(size.to_string())
                .arg(seed.to_string())
                .output()
                .expect("failed to run benchmark child process");

            if !output.status.success() {
                panic!(
                    "benchmark child process failed: {}",
                    String::from_utf8_lossy(&output.stderr)
                );
            }

            let stdout = String::from_utf8(output.stdout)
                .expect("child process returned non-UTF-8 output");
            let mut fields = stdout.split_whitespace();
            let elapsed_us = fields
                .next()
                .expect("missing elapsed time")
                .parse::<u128>()
                .expect("invalid elapsed time");
            let mem = fields
                .next()
                .expect("missing memory usage")
                .parse::<usize>()
                .expect("invalid memory usage");

            total_time += elapsed_us;

            if elapsed_us > max_time {
                max_time = elapsed_us;
            }

            total_mem += mem;

            if mem > max_mem {
                max_mem = mem;
            }
        }

        let avg_time = total_time / RUNS as u128;
        let avg_mem = total_mem / RUNS;

        println!(
            "| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
            size,
            format!("{}.{:06}", avg_time / 1_000_000, avg_time % 1_000_000),
            format!("{}.{:06}", max_time / 1_000_000, max_time % 1_000_000),
            avg_mem,
            max_mem
        );
    }
}
RUST

RUN cargo build --release

CMD ["./target/release/rust-benchmark"]
EOF

docker build -t rust-benchmark "$WORKDIR"
docker run --rm --init rust-benchmark