Parallelize repair tests

This commit is contained in:
Christopher Berner 2020-02-02 10:31:12 -08:00
parent f9edd667dc
commit 29131bb4d2
2 changed files with 52 additions and 31 deletions

@ -14,6 +14,7 @@ serde = {version = "1.0.102", features=["std", "derive"]}
criterion = "0.3"
primal = "0.2"
rand = "0.7"
threadpool = "1.7"
[[bench]]
name = "codec_benchmark"

@ -297,6 +297,8 @@ mod codec_tests {
use crate::{Decoder, SourceBlockEncodingPlan};
use rand::seq::SliceRandom;
use rand::Rng;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
#[test]
fn random_erasure_dense() {
@ -423,40 +425,58 @@ mod codec_tests {
}
fn repair(sparse_threshold: u32, max_symbols: usize, progress: bool, pre_plan: bool) {
let symbol_size = 8;
let pool = threadpool::Builder::new().build();
let failed = Arc::new(AtomicU32::new(0));
for symbol_count in 1..=max_symbols {
let elements = symbol_size * symbol_count;
let mut data: Vec<u8> = vec![0; elements];
for i in 0..elements {
data[i] = rand::thread_rng().gen();
}
if progress && symbol_count % 100 == 0 {
println!("[repair] Completed {} symbols", symbol_count)
}
let encoder = if pre_plan {
let plan = SourceBlockEncodingPlan::generate(symbol_count as u16);
SourceBlockEncoder::with_encoding_plan(1, 8, &data, &plan)
} else {
SourceBlockEncoder::new(1, 8, &data)
};
let mut decoder = SourceBlockDecoder::new(1, 8, elements as u64);
decoder.set_sparse_threshold(sparse_threshold);
let mut result = None;
let mut parsed_packets = 0;
// This test can theoretically fail with ~1/256^5 probability
for packet in encoder.repair_packets(0, (elements / symbol_size + 4) as u32) {
if parsed_packets < elements / 8 {
assert_eq!(result, None);
let failed = failed.clone();
pool.execute(move || {
if failed.load(Ordering::SeqCst) != 0 {
return;
}
let success = do_repair(symbol_count, sparse_threshold, pre_plan);
if !success {
failed.store(symbol_count as u32, Ordering::SeqCst);
}
result = decoder.decode(vec![packet]);
parsed_packets += 1;
}
assert_eq!(result.unwrap(), data);
if progress && symbol_count % 100 == 0 {
println!("[repair] Completed {} symbols", symbol_count)
}
})
}
pool.join();
assert_eq!(0, failed.load(Ordering::SeqCst));
}
fn do_repair(symbol_count: usize, sparse_threshold: u32, pre_plan: bool) -> bool {
let symbol_size = 8;
let elements = symbol_size * symbol_count;
let mut data: Vec<u8> = vec![0; elements];
for i in 0..elements {
data[i] = rand::thread_rng().gen();
}
let encoder = if pre_plan {
let plan = SourceBlockEncodingPlan::generate(symbol_count as u16);
SourceBlockEncoder::with_encoding_plan(1, 8, &data, &plan)
} else {
SourceBlockEncoder::new(1, 8, &data)
};
let mut decoder = SourceBlockDecoder::new(1, 8, elements as u64);
decoder.set_sparse_threshold(sparse_threshold);
let mut result = None;
let mut parsed_packets = 0;
// This test can theoretically fail with ~1/256^5 probability
for packet in encoder.repair_packets(0, (elements / symbol_size + 4) as u32) {
if parsed_packets < elements / symbol_size && result.is_some() {
return false;
}
result = decoder.decode(vec![packet]);
parsed_packets += 1;
}
return result.unwrap() == data;
}
}