アンシャッフルソートを使用する

アンシャッフルソート (unshuffle sort) は整列済みの山札をいくつかの部分山に切って混ぜ直した状態を「元に戻す」イメージから名付けられた。整列済みに近い入力や、すでに単調な部分列が多いデータで効率が出やすい。

  1. 分布フェーズ: 入力を左から走査し、各値 x について既存のパイル(両端にだけ追加できる整列済み両端キュー)を左から試す。先頭要素 h に対し x <= h なら先頭へ、末尾要素 t に対し x >= t なら末尾へ載せる。どのパイルにも載せられなければ右端に新しいパイルを増やす。
  2. マージフェーズ: 分布でできた複数のパイルを理想マージと呼ばれる逐次マージで 1 本の昇順列へ統合する。各パイル内部は先頭から末尾へ非減少に保たれる。
procedure unshuffle_distribute(elements)
  piles = sequence of piles, initially empty deques
  for each x in elements then
    placed = false
    for each pile p in piles left to right then
      if x <= front(p) then
        push_front x onto p
        placed = true
        break
      else if x >= back(p) then
        push_back x onto p
        placed = true
        break
    if not placed then
      piles.append(deque containing only x)

procedure unshuffle_merge(piles)
  result = contents of piles[0]
  for i from 1 to length(piles) - 1 then
    result = merge_sorted(result, contents of piles[i])
  return result

要素の交換や中間挿入を避け、リンクの付け替えで済むため、もともとは単方向連結リスト向けに設計された。配列上では各パイルを両端キューとして模倣する実装が一般的である。

整列済み入力ではパイルが 1 本にまとまり分布は O(n)、マージも不要に近づく。逆順に近い入力でも先頭への連続載せで 1 パイルに収まりやすい。最悪は大小が交互に現れるような並びで、パイル数が O(n) となり分布だけで O(n²) に達する。

理想マージにより逐次 2 列マージを最適化するが、各パイルが内部で昇順であるため、デモでは各パイル先頭の最小を繰り返し取る k 路マージでも同じ結果が得られる。

ペイシェンスソートとの違い

ペイシェンスソート もパイルへ載せてから統合するが、載せ方とマージの前提が異なる。

観点 アンシャッフルソート ペイシェンスソート
載せ方 先頭または末尾(両端キュー) 山の一番上だけ
載せる条件 x <= 先頭 または x >= 末尾 一番上の値が x より 厳密に大きい
パイル内の順序 先頭から末尾へ非減少 下から上へ厳密増加
マージ 理想マージ(逐次 2 列) 各山の一番上の最小を繰り返し取得
向いている入力 整列済み・逆順・部分整列 増加部分列が短い乱数

ペイシェンスソートは山の個数が LIS 長に対応することが知られている。アンシャッフルソートはエントロピー(パイル数)に比例した O(kN) 型の振る舞いを目指すが、ランダム入力では k が大きくなり、比較回数は O(n log n) 程度に近づく。

計算時間量および空間計算量を計測する

Size Average time Maximum time Average memory Maximum memory
256 0.000011 0.000046 1694 1704
512 0.000021 0.000079 1707 1716
1024 0.000045 0.000132 1737 1748
2048 0.000100 0.000235 1789 1800
4096 0.000252 0.000671 1901 1916
8192 0.000740 0.001788 2043 2060
16384 0.002407 0.003718 2276 2404
32768 0.007125 0.024242 3090 3328
65536 0.021105 0.092899 4602 4808
131072 0.062542 0.137245 7260 8328
262144 0.192843 0.524597 13557 14044
計測に使用したコードを表示する

set -euo pipefail

WORKDIR="$(mktemp -d)"
trap 'rm -rf "$WORKDIR"' EXIT

cat > "$WORKDIR/Dockerfile" <<'EOF'
FROM rust:1.95.0

WORKDIR /app

RUN mkdir -p src

RUN cat > Cargo.toml <<'CARGO'
[package]
name = "rust-benchmark"
version = "0.1.0"
edition = "2021"

[profile.release]
lto = true
codegen-units = 1
panic = "abort"
CARGO

RUN cat > src/main.rs <<'RUST'
use std::{
    env,
    process::Command,
    time::{Duration, Instant},
};
const MIN_POWER: u32 = 8;
const MAX_POWER: u32 = 18;
const RUNS: usize = 8192;


fn merge_two_sorted(left: &[usize], right: &[usize]) -> Vec<usize> {
    let mut out = Vec::with_capacity(left.len() + right.len());
    let mut i = 0;
    let mut j = 0;
    while i < left.len() && j < right.len() {
        if left[i] <= right[j] {
            out.push(left[i]);
            i += 1;
        } else {
            out.push(right[j]);
            j += 1;
        }
    }
    out.extend_from_slice(&left[i..]);
    out.extend_from_slice(&right[j..]);
    out
}

