デカルト木ソートを使用する

デカルト木ソート (cartesian tree sort) は、配列からデカルト木 (Cartesian tree) を一度だけ構築し、根(部分木の最小値)を先頭に取り出しつつ左右部分木の結果をマージ (merge) して昇順の並びを得る整列である。

デカルト木は次の2条件を同時に満たす二分木である。

  1. 中順が元の配列順: 左部分木 → 根 → 右部分木の順に走査すると、元の配列の左から右への並び(添字の昇順)になる。
  2. ヒープ性: 最小デカルト木では、各親の値は両方の子以下である(最大デカルト木では逆)。

入力配列が決まれば、値がすべて異なるときデカルト木の形は一意に定まる。整列では最小デカルト木を想定する。

素朴に「根を選んで再帰的に部分木を作る」と O(n²) になりうるが、単調スタック (monotonic stack) を使えば各添字はスタックに高々1回入り1回出るため、全体で O(n) 時間に木を構築できる。

添字 i を左から処理するときの典型形は次のとおりである。

  1. スタック上端より大きい値の添字を、値が A[i] 以下になるまで取り出す(最後に取り出した添字を last とする)。
  2. スタックが空でなければ、A[i] はスタック上端の右の子になる。
  3. last があれば、その添字は i の左の子になる。
  4. i をスタックに追加する。
procedure build_cartesian_tree(A)
  stack = empty stack of indices
  for i from 0 to length(A) - 1
    last = null
    while stack not empty and A[stack.top] > A[i]
      last = stack.pop()
    if stack not empty
      right[stack.top] = i
    if last not null
      left[i] = last
    stack.push(i)
  return tree encoded by left[], right[], and stack[0] as root

構築後、各ノードについて根のキーを出力し、左右部分木から得た すでに昇順の列 をマージする再帰的取出しで整列する。左右部分木は元配列の連続部分区間に対応するため、再帰の各段階で部分列は昇順に保たれる。

procedure merge(L, R)
  // 2 つの昇順列を先頭から比較しながら連結する

procedure extract_sorted(node)
  if node is null
    return empty list
  left = extract_sorted(left[node])
  right = extract_sorted(right[node])
  return [A[node]] followed by merge(left, right)

素朴なマージを重ねると O(n log n) 時間になるが、スタック構築と組み合わせた技法では O(n) まで落とせる(ここでは取出しの意味論だけ示す)。

比較ソートの Ω(n log n) 下界 とは一見矛盾するように見えるが、デカルト木は「値の大小」だけでなく 元配列の添字順(中順が元の並びになる制約) も構造に使う。位置情報付きの入力に対する 線形時間の木構築 であり、比較だけで順序を決め打ちする一般の比較ソートモデルとは前提が異なる。

安定性は等値キーの左右の付け方に依存し、一般的な実装ではスタック上の添字の順序で決まるため、不安定 である。

クイックソートの分割木や、二分木ソートの「挿入順で形が変わる木」とも対比しやすい。範囲最小クエリ (RMQ) や最長増加部分列 (LIS) など、同じスタック構造が別問題でも登場する。

以下のデモでは、同値の棒が入れ替わらないよう、ヒープ比較を 値が異なれば値、等しければ元の位置 id の辞書式順にしている(可視化上の安定化であり、一般のデカルト木ソートの性質を変えるものではない)。

  1. 構築: 左から添字を処理し、スタックとの比較(オレンジ)とリンク付け(紫)を示す。木自体は内部データとして保持する。
  2. 取出し: 根優先の再帰取出し+マージで得た昇順に、配列上の棒を スワップ(緑) で並べ替えて視覚化する(実装では取出し結果を別バッファへ書くだけでよい)。

二分木ソートのように 平衡木への挿入 を繰り返すのではなく、配列と添字順から 一度で デカルト木が定まる点が特徴である。

計算時間量および空間計算量を計測する

Size Average time Maximum time Average memory Maximum memory
256 0.000017 0.000588 1691 1708
512 0.000034 0.000408 1719 1740
1024 0.000068 0.000581 1770 1792
2048 0.000141 0.000546 1863 1892
4096 0.000288 0.000746 2036 2084
8192 0.000601 0.001540 2175 2304
16384 0.001261 0.001782 2835 3072
32768 0.002794 0.148427 4130 4480
65536 0.005823 0.016330 6594 7156
131072 0.011940 0.017344 11486 12660
262144 0.024275 0.042929 21272 23792
計測に使用したコードを表示する

set -euo pipefail

WORKDIR="$(mktemp -d)"
trap 'rm -rf "$WORKDIR"' EXIT

cat > "$WORKDIR/Dockerfile" <<'EOF'
FROM rust:1.95.0

WORKDIR /app

RUN mkdir -p src

RUN cat > Cargo.toml <<'CARGO'
[package]
name = "rust-benchmark"
version = "0.1.0"
edition = "2021"

[profile.release]
lto = true
codegen-units = 1
panic = "abort"
CARGO

