summaryrefslogtreecommitdiff
path: root/crates/windlass/src/transport
diff options
context:
space:
mode:
authortcmal <me@aria.rip>2024-10-01 15:42:50 +0100
committertcmal <me@aria.rip>2024-10-01 16:09:47 +0100
commitd7275a13989b82ea8fc7b2320ef0cf2f427f7d5d (patch)
tree6096c6148879eed35e9ef576e2bd91d582032b57 /crates/windlass/src/transport
parent5861777f2a6e6e1a00670deb4502f7a1587f30c5 (diff)
Refactor out frame encoding stuff
Diffstat (limited to 'crates/windlass/src/transport')
-rw-r--r--crates/windlass/src/transport/frame.rs (renamed from crates/windlass/src/transport/read.rs)47
-rw-r--r--crates/windlass/src/transport/mod.rs53
2 files changed, 54 insertions, 46 deletions
diff --git a/crates/windlass/src/transport/read.rs b/crates/windlass/src/transport/frame.rs
index 1aaf20c..d33a2d8 100644
--- a/crates/windlass/src/transport/read.rs
+++ b/crates/windlass/src/transport/frame.rs
@@ -1,4 +1,4 @@
-use std::{collections::VecDeque, mem};
+use std::{collections::VecDeque, mem, sync::Arc};
use tokio::time::Instant;
use tracing::trace;
@@ -117,7 +117,7 @@ impl PartialFrameState {
continue;
}
- let actual_crc = crc16(&so_far);
+ let actual_crc = crc16(so_far);
let frame_crc = (so_far[len - MESSAGE_TRAILER_CRC] as u16) << 8
| (so_far[len - MESSAGE_TRAILER_CRC + 1] as u16);
if frame_crc != actual_crc {
@@ -150,3 +150,46 @@ impl PartialFrameState {
None
}
}
+
+/// A frame to be sent to the MCU
+#[derive(Debug, Clone)]
+pub struct SentFrame {
+ /// When the frame was first sent
+ pub sent_at: Instant,
+
+ /// The payload of the message, including frame header
+ pub payload: Arc<Vec<u8>>,
+
+ /// True iff the frame has been retransmitted at least once
+ pub is_retransmit: bool,
+}
+
+impl SentFrame {
+ pub fn new(sequence: u64, data: &[u8]) -> Result<Self, MessageEncodeError> {
+ let len = MESSAGE_LENGTH_MIN + data.len();
+ if len > MESSAGE_LENGTH_MAX {
+ return Err(MessageEncodeError::MessageTooLong);
+ }
+
+ let mut payload = Vec::with_capacity(len);
+ payload.push(len as u8);
+ payload.push(MESSAGE_DEST | ((sequence as u8) & MESSAGE_SEQ_MASK));
+ payload.extend_from_slice(data);
+ let crc = crc16(&payload[0..len - MESSAGE_TRAILER_SIZE]);
+ payload.push(((crc >> 8) & 0xFF) as u8);
+ payload.push((crc & 0xFF) as u8);
+ payload.push(MESSAGE_VALUE_SYNC);
+
+ Ok(Self {
+ sent_at: Instant::now(),
+ payload: Arc::new(payload),
+ is_retransmit: false,
+ })
+ }
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum MessageEncodeError {
+ #[error("message would exceed the maximum packet length of {MESSAGE_LENGTH_MAX} bytes")]
+ MessageTooLong,
+}
diff --git a/crates/windlass/src/transport/mod.rs b/crates/windlass/src/transport/mod.rs
index 818a594..eecb98d 100644
--- a/crates/windlass/src/transport/mod.rs
+++ b/crates/windlass/src/transport/mod.rs
@@ -1,6 +1,6 @@
-use read::{FrameReader, ReceivedFrame};
+use frame::{FrameReader, MessageEncodeError, ReceivedFrame, SentFrame};
use rtt::RttState;
-use std::{collections::VecDeque, sync::Arc};
+use std::collections::VecDeque;
use tokio::io::AsyncReadExt;
use tokio::{
io::{AsyncBufRead, AsyncWrite, AsyncWriteExt},
@@ -11,9 +11,7 @@ use tokio::{
};
use tokio_util::{bytes::BytesMut, sync::CancellationToken};
-use crate::encoding::crc16;
-
-mod read;
+mod frame;
mod rtt;
pub(crate) const MESSAGE_HEADER_SIZE: usize = 2;
@@ -177,6 +175,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
.inflight_messages
.front()
.map(|msg| msg.sent_at + self.rtt_state.rto());
+
let retransmit_timeout: futures::future::OptionFuture<_> =
retransmit_deadline.map(sleep_until).into();
pin!(retransmit_timeout);
@@ -306,15 +305,12 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
}
}
- let frame = Arc::new(encode_frame(self.send_sequence, &buf)?);
+ self.inflight_messages
+ .push_back(SentFrame::new(self.send_sequence, &buf)?);
self.send_sequence += 1;
- self.inflight_messages.push_back(SentFrame {
- sent_at: Instant::now(),
- sequence: self.send_sequence,
- payload: frame.clone(),
- is_retransmit: false,
- });
- self.wr.write_all(&frame).await?;
+ self.wr
+ .write_all(&self.inflight_messages.back().unwrap().payload)
+ .await?;
Ok(())
}
@@ -361,34 +357,3 @@ pub enum TransmitterError {
#[error("connection closed")]
ConnectionClosed,
}
-
-#[derive(Debug, Clone)]
-pub(crate) struct SentFrame {
- pub sent_at: Instant,
- #[allow(dead_code)]
- pub sequence: u64,
- pub payload: Arc<Vec<u8>>,
- pub is_retransmit: bool,
-}
-
-#[derive(thiserror::Error, Debug)]
-pub enum MessageEncodeError {
- #[error("message would exceed the maximum packet length of {MESSAGE_LENGTH_MAX} bytes")]
- MessageTooLong,
-}
-
-fn encode_frame(sequence: u64, payload: &[u8]) -> Result<Vec<u8>, MessageEncodeError> {
- let len = MESSAGE_LENGTH_MIN + payload.len();
- if len > MESSAGE_LENGTH_MAX {
- return Err(MessageEncodeError::MessageTooLong);
- }
- let mut buf = Vec::with_capacity(len);
- buf.push(len as u8);
- buf.push(MESSAGE_DEST | ((sequence as u8) & MESSAGE_SEQ_MASK));
- buf.extend_from_slice(payload);
- let crc = crc16(&buf[0..len - MESSAGE_TRAILER_SIZE]);
- buf.push(((crc >> 8) & 0xFF) as u8);
- buf.push((crc & 0xFF) as u8);
- buf.push(MESSAGE_VALUE_SYNC);
- Ok(buf)
-}