デカルト木ソートで配列を並び替える
デカルト木ソートを使用する
デカルト木ソート (cartesian tree sort) は、配列からデカルト木 (Cartesian tree) を一度だけ構築し、根(部分木の最小値)を先頭に取り出しつつ左右部分木の結果をマージ (merge) して昇順の並びを得る整列である。
デカルト木は次の2条件を同時に満たす二分木である。
- 中順が元の配列順: 左部分木 → 根 → 右部分木の順に走査すると、元の配列の左から右への並び(添字の昇順)になる。
- ヒープ性: 最小デカルト木では、各親の値は両方の子以下である(最大デカルト木では逆)。
入力配列が決まれば、値がすべて異なるときデカルト木の形は一意に定まる。整列では最小デカルト木を想定する。
素朴に「根を選んで再帰的に部分木を作る」と O(n²) になりうるが、単調スタック (monotonic stack) を使えば各添字はスタックに高々1回入り1回出るため、全体で O(n) 時間に木を構築できる。
添字 i を左から処理するときの典型形は次のとおりである。
- スタック上端より大きい値の添字を、値が
A[i]以下になるまで取り出す(最後に取り出した添字をlastとする)。 - スタックが空でなければ、
A[i]はスタック上端の右の子になる。 lastがあれば、その添字はiの左の子になる。iをスタックに追加する。
procedure build_cartesian_tree(A)
stack = empty stack of indices
for i from 0 to length(A) - 1
last = null
while stack not empty and A[stack.top] > A[i]
last = stack.pop()
if stack not empty
right[stack.top] = i
if last not null
left[i] = last
stack.push(i)
return tree encoded by left[], right[], and stack[0] as root
構築後、各ノードについて根のキーを出力し、左右部分木から得たすでに昇順の列をマージする再帰的取出しで整列する。左右部分木は元配列の連続部分区間に対応するため、再帰の各段階で部分列は昇順に保たれる。
procedure merge(L, R)
// 2 つの昇順列を先頭から比較しながら連結する
procedure extract_sorted(node)
if node is null
return empty list
left = extract_sorted(left[node])
right = extract_sorted(right[node])
return [A[node]] followed by merge(left, right)
素朴なマージを重ねると O(n log n) 時間になるが、スタック構築と組み合わせた技法では O(n) まで落とせる。
比較ソートの Ω(n log n) 下界とは一見矛盾するように見えるが、デカルト木は「値の大小」だけでなく元配列の添字順(中順が元の並びになる制約)も構造に使う。位置情報付きの入力に対する線形時間の木構築であり、比較だけで順序を決め打ちする一般の比較ソートモデルとは前提が異なる。
安定性は等値キーの左右の付け方に依存し、一般的な実装ではスタック上の添字の順序で決まるため不安定である。
クイックソートの分割木や、二分木ソートの「挿入順で形が変わる木」とも対比しやすい。範囲最小クエリ (RMQ) や最長増加部分列 (LIS) など、同じスタック構造が別問題でも登場する。
以下のデモでは、同値の棒が入れ替わらないよう、ヒープ比較を値が異なれば値、等しければ元の位置 id の辞書式順にしている(可視化上の安定化であり、一般のデカルト木ソートの性質を変えるものではない)。
- 構築: 左から添字を処理し、スタックとの比較(オレンジ)とリンク付け(紫)を示す。木自体は内部データとして保持する。
- 取出し: 根優先の再帰取出し+マージで得た昇順に、配列上の棒をスワップ(緑)で並べ替えて視覚化する(実装では取出し結果を別バッファへ書くだけでよい)。
二分木ソートのように平衡木への挿入を繰り返すのではなく、配列と添字順から一度でデカルト木が定まる点が特徴である。
計算時間量および空間計算量を計測する
| Size | Average time | Maximum time | Average memory | Maximum memory |
|---|---|---|---|---|
| 256 | 0.000017 | 0.000588 | 1691 | 1708 |
| 512 | 0.000034 | 0.000408 | 1719 | 1740 |
| 1024 | 0.000068 | 0.000581 | 1770 | 1792 |
| 2048 | 0.000141 | 0.000546 | 1863 | 1892 |
| 4096 | 0.000288 | 0.000746 | 2036 | 2084 |
| 8192 | 0.000601 | 0.001540 | 2175 | 2304 |
| 16384 | 0.001261 | 0.001782 | 2835 | 3072 |
| 32768 | 0.002794 | 0.148427 | 4130 | 4480 |
| 65536 | 0.005823 | 0.016330 | 6594 | 7156 |
| 131072 | 0.011940 | 0.017344 | 11486 | 12660 |
| 262144 | 0.024275 | 0.042929 | 21272 | 23792 |
計測に使用したコードを表示する
set -euo pipefail
WORKDIR="$(mktemp -d)"
trap 'rm -rf "$WORKDIR"' EXIT
cat > "$WORKDIR/Dockerfile" <<'EOF'
FROM rust:1.95.0
WORKDIR /app
RUN mkdir -p src
RUN cat > Cargo.toml <<'CARGO'
[package]
name = "rust-benchmark"
version = "0.1.0"
edition = "2021"
[profile.release]
lto = true
codegen-units = 1
panic = "abort"
CARGO
RUN cat > src/main.rs <<'RUST'
use std::{
env,
process::Command,
time::{Duration, Instant},
};
const MIN_POWER: u32 = 8;
const MAX_POWER: u32 = 18;
const RUNS: usize = 8192;
fn merge_values(left: &[usize], right: &[usize]) -> Vec<usize> {
let mut out = Vec::with_capacity(left.len() + right.len());
let (mut l, mut r) = (0, 0);
while l < left.len() && r < right.len() {
if left[l] <= right[r] {
out.push(left[l]);
l += 1;
} else {
out.push(right[r]);
r += 1;
}
}
out.extend_from_slice(&left[l..]);
out.extend_from_slice(&right[r..]);
out
}
fn cartesian_tree_sort(a: &mut [usize]) {
let n = a.len();
if n <= 1 {
return;
}
let mut left = vec![None; n];
let mut right = vec![None; n];
let mut stack = Vec::new();
for i in 0..n {
let mut last = None;
while stack.last().is_some_and(|&top| a[top] > a[i]) {
last = stack.pop();
}
if let Some(&top) = stack.last() {
right[top] = Some(i);
}
if let Some(last_idx) = last {
left[i] = Some(last_idx);
}
stack.push(i);
}
fn extract(
node: Option<usize>,
a: &[usize],
left: &[Option<usize>],
right: &[Option<usize>],
) -> Vec<usize> {
if let Some(i) = node {
let l = extract(left[i], a, left, right);
let r = extract(right[i], a, left, right);
let merged = merge_values(&l, &r);
let mut out = Vec::with_capacity(merged.len() + 1);
out.push(a[i]);
out.extend(merged);
out
} else {
Vec::new()
}
}
let root = stack.first().copied();
let out = extract(root, a, &left, &right);
a.copy_from_slice(&out);
}
fn benchmark_sort(array: &mut [usize]) {
cartesian_tree_sort(array);
}
fn shuffled(size: usize, seed: u64) -> Vec<usize> {
let mut v: Vec<usize> = (1..=size).collect();
let mut state = seed;
for i in (1..size).rev() {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
let j = (state as usize) % (i + 1);
v.swap(i, j);
}
v
}
fn memory_usage_kb() -> usize {
let contents = std::fs::read_to_string("/proc/self/status")
.unwrap_or_default();
for line in contents.lines() {
if let Some(rest) = line.strip_prefix("VmHWM:") {
let kb = rest
.split_whitespace()
.next()
.unwrap_or("0")
.parse::<usize>()
.unwrap_or(0);
return kb;
}
}
0
}
fn micros(d: Duration) -> u128 {
d.as_micros()
}
fn run_once(size: usize, seed: usize) -> (u128, usize) {
let expected: Vec<usize> = (1..=size).collect();
let mut array = shuffled(size, seed as u64);
let start = Instant::now();
benchmark_sort(&mut array);
let elapsed = start.elapsed();
if array != expected {
panic!(
"sort failed with seed {} for size {}",
seed,
size
);
}
(micros(elapsed), memory_usage_kb())
}
fn run_child(args: &[String]) {
let size = args[2].parse::<usize>().expect("invalid size");
let seed = args[3].parse::<usize>().expect("invalid seed");
let (elapsed_us, mem) = run_once(size, seed);
println!("{} {}", elapsed_us, mem);
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.get(1).is_some_and(|arg| arg == "--run-once") {
run_child(&args);
return;
}
println!(
"| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
"Size",
"Average time",
"Maximum time",
"Average memory",
"Maximum memory"
);
println!(
"|{:-<11}:|{:-<16}:|{:-<16}:|{:-<16}:|{:-<16}:|",
"",
"",
"",
"",
""
);
for power in MIN_POWER..=MAX_POWER {
let size = 1usize << power;
let mut total_time: u128 = 0;
let mut max_time: u128 = 0;
let mut total_mem: usize = 0;
let mut max_mem: usize = 0;
for seed in 1..=RUNS {
let output = Command::new(env::current_exe().expect("failed to find current executable"))
.arg("--run-once")
.arg(size.to_string())
.arg(seed.to_string())
.output()
.expect("failed to run benchmark child process");
if !output.status.success() {
panic!(
"benchmark child process failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
let stdout = String::from_utf8(output.stdout)
.expect("child process returned non-UTF-8 output");
let mut fields = stdout.split_whitespace();
let elapsed_us = fields
.next()
.expect("missing elapsed time")
.parse::<u128>()
.expect("invalid elapsed time");
let mem = fields
.next()
.expect("missing memory usage")
.parse::<usize>()
.expect("invalid memory usage");
total_time += elapsed_us;
if elapsed_us > max_time {
max_time = elapsed_us;
}
total_mem += mem;
if mem > max_mem {
max_mem = mem;
}
}
let avg_time = total_time / RUNS as u128;
let avg_mem = total_mem / RUNS;
println!(
"| {:>10} | {:>15} | {:>15} | {:>15} | {:>15} |",
size,
format!("{}.{:06}", avg_time / 1_000_000, avg_time % 1_000_000),
format!("{}.{:06}", max_time / 1_000_000, max_time % 1_000_000),
avg_mem,
max_mem
);
}
}
RUST
RUN cargo build --release
CMD ["./target/release/rust-benchmark"]
EOF
docker build -t rust-benchmark "$WORKDIR"
docker run --rm --init rust-benchmark