RUN cat > src/main.rs <<'RUST'
use std::{
    env,
    process::Command,
    time::{Duration, Instant},
};
const MIN_POWER: u32 = 8;
const MAX_POWER: u32 = 18;
const RUNS: usize = 8192;
fn merge_values(left: &[usize], right: &[usize]) -> Vec<usize> {
    let mut out = Vec::with_capacity(left.len() + right.len());
    let (mut l, mut r) = (0, 0);
    while l < left.len() && r < right.len() {
        if left[l] <= right[r] {
            out.push(left[l]);
            l += 1;
        } else {
            out.push(right[r]);
            r += 1;
        }
    }
    out.extend_from_slice(&left[l..]);
    out.extend_from_slice(&right[r..]);
    out
}
fn cartesian_tree_sort(a: &mut [usize]) {
    let n = a.len();
    if n <= 1 {
        return;
    }
    let mut left = vec![None; n];
    let mut right = vec![None; n];
    let mut stack = Vec::new();
    for i in 0..n {
        let mut last = None;
        while stack.last().is_some_and(|&top| a[top] > a[i]) {
            last = stack.pop();
        }
        if let Some(&top) = stack.last() {
            right[top] = Some(i);
        }
        if let Some(last_idx) = last {
            left[i] = Some(last_idx);
        }
        stack.push(i);
    }
    fn extract(
        node: Option<usize>,
        a: &[usize],
        left: &[Option<usize>],
        right: &[Option<usize>],
    ) -> Vec<usize> {
        if let Some(i) = node {
            let l = extract(left[i], a, left, right);
            let r = extract(right[i], a, left, right);
            let merged = merge_values(&l, &r);
            let mut out = Vec::with_capacity(merged.len() + 1);
            out.push(a[i]);
            out.extend(merged);
            out
        } else {
            Vec::new()
        }
    }
    let root = stack.first().copied();
    let out = extract(root, a, &left, &right);
    a.copy_from_slice(&out);
}

fn benchmark_sort(array: &mut [usize]) {

    cartesian_tree_sort(array);

}

fn shuffled(size: usize, seed: u64) -> Vec<usize> {
    let mut v: Vec<usize> = (1..=size).collect();

    let mut state = seed;

    for i in (1..size).rev() {
        state ^= state << 13;
        state ^= state >> 7;
        state ^= state << 17;

        let j = (state as usize) % (i + 1);

        v.swap(i, j);
    }

    v
}

fn memory_usage_kb() -> usize {
    let contents = std::fs::read_to_string("/proc/self/status")
        .unwrap_or_default();

    for line in contents.lines() {
        if let Some(rest) = line.strip_prefix("VmHWM:") {
            let kb = rest
                .split_whitespace()
                .next()
                .unwrap_or("0")
                .parse::<usize>()
                .unwrap_or(0);

            return kb;
        }
    }

    0
}

fn micros(d: Duration) -> u128 {
    d.as_micros()
}

fn run_once(size: usize, seed: usize) -> (u128, usize) {
    let expected: Vec<usize> = (1..=size).collect();
    let mut array = shuffled(size, seed as u64);

    let start = Instant::now();

    benchmark_sort(&mut array);

    let elapsed = start.elapsed();

    if array != expected {
        panic!(
            "sort failed with seed {} for size {}",
            seed,
            size
        );
    }

    (micros(elapsed), memory_usage_kb())
}

fn run_child(args: &[String]) {
    let size = args[2].parse::<usize>().expect("invalid size");
    let seed = args[3].parse::<usize>().expect("invalid seed");
    let (elapsed_us, mem) = run_once(size, seed);
    println!("{} {}", elapsed_us, mem);
}

fn main() {
    let args: Vec<String> = env::args().collect();
    if args.get(1).is_some_and(|arg| arg == "--run-once") {
        run_child(&args);
        return;
    }

    println!(
        "| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
        "Size",
        "Average time",
        "Maximum time",
        "Average memory",
        "Maximum memory"
    );

    println!(
        "|{:-<11}:|{:-<16}:|{:-<16}:|{:-<16}:|{:-<16}:|",
        "",
        "",
        "",
        "",
        ""
    );

    for power in MIN_POWER..=MAX_POWER {
        let size = 1usize << power;

        let mut total_time: u128 = 0;
        let mut max_time: u128 = 0;

        let mut total_mem: usize = 0;
        let mut max_mem: usize = 0;

        for seed in 1..=RUNS {
            let output = Command::new(env::current_exe().expect("failed to find current executable"))
                .arg("--run-once")
                .arg(size.to_string())
                .arg(seed.to_string())
                .output()
                .expect("failed to run benchmark child process");

            if !output.status.success() {
                panic!(
                    "benchmark child process failed: {}",
                    String::from_utf8_lossy(&output.stderr)
                );
            }

            let stdout = String::from_utf8(output.stdout)
                .expect("child process returned non-UTF-8 output");
            let mut fields = stdout.split_whitespace();
            let elapsed_us = fields
                .next()
                .expect("missing elapsed time")
                .parse::<u128>()
                .expect("invalid elapsed time");
            let mem = fields
                .next()
                .expect("missing memory usage")
                .parse::<usize>()
                .expect("invalid memory usage");

            total_time += elapsed_us;

            if elapsed_us > max_time {
                max_time = elapsed_us;
            }

            total_mem += mem;

            if mem > max_mem {
                max_mem = mem;
            }
        }

        let avg_time = total_time / RUNS as u128;
        let avg_mem = total_mem / RUNS;

        println!(
            "| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
            size,
            format!("{}.{:06}", avg_time / 1_000_000, avg_time % 1_000_000),
            format!("{}.{:06}", max_time / 1_000_000, max_time % 1_000_000),
            avg_mem,
            max_mem
        );
    }
}
RUST

RUN cargo build --release

CMD ["./target/release/rust-benchmark"]
EOF

docker build -t rust-benchmark "$WORKDIR"
docker run --rm --init rust-benchmark