summaryrefslogtreecommitdiff
path: root/crates/windlass/src/transport/frame.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/windlass/src/transport/frame.rs')
-rw-r--r--crates/windlass/src/transport/frame.rs195
1 files changed, 195 insertions, 0 deletions
diff --git a/crates/windlass/src/transport/frame.rs b/crates/windlass/src/transport/frame.rs
new file mode 100644
index 0000000..d33a2d8
--- /dev/null
+++ b/crates/windlass/src/transport/frame.rs
@@ -0,0 +1,195 @@
+use std::{collections::VecDeque, mem, sync::Arc};
+
+use tokio::time::Instant;
+use tracing::trace;
+
+use crate::{
+ encoding::crc16,
+ transport::{
+ MESSAGE_DEST, MESSAGE_HEADER_SIZE, MESSAGE_LENGTH_MAX, MESSAGE_LENGTH_MIN,
+ MESSAGE_POSITION_SEQ, MESSAGE_SEQ_MASK, MESSAGE_TRAILER_CRC, MESSAGE_TRAILER_SIZE,
+ MESSAGE_VALUE_SYNC,
+ },
+};
+
+/// A link-level frame. See [klipper docs](https://www.klipper3d.org/Protocol.html#message-blocks)
+#[derive(Debug)]
+pub(crate) struct ReceivedFrame {
+ pub receive_time: Instant,
+ pub sequence: u8,
+ pub payload: Vec<u8>,
+}
+
+/// Buffers and reads link-level frames. See [`ReceivedFrame`].
+#[derive(Debug, Default)]
+pub struct FrameReader {
+ buf: VecDeque<ReceivedFrame>,
+ partial: PartialFrameState,
+}
+
+impl FrameReader {
+ /// Process new data from `b`
+ pub fn receive_data(&mut self, mut b: &[u8]) {
+ while !b.is_empty() {
+ if let Some((frame, remaining)) = self.partial.receive_data(b) {
+ self.buf.push_back(frame);
+ b = remaining;
+ };
+ }
+ }
+
+ /// Attempt to read a parsed frame from the data already processed.
+ pub fn read_frame(&mut self) -> Option<ReceivedFrame> {
+ self.buf.pop_front()
+ }
+}
+
+/// State machine for receiving a frame.
+#[derive(Debug, Default)]
+pub enum PartialFrameState {
+ /// Waiting to sync with the other side
+ #[default]
+ Unsynced,
+
+ /// Synchronised and ready to receive data
+ Synced,
+
+ /// Received length byte, waiting to receive more data.
+ Receiving {
+ /// The total length of this frame, including header and footer.
+ len: usize,
+
+ /// Accumulated data so far.
+ so_far: Vec<u8>,
+
+ /// When the packet started being received.
+ receive_time: Instant,
+ },
+}
+
+impl PartialFrameState {
+ pub fn receive_data<'a>(&mut self, mut b: &'a [u8]) -> Option<(ReceivedFrame, &'a [u8])> {
+ while !b.is_empty() {
+ match self {
+ Self::Unsynced => {
+ // Wait for sync byte before doing anything else
+ if let Some(idx) = b.iter().position(|x| *x == MESSAGE_VALUE_SYNC) {
+ *self = Self::Synced;
+ b = &b[idx + 1..];
+ }
+ }
+ PartialFrameState::Synced => {
+ // Attempt to start a new frame
+ let len = b[0] as usize;
+ if !(MESSAGE_LENGTH_MIN..=MESSAGE_LENGTH_MAX).contains(&len) {
+ *self = Self::Unsynced;
+ continue;
+ }
+
+ let receive_time = Instant::now();
+ let mut so_far = Vec::with_capacity(len);
+ so_far.push(b[0]);
+ *self = Self::Receiving {
+ len,
+ so_far,
+ receive_time,
+ };
+ b = &b[1..];
+ }
+
+ PartialFrameState::Receiving { len, so_far, .. } => {
+ // Continue to receive data for frame
+ let len = *len;
+ let take = len - so_far.len();
+ so_far.extend_from_slice(&b[..take]);
+ if so_far.len() < len {
+ // Frame not yet done, most likely b is empty now.
+ b = &b[take + 1..];
+ continue;
+ }
+
+ let seq = so_far[MESSAGE_POSITION_SEQ];
+ trace!(frame = ?so_far, seq = seq, "Received frame");
+
+ // Check validity of frame
+ if seq & !MESSAGE_SEQ_MASK != MESSAGE_DEST {
+ *self = Self::Unsynced;
+ continue;
+ }
+
+ let actual_crc = crc16(so_far);
+ let frame_crc = (so_far[len - MESSAGE_TRAILER_CRC] as u16) << 8
+ | (so_far[len - MESSAGE_TRAILER_CRC + 1] as u16);
+ if frame_crc != actual_crc {
+ *self = Self::Unsynced;
+ continue;
+ }
+
+ // Set current state back to synced
+ let Self::Receiving {
+ so_far,
+ receive_time,
+ ..
+ } = mem::replace(self, Self::Synced)
+ else {
+ unreachable!()
+ };
+
+ // Return received frame and unused buffer.
+ return Some((
+ ReceivedFrame {
+ receive_time,
+ sequence: seq & MESSAGE_SEQ_MASK,
+ payload: so_far[MESSAGE_HEADER_SIZE..len - MESSAGE_TRAILER_SIZE].into(),
+ },
+ &b[take..],
+ ));
+ }
+ }
+ }
+ None
+ }
+}
+
+/// A frame to be sent to the MCU
+#[derive(Debug, Clone)]
+pub struct SentFrame {
+ /// When the frame was first sent
+ pub sent_at: Instant,
+
+ /// The payload of the message, including frame header
+ pub payload: Arc<Vec<u8>>,
+
+ /// True iff the frame has been retransmitted at least once
+ pub is_retransmit: bool,
+}
+
+impl SentFrame {
+ pub fn new(sequence: u64, data: &[u8]) -> Result<Self, MessageEncodeError> {
+ let len = MESSAGE_LENGTH_MIN + data.len();
+ if len > MESSAGE_LENGTH_MAX {
+ return Err(MessageEncodeError::MessageTooLong);
+ }
+
+ let mut payload = Vec::with_capacity(len);
+ payload.push(len as u8);
+ payload.push(MESSAGE_DEST | ((sequence as u8) & MESSAGE_SEQ_MASK));
+ payload.extend_from_slice(data);
+ let crc = crc16(&payload[0..len - MESSAGE_TRAILER_SIZE]);
+ payload.push(((crc >> 8) & 0xFF) as u8);
+ payload.push((crc & 0xFF) as u8);
+ payload.push(MESSAGE_VALUE_SYNC);
+
+ Ok(Self {
+ sent_at: Instant::now(),
+ payload: Arc::new(payload),
+ is_retransmit: false,
+ })
+ }
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum MessageEncodeError {
+ #[error("message would exceed the maximum packet length of {MESSAGE_LENGTH_MAX} bytes")]
+ MessageTooLong,
+}