summaryrefslogtreecommitdiff
path: root/crates/windlass/src/transport/mod.rs
diff options
context:
space:
mode:
authortcmal <me@aria.rip>2024-10-01 15:42:50 +0100
committertcmal <me@aria.rip>2024-10-01 16:09:47 +0100
commitd7275a13989b82ea8fc7b2320ef0cf2f427f7d5d (patch)
tree6096c6148879eed35e9ef576e2bd91d582032b57 /crates/windlass/src/transport/mod.rs
parent5861777f2a6e6e1a00670deb4502f7a1587f30c5 (diff)
Refactor out frame encoding stuff
Diffstat (limited to 'crates/windlass/src/transport/mod.rs')
-rw-r--r--crates/windlass/src/transport/mod.rs53
1 files changed, 9 insertions, 44 deletions
diff --git a/crates/windlass/src/transport/mod.rs b/crates/windlass/src/transport/mod.rs
index 818a594..eecb98d 100644
--- a/crates/windlass/src/transport/mod.rs
+++ b/crates/windlass/src/transport/mod.rs
@@ -1,6 +1,6 @@
-use read::{FrameReader, ReceivedFrame};
+use frame::{FrameReader, MessageEncodeError, ReceivedFrame, SentFrame};
use rtt::RttState;
-use std::{collections::VecDeque, sync::Arc};
+use std::collections::VecDeque;
use tokio::io::AsyncReadExt;
use tokio::{
io::{AsyncBufRead, AsyncWrite, AsyncWriteExt},
@@ -11,9 +11,7 @@ use tokio::{
};
use tokio_util::{bytes::BytesMut, sync::CancellationToken};
-use crate::encoding::crc16;
-
-mod read;
+mod frame;
mod rtt;
pub(crate) const MESSAGE_HEADER_SIZE: usize = 2;
@@ -177,6 +175,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
.inflight_messages
.front()
.map(|msg| msg.sent_at + self.rtt_state.rto());
+
let retransmit_timeout: futures::future::OptionFuture<_> =
retransmit_deadline.map(sleep_until).into();
pin!(retransmit_timeout);
@@ -306,15 +305,12 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
}
}
- let frame = Arc::new(encode_frame(self.send_sequence, &buf)?);
+ self.inflight_messages
+ .push_back(SentFrame::new(self.send_sequence, &buf)?);
self.send_sequence += 1;
- self.inflight_messages.push_back(SentFrame {
- sent_at: Instant::now(),
- sequence: self.send_sequence,
- payload: frame.clone(),
- is_retransmit: false,
- });
- self.wr.write_all(&frame).await?;
+ self.wr
+ .write_all(&self.inflight_messages.back().unwrap().payload)
+ .await?;
Ok(())
}
@@ -361,34 +357,3 @@ pub enum TransmitterError {
#[error("connection closed")]
ConnectionClosed,
}
-
-#[derive(Debug, Clone)]
-pub(crate) struct SentFrame {
- pub sent_at: Instant,
- #[allow(dead_code)]
- pub sequence: u64,
- pub payload: Arc<Vec<u8>>,
- pub is_retransmit: bool,
-}
-
-#[derive(thiserror::Error, Debug)]
-pub enum MessageEncodeError {
- #[error("message would exceed the maximum packet length of {MESSAGE_LENGTH_MAX} bytes")]
- MessageTooLong,
-}
-
-fn encode_frame(sequence: u64, payload: &[u8]) -> Result<Vec<u8>, MessageEncodeError> {
- let len = MESSAGE_LENGTH_MIN + payload.len();
- if len > MESSAGE_LENGTH_MAX {
- return Err(MessageEncodeError::MessageTooLong);
- }
- let mut buf = Vec::with_capacity(len);
- buf.push(len as u8);
- buf.push(MESSAGE_DEST | ((sequence as u8) & MESSAGE_SEQ_MASK));
- buf.extend_from_slice(payload);
- let crc = crc16(&buf[0..len - MESSAGE_TRAILER_SIZE]);
- buf.push(((crc >> 8) & 0xFF) as u8);
- buf.push((crc & 0xFF) as u8);
- buf.push(MESSAGE_VALUE_SYNC);
- Ok(buf)
-}