From 052ebcaa2a6f66eb8c38b136580e03b776537d31 Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Tue, 9 Dec 2025 16:43:09 -0800 Subject: [PATCH 01/13] refactor: perform sequence decoding in batch to stay in SIMD longer --- src/parallel.rs | 12 +++++ src/vbq/reader.rs | 123 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) diff --git a/src/parallel.rs b/src/parallel.rs index c0ba298..b46e49f 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -26,6 +26,18 @@ impl BinseqReader { } } + /// Set whether to decode sequences at once in each block + /// + /// Note: This setting applies to VBQ readers only. + pub fn set_decode_block(&mut self, decode_block: bool) { + match self { + Self::Bq(_) => { + // no-op + } + Self::Vbq(reader) => reader.set_decode_block(decode_block), + } + } + #[must_use] pub fn is_paired(&self) -> bool { match self { diff --git a/src/vbq/reader.rs b/src/vbq/reader.rs index 41fd4ac..9fe3731 100644 --- a/src/vbq/reader.rs +++ b/src/vbq/reader.rs @@ -180,6 +180,9 @@ pub struct RecordBlock { /// Reusable zstd decompression context dctx: zstd_safe::DCtx<'static>, + + /// Reusable decoding buffer for the block + dbuf: Vec, } impl RecordBlock { /// Creates a new empty `RecordBlock` with the specified block size @@ -205,6 +208,7 @@ impl RecordBlock { records: Vec::default(), sequences: Vec::default(), rbuf: Vec::default(), + dbuf: Vec::default(), dctx: zstd_safe::DCtx::create(), } } @@ -270,6 +274,7 @@ impl RecordBlock { self.index = 0; self.records.clear(); self.sequences.clear(); + self.dbuf.clear(); // Note: We keep rbuf allocated for reuse } @@ -438,6 +443,70 @@ impl RecordBlock { }); } } + + /// Decodes all sequences in the block at once. + /// + /// Note: + /// Each record's sequence is padded internally to the nearest u64. + /// Because of this the global decoding will include nucleotides that are not present in the original data. + /// We track the non-contiguous regions of the sequence separately. + pub fn decode_all(&mut self) -> Result<()> { + if self.sequences.is_empty() { + return Ok(()); + } + self.dbuf.clear(); + match self.bitsize { + BitSize::Two => { + let num_bp = self.sequences.len() * 32; + bitnuc::twobit::decode(&self.sequences, num_bp, &mut self.dbuf) + } + BitSize::Four => { + let num_bp = self.sequences.len() * 16; + bitnuc::fourbit::decode(&self.sequences, num_bp, &mut self.dbuf) + } + }?; + Ok(()) + } + + /// Get decoded primary sequence for a record by index + pub fn get_decoded_s(&self, record_idx: usize) -> Option<&[u8]> { + let meta = self.records.get(record_idx)?; + if self.dbuf.is_empty() { + return None; + } + + let bases_per_word = match self.bitsize { + BitSize::Two => 32, + BitSize::Four => 16, + }; + + // Calculate offset in decoded buffer (accounting for padding) + let offset = meta.s_seq_span.offset * bases_per_word; + let len = meta.slen as usize; + + Some(&self.dbuf[offset..offset + len]) + } + + /// Get decoded extended sequence for a record by index + pub fn get_decoded_x(&self, record_idx: usize) -> Option<&[u8]> { + let meta = self.records.get(record_idx)?; + if meta.xlen == 0 { + return Some(&[]); + } + if self.dbuf.is_empty() { + return None; + } + + let bases_per_word = match self.bitsize { + BitSize::Two => 32, + BitSize::Four => 16, + }; + + let offset = meta.x_seq_span.offset * bases_per_word; + let len = meta.xlen as usize; + + Some(&self.dbuf[offset..offset + len]) + } } pub struct RecordBlockIter<'a> { @@ -460,11 +529,14 @@ impl<'a> Iterator for RecordBlockIter<'a> { let meta = &self.block.records[self.pos]; let index = (self.block.index + self.pos) as u64; + let index_in_block = self.pos; self.pos += 1; Some(RefRecord { + block: self.block, bitsize: self.block.bitsize, index, + index_in_block, flag: meta.flag, slen: meta.slen, xlen: meta.xlen, @@ -482,8 +554,10 @@ impl<'a> Iterator for RecordBlockIter<'a> { /// Zero-copy record reference pub struct RefRecord<'a> { + block: &'a RecordBlock, bitsize: BitSize, index: u64, + index_in_block: usize, flag: Option, slen: u64, xlen: u64, @@ -549,6 +623,28 @@ impl<'a> BinseqRecord for RefRecord<'a> { fn xqual(&self) -> &[u8] { self.xqual } + + /// Override this method since we can make use of block information + fn decode_s(&self, buf: &mut Vec) -> Result<()> { + if let Some(decoded) = self.block.get_decoded_s(self.index_in_block) { + buf.extend_from_slice(decoded); + } else { + self.bitsize() + .decode(self.sbuf(), self.slen() as usize, buf)?; + } + Ok(()) + } + + /// Override this method since we can make use of block information + fn decode_x(&self, buf: &mut Vec) -> Result<()> { + if let Some(decoded) = self.block.get_decoded_x(self.index_in_block) { + buf.extend_from_slice(decoded); + } else { + self.bitsize() + .decode(self.xbuf(), self.xlen() as usize, buf)?; + } + Ok(()) + } } /// Memory-mapped reader for VBINSEQ files @@ -631,6 +727,9 @@ pub struct MmapReader { /// Total number of records read from the file so far total: usize, + + /// Whether to decode sequences at once in each block + decode_block: bool, } impl MmapReader { /// Creates a new `MmapReader` for a VBINSEQ file @@ -689,6 +788,7 @@ impl MmapReader { header, pos: SIZE_HEADER, total: 0, + decode_block: true, }) } @@ -714,6 +814,24 @@ impl MmapReader { RecordBlock::new(self.header.bits, self.header.block as usize) } + /// Sets whether to decode sequences at once in each block + /// + /// # Arguments + /// + /// * `decode_block` - Whether to decode sequences at once in each block + /// + /// # Examples + /// + /// ```rust,no_run + /// use binseq::vbq::MmapReader; + /// + /// let mut reader = MmapReader::new("example.vbq").unwrap(); + /// reader.set_decode_block(false); + /// ``` + pub fn set_decode_block(&mut self, decode_block: bool) { + self.decode_block = decode_block; + } + /// Returns the path where the index file would be located /// /// The index file is used for random access to blocks and has the same path as @@ -1166,6 +1284,11 @@ impl ParallelReader for MmapReader { // Update the record block index record_block.update_index(block_range.cumulative_records as usize); + // decode the data + if self.decode_block { + record_block.decode_all()?; + } + // Process records in this block that fall within our range for record in record_block.iter() { let global_record_idx = record.index as usize; From 8c85c2f7530baecb385474093b9cecb6f06da9be Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 10:27:00 -0800 Subject: [PATCH 02/13] refactor: improve header access logic for zero-copy --- src/bq/reader.rs | 36 ++++++++++++++++++++++++++++-------- src/context/traits.rs | 5 +++-- src/record.rs | 8 ++++---- src/vbq/reader.rs | 39 +++++++++++++++++++++++++++++---------- 4 files changed, 64 insertions(+), 24 deletions(-) diff --git a/src/bq/reader.rs b/src/bq/reader.rs index 8ce3d91..128ac33 100644 --- a/src/bq/reader.rs +++ b/src/bq/reader.rs @@ -41,6 +41,10 @@ pub struct RefRecord<'a> { buffer: &'a [u64], /// The configuration that defines the layout and size of record components config: RecordConfig, + /// Cached index string for the sequence header + header_buf: [u8; 20], + /// Length of the header in bytes + header_len: usize, } impl<'a> RefRecord<'a> { /// Creates a new record reference @@ -57,7 +61,13 @@ impl<'a> RefRecord<'a> { #[must_use] pub fn new(id: u64, buffer: &'a [u64], config: RecordConfig) -> Self { assert_eq!(buffer.len(), config.record_size_u64()); - Self { id, buffer, config } + Self { + id, + buffer, + config, + header_buf: [0; 20], + header_len: 0, + } } /// Returns the record's configuration /// @@ -66,6 +76,11 @@ impl<'a> RefRecord<'a> { pub fn config(&self) -> RecordConfig { self.config } + + pub fn set_id(&mut self, id: &[u8]) { + self.header_len = id.len(); + self.header_buf[..self.header_len].copy_from_slice(id); + } } impl BinseqRecord for RefRecord<'_> { @@ -76,16 +91,15 @@ impl BinseqRecord for RefRecord<'_> { self.id } /// Clear the buffer and fill it with the sequence header - fn sheader(&self, buffer: &mut Vec) { - buffer.clear(); - buffer.extend_from_slice(itoa::Buffer::new().format(self.id).as_bytes()); + fn sheader(&self) -> &[u8] { + &self.header_buf[..self.header_len] } /// Clear the buffer and fill it with the extended header - fn xheader(&self, buffer: &mut Vec) { - buffer.clear(); - buffer.extend_from_slice(itoa::Buffer::new().format(self.id).as_bytes()); + fn xheader(&self) -> &[u8] { + &self.header_buf[self.header_len..] } + fn flag(&self) -> Option { if self.config.flags { Some(self.buffer[0]) @@ -713,8 +727,14 @@ impl ParallelReader for MmapReader { return Ok(()); // No records for this thread } + let mut translater = itoa::Buffer::new(); + for (batch_idx, idx) in (start_idx..end_idx).enumerate() { - let record = reader.get(idx)?; + let id_str = translater.format(idx); + + let mut record = reader.get(idx)?; + record.set_id(id_str.as_bytes()); + processor.process_record(record)?; if batch_idx % BATCH_SIZE == 0 { diff --git a/src/context/traits.rs b/src/context/traits.rs index 84d5055..de0dd3d 100644 --- a/src/context/traits.rs +++ b/src/context/traits.rs @@ -99,9 +99,10 @@ pub trait HeaderContext { #[inline] fn fill_headers(&mut self, record: &R) { - record.sheader(self.sheader_mut()); + self.clear_headers(); + self.sheader_mut().extend_from_slice(&record.sheader()); if record.is_paired() { - record.xheader(self.xheader_mut()); + self.xheader_mut().extend_from_slice(&record.xheader()); } } } diff --git a/src/record.rs b/src/record.rs index 29a1a77..5029e29 100644 --- a/src/record.rs +++ b/src/record.rs @@ -22,11 +22,11 @@ pub trait BinseqRecord { /// Returns the flag value of this record fn flag(&self) -> Option; - /// Fills a buffer with the header of this record. - fn sheader(&self, buffer: &mut Vec); + /// Returns the header of this record + fn sheader(&self) -> &[u8]; - /// Fills a buffer with the header of the extended/paired sequence (empty if not paired) - fn xheader(&self, buffer: &mut Vec); + /// Returns the header of the extended/paired sequence (empty if not paired) + fn xheader(&self) -> &[u8]; /// Returns the length of the primary sequence of this record fn slen(&self) -> u64; diff --git a/src/vbq/reader.rs b/src/vbq/reader.rs index 9fe3731..772f744 100644 --- a/src/vbq/reader.rs +++ b/src/vbq/reader.rs @@ -512,11 +512,16 @@ impl RecordBlock { pub struct RecordBlockIter<'a> { block: &'a RecordBlock, pos: usize, + header_buffer: itoa::Buffer, } impl<'a> RecordBlockIter<'a> { #[must_use] pub fn new(block: &'a RecordBlock) -> Self { - Self { block, pos: 0 } + Self { + block, + pos: 0, + header_buffer: itoa::Buffer::new(), + } } } impl<'a> Iterator for RecordBlockIter<'a> { @@ -530,7 +535,19 @@ impl<'a> Iterator for RecordBlockIter<'a> { let meta = &self.block.records[self.pos]; let index = (self.block.index + self.pos) as u64; let index_in_block = self.pos; - self.pos += 1; + + let mut header_buf = [0; 20]; + let mut header_len = 0; + if meta.s_header_span.len == 0 && meta.x_header_span.len == 0 { + let header_str = self.header_buffer.format(index); + header_len = header_str.len(); + header_buf[..header_len].copy_from_slice(header_str.as_bytes()); + } + + // increment position + { + self.pos += 1; + } Some(RefRecord { block: self.block, @@ -548,6 +565,8 @@ impl<'a> Iterator for RecordBlockIter<'a> { xqual: meta.x_qual_span.slice(&self.block.rbuf), sheader: meta.s_header_span.slice(&self.block.rbuf), xheader: meta.x_header_span.slice(&self.block.rbuf), + header_buf, + header_len, }) } } @@ -567,6 +586,8 @@ pub struct RefRecord<'a> { xqual: &'a [u8], sheader: &'a [u8], xheader: &'a [u8], + header_buf: [u8; 20], + header_len: usize, } impl<'a> BinseqRecord for RefRecord<'a> { @@ -578,21 +599,19 @@ impl<'a> BinseqRecord for RefRecord<'a> { self.index } - fn sheader(&self, buffer: &mut Vec) { - buffer.clear(); + fn sheader(&self) -> &[u8] { if self.sheader.is_empty() { - buffer.extend_from_slice(itoa::Buffer::new().format(self.index).as_bytes()); + &self.header_buf[..self.header_len] } else { - buffer.extend_from_slice(self.sheader); + self.sheader } } - fn xheader(&self, buffer: &mut Vec) { - buffer.clear(); + fn xheader(&self) -> &[u8] { if self.xheader.is_empty() { - buffer.extend_from_slice(itoa::Buffer::new().format(self.index).as_bytes()); + &self.header_buf[..self.header_len] } else { - buffer.extend_from_slice(self.xheader); + self.xheader } } From 664cbc4203df5ea3bb081d008b7e1dcbcb79e19a Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 11:25:41 -0800 Subject: [PATCH 03/13] fix: remove broken api in docs --- src/vbq/mod.rs | 6 ++---- src/vbq/reader.rs | 12 ++++-------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/vbq/mod.rs b/src/vbq/mod.rs index 4e1b4ee..f46fb7a 100644 --- a/src/vbq/mod.rs +++ b/src/vbq/mod.rs @@ -129,16 +129,14 @@ //! //! // Process blocks one at a time //! let mut seq_buffer = Vec::new(); -//! let mut header_buffer = Vec::new(); //! while reader.read_block_into(&mut block).unwrap() { //! for record in block.iter() { //! record.decode_s(&mut seq_buffer).unwrap(); -//! record.sheader(&mut header_buffer); -//! println!("Header: {}", std::str::from_utf8(&header_buffer).unwrap()); +//! let header = record.sheader(); +//! println!("Header: {}", std::str::from_utf8(header).unwrap()); //! println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap()); //! println!("Quality: {}", std::str::from_utf8(record.squal()).unwrap()); //! seq_buffer.clear(); -//! header_buffer.clear(); //! } //! } //! # std::fs::remove_file("example.vbq").unwrap_or(()); diff --git a/src/vbq/reader.rs b/src/vbq/reader.rs index 772f744..a497d33 100644 --- a/src/vbq/reader.rs +++ b/src/vbq/reader.rs @@ -37,18 +37,16 @@ //! //! // Read records with headers and quality scores //! let mut seq_buffer = Vec::new(); -//! let mut header_buffer = Vec::new(); //! while reader.read_block_into(&mut block).unwrap() { //! for record in block.iter() { //! record.decode_s(&mut seq_buffer).unwrap(); -//! record.sheader(&mut header_buffer); -//! println!("Header: {}", std::str::from_utf8(&header_buffer).unwrap()); +//! let header = record.sheader(); +//! println!("Header: {}", std::str::from_utf8(header).unwrap()); //! println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap()); //! if !record.squal().is_empty() { //! println!("Quality: {}", std::str::from_utf8(record.squal()).unwrap()); //! } //! seq_buffer.clear(); -//! header_buffer.clear(); //! } //! } //! ``` @@ -710,7 +708,6 @@ impl<'a> BinseqRecord for RefRecord<'a> { /// /// // Create buffers for sequence data and headers /// let mut seq_buffer = Vec::new(); -/// let mut header_buffer = Vec::new(); /// let mut block = reader.new_block(); /// /// // Read blocks sequentially @@ -719,13 +716,12 @@ impl<'a> BinseqRecord for RefRecord<'a> { /// for record in block.iter() { /// // Decode sequence and header /// record.decode_s(&mut seq_buffer)?; -/// record.sheader(&mut header_buffer); +/// let header = record.sheader(); /// -/// println!("Header: {}", std::str::from_utf8(&header_buffer).unwrap_or("")); +/// println!("Header: {}", std::str::from_utf8(&header).unwrap_or("")); /// println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap_or("")); /// /// seq_buffer.clear(); -/// header_buffer.clear(); /// } /// } /// Ok(()) From ce574ecca29b53d9ae969818e8a9b5ba24f1f8f4 Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:00:34 -0800 Subject: [PATCH 04/13] feat: added an sseq and xseq function to the record trait which can be used when batch decoding --- src/record.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/record.rs b/src/record.rs index 5029e29..93c1570 100644 --- a/src/record.rs +++ b/src/record.rs @@ -70,6 +70,22 @@ pub trait BinseqRecord { Ok(()) } + /// Returns a reference to the primary decoded sequence of this record. + /// + /// This is not available on all types that implement the `Record` trait. + /// It should be available on types that implement it in this library however. + fn sseq(&self) -> &[u8] { + unimplemented!("This record does not implement direct sequence access"); + } + + /// Returns a reference to the extended decoded sequence of this record. + /// + /// This may not be available on all types that implement the `Record` trait. + /// It should be available on types that implement it in this library however. + fn xseq(&self) -> &[u8] { + unimplemented!("This record does not implement direct sequence access"); + } + /// Decodes the primary sequence of this record into a newly allocated buffer. /// /// Not advised to use this function as it allocates a new buffer every time. From 6bf6a5f95ba33fe37b38ca64306c12c974c6305c Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:00:59 -0800 Subject: [PATCH 05/13] refactor: improve bq decoding with batch decoding and implement sseq and xseq for bq and vbq --- src/bq/reader.rs | 163 +++++++++++++++++++++++++++++++++++++++--- src/context/traits.rs | 1 + src/vbq/reader.rs | 14 ++++ 3 files changed, 170 insertions(+), 8 deletions(-) diff --git a/src/bq/reader.rs b/src/bq/reader.rs index 128ac33..924a5f6 100644 --- a/src/bq/reader.rs +++ b/src/bq/reader.rs @@ -129,6 +129,83 @@ impl BinseqRecord for RefRecord<'_> { } } +/// A reference to a record in the map with a precomputed decoded buffer slice +pub struct BatchRecord<'a> { + /// Unprocessed buffer slice (with flags) + buffer: &'a [u64], + /// Decoded buffer slice + dbuf: &'a [u8], + /// Record ID + id: u64, + /// The configuration that defines the layout and size of record components + config: RecordConfig, + /// Cached index string for the sequence header + header_buf: [u8; 20], + /// Length of the header in bytes + header_len: usize, +} +impl<'a> BinseqRecord for BatchRecord<'a> { + fn bitsize(&self) -> BitSize { + self.config.bitsize + } + fn index(&self) -> u64 { + self.id + } + /// Clear the buffer and fill it with the sequence header + fn sheader(&self) -> &[u8] { + &self.header_buf[..self.header_len] + } + + /// Clear the buffer and fill it with the extended header + fn xheader(&self) -> &[u8] { + self.sheader() + } + + fn flag(&self) -> Option { + if self.config.flags { + Some(self.buffer[0]) + } else { + None + } + } + fn slen(&self) -> u64 { + self.config.slen + } + fn xlen(&self) -> u64 { + self.config.xlen + } + fn sbuf(&self) -> &[u64] { + if self.config.flags { + &self.buffer[1..=(self.config.schunk as usize)] + } else { + &self.buffer[..(self.config.schunk as usize)] + } + } + fn xbuf(&self) -> &[u64] { + if self.config.flags { + &self.buffer[1 + self.config.schunk as usize..] + } else { + &self.buffer[self.config.schunk as usize..] + } + } + fn decode_s(&self, dbuf: &mut Vec) -> Result<()> { + dbuf.extend_from_slice(self.sseq()); + Ok(()) + } + fn decode_x(&self, dbuf: &mut Vec) -> Result<()> { + dbuf.extend_from_slice(self.xseq()); + Ok(()) + } + /// Override this method since we can make use of block information + fn sseq(&self) -> &[u8] { + &self.dbuf[..self.config.slen()] + } + /// Override this method since we can make use of block information + fn xseq(&self) -> &[u8] { + &self.dbuf[self.config.slen() as usize..] + } +} + /// Configuration for binary sequence record layout /// /// This struct defines the size and layout of binary sequence records, @@ -399,6 +476,23 @@ impl MmapReader { let buffer = cast_slice(bytes); Ok(RefRecord::new(idx as u64, buffer, self.config)) } + + /// Returns a slice of the buffer containing the underlying u64 for that range + /// of records. + /// + /// Note: range 10..40 will return all u64s in the mmap between the record index 10 and 40 + pub fn get_buffer_slice(&self, range: Range) -> Result<&[u64]> { + if range.end > self.num_records() { + return Err(ReadError::OutOfRange(range.end, self.num_records()).into()); + } + let rsize = self.config.record_size_bytes(); + let total_records = range.end - range.start; + let lbound = SIZE_HEADER + (range.start * rsize); + let rbound = lbound + (total_records * rsize); + let bytes = &self.mmap[lbound..rbound]; + let buffer = cast_slice(bytes); + Ok(buffer) + } } /// A reader for streaming binary sequence data from any source that implements Read @@ -727,21 +821,74 @@ impl ParallelReader for MmapReader { return Ok(()); // No records for this thread } + // create a reusable buffer for translating record IDs let mut translater = itoa::Buffer::new(); - for (batch_idx, idx) in (start_idx..end_idx).enumerate() { - let id_str = translater.format(idx); + // initialize a decoding buffer + let mut dbuf = Vec::new(); + + // calculate the size of a record in the cast u64 slice + let rsize_u64 = reader.config.record_size_bytes() / 8; - let mut record = reader.get(idx)?; - record.set_id(id_str.as_bytes()); + // determine the required scalar size + let scalar = match reader.config.bitsize { + BitSize::Two => 32, + BitSize::Four => 16, + }; - processor.process_record(record)?; + // calculate the size of a record in the batch decoded buffer + let mut dbuf_rsize = { (reader.config.schunk() + reader.config.xchunk()) * scalar }; + if reader.config.flags { + dbuf_rsize += scalar; + } - if batch_idx % BATCH_SIZE == 0 { - processor.on_batch_complete()?; + // iterate over the range of indices + for range_start in (start_idx..end_idx).step_by(BATCH_SIZE) { + let range_end = (range_start + BATCH_SIZE).min(end_idx); + + // clear the decoded buffer + dbuf.clear(); + + // get the encoded buffer slice + let ebuf = reader.get_buffer_slice(range_start..range_end)?; + + // decode the entire buffer at once (with flags and extra bases) + reader + .config + .bitsize + .decode(ebuf, ebuf.len() * scalar, &mut dbuf)?; + + // iterate over each index in the range + for (inner_idx, idx) in (range_start..range_end).enumerate() { + // translate the index + let id_str = translater.format(idx); + + // create the index buffer + let mut header_buf = [0; 20]; + let header_len = id_str.len(); + header_buf[..header_len].copy_from_slice(id_str.as_bytes()); + + // find the buffer starts + let ebuf_start = inner_idx * rsize_u64; + let dbuf_start = inner_idx * dbuf_rsize as usize; + + // initialize the record + let record = BatchRecord { + buffer: &ebuf[ebuf_start..(ebuf_start + rsize_u64)], + dbuf: &dbuf[dbuf_start..(dbuf_start + dbuf_rsize as usize)], + id: idx as u64, + config: reader.config, + header_buf, + header_len, + }; + + // process the record + processor.process_record(record)?; } + + // process the batch + processor.on_batch_complete()?; } - processor.on_batch_complete()?; Ok(()) }); diff --git a/src/context/traits.rs b/src/context/traits.rs index de0dd3d..4308ab5 100644 --- a/src/context/traits.rs +++ b/src/context/traits.rs @@ -22,6 +22,7 @@ pub trait SequenceContext { self.xbuf_mut().clear(); } #[inline] + #[allow(deprecated)] fn fill_sequences(&mut self, record: &R) -> Result<()> { self.clear_sequences(); record.decode_s(self.sbuf_mut())?; diff --git a/src/vbq/reader.rs b/src/vbq/reader.rs index a497d33..a4aaedc 100644 --- a/src/vbq/reader.rs +++ b/src/vbq/reader.rs @@ -662,6 +662,20 @@ impl<'a> BinseqRecord for RefRecord<'a> { } Ok(()) } + + /// Override this method since we can make use of block information + fn sseq(&self) -> &[u8] { + self.block + .get_decoded_s(self.index_in_block) + .expect("Reader was built without batch-decoding") + } + + /// Override this method since we can make use of block information + fn xseq(&self) -> &[u8] { + self.block + .get_decoded_x(self.index_in_block) + .expect("Reader was built without batch-decoding") + } } /// Memory-mapped reader for VBINSEQ files From daa07157285ba13b4e8087394a0c0f71f86bd812 Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:01:08 -0800 Subject: [PATCH 06/13] feat: added examples --- examples/parallel_range.rs | 7 +------ examples/read_write.rs | 17 ++++------------- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/examples/parallel_range.rs b/examples/parallel_range.rs index a7206b6..e17a1d4 100644 --- a/examples/parallel_range.rs +++ b/examples/parallel_range.rs @@ -8,7 +8,6 @@ struct RangeProcessor { tid: Option, range_start: usize, range_end: usize, - sbuf: Vec, } impl RangeProcessor { @@ -18,7 +17,6 @@ impl RangeProcessor { tid: None, range_start, range_end, - sbuf: Vec::new(), } } @@ -34,9 +32,6 @@ impl ParallelProcessor for RangeProcessor { // Print progress every 10,000 records if count % 10_000 == 0 { if let Some(tid) = self.tid { - // Decode the sequence to get its length - self.sbuf.clear(); - record.decode_s(&mut self.sbuf)?; println!( "Thread {}: Processed {} records (Range: {}-{}, Index: {}, Len: {})", tid, @@ -44,7 +39,7 @@ impl ParallelProcessor for RangeProcessor { self.range_start, self.range_end, record.index(), - self.sbuf.len() + record.sseq().len(), ); } } diff --git a/examples/read_write.rs b/examples/read_write.rs index 186acee..729269e 100644 --- a/examples/read_write.rs +++ b/examples/read_write.rs @@ -44,18 +44,16 @@ fn read_write_single(fastq_path: &str, binseq_path: &str, seq_size: usize) -> Re // Read the binary sequence let reader = MmapReader::new(binseq_path)?; let mut num_records_read = 0; - let mut record_buffer = Vec::new(); for idx in 0..reader.num_records() { let record = reader.get(idx)?; - record.decode_s(&mut record_buffer)?; + let seq = record.sseq(); // Check if the decoded sequence matches the original - let buf_str = std::str::from_utf8(&record_buffer)?; + let buf_str = std::str::from_utf8(&seq)?; let seq_str = std::str::from_utf8(&all_sequences[num_records_read])?; assert_eq!(buf_str, seq_str); num_records_read += 1; - record_buffer.clear(); } eprintln!("Finished reading {num_records_read} records (mmap)"); eprintln!( @@ -131,19 +129,15 @@ fn read_write_paired( // Read the binary sequence with mmap let reader = MmapReader::new(binseq_path)?; - let mut sbuf = Vec::new(); - let mut xbuf = Vec::new(); let mut n_processed = 0; for idx in 0..reader.num_records() { let record = reader.get(idx)?; - record.decode_s(&mut sbuf)?; - record.decode_x(&mut xbuf)?; // Check if the decoded sequence matches the original - let s_str = std::str::from_utf8(&sbuf)?; - let x_str = std::str::from_utf8(&xbuf)?; + let s_str = std::str::from_utf8(record.sseq())?; + let x_str = std::str::from_utf8(record.xseq())?; let s_exp = std::str::from_utf8(&r1_storage[n_processed])?; let x_exp = std::str::from_utf8(&r2_storage[n_processed])?; @@ -151,9 +145,6 @@ fn read_write_paired( assert_eq!(s_str, s_exp); assert_eq!(x_str, x_exp); - sbuf.clear(); - xbuf.clear(); - n_processed += 1; } eprintln!("Finished reading {n_processed} records"); From a2f7c2c02f7fd74f78d49c269a3b6eef1ca18835 Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:09:37 -0800 Subject: [PATCH 07/13] docs: update docs --- src/vbq/reader.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/vbq/reader.rs b/src/vbq/reader.rs index a4aaedc..3e3ab12 100644 --- a/src/vbq/reader.rs +++ b/src/vbq/reader.rs @@ -36,17 +36,15 @@ //! let mut block = reader.new_block(); //! //! // Read records with headers and quality scores -//! let mut seq_buffer = Vec::new(); //! while reader.read_block_into(&mut block).unwrap() { //! for record in block.iter() { -//! record.decode_s(&mut seq_buffer).unwrap(); +//! let seq = record.sseq(); //! let header = record.sheader(); //! println!("Header: {}", std::str::from_utf8(header).unwrap()); -//! println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap()); +//! println!("Sequence: {}", std::str::from_utf8(seq).unwrap()); //! if !record.squal().is_empty() { //! println!("Quality: {}", std::str::from_utf8(record.squal()).unwrap()); //! } -//! seq_buffer.clear(); //! } //! } //! ``` @@ -716,6 +714,7 @@ impl<'a> BinseqRecord for RefRecord<'a> { /// use binseq::vbq::MmapReader; /// use binseq::{BinseqRecord, Result}; /// +/// #[allow(deprecated)] /// fn main() -> Result<()> { /// let path = "./data/subset.vbq"; /// let mut reader = MmapReader::new(path)?; // Index loaded automatically From ba56b1c9417385384086715ae7e701d97f3d1a33 Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:13:15 -0800 Subject: [PATCH 08/13] chore(semver): bump - breaking --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c4ee831..b3a3bce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "binseq" -version = "0.7.8" +version = "0.8.0" edition = "2021" description = "A high efficiency binary format for sequencing data" license = "MIT" From 22f16522bd9c097e51c89e4547bc3a627aa4d290 Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:13:27 -0800 Subject: [PATCH 09/13] fix: example --- examples/read_write.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/examples/read_write.rs b/examples/read_write.rs index 729269e..d030242 100644 --- a/examples/read_write.rs +++ b/examples/read_write.rs @@ -131,13 +131,18 @@ fn read_write_paired( let reader = MmapReader::new(binseq_path)?; let mut n_processed = 0; + let mut sbuf = Vec::new(); + let mut xbuf = Vec::new(); for idx in 0..reader.num_records() { let record = reader.get(idx)?; + record.decode_s(&mut sbuf)?; + record.decode_x(&mut xbuf)?; + // Check if the decoded sequence matches the original - let s_str = std::str::from_utf8(record.sseq())?; - let x_str = std::str::from_utf8(record.xseq())?; + let s_str = std::str::from_utf8(&sbuf)?; + let x_str = std::str::from_utf8(&xbuf)?; let s_exp = std::str::from_utf8(&r1_storage[n_processed])?; let x_exp = std::str::from_utf8(&r2_storage[n_processed])?; @@ -146,6 +151,8 @@ fn read_write_paired( assert_eq!(x_str, x_exp); n_processed += 1; + sbuf.clear(); + xbuf.clear(); } eprintln!("Finished reading {n_processed} records"); From 240a89bbd5d0ee483e027c0a05371fe23a902376 Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:14:42 -0800 Subject: [PATCH 10/13] fix: example --- examples/read_write.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/read_write.rs b/examples/read_write.rs index d030242..4e9b7c4 100644 --- a/examples/read_write.rs +++ b/examples/read_write.rs @@ -44,16 +44,18 @@ fn read_write_single(fastq_path: &str, binseq_path: &str, seq_size: usize) -> Re // Read the binary sequence let reader = MmapReader::new(binseq_path)?; let mut num_records_read = 0; + let mut sbuf = Vec::new(); for idx in 0..reader.num_records() { let record = reader.get(idx)?; - let seq = record.sseq(); + record.decode_s(&mut sbuf)?; // Check if the decoded sequence matches the original - let buf_str = std::str::from_utf8(&seq)?; + let buf_str = std::str::from_utf8(&sbuf)?; let seq_str = std::str::from_utf8(&all_sequences[num_records_read])?; assert_eq!(buf_str, seq_str); num_records_read += 1; + sbuf.clear(); } eprintln!("Finished reading {num_records_read} records (mmap)"); eprintln!( From 2e91b16717140702201b83fc60fe1958a0cdb8ad Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:15:43 -0800 Subject: [PATCH 11/13] style(clippy): fix --- src/bq/reader.rs | 8 ++++---- src/context/traits.rs | 4 ++-- src/vbq/reader.rs | 6 ++++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/bq/reader.rs b/src/bq/reader.rs index 924a5f6..0d10170 100644 --- a/src/bq/reader.rs +++ b/src/bq/reader.rs @@ -144,7 +144,7 @@ pub struct BatchRecord<'a> { /// Length of the header in bytes header_len: usize, } -impl<'a> BinseqRecord for BatchRecord<'a> { +impl BinseqRecord for BatchRecord<'_> { fn bitsize(&self) -> BitSize { self.config.bitsize } @@ -202,7 +202,7 @@ impl<'a> BinseqRecord for BatchRecord<'a> { } /// Override this method since we can make use of block information fn xseq(&self) -> &[u8] { - &self.dbuf[self.config.slen() as usize..] + &self.dbuf[self.config.slen()..] } } @@ -870,12 +870,12 @@ impl ParallelReader for MmapReader { // find the buffer starts let ebuf_start = inner_idx * rsize_u64; - let dbuf_start = inner_idx * dbuf_rsize as usize; + let dbuf_start = inner_idx * dbuf_rsize; // initialize the record let record = BatchRecord { buffer: &ebuf[ebuf_start..(ebuf_start + rsize_u64)], - dbuf: &dbuf[dbuf_start..(dbuf_start + dbuf_rsize as usize)], + dbuf: &dbuf[dbuf_start..(dbuf_start + dbuf_rsize)], id: idx as u64, config: reader.config, header_buf, diff --git a/src/context/traits.rs b/src/context/traits.rs index 4308ab5..9712d9d 100644 --- a/src/context/traits.rs +++ b/src/context/traits.rs @@ -101,9 +101,9 @@ pub trait HeaderContext { #[inline] fn fill_headers(&mut self, record: &R) { self.clear_headers(); - self.sheader_mut().extend_from_slice(&record.sheader()); + self.sheader_mut().extend_from_slice(record.sheader()); if record.is_paired() { - self.xheader_mut().extend_from_slice(&record.xheader()); + self.xheader_mut().extend_from_slice(record.xheader()); } } } diff --git a/src/vbq/reader.rs b/src/vbq/reader.rs index 3e3ab12..2973457 100644 --- a/src/vbq/reader.rs +++ b/src/vbq/reader.rs @@ -431,9 +431,9 @@ impl RecordBlock { slen, xlen, s_seq_span, - x_seq_span, s_qual_span, s_header_span, + x_seq_span, x_qual_span, x_header_span, }); @@ -465,6 +465,7 @@ impl RecordBlock { } /// Get decoded primary sequence for a record by index + #[must_use] pub fn get_decoded_s(&self, record_idx: usize) -> Option<&[u8]> { let meta = self.records.get(record_idx)?; if self.dbuf.is_empty() { @@ -484,6 +485,7 @@ impl RecordBlock { } /// Get decoded extended sequence for a record by index + #[must_use] pub fn get_decoded_x(&self, record_idx: usize) -> Option<&[u8]> { let meta = self.records.get(record_idx)?; if meta.xlen == 0 { @@ -586,7 +588,7 @@ pub struct RefRecord<'a> { header_len: usize, } -impl<'a> BinseqRecord for RefRecord<'a> { +impl BinseqRecord for RefRecord<'_> { fn bitsize(&self) -> BitSize { self.bitsize } From 640521510e5a85d9e6f4f0d82226aa4d601c5139 Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:23:34 -0800 Subject: [PATCH 12/13] fix: proper slicing when flags are set in bq --- src/bq/reader.rs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/bq/reader.rs b/src/bq/reader.rs index 0d10170..7b7827f 100644 --- a/src/bq/reader.rs +++ b/src/bq/reader.rs @@ -198,11 +198,21 @@ impl BinseqRecord for BatchRecord<'_> { } /// Override this method since we can make use of block information fn sseq(&self) -> &[u8] { - &self.dbuf[..self.config.slen()] + if self.config.flags { + let scalar = self.config.scalar(); + &self.dbuf[scalar..scalar + self.config.slen()] + } else { + &self.dbuf[..self.config.slen()] + } } /// Override this method since we can make use of block information fn xseq(&self) -> &[u8] { - &self.dbuf[self.config.slen()..] + if self.config.flags { + let scalar = self.config.scalar(); + &self.dbuf[scalar + self.config.slen()..] + } else { + &self.dbuf[self.config.slen()..] + } } } @@ -333,6 +343,14 @@ impl RecordConfig { (self.schunk + self.xchunk) as usize } } + + /// The number of nucleotides per word + pub fn scalar(&self) -> usize { + match self.bitsize { + BitSize::Two => 32, + BitSize::Four => 16, + } + } } /// A memory-mapped reader for binary sequence files @@ -831,10 +849,7 @@ impl ParallelReader for MmapReader { let rsize_u64 = reader.config.record_size_bytes() / 8; // determine the required scalar size - let scalar = match reader.config.bitsize { - BitSize::Two => 32, - BitSize::Four => 16, - }; + let scalar = reader.config.scalar(); // calculate the size of a record in the batch decoded buffer let mut dbuf_rsize = { (reader.config.schunk() + reader.config.xchunk()) * scalar }; From 4270301c4dc9f15e4d03209d50e6cbd985c15e16 Mon Sep 17 00:00:00 2001 From: noam teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:24:37 -0800 Subject: [PATCH 13/13] fix: proper return of index --- src/bq/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bq/reader.rs b/src/bq/reader.rs index 7b7827f..a60889c 100644 --- a/src/bq/reader.rs +++ b/src/bq/reader.rs @@ -97,7 +97,7 @@ impl BinseqRecord for RefRecord<'_> { /// Clear the buffer and fill it with the extended header fn xheader(&self) -> &[u8] { - &self.header_buf[self.header_len..] + self.sheader() } fn flag(&self) -> Option {