diff options
Diffstat (limited to 'crates/windlass/src/transport/mod.rs')
-rw-r--r-- | crates/windlass/src/transport/mod.rs | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/crates/windlass/src/transport/mod.rs b/crates/windlass/src/transport/mod.rs index f5c5fc3..6f2cc7c 100644 --- a/crates/windlass/src/transport/mod.rs +++ b/crates/windlass/src/transport/mod.rs @@ -1,5 +1,6 @@ -use read::{FrameReader, ReceivedFrame, ReceiverError}; +use read::{FrameReader, ReceivedFrame}; use std::{collections::VecDeque, sync::Arc, time::Duration}; +use tokio::io::AsyncReadExt; use tokio::{ io::{AsyncBufRead, AsyncWrite, AsyncWriteExt}, pin, select, spawn, @@ -7,7 +8,7 @@ use tokio::{ task::JoinHandle, time::{sleep_until, Instant}, }; -use tokio_util::sync::CancellationToken; +use tokio_util::{bytes::BytesMut, sync::CancellationToken}; use crate::encoding::crc16; @@ -89,9 +90,6 @@ pub enum TransportError { #[error("message encoding failed: {0}")] MessageEncode(#[from] MessageEncodeError), - #[error("receiver error: {0}")] - Receiver(#[from] ReceiverError), - #[error("transmitter error: {0}")] Transmitter(#[from] TransmitterError), @@ -144,7 +142,8 @@ impl RttState { /// State for the task which deals with transport state #[derive(Debug)] struct TransportState<R, W> { - rdr: FrameReader<R>, + frdr: FrameReader, + rdr: R, wr: W, data_send: UnboundedSender<Result<Vec<u8>, TransportError>>, @@ -176,7 +175,8 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> { cancel: CancellationToken, ) -> Self { Self { - rdr: FrameReader::new(rdr), + frdr: FrameReader::default(), + rdr, wr, data_send, cmd_recv, @@ -199,6 +199,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> { } async fn run(&mut self) -> Result<(), TransportError> { + let mut buf = BytesMut::with_capacity(MESSAGE_LENGTH_MAX); loop { if self.retransmit_now { self.retransmit_pending().await?; @@ -208,6 +209,10 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> { self.send_more_frames().await?; } + while let Some(frame) = self.frdr.read_frame() { + self.handle_frame(frame); + } + let retransmit_deadline = self .inflight_messages .front() @@ -220,15 +225,10 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> TransportState<R, W> { self.corked_until.map(sleep_until).into(); pin!(corked_timeout); - // FIXME: This is not correct because read_frame is not cancellation safe select! { - frame = self.rdr.read_frame() => { - let frame = frame?; - let frame = match frame { - Some(frame) => frame, - None => break, - }; - self.handle_frame(frame); + _ = self.rdr.read_buf(&mut buf) => { + self.frdr.receive_data(&buf); + buf.clear(); }, msg = self.cmd_recv.recv() => { |