diff options
Diffstat (limited to 'crates/windlass/src')
-rw-r--r-- | crates/windlass/src/transport/frame.rs (renamed from crates/windlass/src/transport/read.rs) | 47 | ||||
-rw-r--r-- | crates/windlass/src/transport/mod.rs | 53 |
2 files changed, 54 insertions, 46 deletions
diff --git a/crates/windlass/src/transport/read.rs b/crates/windlass/src/transport/frame.rs index 1aaf20c..d33a2d8 100644 --- a/crates/windlass/src/transport/read.rs +++ b/crates/windlass/src/transport/frame.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, mem}; +use std::{collections::VecDeque, mem, sync::Arc}; use tokio::time::Instant; use tracing::trace; @@ -117,7 +117,7 @@ impl PartialFrameState { continue; } - let actual_crc = crc16(&so_far); + let actual_crc = crc16(so_far); let frame_crc = (so_far[len - MESSAGE_TRAILER_CRC] as u16) << 8 | (so_far[len - MESSAGE_TRAILER_CRC + 1] as u16); if frame_crc != actual_crc { @@ -150,3 +150,46 @@ impl PartialFrameState { None } } + +/// A frame to be sent to the MCU +#[derive(Debug, Clone)] +pub struct SentFrame { + /// When the frame was first sent + pub sent_at: Instant, + + /// The payload of the message, including frame header + pub payload: Arc<Vec<u8>>, + + /// True iff the frame has been retransmitted at least once + pub is_retransmit: bool, +} + +impl SentFrame { + pub fn new(sequence: u64, data: &[u8]) -> Result<Self, MessageEncodeError> { + let len = MESSAGE_LENGTH_MIN + data.len(); + if len > MESSAGE_LENGTH_MAX { + return Err(MessageEncodeError::MessageTooLong); + } + + let mut payload = Vec::with_capacity(len); + payload.push(len as u8); + payload.push(MESSAGE_DEST | ((sequence as u8) & MESSAGE_SEQ_MASK)); + payload.extend_from_slice(data); + let crc = crc16(&payload[0..len - MESSAGE_TRAILER_SIZE]); + payload.push(((crc >> 8) & 0xFF) as u8); + payload.push((crc & 0xFF) as u8); + payload.push(MESSAGE_VALUE_SYNC); + + Ok(Self { + sent_at: Instant::now(), + payload: Arc::new(payload), + is_retransmit: false, + }) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum MessageEncodeError { + #[error("message would exceed the maximum packet length of {MESSAGE_LENGTH_MAX} bytes")] + MessageTooLong, +} diff --git a/crates/windlass/src/transport/mod.rs b/crates/windlass/src/transport/mod.rs index 818a594..eecb98d 100644 --- a/crates/windlass/src/transport/mod.rs +++ b/crates/windlass/src/transport/mod.rs @@ -1,6 +1,6 @@ -use read::{FrameReader, ReceivedFrame}; +use frame::{FrameReader, MessageEncodeError, ReceivedFrame, SentFrame}; use rtt::RttState; -use std::{collections::VecDeque, sync::Arc}; +use std::collections::VecDeque; use tokio::io::AsyncReadExt; use tokio::{ io::{AsyncBufRead, AsyncWrite, AsyncWriteExt}, @@ -11,9 +11,7 @@ use tokio::{ }; use tokio_util::{bytes::BytesMut, sync::CancellationToken}; -use crate::encoding::crc16; - -mod read; +mod frame; mod rtt; pub(crate) const MESSAGE_HEADER_SIZE: usize = 2; @@ -177,6 +175,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> { .inflight_messages .front() .map(|msg| msg.sent_at + self.rtt_state.rto()); + let retransmit_timeout: futures::future::OptionFuture<_> = retransmit_deadline.map(sleep_until).into(); pin!(retransmit_timeout); @@ -306,15 +305,12 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> { } } - let frame = Arc::new(encode_frame(self.send_sequence, &buf)?); + self.inflight_messages + .push_back(SentFrame::new(self.send_sequence, &buf)?); self.send_sequence += 1; - self.inflight_messages.push_back(SentFrame { - sent_at: Instant::now(), - sequence: self.send_sequence, - payload: frame.clone(), - is_retransmit: false, - }); - self.wr.write_all(&frame).await?; + self.wr + .write_all(&self.inflight_messages.back().unwrap().payload) + .await?; Ok(()) } @@ -361,34 +357,3 @@ pub enum TransmitterError { #[error("connection closed")] ConnectionClosed, } - -#[derive(Debug, Clone)] -pub(crate) struct SentFrame { - pub sent_at: Instant, - #[allow(dead_code)] - pub sequence: u64, - pub payload: Arc<Vec<u8>>, - pub is_retransmit: bool, -} - -#[derive(thiserror::Error, Debug)] -pub enum MessageEncodeError { - #[error("message would exceed the maximum packet length of {MESSAGE_LENGTH_MAX} bytes")] - MessageTooLong, -} - -fn encode_frame(sequence: u64, payload: &[u8]) -> Result<Vec<u8>, MessageEncodeError> { - let len = MESSAGE_LENGTH_MIN + payload.len(); - if len > MESSAGE_LENGTH_MAX { - return Err(MessageEncodeError::MessageTooLong); - } - let mut buf = Vec::with_capacity(len); - buf.push(len as u8); - buf.push(MESSAGE_DEST | ((sequence as u8) & MESSAGE_SEQ_MASK)); - buf.extend_from_slice(payload); - let crc = crc16(&buf[0..len - MESSAGE_TRAILER_SIZE]); - buf.push(((crc >> 8) & 0xFF) as u8); - buf.push((crc & 0xFF) as u8); - buf.push(MESSAGE_VALUE_SYNC); - Ok(buf) -} |