summaryrefslogtreecommitdiff
path: root/crates/windlass/src/transport/mod.rs
diff options
context:
space:
mode:
authortcmal <me@aria.rip>2024-10-01 15:42:50 +0100
committertcmal <me@aria.rip>2024-10-11 23:57:19 +0100
commit34572a8f3839e37063641fa693029ad52265730a (patch)
treedd81dfba798039acb638aa1945a58cff023f1c38 /crates/windlass/src/transport/mod.rs
parentd7275a13989b82ea8fc7b2320ef0cf2f427f7d5d (diff)
Stricter clippy lints
Diffstat (limited to 'crates/windlass/src/transport/mod.rs')
-rw-r--r--crates/windlass/src/transport/mod.rs59
1 files changed, 30 insertions, 29 deletions
diff --git a/crates/windlass/src/transport/mod.rs b/crates/windlass/src/transport/mod.rs
index eecb98d..8a24b8a 100644
--- a/crates/windlass/src/transport/mod.rs
+++ b/crates/windlass/src/transport/mod.rs
@@ -1,4 +1,4 @@
-use frame::{FrameReader, MessageEncodeError, ReceivedFrame, SentFrame};
+use frame::{MessageEncodeError, Reader, ReceivedFrame, SentFrame};
use rtt::RttState;
use std::collections::VecDeque;
use tokio::io::AsyncReadExt;
@@ -14,16 +14,16 @@ use tokio_util::{bytes::BytesMut, sync::CancellationToken};
mod frame;
mod rtt;
-pub(crate) const MESSAGE_HEADER_SIZE: usize = 2;
-pub(crate) const MESSAGE_TRAILER_SIZE: usize = 3;
-pub(crate) const MESSAGE_LENGTH_MIN: usize = MESSAGE_HEADER_SIZE + MESSAGE_TRAILER_SIZE;
-pub(crate) const MESSAGE_LENGTH_MAX: usize = 64;
-pub(crate) const MESSAGE_LENGTH_PAYLOAD_MAX: usize = MESSAGE_LENGTH_MAX - MESSAGE_LENGTH_MIN;
-pub(crate) const MESSAGE_POSITION_SEQ: usize = 1;
-pub(crate) const MESSAGE_TRAILER_CRC: usize = 3;
-pub(crate) const MESSAGE_VALUE_SYNC: u8 = 0x7E;
-pub(crate) const MESSAGE_DEST: u8 = 0x10;
-pub(crate) const MESSAGE_SEQ_MASK: u8 = 0x0F;
+pub const MESSAGE_HEADER_SIZE: usize = 2;
+pub const MESSAGE_TRAILER_SIZE: usize = 3;
+pub const MESSAGE_LENGTH_MIN: usize = MESSAGE_HEADER_SIZE + MESSAGE_TRAILER_SIZE;
+pub const MESSAGE_LENGTH_MAX: usize = 64;
+pub const MESSAGE_LENGTH_PAYLOAD_MAX: usize = MESSAGE_LENGTH_MAX - MESSAGE_LENGTH_MIN;
+pub const MESSAGE_POSITION_SEQ: usize = 1;
+pub const MESSAGE_TRAILER_CRC: usize = 3;
+pub const MESSAGE_VALUE_SYNC: u8 = 0x7E;
+pub const MESSAGE_DEST: u8 = 0x10;
+pub const MESSAGE_SEQ_MASK: u8 = 0x0F;
/// Wrapper around a connection to a klipper firmware MCU, which deals with
/// retransmission, flow control, etc.
@@ -46,13 +46,14 @@ enum TransportCommand {
Exit,
}
-pub(crate) type TransportReceiver = UnboundedReceiver<Result<Vec<u8>, TransportError>>;
+#[allow(clippy::module_name_repetitions)]
+pub type TransportReceiver = UnboundedReceiver<Result<Vec<u8>, Error>>;
impl Transport {
- pub(crate) async fn connect(
+ pub(crate) fn connect(
rdr: impl AsyncBufRead + Unpin + Send + 'static,
wr: impl AsyncWrite + Unpin + Send + 'static,
- ) -> (Transport, TransportReceiver) {
+ ) -> (Self, TransportReceiver) {
let (data_send, data_recv) = unbounded_channel();
let (cmd_send, cmd_recv) = unbounded_channel();
@@ -65,7 +66,7 @@ impl Transport {
});
(
- Transport {
+ Self {
task_inner,
cmd_send,
},
@@ -86,7 +87,7 @@ impl Transport {
}
#[derive(thiserror::Error, Debug)]
-pub enum TransportError {
+pub enum Error {
#[error("message encoding failed: {0}")]
MessageEncode(#[from] MessageEncodeError),
@@ -94,17 +95,17 @@ pub enum TransportError {
Transmitter(#[from] TransmitterError),
#[error("io error: {0}")]
- IOError(#[from] std::io::Error),
+ IO(#[from] std::io::Error),
}
/// State for the task which deals with transport state
#[derive(Debug)]
struct TransportState<R, W> {
- frdr: FrameReader,
+ frdr: Reader,
rdr: R,
wr: W,
- data_send: UnboundedSender<Result<Vec<u8>, TransportError>>,
+ data_send: UnboundedSender<Result<Vec<u8>, Error>>,
cmd_recv: UnboundedReceiver<TransportCommand>,
cancel: CancellationToken,
@@ -128,12 +129,12 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
fn new(
rdr: R,
wr: W,
- data_send: UnboundedSender<Result<Vec<u8>, TransportError>>,
+ data_send: UnboundedSender<Result<Vec<u8>, Error>>,
cmd_recv: UnboundedReceiver<TransportCommand>,
cancel: CancellationToken,
) -> Self {
Self {
- frdr: FrameReader::default(),
+ frdr: Reader::default(),
rdr,
wr,
data_send,
@@ -156,7 +157,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
}
}
- async fn run(&mut self) -> Result<(), TransportError> {
+ async fn run(&mut self) -> Result<(), Error> {
let mut buf = BytesMut::with_capacity(MESSAGE_LENGTH_MAX);
loop {
if self.retransmit_now {
@@ -210,7 +211,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
// Timeout for when we are able to send again
}
- _ = self.cancel.cancelled() => {
+ () = self.cancel.cancelled() => {
break;
},
}
@@ -223,9 +224,9 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
let rseq = self.receive_sequence;
// wrap-around logic(?)
- let mut sequence = (rseq & !(MESSAGE_SEQ_MASK as u64)) | (frame.sequence as u64);
+ let mut sequence = (rseq & !u64::from(MESSAGE_SEQ_MASK)) | u64::from(frame.sequence);
if sequence < rseq {
- sequence += (MESSAGE_SEQ_MASK as u64) + 1;
+ sequence += u64::from(MESSAGE_SEQ_MASK) + 1;
}
// Frame acknowledges some messages
@@ -284,7 +285,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
}
/// Send as many more frames as possible from [`self.pending_messages`]
- async fn send_more_frames(&mut self) -> Result<(), TransportError> {
+ async fn send_more_frames(&mut self) -> Result<(), Error> {
while self.can_send() && !self.pending_messages.is_empty() {
self.send_new_frame().await?;
}
@@ -293,7 +294,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
}
/// Send a single new frame from [`self.pending_messages`]
- async fn send_new_frame(&mut self) -> Result<(), TransportError> {
+ async fn send_new_frame(&mut self) -> Result<(), Error> {
let mut buf = Vec::new();
while let Some(next) = self.pending_messages.front() {
if !buf.is_empty() && buf.len() + next.len() <= MESSAGE_LENGTH_PAYLOAD_MAX {
@@ -316,7 +317,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
}
/// Retransmit all inflight messages
- async fn retransmit_pending(&mut self) -> Result<(), TransportError> {
+ async fn retransmit_pending(&mut self) -> Result<(), Error> {
let len: usize = self
.inflight_messages
.iter()
@@ -325,7 +326,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> {
let mut buf = Vec::with_capacity(1 + len);
buf.push(MESSAGE_VALUE_SYNC);
let now = Instant::now();
- for msg in self.inflight_messages.iter_mut() {
+ for msg in &mut self.inflight_messages {
buf.extend_from_slice(&msg.payload);
msg.is_retransmit = true;
msg.sent_at = now;