対称分割ソートで配列を並び替える
対称分割ソートを使用する
対称分割ソート (symmetry partition sort, SPSort) は、比例拡張ソート(PESort)の分割を土台にしつつ、整列済み標本を配列の両端へ置いてから未整列部分を分割することで、クイックソートの最悪 O(n²) を避けつつ分割処理を速くする比較ソートである。
PESort は整列済み接頭辞 S を左端に伸ばし、その中央値をピボットに未整列部分 U を分割する。
SPSort も同じ比例拡張の枠組みを使うが、分割に入る直前に S を中央値で左右に分け、左半分 L を左端、右半分 R を右端に置き、中央に U を残す L | U | R の形に整える。
L と R はそれぞれピボット未満・以上の要素群として働き、分割ループの番兵(Sentinel)になるため、境界チェックを減らしつつ走査を速くできる。
- 接頭辞の初期化: 先頭 1 要素(または少数要素)を整列済み接頭辞
Sとみなす。 - 比例拡張:
|U| > p·|S|のあいだ、SとUの先頭p·|S|要素をまとめて整列し、結果を新しいSとする。 - 対称配置:
Sの中央値位置で左右に分け、右半分Rを配列右端へ移動してL | U | Rにする。 - 中央値ピボット: 中央値をピボットとして
Uを分割する(実装では Lomuto 型や 3 路分割などが用いられる)。 - 再帰: 左側(
Lとピボット未満の部分)・右側(ピボット以上とR)に同じ手順を適用する。十分短い区間は挿入ソートで仕上げる。
procedure symmetry_partition_sort(A, lo, hi, p)
if hi - lo <= INSERTION_THRESHOLD then
insertion_sort(A, lo, hi)
return
s_end = lo
while s_end < hi and (hi - s_end) > p * (s_end - lo + 1)
chunk_end = min(s_end + p * (s_end - lo + 1), hi)
symmetry_partition_sort(A, lo, chunk_end, p)
s_end = chunk_end
median = lo + floor((s_end - lo) / 2)
r_len = s_end - median
move A[median + 1 .. s_end] to A[hi - r_len + 1 .. hi] // L | U | R
pivot = partition(A, lo, hi, median)
symmetry_partition_sort(A, lo, pivot - 1, p)
symmetry_partition_sort(A, pivot + 1, hi, p)
パラメータ p は PESort と同じく「未整列部分 ÷ 整列済み接頭辞」の上限である。p ≈ 16 付近で平均性能がよく、小さくすると最悪ケースが改善しやすい。
Jing-Chao Chen の実装では 適応的な連続区間の検出 や 等値キー領域を持つ分割 も組み合わされ、部分整列済み入力への適応性が高い。
最悪・平均とも O(n log n) 比較、補助領域は再帰スタックで O(log n) とされる。安定ソートではない。
以下のデモでは p = 2(小配列でも拡張・対称配置が見えるよう意図的に小さくしている)、小区間閾値 4、分割は Lomuto 型である。
青枠は左端の整列済み標本 L、水色は右端の R、紫は中央値ピボット、オレンジは比較、緑は交換を表す。
比例拡張ソートの記事と並べて読むと、「整列済み標本を左端だけに置くか、両端に置いて番兵にするか」の違いが対比しやすい。
計算時間量および空間計算量を計測する
| Size | Average time | Maximum time | Average memory | Maximum memory |
|---|---|---|---|---|
| 256 | 0.000006 | 0.000375 | 1662 | 1668 |
| 512 | 0.000019 | 0.000136 | 1666 | 1672 |
| 1024 | 0.000044 | 0.000498 | 1674 | 1680 |
| 2048 | 0.000099 | 0.000383 | 1690 | 1696 |
| 4096 | 0.000216 | 0.000486 | 1722 | 1728 |
| 8192 | 0.000626 | 0.003898 | 1786 | 1792 |
| 16384 | 0.001399 | 0.002199 | 1918 | 1924 |
| 32768 | 0.003027 | 0.004453 | 2177 | 2184 |
| 65536 | 0.006799 | 0.013110 | 2689 | 2696 |
| 131072 | 0.019849 | 0.137346 | 3714 | 3720 |
| 262144 | 0.045632 | 0.083470 | 5762 | 5768 |
計測に使用したコードを表示する
set -euo pipefail
WORKDIR="$(mktemp -d)"
trap 'rm -rf "$WORKDIR"' EXIT
cat > "$WORKDIR/Dockerfile" <<'EOF'
FROM rust:1.95.0
WORKDIR /app
RUN mkdir -p src
RUN cat > Cargo.toml <<'CARGO'
[package]
name = "rust-benchmark"
version = "0.1.0"
edition = "2021"
[profile.release]
lto = true
codegen-units = 1
panic = "abort"
CARGO
RUN cat > src/main.rs <<'RUST'
use std::{
env,
process::Command,
time::{Duration, Instant},
};
const MIN_POWER: u32 = 8;
const MAX_POWER: u32 = 18;
const RUNS: usize = 8192;
fn insertion_sort(a: &mut [usize]) {
for i in 1..a.len() {
let mut j = i;
while j > 0 && a[j - 1] > a[j] {
a.swap(j - 1, j);
j -= 1;
}
}
}
fn partition_at(a: &mut [usize], lo: usize, hi: usize, pivot_idx: usize) -> usize {
a.swap(pivot_idx, hi);
let pivot = a[hi];
let mut i = lo;
for j in lo..hi {
if a[j] < pivot {
a.swap(i, j);
i += 1;
}
}
a.swap(i, hi);
i
}
fn sp_sort_range(a: &mut [usize], lo: usize, hi: usize) {
const P: usize = 16;
if hi <= lo {
return;
}
if hi - lo < 16 {
insertion_sort(&mut a[lo..=hi]);
return;
}
let mut s_end = lo;
while s_end < hi && hi - s_end > P * (s_end - lo + 1) {
let chunk_end = (s_end + P * (s_end - lo + 1)).min(hi);
sp_sort_range(a, lo, chunk_end);
s_end = chunk_end;
}
let median = lo + (s_end - lo) / 2;
let r_len = s_end - median;
let r_start = median + 1;
let r_dest = hi - r_len + 1;
for i in 0..r_len {
let from = r_start + i;
let to = r_dest + i;
if from != to {
a.swap(from, to);
}
}
let pivot = partition_at(a, lo, hi, median);
if pivot > 0 {
sp_sort_range(a, lo, pivot - 1);
}
sp_sort_range(a, pivot + 1, hi);
}
fn symmetry_partition_sort(a: &mut [usize]) {
if let Some(hi) = a.len().checked_sub(1) {
sp_sort_range(a, 0, hi);
}
}
fn benchmark_sort(array: &mut [usize]) {
symmetry_partition_sort(array);
}
fn shuffled(size: usize, seed: u64) -> Vec<usize> {
let mut v: Vec<usize> = (1..=size).collect();
let mut state = seed;
for i in (1..size).rev() {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
let j = (state as usize) % (i + 1);
v.swap(i, j);
}
v
}
fn memory_usage_kb() -> usize {
let contents = std::fs::read_to_string("/proc/self/status")
.unwrap_or_default();
for line in contents.lines() {
if let Some(rest) = line.strip_prefix("VmHWM:") {
let kb = rest
.split_whitespace()
.next()
.unwrap_or("0")
.parse::<usize>()
.unwrap_or(0);
return kb;
}
}
0
}
fn micros(d: Duration) -> u128 {
d.as_micros()
}
fn run_once(size: usize, seed: usize) -> (u128, usize) {
let expected: Vec<usize> = (1..=size).collect();
let mut array = shuffled(size, seed as u64);
let start = Instant::now();
benchmark_sort(&mut array);
let elapsed = start.elapsed();
if array != expected {
panic!(
"sort failed with seed {} for size {}",
seed,
size
);
}
(micros(elapsed), memory_usage_kb())
}
fn run_child(args: &[String]) {
let size = args[2].parse::<usize>().expect("invalid size");
let seed = args[3].parse::<usize>().expect("invalid seed");
let (elapsed_us, mem) = run_once(size, seed);
println!("{} {}", elapsed_us, mem);
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.get(1).is_some_and(|arg| arg == "--run-once") {
run_child(&args);
return;
}
println!(
"| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
"Size",
"Average time",
"Maximum time",
"Average memory",
"Maximum memory"
);
println!(
"|{:-<11}:|{:-<16}:|{:-<16}:|{:-<16}:|{:-<16}:|",
"",
"",
"",
"",
""
);
for power in MIN_POWER..=MAX_POWER {
let size = 1usize << power;
let mut total_time: u128 = 0;
let mut max_time: u128 = 0;
let mut total_mem: usize = 0;
let mut max_mem: usize = 0;
for seed in 1..=RUNS {
let output = Command::new(env::current_exe().expect("failed to find current executable"))
.arg("--run-once")
.arg(size.to_string())
.arg(seed.to_string())
.output()
.expect("failed to run benchmark child process");
if !output.status.success() {
panic!(
"benchmark child process failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
let stdout = String::from_utf8(output.stdout)
.expect("child process returned non-UTF-8 output");
let mut fields = stdout.split_whitespace();
let elapsed_us = fields
.next()
.expect("missing elapsed time")
.parse::<u128>()
.expect("invalid elapsed time");
let mem = fields
.next()
.expect("missing memory usage")
.parse::<usize>()
.expect("invalid memory usage");
total_time += elapsed_us;
if elapsed_us > max_time {
max_time = elapsed_us;
}
total_mem += mem;
if mem > max_mem {
max_mem = mem;
}
}
let avg_time = total_time / RUNS as u128;
let avg_mem = total_mem / RUNS;
println!(
"| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
size,
format!("{}.{:06}", avg_time / 1_000_000, avg_time % 1_000_000),
format!("{}.{:06}", max_time / 1_000_000, max_time % 1_000_000),
avg_mem,
max_mem
);
}
}
RUST
RUN cargo build --release
CMD ["./target/release/rust-benchmark"]
EOF
docker build -t rust-benchmark "$WORKDIR"
docker run --rm --init rust-benchmark