Replace SourceBlockEncoderCache with SourceBlockEncodingPlan

This commit is contained in:
Christopher Berner 2020-01-25 21:49:45 -08:00
parent 04786d26fd
commit ec54f3c838
6 changed files with 122 additions and 129 deletions

@ -77,7 +77,7 @@ fn criterion_benchmark(c: &mut Criterion) {
"encode 10KB",
Benchmark::new("", move |b| {
b.iter(|| {
let encoder = SourceBlockEncoder::new(1, symbol_size, &encode_data, None);
let encoder = SourceBlockEncoder::new(1, symbol_size, &encode_data);
return encoder.source_packets();
})
})
@ -89,7 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) {
"roundtrip 10KB",
Benchmark::new("", move |b| {
b.iter(|| {
let encoder = SourceBlockEncoder::new(1, symbol_size, &roundtrip_data, None);
let encoder = SourceBlockEncoder::new(1, symbol_size, &roundtrip_data);
let mut decoder = SourceBlockDecoder::new(1, symbol_size, elements as u64);
return decoder.decode(encoder.source_packets());
})
@ -102,7 +102,7 @@ fn criterion_benchmark(c: &mut Criterion) {
"roundtrip repair 10KB",
Benchmark::new("", move |b| {
b.iter(|| {
let encoder = SourceBlockEncoder::new(1, symbol_size, &repair_data, None);
let encoder = SourceBlockEncoder::new(1, symbol_size, &repair_data);
let repair_packets = (elements / symbol_size as usize) as u32;
let mut decoder = SourceBlockDecoder::new(1, symbol_size, elements as u64);
return decoder.decode(encoder.repair_packets(0, repair_packets));

@ -21,7 +21,7 @@ fn benchmark(symbol_size: u16, overhead: f64) -> u64 {
}
let iterations = TARGET_TOTAL_BYTES / elements;
let encoder = SourceBlockEncoder::new(1, symbol_size, &data, None);
let encoder = SourceBlockEncoder::new(1, symbol_size, &data);
let elements_and_overhead = (symbol_count as f64 * (1.0 + overhead)) as u32;
let mut packets =
encoder.repair_packets(0, (iterations as u32 * elements_and_overhead) as u32);

@ -1,6 +1,5 @@
use rand::Rng;
use raptorq::SourceBlockEncoder;
use raptorq::SourceBlockEncoderCache;
use raptorq::{SourceBlockEncoder, SourceBlockEncodingPlan};
use std::time::Instant;
const TARGET_TOTAL_BYTES: usize = 128 * 1024 * 1024;
@ -12,7 +11,7 @@ fn black_box(value: u64) {
}
}
fn benchmark(symbol_size: u16, cache: Option<&SourceBlockEncoderCache>) -> u64 {
fn benchmark(symbol_size: u16, pre_plan: bool) -> u64 {
let mut black_box_value = 0;
for symbol_count in SYMBOL_COUNTS.iter() {
let elements = symbol_count * symbol_size as usize;
@ -21,15 +20,20 @@ fn benchmark(symbol_size: u16, cache: Option<&SourceBlockEncoderCache>) -> u64 {
data[i] = rand::thread_rng().gen();
}
if cache.is_some() {
// Create and store the operation vector to measure performance when the cache is in use for all blocks.
SourceBlockEncoder::new(1, symbol_size, &data, cache);
}
let plan = if pre_plan {
Some(SourceBlockEncodingPlan::generate(*symbol_count as u16))
} else {
None
};
let now = Instant::now();
let iterations = TARGET_TOTAL_BYTES / elements;
for _ in 0..iterations {
let encoder = SourceBlockEncoder::new(1, symbol_size, &data, cache);
let encoder = if let Some(ref plan) = plan {
SourceBlockEncoder::with_encoding_plan(1, symbol_size, &data, plan)
} else {
SourceBlockEncoder::new(1, symbol_size, &data)
};
let packets = encoder.repair_packets(0, 1);
black_box_value += packets[0].data()[0] as u64;
}
@ -50,15 +54,11 @@ fn benchmark(symbol_size: u16, cache: Option<&SourceBlockEncoderCache>) -> u64 {
fn main() {
let symbol_size = 1280;
println!(
"Symbol size: {} bytes (without operation vectors)",
"Symbol size: {} bytes (without pre-built plan)",
symbol_size
);
black_box(benchmark(symbol_size, None));
black_box(benchmark(symbol_size, false));
println!();
let cache = SourceBlockEncoderCache::new();
println!(
"Symbol size: {} bytes (with operation vectors)",
symbol_size
);
black_box(benchmark(symbol_size, Some(&cache)));
println!("Symbol size: {} bytes (with pre-built plan)", symbol_size);
black_box(benchmark(symbol_size, true));
}

@ -292,11 +292,10 @@ impl SourceBlockDecoder {
#[cfg(test)]
mod codec_tests {
use crate::Decoder;
use crate::Encoder;
use crate::SourceBlockDecoder;
use crate::SourceBlockEncoder;
use crate::SourceBlockEncoderCache;
use crate::{Decoder, SourceBlockEncodingPlan};
use rand::seq::SliceRandom;
use rand::Rng;
@ -377,7 +376,7 @@ mod codec_tests {
println!("Completed {} symbols", symbol_count)
}
let encoder = SourceBlockEncoder::new(1, symbol_size as u16, &data, None);
let encoder = SourceBlockEncoder::new(1, symbol_size as u16, &data);
let mut decoder = SourceBlockDecoder::new(1, symbol_size as u16, elements as u64);
decoder.set_sparse_threshold(sparse_threshold);
@ -395,43 +394,36 @@ mod codec_tests {
#[test]
#[ignore]
fn repair_dense_extended() {
repair(99_999, 5000, true, None);
repair(99_999, 5000, true, false);
}
#[test]
#[ignore]
fn repair_sparse_extended() {
repair(0, 56403, true, None);
repair(0, 56403, true, false);
}
#[test]
fn repair_dense() {
repair(99_999, 50, false, None);
repair(99_999, 50, false, false);
}
#[test]
fn repair_sparse() {
repair(0, 50, false, None);
repair(0, 50, false, false);
}
#[test]
fn repair_dense_cache() {
let cache = SourceBlockEncoderCache::new();
repair(99_999, 50, false, Some(&cache));
fn repair_dense_pre_planned() {
repair(99_999, 50, false, true);
}
#[test]
fn repair_sparse_cache() {
let cache = SourceBlockEncoderCache::new();
repair(0, 50, false, Some(&cache));
fn repair_sparse_pre_planned() {
repair(0, 50, false, true);
}
fn repair(
sparse_threshold: u32,
max_symbols: usize,
progress: bool,
cache: Option<&SourceBlockEncoderCache>,
) {
fn repair(sparse_threshold: u32, max_symbols: usize, progress: bool, pre_plan: bool) {
let symbol_size = 8;
for symbol_count in 1..=max_symbols {
let elements = symbol_size * symbol_count;
@ -444,7 +436,12 @@ mod codec_tests {
println!("[repair] Completed {} symbols", symbol_count)
}
let encoder = SourceBlockEncoder::new(1, 8, &data, cache);
let encoder = if pre_plan {
let plan = SourceBlockEncodingPlan::generate(symbol_count as u16);
SourceBlockEncoder::with_encoding_plan(1, 8, &data, &plan)
} else {
SourceBlockEncoder::new(1, 8, &data)
};
let mut decoder = SourceBlockDecoder::new(1, 8, elements as u64);
decoder.set_sparse_threshold(sparse_threshold);

@ -17,8 +17,6 @@ use crate::systematic_constants::num_pi_symbols;
use crate::systematic_constants::{calculate_p1, systematic_index};
use crate::ObjectTransmissionInformation;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
pub const SPARSE_MATRIX_THRESHOLD: u32 = 250;
@ -42,46 +40,50 @@ impl Encoder {
assert_eq!(1, config.sub_blocks());
// let (tl, ts, nl, ns) = partition((config.symbol_size() / config.alignment() as u16) as u32, config.sub_blocks());
let cache = SourceBlockEncoderCache::new();
let mut data_index = 0;
let mut blocks = vec![];
for i in 0..zl {
let offset = kl as usize * config.symbol_size() as usize;
blocks.push(SourceBlockEncoder::new(
i as u8,
config.symbol_size(),
&data[data_index..(data_index + offset)],
Some(&cache),
));
data_index += offset;
}
for i in 0..zs {
let offset = ks as usize * config.symbol_size() as usize;
if data_index + offset <= data.len() {
blocks.push(SourceBlockEncoder::new(
if zl > 0 {
let kl_plan = SourceBlockEncodingPlan::generate(kl as u16);
for i in 0..zl {
let offset = kl as usize * config.symbol_size() as usize;
blocks.push(SourceBlockEncoder::with_encoding_plan(
i as u8,
config.symbol_size(),
&data[data_index..(data_index + offset)],
Some(&cache),
));
} else {
// Should only be possible when Kt * T > F. See third to last paragraph in section 4.4.1.2
assert!(kt as usize * config.symbol_size() as usize > data.len());
// Zero pad the last symbol
let mut padded = Vec::from(&data[data_index..]);
padded.extend(vec![
0;
kt as usize * config.symbol_size() as usize - data.len()
]);
blocks.push(SourceBlockEncoder::new(
i as u8,
config.symbol_size(),
&padded,
Some(&cache),
&kl_plan,
));
data_index += offset;
}
}
if zs > 0 {
let ks_plan = SourceBlockEncodingPlan::generate(ks as u16);
for i in 0..zs {
let offset = ks as usize * config.symbol_size() as usize;
if data_index + offset <= data.len() {
blocks.push(SourceBlockEncoder::with_encoding_plan(
i as u8,
config.symbol_size(),
&data[data_index..(data_index + offset)],
&ks_plan,
));
} else {
// Should only be possible when Kt * T > F. See third to last paragraph in section 4.4.1.2
assert!(kt as usize * config.symbol_size() as usize > data.len());
// Zero pad the last symbol
let mut padded = Vec::from(&data[data_index..]);
padded.extend(vec![
0;
kt as usize * config.symbol_size() as usize - data.len()
]);
blocks.push(SourceBlockEncoder::new(
i as u8,
config.symbol_size(),
&padded,
));
}
data_index += offset;
}
data_index += offset;
}
Encoder { config, blocks }
@ -105,15 +107,22 @@ impl Encoder {
}
}
#[derive(Default)]
pub struct SourceBlockEncoderCache {
cache: Arc<RwLock<HashMap<usize, Vec<SymbolOps>>>>,
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SourceBlockEncodingPlan {
operations: Vec<SymbolOps>,
source_symbol_count: u16,
}
impl SourceBlockEncoderCache {
pub fn new() -> SourceBlockEncoderCache {
let cache = Arc::new(RwLock::new(HashMap::new()));
SourceBlockEncoderCache { cache }
impl SourceBlockEncodingPlan {
// Generates an encoding plan that is valid for any combination of data length and symbol size
// where ceil(data_length / symbol_size) = symbol_count
pub fn generate(symbol_count: u16) -> SourceBlockEncodingPlan {
let symbols = vec![Symbol::new(vec![0]); symbol_count as usize];
let (_, ops) = gen_intermediate_symbols(&symbols, 1, SPARSE_MATRIX_THRESHOLD, true);
SourceBlockEncodingPlan {
operations: ops.unwrap(),
source_symbol_count: symbol_count,
}
}
}
@ -125,59 +134,46 @@ pub struct SourceBlockEncoder {
}
impl SourceBlockEncoder {
pub fn new(
source_block_id: u8,
symbol_size: u16,
data: &[u8],
cache: Option<&SourceBlockEncoderCache>,
) -> SourceBlockEncoder {
pub fn new(source_block_id: u8, symbol_size: u16, data: &[u8]) -> SourceBlockEncoder {
assert_eq!(data.len() % symbol_size as usize, 0);
let source_symbols: Vec<Symbol> = data
.chunks(symbol_size as usize)
.map(|x| Symbol::new(Vec::from(x)))
.collect();
let intermediate_symbols = match cache {
Some(c) => {
let key = source_symbols.len();
let read_map = c.cache.read().unwrap();
let value = read_map.get(&key);
let (intermediate_symbols, _) = gen_intermediate_symbols(
&source_symbols,
symbol_size as usize,
SPARSE_MATRIX_THRESHOLD,
false,
);
match value {
None => {
drop(read_map);
let (is, ops_vec) = gen_intermediate_symbols(
&source_symbols,
symbol_size as usize,
SPARSE_MATRIX_THRESHOLD,
true,
);
let mut write_map = c.cache.write().unwrap();
write_map.insert(key, ops_vec.unwrap());
drop(write_map);
is.unwrap()
}
Some(operation_vector) => {
let is = gen_intermediate_symbols_ops_vec(
&source_symbols,
symbol_size as usize,
&(*operation_vector),
);
drop(read_map);
is
}
}
}
None => {
let (is, _ops_vec) = gen_intermediate_symbols(
&source_symbols,
symbol_size as usize,
SPARSE_MATRIX_THRESHOLD,
false,
);
is.unwrap()
}
};
SourceBlockEncoder {
source_block_id,
source_symbols,
intermediate_symbols: intermediate_symbols.unwrap(),
}
}
pub fn with_encoding_plan(
source_block_id: u8,
symbol_size: u16,
data: &[u8],
plan: &SourceBlockEncodingPlan,
) -> SourceBlockEncoder {
assert_eq!(data.len() % symbol_size as usize, 0);
let source_symbols: Vec<Symbol> = data
.chunks(symbol_size as usize)
.map(|x| Symbol::new(Vec::from(x)))
.collect();
// TODO: this could be more lenient and support anything with the same extended symbol count
assert_eq!(source_symbols.len(), plan.source_symbol_count as usize);
let intermediate_symbols = gen_intermediate_symbols_ops_vec(
&source_symbols,
symbol_size as usize,
&plan.operations,
);
SourceBlockEncoder {
source_block_id,

@ -27,7 +27,7 @@ pub use crate::decoder::Decoder;
pub use crate::decoder::SourceBlockDecoder;
pub use crate::encoder::Encoder;
pub use crate::encoder::SourceBlockEncoder;
pub use crate::encoder::SourceBlockEncoderCache;
pub use crate::encoder::SourceBlockEncodingPlan;
#[cfg(feature = "benchmarking")]
pub use crate::constraint_matrix::generate_constraint_matrix;