ポリフェーズマージソートで配列を並び替える
ポリフェーズマージソートを使用する
ポリフェーズマージソート (PolyPhase Merge Sort) は、複数のテープ(またはファイル)に分散した整列済みランを、フィボナッチ分布に沿って段階的に併合していく外部整列アルゴリズムである。テープ本数が少ない環境でも、各パスでほぼすべてのテープを稼働させ、バランスマージよりパス数を抑えられる場合がある。
本記事のデモとベンチマークでは、3 本の仮想テープを主記憶上のベクタで模擬する。実際の外部整列では置換選択などで初期ランを作り、ラン数がフィボナッチ数でないときはダミーランで分布を調整する。
- 初期ラン生成: 配列を固定長(例: 32 要素)の区間に区切り、各区間を整列してランとする。
- フィボナッチ分布: 3 本テープのうち 1 本を空け、残り 2 本へラン数比を連続するフィボナッチ数(例:
{2, 3},{3, 5})に近づけるよう分配する。 - ポリフェーズ併合: 2 本のソーステープから先頭ランを 1 組ずつ取り出し、空きテープへマージする。
- テープローテーション: 出力テープの役割を循環させ、再び 2 本から 1 本への併合を繰り返す。
- 完了: 全要素が 1 本のテープ上の 1 ランにまとまったら配列へ書き戻す。
procedure create_runs(A, run_size)
split A into chunks of run_size, sort each chunk into a run
procedure distribute_fibonacci(runs, tapes[3])
target = smallest Fibonacci number >= length(runs)
put runs on tape 1 and tape 2 in Fibonacci ratio; tape 0 empty
procedure polyphase_pass(tapes[3])
while tape 1 and tape 2 both have runs
merged = merge(tape1.pop_front(), tape2.pop_front())
tape0.push_back(merged)
rotate tape roles cyclically
procedure polyphase_merge_sort(A)
runs = create_runs(A)
tapes = distribute_fibonacci(runs)
while total run count on all tapes > 1
polyphase_pass(tapes)
copy final run back into A
ラン数が理想的なフィボナッチ分布に一致するとき、3 テープ構成では併合パス数が最小化される。
一般の入力ではダミーラン(空ラン)で分布を補い、テープの待機時間を減らす。時間計算量は通常O(n log n) 程度だが、I/O 待ちとバッファ管理のコストが支配的になる。主記憶シミュレーションではラン用ベクタとマージ用バッファに O(n) の追加空間を要し、マージの比較規則上は安定ソートにできる。
テープドライブが高価だった時代のポリフェーズマージは、限られた I/O チャネルを稼働させ続ける典型例として学ぶ価値がある。
計算時間量および空間計算量を計測する
| Size | Average time | Maximum time | Average memory | Maximum memory |
|---|---|---|---|---|
| 256 | 0.000005 | 0.000084 | 1674 | 1680 |
| 512 | 0.000012 | 0.000105 | 1690 | 1696 |
| 1024 | 0.000025 | 0.000778 | 1714 | 1720 |
| 2048 | 0.000056 | 0.000471 | 1777 | 1784 |
| 4096 | 0.000119 | 0.000597 | 1866 | 1872 |
| 8192 | 0.000285 | 0.000756 | 2047 | 2048 |
| 16384 | 0.000581 | 0.001235 | 2431 | 2432 |
| 32768 | 0.001276 | 0.001561 | 3199 | 3200 |
| 65536 | 0.004290 | 0.019015 | 4680 | 4684 |
| 131072 | 0.009340 | 0.038067 | 7738 | 7744 |
| 262144 | 0.022799 | 0.065756 | 12457 | 12460 |
計測に使用したコードを表示する
set -euo pipefail
WORKDIR="$(mktemp -d)"
trap 'rm -rf "$WORKDIR"' EXIT
cat > "$WORKDIR/Dockerfile" <<'EOF'
FROM rust:1.95.0
WORKDIR /app
RUN mkdir -p src
RUN cat > Cargo.toml <<'CARGO'
[package]
name = "rust-benchmark"
version = "0.1.0"
edition = "2021"
[profile.release]
lto = true
codegen-units = 1
panic = "abort"
CARGO
RUN cat > src/main.rs <<'RUST'
use std::{
env,
process::Command,
time::{Duration, Instant},
};
const MIN_POWER: u32 = 8;
const MAX_POWER: u32 = 18;
const RUNS: usize = 8192;
const NUM_TAPES: usize = 3;
const RUN_SIZE: usize = 32;
fn merge_runs(left: &[usize], right: &[usize]) -> Vec<usize> {
let mut out = Vec::with_capacity(left.len() + right.len());
let (mut i, mut j) = (0, 0);
while i < left.len() && j < right.len() {
if left[i] <= right[j] {
out.push(left[i]);
i += 1;
} else {
out.push(right[j]);
j += 1;
}
}
out.extend_from_slice(&left[i..]);
out.extend_from_slice(&right[j..]);
out
}
fn create_runs(a: &[usize], run_size: usize) -> Vec<Vec<usize>> {
let mut runs = Vec::new();
let mut i = 0;
while i < a.len() {
let end = (i + run_size).min(a.len());
let mut run = a[i..end].to_vec();
run.sort_unstable();
runs.push(run);
i = end;
}
runs
}
fn next_fibonacci_at_least(n: usize) -> (usize, usize) {
let (mut prev, mut curr) = (1usize, 1usize);
while curr < n {
let next = prev + curr;
prev = curr;
curr = next;
}
(prev, curr)
}
fn distribute_fibonacci(runs: Vec<Vec<usize>>) -> [Vec<Vec<usize>>; NUM_TAPES] {
let mut tapes: [Vec<Vec<usize>>; NUM_TAPES] = [vec![], vec![], vec![]];
let n = runs.len();
if n == 0 {
return tapes;
}
if n == 1 {
tapes[1].push(runs.into_iter().next().unwrap());
return tapes;
}
let (fib_prev, fib_target) = next_fibonacci_at_least(n);
let dummies = fib_target - n;
let on_tape2 = fib_prev.saturating_sub(dummies);
let on_tape1 = n - on_tape2;
for (idx, run) in runs.into_iter().enumerate() {
if idx < on_tape1 {
tapes[1].push(run);
} else {
tapes[2].push(run);
}
}
tapes
}
fn count_runs(tapes: &[Vec<Vec<usize>>; NUM_TAPES]) -> usize {
tapes.iter().map(|t| t.len()).sum()
}
fn rotate_tapes(tapes: &mut [Vec<Vec<usize>>; NUM_TAPES]) {
tapes.swap(0, 1);
tapes.swap(1, 2);
}
fn polyphase_pass(tapes: &mut [Vec<Vec<usize>>; NUM_TAPES]) -> bool {
let mut merged = false;
while !tapes[1].is_empty() && !tapes[2].is_empty() {
let left = tapes[1].remove(0);
let right = tapes[2].remove(0);
tapes[0].push(merge_runs(&left, &right));
merged = true;
}
merged
}
fn merge_all_remaining(tapes: &mut [Vec<Vec<usize>>; NUM_TAPES]) -> Vec<usize> {
let mut all: Vec<Vec<usize>> = Vec::new();
for tape in tapes.iter() {
all.extend(tape.iter().cloned());
}
while all.len() > 1 {
let a = all.remove(0);
let b = all.remove(0);
all.push(merge_runs(&a, &b));
}
all.pop().unwrap_or_default()
}
fn polyphase_merge_sort(a: &mut [usize]) {
if a.len() <= 1 {
return;
}
let runs = create_runs(a, RUN_SIZE);
if runs.len() <= 1 {
if let Some(r) = runs.first() {
a.copy_from_slice(r);
}
return;
}
let mut tapes = distribute_fibonacci(runs);
let mut idle = 0usize;
while count_runs(&tapes) > 1 {
if polyphase_pass(&mut tapes) {
rotate_tapes(&mut tapes);
idle = 0;
} else {
idle += 1;
if idle > NUM_TAPES * 4 {
break;
}
rotate_tapes(&mut tapes);
}
}
let result = if count_runs(&tapes) == 1 {
tapes
.iter()
.find_map(|t| t.first().cloned())
.unwrap_or_default()
} else {
merge_all_remaining(&mut tapes)
};
a.copy_from_slice(&result);
}
fn benchmark_sort(array: &mut [usize]) {
polyphase_merge_sort(array);
}
fn shuffled(size: usize, seed: u64) -> Vec<usize> {
let mut v: Vec<usize> = (1..=size).collect();
let mut state = seed;
for i in (1..size).rev() {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
let j = (state as usize) % (i + 1);
v.swap(i, j);
}
v
}
fn memory_usage_kb() -> usize {
let contents = std::fs::read_to_string("/proc/self/status")
.unwrap_or_default();
for line in contents.lines() {
if let Some(rest) = line.strip_prefix("VmHWM:") {
let kb = rest
.split_whitespace()
.next()
.unwrap_or("0")
.parse::<usize>()
.unwrap_or(0);
return kb;
}
}
0
}
fn micros(d: Duration) -> u128 {
d.as_micros()
}
fn run_once(size: usize, seed: usize) -> (u128, usize) {
let expected: Vec<usize> = (1..=size).collect();
let mut array = shuffled(size, seed as u64);
let start = Instant::now();
benchmark_sort(&mut array);
let elapsed = start.elapsed();
if array != expected {
panic!(
"sort failed with seed {} for size {}",
seed,
size
);
}
(micros(elapsed), memory_usage_kb())
}
fn run_child(args: &[String]) {
let size = args[2].parse::<usize>().expect("invalid size");
let seed = args[3].parse::<usize>().expect("invalid seed");
let (elapsed_us, mem) = run_once(size, seed);
println!("{} {}", elapsed_us, mem);
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.get(1).is_some_and(|arg| arg == "--run-once") {
run_child(&args);
return;
}
println!(
"| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
"Size",
"Average time",
"Maximum time",
"Average memory",
"Maximum memory"
);
println!(
"|{:-<11}:|{:-<16}:|{:-<16}:|{:-<16}:|{:-<16}:|",
"",
"",
"",
"",
""
);
for power in MIN_POWER..=MAX_POWER {
let size = 1usize << power;
let mut total_time: u128 = 0;
let mut max_time: u128 = 0;
let mut total_mem: usize = 0;
let mut max_mem: usize = 0;
for seed in 1..=RUNS {
let output = Command::new(env::current_exe().expect("failed to find current executable"))
.arg("--run-once")
.arg(size.to_string())
.arg(seed.to_string())
.output()
.expect("failed to run benchmark child process");
if !output.status.success() {
panic!(
"benchmark child process failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
let stdout = String::from_utf8(output.stdout)
.expect("child process returned non-UTF-8 output");
let mut fields = stdout.split_whitespace();
let elapsed_us = fields
.next()
.expect("missing elapsed time")
.parse::<u128>()
.expect("invalid elapsed time");
let mem = fields
.next()
.expect("missing memory usage")
.parse::<usize>()
.expect("invalid memory usage");
total_time += elapsed_us;
if elapsed_us > max_time {
max_time = elapsed_us;
}
total_mem += mem;
if mem > max_mem {
max_mem = mem;
}
}
let avg_time = total_time / RUNS as u128;
let avg_mem = total_mem / RUNS;
println!(
"| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
size,
format!("{}.{:06}", avg_time / 1_000_000, avg_time % 1_000_000),
format!("{}.{:06}", max_time / 1_000_000, max_time % 1_000_000),
avg_mem,
max_mem
);
}
}
RUST
RUN cargo build --release
CMD ["./target/release/rust-benchmark"]
EOF
docker build -t rust-benchmark "$WORKDIR"
docker run --rm --init rust-benchmark