fn unshuffle_sort(a: &mut [usize]) {
    let mut piles: Vec<Vec<usize>> = Vec::new();

    for &x in a.iter() {
        let mut placed = false;
        for pile in piles.iter_mut() {
            let front = pile[0];
            let back = *pile.last().unwrap();
            if x <= front {
                pile.insert(0, x);
                placed = true;
                break;
            } else if x >= back {
                pile.push(x);
                placed = true;
                break;
            }
        }
        if !placed {
            piles.push(vec![x]);
        }
    }

    if piles.is_empty() {
        return;
    }

    let mut merged = piles[0].clone();
    for pile in piles.into_iter().skip(1) {
        merged = merge_two_sorted(&merged, &pile);
    }

    a.copy_from_slice(&merged);
}


fn benchmark_sort(array: &mut [usize]) {

    unshuffle_sort(array);

}

fn shuffled(size: usize, seed: u64) -> Vec<usize> {
    let mut v: Vec<usize> = (1..=size).collect();

    let mut state = seed;

    for i in (1..size).rev() {
        state ^= state << 13;
        state ^= state >> 7;
        state ^= state << 17;

        let j = (state as usize) % (i + 1);

        v.swap(i, j);
    }

    v
}

fn memory_usage_kb() -> usize {
    let contents = std::fs::read_to_string("/proc/self/status")
        .unwrap_or_default();

    for line in contents.lines() {
        if let Some(rest) = line.strip_prefix("VmHWM:") {
            let kb = rest
                .split_whitespace()
                .next()
                .unwrap_or("0")
                .parse::<usize>()
                .unwrap_or(0);

            return kb;
        }
    }

    0
}

fn micros(d: Duration) -> u128 {
    d.as_micros()
}

fn run_once(size: usize, seed: usize) -> (u128, usize) {
    let expected: Vec<usize> = (1..=size).collect();
    let mut array = shuffled(size, seed as u64);

    let start = Instant::now();

    benchmark_sort(&mut array);

    let elapsed = start.elapsed();

    if array != expected {
        panic!(
            "sort failed with seed {} for size {}",
            seed,
            size
        );
    }

    (micros(elapsed), memory_usage_kb())
}

fn run_child(args: &[String]) {
    let size = args[2].parse::<usize>().expect("invalid size");
    let seed = args[3].parse::<usize>().expect("invalid seed");
    let (elapsed_us, mem) = run_once(size, seed);
    println!("{} {}", elapsed_us, mem);
}

fn main() {
    let args: Vec<String> = env::args().collect();
    if args.get(1).is_some_and(|arg| arg == "--run-once") {
        run_child(&args);
        return;
    }

    println!(
        "| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
        "Size",
        "Average time",
        "Maximum time",
        "Average memory",
        "Maximum memory"
    );

    println!(
        "|{:-<11}:|{:-<16}:|{:-<16}:|{:-<16}:|{:-<16}:|",
        "",
        "",
        "",
        "",
        ""
    );

    for power in MIN_POWER..=MAX_POWER {
        let size = 1usize << power;

        let mut total_time: u128 = 0;
        let mut max_time: u128 = 0;

        let mut total_mem: usize = 0;
        let mut max_mem: usize = 0;

        for seed in 1..=RUNS {
            let output = Command::new(env::current_exe().expect("failed to find current executable"))
                .arg("--run-once")
                .arg(size.to_string())
                .arg(seed.to_string())
                .output()
                .expect("failed to run benchmark child process");

            if !output.status.success() {
                panic!(
                    "benchmark child process failed: {}",
                    String::from_utf8_lossy(&output.stderr)
                );
            }

            let stdout = String::from_utf8(output.stdout)
                .expect("child process returned non-UTF-8 output");
            let mut fields = stdout.split_whitespace();
            let elapsed_us = fields
                .next()
                .expect("missing elapsed time")
                .parse::<u128>()
                .expect("invalid elapsed time");
            let mem = fields
                .next()
                .expect("missing memory usage")
                .parse::<usize>()
                .expect("invalid memory usage");

            total_time += elapsed_us;

            if elapsed_us > max_time {
                max_time = elapsed_us;
            }

            total_mem += mem;

            if mem > max_mem {
                max_mem = mem;
            }
        }

        let avg_time = total_time / RUNS as u128;
        let avg_mem = total_mem / RUNS;

        println!(
            "| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
            size,
            format!("{}.{:06}", avg_time / 1_000_000, avg_time % 1_000_000),
            format!("{}.{:06}", max_time / 1_000_000, max_time % 1_000_000),
            avg_mem,
            max_mem
        );
    }
}
RUST

RUN cargo build --release

CMD ["./target/release/rust-benchmark"]
EOF

docker build -t rust-benchmark "$WORKDIR"
docker run --rm --init rust-benchmark