diff --git a/benches/codec_benchmark.rs b/benches/codec_benchmark.rs index 1440951..e45b304 100644 --- a/benches/codec_benchmark.rs +++ b/benches/codec_benchmark.rs @@ -77,7 +77,7 @@ fn criterion_benchmark(c: &mut Criterion) { "encode 10KB", Benchmark::new("", move |b| { b.iter(|| { - let encoder = SourceBlockEncoder::new(1, symbol_size, &encode_data, None); + let encoder = SourceBlockEncoder::new(1, symbol_size, &encode_data); return encoder.source_packets(); }) }) @@ -89,7 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) { "roundtrip 10KB", Benchmark::new("", move |b| { b.iter(|| { - let encoder = SourceBlockEncoder::new(1, symbol_size, &roundtrip_data, None); + let encoder = SourceBlockEncoder::new(1, symbol_size, &roundtrip_data); let mut decoder = SourceBlockDecoder::new(1, symbol_size, elements as u64); return decoder.decode(encoder.source_packets()); }) @@ -102,7 +102,7 @@ fn criterion_benchmark(c: &mut Criterion) { "roundtrip repair 10KB", Benchmark::new("", move |b| { b.iter(|| { - let encoder = SourceBlockEncoder::new(1, symbol_size, &repair_data, None); + let encoder = SourceBlockEncoder::new(1, symbol_size, &repair_data); let repair_packets = (elements / symbol_size as usize) as u32; let mut decoder = SourceBlockDecoder::new(1, symbol_size, elements as u64); return decoder.decode(encoder.repair_packets(0, repair_packets)); diff --git a/benches/decode_benchmark.rs b/benches/decode_benchmark.rs index e077bd1..084f4cb 100644 --- a/benches/decode_benchmark.rs +++ b/benches/decode_benchmark.rs @@ -21,7 +21,7 @@ fn benchmark(symbol_size: u16, overhead: f64) -> u64 { } let iterations = TARGET_TOTAL_BYTES / elements; - let encoder = SourceBlockEncoder::new(1, symbol_size, &data, None); + let encoder = SourceBlockEncoder::new(1, symbol_size, &data); let elements_and_overhead = (symbol_count as f64 * (1.0 + overhead)) as u32; let mut packets = encoder.repair_packets(0, (iterations as u32 * elements_and_overhead) as u32); diff --git a/benches/encode_benchmark.rs b/benches/encode_benchmark.rs index d4ae726..03b9139 100644 --- a/benches/encode_benchmark.rs +++ b/benches/encode_benchmark.rs @@ -1,6 +1,5 @@ use rand::Rng; -use raptorq::SourceBlockEncoder; -use raptorq::SourceBlockEncoderCache; +use raptorq::{SourceBlockEncoder, SourceBlockEncodingPlan}; use std::time::Instant; const TARGET_TOTAL_BYTES: usize = 128 * 1024 * 1024; @@ -12,7 +11,7 @@ fn black_box(value: u64) { } } -fn benchmark(symbol_size: u16, cache: Option<&SourceBlockEncoderCache>) -> u64 { +fn benchmark(symbol_size: u16, pre_plan: bool) -> u64 { let mut black_box_value = 0; for symbol_count in SYMBOL_COUNTS.iter() { let elements = symbol_count * symbol_size as usize; @@ -21,15 +20,20 @@ fn benchmark(symbol_size: u16, cache: Option<&SourceBlockEncoderCache>) -> u64 { data[i] = rand::thread_rng().gen(); } - if cache.is_some() { - // Create and store the operation vector to measure performance when the cache is in use for all blocks. - SourceBlockEncoder::new(1, symbol_size, &data, cache); - } + let plan = if pre_plan { + Some(SourceBlockEncodingPlan::generate(*symbol_count as u16)) + } else { + None + }; let now = Instant::now(); let iterations = TARGET_TOTAL_BYTES / elements; for _ in 0..iterations { - let encoder = SourceBlockEncoder::new(1, symbol_size, &data, cache); + let encoder = if let Some(ref plan) = plan { + SourceBlockEncoder::with_encoding_plan(1, symbol_size, &data, plan) + } else { + SourceBlockEncoder::new(1, symbol_size, &data) + }; let packets = encoder.repair_packets(0, 1); black_box_value += packets[0].data()[0] as u64; } @@ -50,15 +54,11 @@ fn benchmark(symbol_size: u16, cache: Option<&SourceBlockEncoderCache>) -> u64 { fn main() { let symbol_size = 1280; println!( - "Symbol size: {} bytes (without operation vectors)", + "Symbol size: {} bytes (without pre-built plan)", symbol_size ); - black_box(benchmark(symbol_size, None)); + black_box(benchmark(symbol_size, false)); println!(); - let cache = SourceBlockEncoderCache::new(); - println!( - "Symbol size: {} bytes (with operation vectors)", - symbol_size - ); - black_box(benchmark(symbol_size, Some(&cache))); + println!("Symbol size: {} bytes (with pre-built plan)", symbol_size); + black_box(benchmark(symbol_size, true)); } diff --git a/src/decoder.rs b/src/decoder.rs index bba3b93..458801a 100644 --- a/src/decoder.rs +++ b/src/decoder.rs @@ -292,11 +292,10 @@ impl SourceBlockDecoder { #[cfg(test)] mod codec_tests { - use crate::Decoder; use crate::Encoder; use crate::SourceBlockDecoder; use crate::SourceBlockEncoder; - use crate::SourceBlockEncoderCache; + use crate::{Decoder, SourceBlockEncodingPlan}; use rand::seq::SliceRandom; use rand::Rng; @@ -377,7 +376,7 @@ mod codec_tests { println!("Completed {} symbols", symbol_count) } - let encoder = SourceBlockEncoder::new(1, symbol_size as u16, &data, None); + let encoder = SourceBlockEncoder::new(1, symbol_size as u16, &data); let mut decoder = SourceBlockDecoder::new(1, symbol_size as u16, elements as u64); decoder.set_sparse_threshold(sparse_threshold); @@ -395,43 +394,36 @@ mod codec_tests { #[test] #[ignore] fn repair_dense_extended() { - repair(99_999, 5000, true, None); + repair(99_999, 5000, true, false); } #[test] #[ignore] fn repair_sparse_extended() { - repair(0, 56403, true, None); + repair(0, 56403, true, false); } #[test] fn repair_dense() { - repair(99_999, 50, false, None); + repair(99_999, 50, false, false); } #[test] fn repair_sparse() { - repair(0, 50, false, None); + repair(0, 50, false, false); } #[test] - fn repair_dense_cache() { - let cache = SourceBlockEncoderCache::new(); - repair(99_999, 50, false, Some(&cache)); + fn repair_dense_pre_planned() { + repair(99_999, 50, false, true); } #[test] - fn repair_sparse_cache() { - let cache = SourceBlockEncoderCache::new(); - repair(0, 50, false, Some(&cache)); + fn repair_sparse_pre_planned() { + repair(0, 50, false, true); } - fn repair( - sparse_threshold: u32, - max_symbols: usize, - progress: bool, - cache: Option<&SourceBlockEncoderCache>, - ) { + fn repair(sparse_threshold: u32, max_symbols: usize, progress: bool, pre_plan: bool) { let symbol_size = 8; for symbol_count in 1..=max_symbols { let elements = symbol_size * symbol_count; @@ -444,7 +436,12 @@ mod codec_tests { println!("[repair] Completed {} symbols", symbol_count) } - let encoder = SourceBlockEncoder::new(1, 8, &data, cache); + let encoder = if pre_plan { + let plan = SourceBlockEncodingPlan::generate(symbol_count as u16); + SourceBlockEncoder::with_encoding_plan(1, 8, &data, &plan) + } else { + SourceBlockEncoder::new(1, 8, &data) + }; let mut decoder = SourceBlockDecoder::new(1, 8, elements as u64); decoder.set_sparse_threshold(sparse_threshold); diff --git a/src/encoder.rs b/src/encoder.rs index 934e8fa..6d9e9e6 100644 --- a/src/encoder.rs +++ b/src/encoder.rs @@ -17,8 +17,6 @@ use crate::systematic_constants::num_pi_symbols; use crate::systematic_constants::{calculate_p1, systematic_index}; use crate::ObjectTransmissionInformation; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; pub const SPARSE_MATRIX_THRESHOLD: u32 = 250; @@ -42,46 +40,50 @@ impl Encoder { assert_eq!(1, config.sub_blocks()); // let (tl, ts, nl, ns) = partition((config.symbol_size() / config.alignment() as u16) as u32, config.sub_blocks()); - let cache = SourceBlockEncoderCache::new(); let mut data_index = 0; let mut blocks = vec![]; - for i in 0..zl { - let offset = kl as usize * config.symbol_size() as usize; - blocks.push(SourceBlockEncoder::new( - i as u8, - config.symbol_size(), - &data[data_index..(data_index + offset)], - Some(&cache), - )); - data_index += offset; - } - - for i in 0..zs { - let offset = ks as usize * config.symbol_size() as usize; - if data_index + offset <= data.len() { - blocks.push(SourceBlockEncoder::new( + if zl > 0 { + let kl_plan = SourceBlockEncodingPlan::generate(kl as u16); + for i in 0..zl { + let offset = kl as usize * config.symbol_size() as usize; + blocks.push(SourceBlockEncoder::with_encoding_plan( i as u8, config.symbol_size(), &data[data_index..(data_index + offset)], - Some(&cache), - )); - } else { - // Should only be possible when Kt * T > F. See third to last paragraph in section 4.4.1.2 - assert!(kt as usize * config.symbol_size() as usize > data.len()); - // Zero pad the last symbol - let mut padded = Vec::from(&data[data_index..]); - padded.extend(vec![ - 0; - kt as usize * config.symbol_size() as usize - data.len() - ]); - blocks.push(SourceBlockEncoder::new( - i as u8, - config.symbol_size(), - &padded, - Some(&cache), + &kl_plan, )); + data_index += offset; + } + } + + if zs > 0 { + let ks_plan = SourceBlockEncodingPlan::generate(ks as u16); + for i in 0..zs { + let offset = ks as usize * config.symbol_size() as usize; + if data_index + offset <= data.len() { + blocks.push(SourceBlockEncoder::with_encoding_plan( + i as u8, + config.symbol_size(), + &data[data_index..(data_index + offset)], + &ks_plan, + )); + } else { + // Should only be possible when Kt * T > F. See third to last paragraph in section 4.4.1.2 + assert!(kt as usize * config.symbol_size() as usize > data.len()); + // Zero pad the last symbol + let mut padded = Vec::from(&data[data_index..]); + padded.extend(vec![ + 0; + kt as usize * config.symbol_size() as usize - data.len() + ]); + blocks.push(SourceBlockEncoder::new( + i as u8, + config.symbol_size(), + &padded, + )); + } + data_index += offset; } - data_index += offset; } Encoder { config, blocks } @@ -105,15 +107,22 @@ impl Encoder { } } -#[derive(Default)] -pub struct SourceBlockEncoderCache { - cache: Arc>>>, +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct SourceBlockEncodingPlan { + operations: Vec, + source_symbol_count: u16, } -impl SourceBlockEncoderCache { - pub fn new() -> SourceBlockEncoderCache { - let cache = Arc::new(RwLock::new(HashMap::new())); - SourceBlockEncoderCache { cache } +impl SourceBlockEncodingPlan { + // Generates an encoding plan that is valid for any combination of data length and symbol size + // where ceil(data_length / symbol_size) = symbol_count + pub fn generate(symbol_count: u16) -> SourceBlockEncodingPlan { + let symbols = vec![Symbol::new(vec![0]); symbol_count as usize]; + let (_, ops) = gen_intermediate_symbols(&symbols, 1, SPARSE_MATRIX_THRESHOLD, true); + SourceBlockEncodingPlan { + operations: ops.unwrap(), + source_symbol_count: symbol_count, + } } } @@ -125,59 +134,46 @@ pub struct SourceBlockEncoder { } impl SourceBlockEncoder { - pub fn new( - source_block_id: u8, - symbol_size: u16, - data: &[u8], - cache: Option<&SourceBlockEncoderCache>, - ) -> SourceBlockEncoder { + pub fn new(source_block_id: u8, symbol_size: u16, data: &[u8]) -> SourceBlockEncoder { assert_eq!(data.len() % symbol_size as usize, 0); let source_symbols: Vec = data .chunks(symbol_size as usize) .map(|x| Symbol::new(Vec::from(x))) .collect(); - let intermediate_symbols = match cache { - Some(c) => { - let key = source_symbols.len(); - let read_map = c.cache.read().unwrap(); - let value = read_map.get(&key); + let (intermediate_symbols, _) = gen_intermediate_symbols( + &source_symbols, + symbol_size as usize, + SPARSE_MATRIX_THRESHOLD, + false, + ); - match value { - None => { - drop(read_map); - let (is, ops_vec) = gen_intermediate_symbols( - &source_symbols, - symbol_size as usize, - SPARSE_MATRIX_THRESHOLD, - true, - ); - let mut write_map = c.cache.write().unwrap(); - write_map.insert(key, ops_vec.unwrap()); - drop(write_map); - is.unwrap() - } - Some(operation_vector) => { - let is = gen_intermediate_symbols_ops_vec( - &source_symbols, - symbol_size as usize, - &(*operation_vector), - ); - drop(read_map); - is - } - } - } - None => { - let (is, _ops_vec) = gen_intermediate_symbols( - &source_symbols, - symbol_size as usize, - SPARSE_MATRIX_THRESHOLD, - false, - ); - is.unwrap() - } - }; + SourceBlockEncoder { + source_block_id, + source_symbols, + intermediate_symbols: intermediate_symbols.unwrap(), + } + } + + pub fn with_encoding_plan( + source_block_id: u8, + symbol_size: u16, + data: &[u8], + plan: &SourceBlockEncodingPlan, + ) -> SourceBlockEncoder { + assert_eq!(data.len() % symbol_size as usize, 0); + let source_symbols: Vec = data + .chunks(symbol_size as usize) + .map(|x| Symbol::new(Vec::from(x))) + .collect(); + // TODO: this could be more lenient and support anything with the same extended symbol count + assert_eq!(source_symbols.len(), plan.source_symbol_count as usize); + + let intermediate_symbols = gen_intermediate_symbols_ops_vec( + &source_symbols, + symbol_size as usize, + &plan.operations, + ); SourceBlockEncoder { source_block_id, diff --git a/src/lib.rs b/src/lib.rs index 7a8260a..2610650 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,7 @@ pub use crate::decoder::Decoder; pub use crate::decoder::SourceBlockDecoder; pub use crate::encoder::Encoder; pub use crate::encoder::SourceBlockEncoder; -pub use crate::encoder::SourceBlockEncoderCache; +pub use crate::encoder::SourceBlockEncodingPlan; #[cfg(feature = "benchmarking")] pub use crate::constraint_matrix::generate_constraint_matrix;