use std::{collections::VecDeque, mem}; use tokio::time::Instant; use tracing::trace; use crate::{ encoding::crc16, transport::{ MESSAGE_DEST, MESSAGE_HEADER_SIZE, MESSAGE_LENGTH_MAX, MESSAGE_LENGTH_MIN, MESSAGE_POSITION_SEQ, MESSAGE_SEQ_MASK, MESSAGE_TRAILER_CRC, MESSAGE_TRAILER_SIZE, MESSAGE_VALUE_SYNC, }, }; /// A link-level frame. See [klipper docs](https://www.klipper3d.org/Protocol.html#message-blocks) #[derive(Debug)] pub(crate) struct ReceivedFrame { pub receive_time: Instant, pub sequence: u8, pub payload: Vec, } /// Buffers and reads link-level frames. See [`ReceivedFrame`]. #[derive(Debug, Default)] pub struct FrameReader { buf: VecDeque, partial: PartialFrameState, } impl FrameReader { /// Process new data from `b` pub fn receive_data(&mut self, mut b: &[u8]) { while !b.is_empty() { if let Some((frame, remaining)) = self.partial.receive_data(b) { self.buf.push_back(frame); b = remaining; }; } } /// Attempt to read a parsed frame from the data already processed. pub fn read_frame(&mut self) -> Option { self.buf.pop_front() } } /// State machine for receiving a frame. #[derive(Debug, Default)] pub enum PartialFrameState { /// Waiting to sync with the other side #[default] Unsynced, /// Synchronised and ready to receive data Synced, /// Received length byte, waiting to receive more data. Receiving { /// The total length of this frame, including header and footer. len: usize, /// Accumulated data so far. so_far: Vec, /// When the packet started being received. receive_time: Instant, }, } impl PartialFrameState { pub fn receive_data<'a>(&mut self, mut b: &'a [u8]) -> Option<(ReceivedFrame, &'a [u8])> { while !b.is_empty() { match self { Self::Unsynced => { // Wait for sync byte before doing anything else if let Some(idx) = b.iter().position(|x| *x == MESSAGE_VALUE_SYNC) { *self = Self::Synced; b = &b[idx + 1..]; } } PartialFrameState::Synced => { // Attempt to start a new frame let len = b[0] as usize; if !(MESSAGE_LENGTH_MIN..=MESSAGE_LENGTH_MAX).contains(&len) { *self = Self::Unsynced; continue; } let receive_time = Instant::now(); let mut so_far = Vec::with_capacity(len); so_far.push(b[0]); *self = Self::Receiving { len, so_far, receive_time, }; b = &b[1..]; } PartialFrameState::Receiving { len, so_far, .. } => { // Continue to receive data for frame let len = *len; let take = len - so_far.len(); so_far.extend_from_slice(&b[..take]); if so_far.len() < len { // Frame not yet done, most likely b is empty now. b = &b[take + 1..]; continue; } let seq = so_far[MESSAGE_POSITION_SEQ]; trace!(frame = ?so_far, seq = seq, "Received frame"); // Check validity of frame if seq & !MESSAGE_SEQ_MASK != MESSAGE_DEST { *self = Self::Unsynced; continue; } let actual_crc = crc16(&so_far); let frame_crc = (so_far[len - MESSAGE_TRAILER_CRC] as u16) << 8 | (so_far[len - MESSAGE_TRAILER_CRC + 1] as u16); if frame_crc != actual_crc { *self = Self::Unsynced; continue; } // Set current state back to synced let Self::Receiving { so_far, receive_time, .. } = mem::replace(self, Self::Synced) else { unreachable!() }; // Return received frame and unused buffer. return Some(( ReceivedFrame { receive_time, sequence: seq & MESSAGE_SEQ_MASK, payload: so_far[MESSAGE_HEADER_SIZE..len - MESSAGE_TRAILER_SIZE].into(), }, &b[take..], )); } } } None } }