diff options
Diffstat (limited to 'crates/windlass/src/transport/frame.rs')
-rw-r--r-- | crates/windlass/src/transport/frame.rs | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/crates/windlass/src/transport/frame.rs b/crates/windlass/src/transport/frame.rs new file mode 100644 index 0000000..d33a2d8 --- /dev/null +++ b/crates/windlass/src/transport/frame.rs @@ -0,0 +1,195 @@ +use std::{collections::VecDeque, mem, sync::Arc}; + +use tokio::time::Instant; +use tracing::trace; + +use crate::{ + encoding::crc16, + transport::{ + MESSAGE_DEST, MESSAGE_HEADER_SIZE, MESSAGE_LENGTH_MAX, MESSAGE_LENGTH_MIN, + MESSAGE_POSITION_SEQ, MESSAGE_SEQ_MASK, MESSAGE_TRAILER_CRC, MESSAGE_TRAILER_SIZE, + MESSAGE_VALUE_SYNC, + }, +}; + +/// A link-level frame. See [klipper docs](https://www.klipper3d.org/Protocol.html#message-blocks) +#[derive(Debug)] +pub(crate) struct ReceivedFrame { + pub receive_time: Instant, + pub sequence: u8, + pub payload: Vec<u8>, +} + +/// Buffers and reads link-level frames. See [`ReceivedFrame`]. +#[derive(Debug, Default)] +pub struct FrameReader { + buf: VecDeque<ReceivedFrame>, + partial: PartialFrameState, +} + +impl FrameReader { + /// Process new data from `b` + pub fn receive_data(&mut self, mut b: &[u8]) { + while !b.is_empty() { + if let Some((frame, remaining)) = self.partial.receive_data(b) { + self.buf.push_back(frame); + b = remaining; + }; + } + } + + /// Attempt to read a parsed frame from the data already processed. + pub fn read_frame(&mut self) -> Option<ReceivedFrame> { + self.buf.pop_front() + } +} + +/// State machine for receiving a frame. +#[derive(Debug, Default)] +pub enum PartialFrameState { + /// Waiting to sync with the other side + #[default] + Unsynced, + + /// Synchronised and ready to receive data + Synced, + + /// Received length byte, waiting to receive more data. + Receiving { + /// The total length of this frame, including header and footer. + len: usize, + + /// Accumulated data so far. + so_far: Vec<u8>, + + /// When the packet started being received. + receive_time: Instant, + }, +} + +impl PartialFrameState { + pub fn receive_data<'a>(&mut self, mut b: &'a [u8]) -> Option<(ReceivedFrame, &'a [u8])> { + while !b.is_empty() { + match self { + Self::Unsynced => { + // Wait for sync byte before doing anything else + if let Some(idx) = b.iter().position(|x| *x == MESSAGE_VALUE_SYNC) { + *self = Self::Synced; + b = &b[idx + 1..]; + } + } + PartialFrameState::Synced => { + // Attempt to start a new frame + let len = b[0] as usize; + if !(MESSAGE_LENGTH_MIN..=MESSAGE_LENGTH_MAX).contains(&len) { + *self = Self::Unsynced; + continue; + } + + let receive_time = Instant::now(); + let mut so_far = Vec::with_capacity(len); + so_far.push(b[0]); + *self = Self::Receiving { + len, + so_far, + receive_time, + }; + b = &b[1..]; + } + + PartialFrameState::Receiving { len, so_far, .. } => { + // Continue to receive data for frame + let len = *len; + let take = len - so_far.len(); + so_far.extend_from_slice(&b[..take]); + if so_far.len() < len { + // Frame not yet done, most likely b is empty now. + b = &b[take + 1..]; + continue; + } + + let seq = so_far[MESSAGE_POSITION_SEQ]; + trace!(frame = ?so_far, seq = seq, "Received frame"); + + // Check validity of frame + if seq & !MESSAGE_SEQ_MASK != MESSAGE_DEST { + *self = Self::Unsynced; + continue; + } + + let actual_crc = crc16(so_far); + let frame_crc = (so_far[len - MESSAGE_TRAILER_CRC] as u16) << 8 + | (so_far[len - MESSAGE_TRAILER_CRC + 1] as u16); + if frame_crc != actual_crc { + *self = Self::Unsynced; + continue; + } + + // Set current state back to synced + let Self::Receiving { + so_far, + receive_time, + .. + } = mem::replace(self, Self::Synced) + else { + unreachable!() + }; + + // Return received frame and unused buffer. + return Some(( + ReceivedFrame { + receive_time, + sequence: seq & MESSAGE_SEQ_MASK, + payload: so_far[MESSAGE_HEADER_SIZE..len - MESSAGE_TRAILER_SIZE].into(), + }, + &b[take..], + )); + } + } + } + None + } +} + +/// A frame to be sent to the MCU +#[derive(Debug, Clone)] +pub struct SentFrame { + /// When the frame was first sent + pub sent_at: Instant, + + /// The payload of the message, including frame header + pub payload: Arc<Vec<u8>>, + + /// True iff the frame has been retransmitted at least once + pub is_retransmit: bool, +} + +impl SentFrame { + pub fn new(sequence: u64, data: &[u8]) -> Result<Self, MessageEncodeError> { + let len = MESSAGE_LENGTH_MIN + data.len(); + if len > MESSAGE_LENGTH_MAX { + return Err(MessageEncodeError::MessageTooLong); + } + + let mut payload = Vec::with_capacity(len); + payload.push(len as u8); + payload.push(MESSAGE_DEST | ((sequence as u8) & MESSAGE_SEQ_MASK)); + payload.extend_from_slice(data); + let crc = crc16(&payload[0..len - MESSAGE_TRAILER_SIZE]); + payload.push(((crc >> 8) & 0xFF) as u8); + payload.push((crc & 0xFF) as u8); + payload.push(MESSAGE_VALUE_SYNC); + + Ok(Self { + sent_at: Instant::now(), + payload: Arc::new(payload), + is_retransmit: false, + }) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum MessageEncodeError { + #[error("message would exceed the maximum packet length of {MESSAGE_LENGTH_MAX} bytes")] + MessageTooLong, +} |